diff options
-rw-r--r-- | common/queryrunner.cpp | 43 |
1 files changed, 23 insertions, 20 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index c0c1d00..3710ec8 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -110,7 +110,6 @@ void QueryRunner<DomainType>::delayNextQuery() | |||
110 | template <class DomainType> | 110 | template <class DomainType> |
111 | void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) | 111 | void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) |
112 | { | 112 | { |
113 | auto guardPtr = QPointer<QObject>(&guard); | ||
114 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; | 113 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; |
115 | if (mQueryInProgress) { | 114 | if (mQueryInProgress) { |
116 | SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize; | 115 | SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize; |
@@ -118,17 +117,19 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray & | |||
118 | return; | 117 | return; |
119 | } | 118 | } |
120 | mQueryInProgress = true; | 119 | mQueryInProgress = true; |
121 | auto resultProvider = mResultProvider; | ||
122 | auto resultTransformation = mResultTransformation; | ||
123 | auto batchSize = mBatchSize; | ||
124 | auto resourceContext = mResourceContext; | ||
125 | auto logCtx = mLogCtx; | ||
126 | auto state = mQueryState; | ||
127 | bool addDelay = mDelayNextQuery; | 120 | bool addDelay = mDelayNextQuery; |
128 | mDelayNextQuery = false; | 121 | mDelayNextQuery = false; |
129 | const bool runAsync = !query.synchronousQuery(); | 122 | const bool runAsync = !query.synchronousQuery(); |
130 | //The lambda will be executed in a separate thread, so copy all arguments | 123 | //The lambda will be executed in a separate thread, so copy all arguments |
131 | async::run<ReplayResult>([=]() { | 124 | async::run<ReplayResult>([query, |
125 | bufferType, | ||
126 | resultProvider = mResultProvider, | ||
127 | resourceContext = mResourceContext, | ||
128 | logCtx = mLogCtx, | ||
129 | state = mQueryState, | ||
130 | resultTransformation = mResultTransformation, | ||
131 | batchSize = mBatchSize, | ||
132 | addDelay]() { | ||
132 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 133 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
133 | const auto result = worker.executeInitialQuery(query, *resultProvider, batchSize, state); | 134 | const auto result = worker.executeInitialQuery(query, *resultProvider, batchSize, state); |
134 | 135 | ||
@@ -139,7 +140,7 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray & | |||
139 | 140 | ||
140 | return result; | 141 | return result; |
141 | }, runAsync) | 142 | }, runAsync) |
142 | .then([=](const ReplayResult &result) { | 143 | .then([this, query, bufferType, guardPtr = QPointer<QObject>(&guard)](const ReplayResult &result) { |
143 | if (!guardPtr) { | 144 | if (!guardPtr) { |
144 | //Not an error, the query can vanish at any time. | 145 | //Not an error, the query can vanish at any time. |
145 | return; | 146 | return; |
@@ -151,8 +152,8 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray & | |||
151 | if (query.liveQuery()) { | 152 | if (query.liveQuery()) { |
152 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | 153 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); |
153 | } | 154 | } |
154 | resultProvider->setRevision(result.newRevision); | 155 | mResultProvider->setRevision(result.newRevision); |
155 | resultProvider->initialResultSetComplete(result.replayedAll); | 156 | mResultProvider->initialResultSetComplete(result.replayedAll); |
156 | if (mRequestFetchMore) { | 157 | if (mRequestFetchMore) { |
157 | mRequestFetchMore = false; | 158 | mRequestFetchMore = false; |
158 | //This code exists for incemental fetches, so we don't skip loading another set. | 159 | //This code exists for incemental fetches, so we don't skip loading another set. |
@@ -180,19 +181,21 @@ KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &q | |||
180 | return KAsync::null(); | 181 | return KAsync::null(); |
181 | } | 182 | } |
182 | mRevisionChangedMeanwhile = false; | 183 | mRevisionChangedMeanwhile = false; |
183 | auto resultProvider = mResultProvider; | ||
184 | auto resourceContext = mResourceContext; | ||
185 | auto logCtx = mLogCtx; | ||
186 | auto state = mQueryState; | ||
187 | auto resultTransformation = mResultTransformation; | ||
188 | Q_ASSERT(!mQueryInProgress); | 184 | Q_ASSERT(!mQueryInProgress); |
189 | auto guardPtr = QPointer<QObject>(&guard); | ||
190 | bool addDelay = mDelayNextQuery; | 185 | bool addDelay = mDelayNextQuery; |
191 | mDelayNextQuery = false; | 186 | mDelayNextQuery = false; |
192 | return KAsync::start([&] { | 187 | return KAsync::start([&] { |
193 | mQueryInProgress = true; | 188 | mQueryInProgress = true; |
194 | }) | 189 | }) |
195 | .then(async::run<ReplayResult>([=]() { | 190 | //The lambda will be executed in a separate thread, so copy all arguments |
191 | .then(async::run<ReplayResult>([query, | ||
192 | bufferType, | ||
193 | resultProvider = mResultProvider, | ||
194 | resourceContext = mResourceContext, | ||
195 | logCtx = mLogCtx, | ||
196 | state = mQueryState, | ||
197 | resultTransformation = mResultTransformation, | ||
198 | addDelay]() { | ||
196 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 199 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
197 | const auto result = worker.executeIncrementalQuery(query, *resultProvider, state); | 200 | const auto result = worker.executeIncrementalQuery(query, *resultProvider, state); |
198 | ////For testing only | 201 | ////For testing only |
@@ -203,14 +206,14 @@ KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &q | |||
203 | 206 | ||
204 | return result; | 207 | return result; |
205 | })) | 208 | })) |
206 | .then([query, this, resultProvider, guardPtr, bufferType](const ReplayResult &newRevisionAndReplayedEntities) { | 209 | .then([this, query, bufferType, guardPtr = QPointer<QObject>(&guard)](const ReplayResult &newRevisionAndReplayedEntities) { |
207 | if (!guardPtr) { | 210 | if (!guardPtr) { |
208 | //Not an error, the query can vanish at any time. | 211 | //Not an error, the query can vanish at any time. |
209 | return KAsync::null(); | 212 | return KAsync::null(); |
210 | } | 213 | } |
211 | mQueryInProgress = false; | 214 | mQueryInProgress = false; |
212 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | 215 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); |
213 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | 216 | mResultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); |
214 | if (mRevisionChangedMeanwhile) { | 217 | if (mRevisionChangedMeanwhile) { |
215 | return incrementalFetch(query, bufferType); | 218 | return incrementalFetch(query, bufferType); |
216 | } | 219 | } |