summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-03-21 22:06:12 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-03-22 11:32:53 +0100
commit86045e308c10c60cd7c4339d305cee1acb084760 (patch)
treeb8fd3e91ffbdeb8ad2e78a61fe1d3426eb5874be /common/queryrunner.cpp
parentdaf96f7efec0538e161eab8e906a291015842e1e (diff)
downloadsink-86045e308c10c60cd7c4339d305cee1acb084760.tar.gz
sink-86045e308c10c60cd7c4339d305cee1acb084760.zip
Maintain the query state instead of using the offset.
Instead of using the offset to skip over old results requires recalculating them, and resulted in some cases in results being added multiple times to the model. By just maintaining the state we can apply the offset directly to the base-set, and maintain the state in reduction etc. which is necessary to continue streaming results while making sure we don't report anything twice.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp38
1 files changed, 21 insertions, 17 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 802fc48..43f48c0 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -51,7 +51,7 @@ public:
51 virtual ~QueryWorker(); 51 virtual ~QueryWorker();
52 52
53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); 53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state);
54 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 54 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state);
55 55
56private: 56private:
57 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); 57 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result);
@@ -72,18 +72,18 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
72 auto guardPtr = QPointer<QObject>(&guard); 72 auto guardPtr = QPointer<QObject>(&guard);
73 auto fetcher = [=](const typename DomainType::Ptr &parent) { 73 auto fetcher = [=](const typename DomainType::Ptr &parent) {
74 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 74 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
75 SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 75 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
76 auto resultProvider = mResultProvider; 76 auto resultProvider = mResultProvider;
77 auto resultTransformation = mResultTransformation; 77 auto resultTransformation = mResultTransformation;
78 auto offset = mOffset[parentId];
79 auto batchSize = mBatchSize; 78 auto batchSize = mBatchSize;
80 auto resourceContext = mResourceContext; 79 auto resourceContext = mResourceContext;
81 auto logCtx = mLogCtx; 80 auto logCtx = mLogCtx;
81 auto state = mQueryState.value(parentId);
82 const bool runAsync = !query.synchronousQuery(); 82 const bool runAsync = !query.synchronousQuery();
83 //The lambda will be executed in a separate thread, so copy all arguments 83 //The lambda will be executed in a separate thread, so copy all arguments
84 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { 84 async::run<ReplayResult>([=]() {
85 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 85 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
86 return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 86 return worker.executeInitialQuery(query, parent, *resultProvider, batchSize, state);
87 }, runAsync) 87 }, runAsync)
88 .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { 88 .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) {
89 if (!guardPtr) { 89 if (!guardPtr) {
@@ -91,8 +91,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
91 return; 91 return;
92 } 92 }
93 mInitialQueryComplete = true; 93 mInitialQueryComplete = true;
94 mQueryState = result.queryState; 94 mQueryState[parentId] = result.queryState;
95 mOffset[parentId] += result.replayedEntities;
96 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 95 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
97 if (query.liveQuery()) { 96 if (query.liveQuery()) {
98 mResourceAccess->sendRevisionReplayedCommand(result.newRevision); 97 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
@@ -111,10 +110,11 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
111 Q_ASSERT(!query.synchronousQuery()); 110 Q_ASSERT(!query.synchronousQuery());
112 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 111 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
113 setQuery([=]() -> KAsync::Job<void> { 112 setQuery([=]() -> KAsync::Job<void> {
113 const QByteArray parentId;
114 auto resultProvider = mResultProvider; 114 auto resultProvider = mResultProvider;
115 auto resourceContext = mResourceContext; 115 auto resourceContext = mResourceContext;
116 auto logCtx = mLogCtx; 116 auto logCtx = mLogCtx;
117 auto state = mQueryState; 117 auto state = mQueryState.value(parentId);
118 if (!mInitialQueryComplete) { 118 if (!mInitialQueryComplete) {
119 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; 119 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
120 fetcher({}); 120 fetcher({});
@@ -225,7 +225,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
225 SinkWarningCtx(mLogCtx) << "No previous query state."; 225 SinkWarningCtx(mLogCtx) << "No previous query state.";
226 return {0, 0, false, DataStoreQuery::State::Ptr{}}; 226 return {0, 0, false, DataStoreQuery::State::Ptr{}};
227 } 227 }
228 auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 228 auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, true};
229 auto resultSet = preparedQuery.update(baseRevision); 229 auto resultSet = preparedQuery.update(baseRevision);
230 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 230 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
231 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 231 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
@@ -240,7 +240,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
240 240
241template <class DomainType> 241template <class DomainType>
242ReplayResult QueryWorker<DomainType>::executeInitialQuery( 242ReplayResult QueryWorker<DomainType>::executeInitialQuery(
243 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) 243 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state)
244{ 244{
245 QTime time; 245 QTime time;
246 time.start(); 246 time.start();
@@ -257,11 +257,17 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
257 } 257 }
258 258
259 auto entityStore = EntityStore{mResourceContext, mLogCtx}; 259 auto entityStore = EntityStore{mResourceContext, mLogCtx};
260 auto preparedQuery = DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 260 auto preparedQuery = [&] {
261 auto resultSet = preparedQuery.execute(); 261 if (state) {
262 return DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, false};
263 } else {
264 return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore};
265 }
266 }();
267 auto resultSet = preparedQuery.execute();;
262 268
263 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 269 SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed());
264 auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { 270 auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) {
265 resultProviderCallback(query, resultProvider, result); 271 resultProviderCallback(query, resultProvider, result);
266 }); 272 });
267 273
@@ -269,9 +275,7 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
269 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 275 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
270 << "Initial query took: " << Log::TraceTime(time.elapsed()); 276 << "Initial query took: " << Log::TraceTime(time.elapsed());
271 277
272 auto state = preparedQuery.getState(); 278 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()};
273
274 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state};
275} 279}
276 280
277#define REGISTER_TYPE(T) \ 281#define REGISTER_TYPE(T) \