summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-02-11 12:02:58 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-02-13 19:42:39 +0100
commit1259b236704e790fa1284a63ec537525bce23841 (patch)
tree85cd0491e56d2f604cc8aa291a49d20f8f73c684 /common/queryrunner.cpp
parentb4bd3932aa2a8e841ed204b341bcbf65ba59c5b2 (diff)
downloadsink-1259b236704e790fa1284a63ec537525bce23841.tar.gz
sink-1259b236704e790fa1284a63ec537525bce23841.zip
Fixed reduction updates with stateful query.
Some filters need to maintain state between runs in order to be able to emit only what has changed. This now also make reduction work for live queries.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp119
1 files changed, 66 insertions, 53 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 748320f..40880eb 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -33,6 +33,7 @@ struct ReplayResult {
33 qint64 newRevision; 33 qint64 newRevision;
34 qint64 replayedEntities; 34 qint64 replayedEntities;
35 bool replayedAll; 35 bool replayedAll;
36 DataStoreQuery::State::Ptr queryState;
36}; 37};
37 38
38/* 39/*
@@ -49,7 +50,7 @@ public:
49 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); 50 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx);
50 virtual ~QueryWorker(); 51 virtual ~QueryWorker();
51 52
52 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state);
53 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 54 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
54 55
55private: 56private:
@@ -69,45 +70,41 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
69 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; 70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get.";
70 } 71 }
71 auto guardPtr = QPointer<QObject>(&guard); 72 auto guardPtr = QPointer<QObject>(&guard);
72 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 73 auto fetcher = [=](const typename DomainType::Ptr &parent) {
73 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) {
74 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 74 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
75 SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 75 SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize;
76 auto resultProvider = mResultProvider; 76 auto resultProvider = mResultProvider;
77 if (query.synchronousQuery()) { 77 auto resultTransformation = mResultTransformation;
78 QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation, mLogCtx); 78 auto offset = mOffset[parentId];
79 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 79 auto batchSize = mBatchSize;
80 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; 80 auto resourceContext = mResourceContext;
81 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); 81 auto logCtx = mLogCtx;
82 resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); 82 const bool runAsync = !query.synchronousQuery();
83 } else { 83 //The lambda will be executed in a separate thread, so copy all arguments
84 auto resultTransformation = mResultTransformation; 84 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() {
85 auto offset = mOffset[parentId]; 85 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
86 auto batchSize = mBatchSize; 86 return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize);
87 auto resourceContext = mResourceContext; 87 }, runAsync)
88 auto logCtx = mLogCtx; 88 .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) {
89 //The lambda will be executed in a separate thread, so copy all arguments 89 if (!guardPtr) {
90 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { 90 qWarning() << "The parent object is already gone";
91 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 91 return;
92 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 92 }
93 return newRevisionAndReplayedEntities; 93 mInitialQueryComplete = true;
94 mQueryState = result.queryState;
95 mOffset[parentId] += result.replayedEntities;
96 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
97 if (query.liveQuery()) {
98 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
99 }
100 resultProvider->setRevision(result.newRevision);
101 resultProvider->initialResultSetComplete(parent, result.replayedAll);
94 }) 102 })
95 .template then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { 103 .exec();
96 if (!guardPtr) { 104 };
97 qWarning() << "The parent object is already gone"; 105
98 return; 106 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
99 } 107 mResultProvider->setFetcher(fetcher);
100 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities;
101 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
102 if (query.liveQuery()) {
103 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
104 }
105 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
106 resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll);
107 })
108 .exec();
109 }
110 });
111 108
112 // In case of a live query we keep the runner for as long alive as the result provider exists 109 // In case of a live query we keep the runner for as long alive as the result provider exists
113 if (query.liveQuery()) { 110 if (query.liveQuery()) {
@@ -117,16 +114,26 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
117 auto resultProvider = mResultProvider; 114 auto resultProvider = mResultProvider;
118 auto resourceContext = mResourceContext; 115 auto resourceContext = mResourceContext;
119 auto logCtx = mLogCtx; 116 auto logCtx = mLogCtx;
120 return async::run<ReplayResult>([=]() { 117 auto state = mQueryState;
118 if (!mInitialQueryComplete) {
119 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
120 fetcher({});
121 return KAsync::null();
122 }
123 Q_ASSERT(!mQueryInProgress);
124 return KAsync::syncStart<void>([&] {
125 mQueryInProgress = true;
126 })
127 .then(async::run<ReplayResult>([=]() {
121 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx); 128 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx);
122 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); 129 return worker.executeIncrementalQuery(query, *resultProvider, state);
123 return newRevisionAndReplayedEntities; 130 }))
124 }) 131 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
125 .template then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
126 if (!guardPtr) { 132 if (!guardPtr) {
127 qWarning() << "The parent object is already gone"; 133 qWarning() << "The parent object is already gone";
128 return; 134 return;
129 } 135 }
136 mQueryInProgress = false;
130 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 137 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
131 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); 138 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
132 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); 139 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
@@ -201,14 +208,20 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S
201} 208}
202 209
203template <class DomainType> 210template <class DomainType>
204ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 211ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state)
205{ 212{
206 QTime time; 213 QTime time;
207 time.start(); 214 time.start();
208 215
209 const qint64 baseRevision = resultProvider.revision() + 1; 216 const qint64 baseRevision = resultProvider.revision() + 1;
217 SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision;
218
210 auto entityStore = EntityStore{mResourceContext, mLogCtx}; 219 auto entityStore = EntityStore{mResourceContext, mLogCtx};
211 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 220 if (!state) {
221 SinkWarningCtx(mLogCtx) << "No previous query state.";
222 return {0, 0, false, DataStoreQuery::State::Ptr{}};
223 }
224 auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore};
212 auto resultSet = preparedQuery.update(baseRevision); 225 auto resultSet = preparedQuery.update(baseRevision);
213 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 226 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
214 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 227 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
@@ -218,7 +231,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
218 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" 231 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n"
219 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 232 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
220 << "Incremental query took: " << Log::TraceTime(time.elapsed()); 233 << "Incremental query took: " << Log::TraceTime(time.elapsed());
221 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; 234 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()};
222} 235}
223 236
224template <class DomainType> 237template <class DomainType>
@@ -251,14 +264,14 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
251 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" 264 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n"
252 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 265 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
253 << "Initial query took: " << Log::TraceTime(time.elapsed()); 266 << "Initial query took: " << Log::TraceTime(time.elapsed());
254 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; 267
268 auto state = preparedQuery.getState();
269
270 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state};
255} 271}
256 272
257template class QueryRunner<Sink::ApplicationDomain::Contact>; 273#define REGISTER_TYPE(T) \
258template class QueryRunner<Sink::ApplicationDomain::Folder>; 274 template class QueryRunner<T>; \
259template class QueryRunner<Sink::ApplicationDomain::Mail>; 275 template class QueryWorker<T>; \
260template class QueryRunner<Sink::ApplicationDomain::Event>; 276
261template class QueryWorker<Sink::ApplicationDomain::Contact>; 277SINK_REGISTER_TYPES()
262template class QueryWorker<Sink::ApplicationDomain::Folder>;
263template class QueryWorker<Sink::ApplicationDomain::Mail>;
264template class QueryWorker<Sink::ApplicationDomain::Event>;