summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2018-04-01 09:41:28 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2018-04-01 10:02:41 +0200
commit8d2d52fcfd3c42a82b7f86c6f3c5009461f1de9f (patch)
tree042e2db12927d289fce3a2fab8486e03ac4c9049 /common/queryrunner.cpp
parent53b76e66f5527a5f9442173279b0a01f1f07da46 (diff)
downloadsink-8d2d52fcfd3c42a82b7f86c6f3c5009461f1de9f.tar.gz
sink-8d2d52fcfd3c42a82b7f86c6f3c5009461f1de9f.zip
Avoid missing revision updates while a query is running.
Instead we have to remember that something has changed and rerun an incremental query.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp53
1 files changed, 45 insertions, 8 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 2c50fca..c0c1d00 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -21,6 +21,8 @@
21#include <limits> 21#include <limits>
22#include <QTime> 22#include <QTime>
23#include <QPointer> 23#include <QPointer>
24#include <thread>
25#include <chrono>
24 26
25#include "commands.h" 27#include "commands.h"
26#include "asyncutils.h" 28#include "asyncutils.h"
@@ -97,6 +99,13 @@ QueryRunner<DomainType>::~QueryRunner()
97 SinkTraceCtx(mLogCtx) << "Stopped query"; 99 SinkTraceCtx(mLogCtx) << "Stopped query";
98} 100}
99 101
102
103template <class DomainType>
104void QueryRunner<DomainType>::delayNextQuery()
105{
106 mDelayNextQuery = true;
107}
108
100//This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. 109//This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize.
101template <class DomainType> 110template <class DomainType>
102void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) 111void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType)
@@ -115,11 +124,20 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &
115 auto resourceContext = mResourceContext; 124 auto resourceContext = mResourceContext;
116 auto logCtx = mLogCtx; 125 auto logCtx = mLogCtx;
117 auto state = mQueryState; 126 auto state = mQueryState;
127 bool addDelay = mDelayNextQuery;
128 mDelayNextQuery = false;
118 const bool runAsync = !query.synchronousQuery(); 129 const bool runAsync = !query.synchronousQuery();
119 //The lambda will be executed in a separate thread, so copy all arguments 130 //The lambda will be executed in a separate thread, so copy all arguments
120 async::run<ReplayResult>([=]() { 131 async::run<ReplayResult>([=]() {
121 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 132 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
122 return worker.executeInitialQuery(query, *resultProvider, batchSize, state); 133 const auto result = worker.executeInitialQuery(query, *resultProvider, batchSize, state);
134
135 //For testing only
136 if (addDelay) {
137 std::this_thread::sleep_for(std::chrono::seconds(1));
138 }
139
140 return result;
123 }, runAsync) 141 }, runAsync)
124 .then([=](const ReplayResult &result) { 142 .then([=](const ReplayResult &result) {
125 if (!guardPtr) { 143 if (!guardPtr) {
@@ -137,7 +155,12 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &
137 resultProvider->initialResultSetComplete(result.replayedAll); 155 resultProvider->initialResultSetComplete(result.replayedAll);
138 if (mRequestFetchMore) { 156 if (mRequestFetchMore) {
139 mRequestFetchMore = false; 157 mRequestFetchMore = false;
158 //This code exists for incemental fetches, so we don't skip loading another set.
140 fetch(query, bufferType); 159 fetch(query, bufferType);
160 return;
161 }
162 if (mRevisionChangedMeanwhile) {
163 incrementalFetch(query, bufferType).exec();
141 } 164 }
142 }) 165 })
143 .exec(); 166 .exec();
@@ -146,15 +169,17 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &
146template <class DomainType> 169template <class DomainType>
147KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) 170KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType)
148{ 171{
149 if (!mInitialQueryComplete) { 172 if (!mInitialQueryComplete && !mQueryInProgress) {
150 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; 173 //We rely on this codepath in the case of newly added resources to trigger the initial fetch.
151 fetch(query, bufferType); 174 fetch(query, bufferType);
152 return KAsync::null(); 175 return KAsync::null();
153 } 176 }
154 if (mQueryInProgress) { 177 if (mQueryInProgress) {
155 //Can happen if the revision come in quicker than we process them. 178 //If a query is already in progress we just remember to fetch again once the current query is done.
179 mRevisionChangedMeanwhile = true;
156 return KAsync::null(); 180 return KAsync::null();
157 } 181 }
182 mRevisionChangedMeanwhile = false;
158 auto resultProvider = mResultProvider; 183 auto resultProvider = mResultProvider;
159 auto resourceContext = mResourceContext; 184 auto resourceContext = mResourceContext;
160 auto logCtx = mLogCtx; 185 auto logCtx = mLogCtx;
@@ -162,22 +187,34 @@ KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &q
162 auto resultTransformation = mResultTransformation; 187 auto resultTransformation = mResultTransformation;
163 Q_ASSERT(!mQueryInProgress); 188 Q_ASSERT(!mQueryInProgress);
164 auto guardPtr = QPointer<QObject>(&guard); 189 auto guardPtr = QPointer<QObject>(&guard);
190 bool addDelay = mDelayNextQuery;
191 mDelayNextQuery = false;
165 return KAsync::start([&] { 192 return KAsync::start([&] {
166 mQueryInProgress = true; 193 mQueryInProgress = true;
167 }) 194 })
168 .then(async::run<ReplayResult>([=]() { 195 .then(async::run<ReplayResult>([=]() {
169 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 196 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
170 return worker.executeIncrementalQuery(query, *resultProvider, state); 197 const auto result = worker.executeIncrementalQuery(query, *resultProvider, state);
198 ////For testing only
199 if (addDelay) {
200 SinkWarning() << "Sleeping in incremental query";
201 std::this_thread::sleep_for(std::chrono::seconds(1));
202 }
203
204 return result;
171 })) 205 }))
172 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { 206 .then([query, this, resultProvider, guardPtr, bufferType](const ReplayResult &newRevisionAndReplayedEntities) {
173 if (!guardPtr) { 207 if (!guardPtr) {
174 //Not an error, the query can vanish at any time. 208 //Not an error, the query can vanish at any time.
175 return; 209 return KAsync::null();
176 } 210 }
177 mQueryInProgress = false; 211 mQueryInProgress = false;
178 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
179 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); 212 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
180 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); 213 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
214 if (mRevisionChangedMeanwhile) {
215 return incrementalFetch(query, bufferType);
216 }
217 return KAsync::null();
181 }); 218 });
182} 219}
183 220