summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp43
1 files changed, 23 insertions, 20 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index c0c1d00..3710ec8 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -110,7 +110,6 @@ void QueryRunner<DomainType>::delayNextQuery()
110template <class DomainType> 110template <class DomainType>
111void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) 111void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType)
112{ 112{
113 auto guardPtr = QPointer<QObject>(&guard);
114 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; 113 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
115 if (mQueryInProgress) { 114 if (mQueryInProgress) {
116 SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize; 115 SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize;
@@ -118,17 +117,19 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &
118 return; 117 return;
119 } 118 }
120 mQueryInProgress = true; 119 mQueryInProgress = true;
121 auto resultProvider = mResultProvider;
122 auto resultTransformation = mResultTransformation;
123 auto batchSize = mBatchSize;
124 auto resourceContext = mResourceContext;
125 auto logCtx = mLogCtx;
126 auto state = mQueryState;
127 bool addDelay = mDelayNextQuery; 120 bool addDelay = mDelayNextQuery;
128 mDelayNextQuery = false; 121 mDelayNextQuery = false;
129 const bool runAsync = !query.synchronousQuery(); 122 const bool runAsync = !query.synchronousQuery();
130 //The lambda will be executed in a separate thread, so copy all arguments 123 //The lambda will be executed in a separate thread, so copy all arguments
131 async::run<ReplayResult>([=]() { 124 async::run<ReplayResult>([query,
125 bufferType,
126 resultProvider = mResultProvider,
127 resourceContext = mResourceContext,
128 logCtx = mLogCtx,
129 state = mQueryState,
130 resultTransformation = mResultTransformation,
131 batchSize = mBatchSize,
132 addDelay]() {
132 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 133 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
133 const auto result = worker.executeInitialQuery(query, *resultProvider, batchSize, state); 134 const auto result = worker.executeInitialQuery(query, *resultProvider, batchSize, state);
134 135
@@ -139,7 +140,7 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &
139 140
140 return result; 141 return result;
141 }, runAsync) 142 }, runAsync)
142 .then([=](const ReplayResult &result) { 143 .then([this, query, bufferType, guardPtr = QPointer<QObject>(&guard)](const ReplayResult &result) {
143 if (!guardPtr) { 144 if (!guardPtr) {
144 //Not an error, the query can vanish at any time. 145 //Not an error, the query can vanish at any time.
145 return; 146 return;
@@ -151,8 +152,8 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &
151 if (query.liveQuery()) { 152 if (query.liveQuery()) {
152 mResourceAccess->sendRevisionReplayedCommand(result.newRevision); 153 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
153 } 154 }
154 resultProvider->setRevision(result.newRevision); 155 mResultProvider->setRevision(result.newRevision);
155 resultProvider->initialResultSetComplete(result.replayedAll); 156 mResultProvider->initialResultSetComplete(result.replayedAll);
156 if (mRequestFetchMore) { 157 if (mRequestFetchMore) {
157 mRequestFetchMore = false; 158 mRequestFetchMore = false;
158 //This code exists for incemental fetches, so we don't skip loading another set. 159 //This code exists for incemental fetches, so we don't skip loading another set.
@@ -180,19 +181,21 @@ KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &q
180 return KAsync::null(); 181 return KAsync::null();
181 } 182 }
182 mRevisionChangedMeanwhile = false; 183 mRevisionChangedMeanwhile = false;
183 auto resultProvider = mResultProvider;
184 auto resourceContext = mResourceContext;
185 auto logCtx = mLogCtx;
186 auto state = mQueryState;
187 auto resultTransformation = mResultTransformation;
188 Q_ASSERT(!mQueryInProgress); 184 Q_ASSERT(!mQueryInProgress);
189 auto guardPtr = QPointer<QObject>(&guard);
190 bool addDelay = mDelayNextQuery; 185 bool addDelay = mDelayNextQuery;
191 mDelayNextQuery = false; 186 mDelayNextQuery = false;
192 return KAsync::start([&] { 187 return KAsync::start([&] {
193 mQueryInProgress = true; 188 mQueryInProgress = true;
194 }) 189 })
195 .then(async::run<ReplayResult>([=]() { 190 //The lambda will be executed in a separate thread, so copy all arguments
191 .then(async::run<ReplayResult>([query,
192 bufferType,
193 resultProvider = mResultProvider,
194 resourceContext = mResourceContext,
195 logCtx = mLogCtx,
196 state = mQueryState,
197 resultTransformation = mResultTransformation,
198 addDelay]() {
196 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 199 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
197 const auto result = worker.executeIncrementalQuery(query, *resultProvider, state); 200 const auto result = worker.executeIncrementalQuery(query, *resultProvider, state);
198 ////For testing only 201 ////For testing only
@@ -203,14 +206,14 @@ KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &q
203 206
204 return result; 207 return result;
205 })) 208 }))
206 .then([query, this, resultProvider, guardPtr, bufferType](const ReplayResult &newRevisionAndReplayedEntities) { 209 .then([this, query, bufferType, guardPtr = QPointer<QObject>(&guard)](const ReplayResult &newRevisionAndReplayedEntities) {
207 if (!guardPtr) { 210 if (!guardPtr) {
208 //Not an error, the query can vanish at any time. 211 //Not an error, the query can vanish at any time.
209 return KAsync::null(); 212 return KAsync::null();
210 } 213 }
211 mQueryInProgress = false; 214 mQueryInProgress = false;
212 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); 215 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
213 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); 216 mResultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
214 if (mRevisionChangedMeanwhile) { 217 if (mRevisionChangedMeanwhile) {
215 return incrementalFetch(query, bufferType); 218 return incrementalFetch(query, bufferType);
216 } 219 }