diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-02-11 12:02:58 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-02-13 19:42:39 +0100 |
commit | 1259b236704e790fa1284a63ec537525bce23841 (patch) | |
tree | 85cd0491e56d2f604cc8aa291a49d20f8f73c684 /common/queryrunner.cpp | |
parent | b4bd3932aa2a8e841ed204b341bcbf65ba59c5b2 (diff) | |
download | sink-1259b236704e790fa1284a63ec537525bce23841.tar.gz sink-1259b236704e790fa1284a63ec537525bce23841.zip |
Fixed reduction updates with stateful query.
Some filters need to maintain state between runs in order to be able to
emit only what has changed. This now also make reduction work for live
queries.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 119 |
1 files changed, 66 insertions, 53 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 748320f..40880eb 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -33,6 +33,7 @@ struct ReplayResult { | |||
33 | qint64 newRevision; | 33 | qint64 newRevision; |
34 | qint64 replayedEntities; | 34 | qint64 replayedEntities; |
35 | bool replayedAll; | 35 | bool replayedAll; |
36 | DataStoreQuery::State::Ptr queryState; | ||
36 | }; | 37 | }; |
37 | 38 | ||
38 | /* | 39 | /* |
@@ -49,7 +50,7 @@ public: | |||
49 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); | 50 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); |
50 | virtual ~QueryWorker(); | 51 | virtual ~QueryWorker(); |
51 | 52 | ||
52 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 53 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); |
53 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); | 54 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); |
54 | 55 | ||
55 | private: | 56 | private: |
@@ -69,45 +70,41 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
69 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; | 70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; |
70 | } | 71 | } |
71 | auto guardPtr = QPointer<QObject>(&guard); | 72 | auto guardPtr = QPointer<QObject>(&guard); |
72 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. | 73 | auto fetcher = [=](const typename DomainType::Ptr &parent) { |
73 | mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { | ||
74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); | 74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); |
75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; | 75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; |
76 | auto resultProvider = mResultProvider; | 76 | auto resultProvider = mResultProvider; |
77 | if (query.synchronousQuery()) { | 77 | auto resultTransformation = mResultTransformation; |
78 | QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation, mLogCtx); | 78 | auto offset = mOffset[parentId]; |
79 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); | 79 | auto batchSize = mBatchSize; |
80 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; | 80 | auto resourceContext = mResourceContext; |
81 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | 81 | auto logCtx = mLogCtx; |
82 | resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); | 82 | const bool runAsync = !query.synchronousQuery(); |
83 | } else { | 83 | //The lambda will be executed in a separate thread, so copy all arguments |
84 | auto resultTransformation = mResultTransformation; | 84 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { |
85 | auto offset = mOffset[parentId]; | 85 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
86 | auto batchSize = mBatchSize; | 86 | return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); |
87 | auto resourceContext = mResourceContext; | 87 | }, runAsync) |
88 | auto logCtx = mLogCtx; | 88 | .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { |
89 | //The lambda will be executed in a separate thread, so copy all arguments | 89 | if (!guardPtr) { |
90 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { | 90 | qWarning() << "The parent object is already gone"; |
91 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 91 | return; |
92 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); | 92 | } |
93 | return newRevisionAndReplayedEntities; | 93 | mInitialQueryComplete = true; |
94 | mQueryState = result.queryState; | ||
95 | mOffset[parentId] += result.replayedEntities; | ||
96 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
97 | if (query.liveQuery()) { | ||
98 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | ||
99 | } | ||
100 | resultProvider->setRevision(result.newRevision); | ||
101 | resultProvider->initialResultSetComplete(parent, result.replayedAll); | ||
94 | }) | 102 | }) |
95 | .template then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | 103 | .exec(); |
96 | if (!guardPtr) { | 104 | }; |
97 | qWarning() << "The parent object is already gone"; | 105 | |
98 | return; | 106 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. |
99 | } | 107 | mResultProvider->setFetcher(fetcher); |
100 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; | ||
101 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
102 | if (query.liveQuery()) { | ||
103 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | ||
104 | } | ||
105 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
106 | resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); | ||
107 | }) | ||
108 | .exec(); | ||
109 | } | ||
110 | }); | ||
111 | 108 | ||
112 | // In case of a live query we keep the runner for as long alive as the result provider exists | 109 | // In case of a live query we keep the runner for as long alive as the result provider exists |
113 | if (query.liveQuery()) { | 110 | if (query.liveQuery()) { |
@@ -117,16 +114,26 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
117 | auto resultProvider = mResultProvider; | 114 | auto resultProvider = mResultProvider; |
118 | auto resourceContext = mResourceContext; | 115 | auto resourceContext = mResourceContext; |
119 | auto logCtx = mLogCtx; | 116 | auto logCtx = mLogCtx; |
120 | return async::run<ReplayResult>([=]() { | 117 | auto state = mQueryState; |
118 | if (!mInitialQueryComplete) { | ||
119 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | ||
120 | fetcher({}); | ||
121 | return KAsync::null(); | ||
122 | } | ||
123 | Q_ASSERT(!mQueryInProgress); | ||
124 | return KAsync::syncStart<void>([&] { | ||
125 | mQueryInProgress = true; | ||
126 | }) | ||
127 | .then(async::run<ReplayResult>([=]() { | ||
121 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx); | 128 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx); |
122 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); | 129 | return worker.executeIncrementalQuery(query, *resultProvider, state); |
123 | return newRevisionAndReplayedEntities; | 130 | })) |
124 | }) | 131 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { |
125 | .template then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | ||
126 | if (!guardPtr) { | 132 | if (!guardPtr) { |
127 | qWarning() << "The parent object is already gone"; | 133 | qWarning() << "The parent object is already gone"; |
128 | return; | 134 | return; |
129 | } | 135 | } |
136 | mQueryInProgress = false; | ||
130 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 137 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
131 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | 138 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); |
132 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | 139 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); |
@@ -201,14 +208,20 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S | |||
201 | } | 208 | } |
202 | 209 | ||
203 | template <class DomainType> | 210 | template <class DomainType> |
204 | ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 211 | ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state) |
205 | { | 212 | { |
206 | QTime time; | 213 | QTime time; |
207 | time.start(); | 214 | time.start(); |
208 | 215 | ||
209 | const qint64 baseRevision = resultProvider.revision() + 1; | 216 | const qint64 baseRevision = resultProvider.revision() + 1; |
217 | SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision; | ||
218 | |||
210 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; | 219 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; |
211 | auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 220 | if (!state) { |
221 | SinkWarningCtx(mLogCtx) << "No previous query state."; | ||
222 | return {0, 0, false, DataStoreQuery::State::Ptr{}}; | ||
223 | } | ||
224 | auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | ||
212 | auto resultSet = preparedQuery.update(baseRevision); | 225 | auto resultSet = preparedQuery.update(baseRevision); |
213 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 226 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
214 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { | 227 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { |
@@ -218,7 +231,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
218 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" | 231 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" |
219 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 232 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
220 | << "Incremental query took: " << Log::TraceTime(time.elapsed()); | 233 | << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
221 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; | 234 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; |
222 | } | 235 | } |
223 | 236 | ||
224 | template <class DomainType> | 237 | template <class DomainType> |
@@ -251,14 +264,14 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
251 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" | 264 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" |
252 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 265 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
253 | << "Initial query took: " << Log::TraceTime(time.elapsed()); | 266 | << "Initial query took: " << Log::TraceTime(time.elapsed()); |
254 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; | 267 | |
268 | auto state = preparedQuery.getState(); | ||
269 | |||
270 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state}; | ||
255 | } | 271 | } |
256 | 272 | ||
257 | template class QueryRunner<Sink::ApplicationDomain::Contact>; | 273 | #define REGISTER_TYPE(T) \ |
258 | template class QueryRunner<Sink::ApplicationDomain::Folder>; | 274 | template class QueryRunner<T>; \ |
259 | template class QueryRunner<Sink::ApplicationDomain::Mail>; | 275 | template class QueryWorker<T>; \ |
260 | template class QueryRunner<Sink::ApplicationDomain::Event>; | 276 | |
261 | template class QueryWorker<Sink::ApplicationDomain::Contact>; | 277 | SINK_REGISTER_TYPES() |
262 | template class QueryWorker<Sink::ApplicationDomain::Folder>; | ||
263 | template class QueryWorker<Sink::ApplicationDomain::Mail>; | ||
264 | template class QueryWorker<Sink::ApplicationDomain::Event>; | ||