diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-02-15 10:19:08 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-02-15 11:08:52 +0100 |
commit | 531972042d4b610258c8af8a17ec3a99cd063dda (patch) | |
tree | ef4356ec141f0f1ccd756e8610b08553b866bf78 /common/queryrunner.cpp | |
parent | f51963f057bcbdd175114433913a1c5f0eebd546 (diff) | |
download | sink-531972042d4b610258c8af8a17ec3a99cd063dda.tar.gz sink-531972042d4b610258c8af8a17ec3a99cd063dda.zip |
Fixed crashes due to concurrently running queries.
A single QueryRunner should never have multiple workers running at the
same time. We did not properly enforce this in case of incremental
updates coming in.
The only way I managed to reproduce the crash:
* Open a large folder with lots of unread mail in kube
* Select a mail in the maillist and hold the down button
* This will:
* Repeatedly call fetch more
* Trigger lot's of mark as read modifications that result in
notifications.
* Eventually it crashes somewhere in EntityStore, likely because
of concurrent access of the filter structure which is shared through
the state.
We now ensure in the single threaded portion of the code that we only
ever run one worker at a time. If we did receive an update during,
we remember that change and fetch more once we're done.
To be able to call fetch again that portion was also factored out into a
separate function.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 153 |
1 files changed, 86 insertions, 67 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 928e1e0..0ed4cb5 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -69,79 +69,14 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
69 | if (query.limit() && query.sortProperty().isEmpty()) { | 69 | if (query.limit() && query.sortProperty().isEmpty()) { |
70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; | 70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; |
71 | } | 71 | } |
72 | auto guardPtr = QPointer<QObject>(&guard); | ||
73 | auto fetcher = [=]() { | ||
74 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; | ||
75 | auto resultProvider = mResultProvider; | ||
76 | auto resultTransformation = mResultTransformation; | ||
77 | auto batchSize = mBatchSize; | ||
78 | auto resourceContext = mResourceContext; | ||
79 | auto logCtx = mLogCtx; | ||
80 | auto state = mQueryState; | ||
81 | const bool runAsync = !query.synchronousQuery(); | ||
82 | //The lambda will be executed in a separate thread, so copy all arguments | ||
83 | async::run<ReplayResult>([=]() { | ||
84 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
85 | return worker.executeInitialQuery(query, *resultProvider, batchSize, state); | ||
86 | }, runAsync) | ||
87 | .then([this, query, resultProvider, guardPtr](const ReplayResult &result) { | ||
88 | if (!guardPtr) { | ||
89 | //Not an error, the query can vanish at any time. | ||
90 | return; | ||
91 | } | ||
92 | mInitialQueryComplete = true; | ||
93 | mQueryState = result.queryState; | ||
94 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
95 | if (query.liveQuery()) { | ||
96 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | ||
97 | } | ||
98 | resultProvider->setRevision(result.newRevision); | ||
99 | resultProvider->initialResultSetComplete(result.replayedAll); | ||
100 | }) | ||
101 | .exec(); | ||
102 | }; | ||
103 | |||
104 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. | 72 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. |
105 | mResultProvider->setFetcher(fetcher); | 73 | mResultProvider->setFetcher([this, query, bufferType] { fetch(query, bufferType); }); |
106 | 74 | ||
107 | // In case of a live query we keep the runner for as long alive as the result provider exists | 75 | // In case of a live query we keep the runner for as long alive as the result provider exists |
108 | if (query.liveQuery()) { | 76 | if (query.liveQuery()) { |
109 | Q_ASSERT(!query.synchronousQuery()); | 77 | Q_ASSERT(!query.synchronousQuery()); |
110 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 78 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
111 | setQuery([=]() -> KAsync::Job<void> { | 79 | setQuery([=]() { return incrementalFetch(query, bufferType); }); |
112 | auto resultProvider = mResultProvider; | ||
113 | auto resourceContext = mResourceContext; | ||
114 | auto logCtx = mLogCtx; | ||
115 | auto state = mQueryState; | ||
116 | auto resultTransformation = mResultTransformation; | ||
117 | if (!mInitialQueryComplete) { | ||
118 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | ||
119 | fetcher(); | ||
120 | return KAsync::null(); | ||
121 | } | ||
122 | if (mQueryInProgress) { | ||
123 | //Can happen if the revision come in quicker than we process them. | ||
124 | return KAsync::null(); | ||
125 | } | ||
126 | Q_ASSERT(!mQueryInProgress); | ||
127 | return KAsync::start([&] { | ||
128 | mQueryInProgress = true; | ||
129 | }) | ||
130 | .then(async::run<ReplayResult>([=]() { | ||
131 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
132 | return worker.executeIncrementalQuery(query, *resultProvider, state); | ||
133 | })) | ||
134 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | ||
135 | if (!guardPtr) { | ||
136 | //Not an error, the query can vanish at any time. | ||
137 | return; | ||
138 | } | ||
139 | mQueryInProgress = false; | ||
140 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
141 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | ||
142 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
143 | }); | ||
144 | }); | ||
145 | // Ensure the connection is open, if it wasn't already opened | 80 | // Ensure the connection is open, if it wasn't already opened |
146 | // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | 81 | // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates |
147 | mResourceAccess->open(); | 82 | mResourceAccess->open(); |
@@ -158,6 +93,90 @@ QueryRunner<DomainType>::~QueryRunner() | |||
158 | SinkTraceCtx(mLogCtx) << "Stopped query"; | 93 | SinkTraceCtx(mLogCtx) << "Stopped query"; |
159 | } | 94 | } |
160 | 95 | ||
96 | //This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. | ||
97 | template <class DomainType> | ||
98 | void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) | ||
99 | { | ||
100 | auto guardPtr = QPointer<QObject>(&guard); | ||
101 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; | ||
102 | if (mQueryInProgress) { | ||
103 | SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize; | ||
104 | mRequestFetchMore = true; | ||
105 | return; | ||
106 | } | ||
107 | mQueryInProgress = true; | ||
108 | auto resultProvider = mResultProvider; | ||
109 | auto resultTransformation = mResultTransformation; | ||
110 | auto batchSize = mBatchSize; | ||
111 | auto resourceContext = mResourceContext; | ||
112 | auto logCtx = mLogCtx; | ||
113 | auto state = mQueryState; | ||
114 | const bool runAsync = !query.synchronousQuery(); | ||
115 | //The lambda will be executed in a separate thread, so copy all arguments | ||
116 | async::run<ReplayResult>([=]() { | ||
117 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
118 | return worker.executeInitialQuery(query, *resultProvider, batchSize, state); | ||
119 | }, runAsync) | ||
120 | .then([=](const ReplayResult &result) { | ||
121 | if (!guardPtr) { | ||
122 | //Not an error, the query can vanish at any time. | ||
123 | return; | ||
124 | } | ||
125 | mInitialQueryComplete = true; | ||
126 | mQueryInProgress = false; | ||
127 | mQueryState = result.queryState; | ||
128 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
129 | if (query.liveQuery()) { | ||
130 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | ||
131 | } | ||
132 | resultProvider->setRevision(result.newRevision); | ||
133 | resultProvider->initialResultSetComplete(result.replayedAll); | ||
134 | if (mRequestFetchMore) { | ||
135 | mRequestFetchMore = false; | ||
136 | fetch(query, bufferType); | ||
137 | } | ||
138 | }) | ||
139 | .exec(); | ||
140 | } | ||
141 | |||
142 | template <class DomainType> | ||
143 | KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) | ||
144 | { | ||
145 | if (!mInitialQueryComplete) { | ||
146 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | ||
147 | fetch(query, bufferType); | ||
148 | return KAsync::null(); | ||
149 | } | ||
150 | if (mQueryInProgress) { | ||
151 | //Can happen if the revision come in quicker than we process them. | ||
152 | return KAsync::null(); | ||
153 | } | ||
154 | auto resultProvider = mResultProvider; | ||
155 | auto resourceContext = mResourceContext; | ||
156 | auto logCtx = mLogCtx; | ||
157 | auto state = mQueryState; | ||
158 | auto resultTransformation = mResultTransformation; | ||
159 | Q_ASSERT(!mQueryInProgress); | ||
160 | auto guardPtr = QPointer<QObject>(&guard); | ||
161 | return KAsync::start([&] { | ||
162 | mQueryInProgress = true; | ||
163 | }) | ||
164 | .then(async::run<ReplayResult>([=]() { | ||
165 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
166 | return worker.executeIncrementalQuery(query, *resultProvider, state); | ||
167 | })) | ||
168 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | ||
169 | if (!guardPtr) { | ||
170 | //Not an error, the query can vanish at any time. | ||
171 | return; | ||
172 | } | ||
173 | mQueryInProgress = false; | ||
174 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
175 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | ||
176 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
177 | }); | ||
178 | } | ||
179 | |||
161 | template <class DomainType> | 180 | template <class DomainType> |
162 | void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) | 181 | void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) |
163 | { | 182 | { |