diff options
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 119 |
1 files changed, 66 insertions, 53 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 748320f..40880eb 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -33,6 +33,7 @@ struct ReplayResult { | |||
33 | qint64 newRevision; | 33 | qint64 newRevision; |
34 | qint64 replayedEntities; | 34 | qint64 replayedEntities; |
35 | bool replayedAll; | 35 | bool replayedAll; |
36 | DataStoreQuery::State::Ptr queryState; | ||
36 | }; | 37 | }; |
37 | 38 | ||
38 | /* | 39 | /* |
@@ -49,7 +50,7 @@ public: | |||
49 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); | 50 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); |
50 | virtual ~QueryWorker(); | 51 | virtual ~QueryWorker(); |
51 | 52 | ||
52 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 53 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); |
53 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); | 54 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); |
54 | 55 | ||
55 | private: | 56 | private: |
@@ -69,45 +70,41 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
69 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; | 70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; |
70 | } | 71 | } |
71 | auto guardPtr = QPointer<QObject>(&guard); | 72 | auto guardPtr = QPointer<QObject>(&guard); |
72 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. | 73 | auto fetcher = [=](const typename DomainType::Ptr &parent) { |
73 | mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { | ||
74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); | 74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); |
75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; | 75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; |
76 | auto resultProvider = mResultProvider; | 76 | auto resultProvider = mResultProvider; |
77 | if (query.synchronousQuery()) { | 77 | auto resultTransformation = mResultTransformation; |
78 | QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation, mLogCtx); | 78 | auto offset = mOffset[parentId]; |
79 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); | 79 | auto batchSize = mBatchSize; |
80 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; | 80 | auto resourceContext = mResourceContext; |
81 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | 81 | auto logCtx = mLogCtx; |
82 | resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); | 82 | const bool runAsync = !query.synchronousQuery(); |
83 | } else { | 83 | //The lambda will be executed in a separate thread, so copy all arguments |
84 | auto resultTransformation = mResultTransformation; | 84 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { |
85 | auto offset = mOffset[parentId]; | 85 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
86 | auto batchSize = mBatchSize; | 86 | return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); |
87 | auto resourceContext = mResourceContext; | 87 | }, runAsync) |
88 | auto logCtx = mLogCtx; | 88 | .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { |
89 | //The lambda will be executed in a separate thread, so copy all arguments | 89 | if (!guardPtr) { |
90 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { | 90 | qWarning() << "The parent object is already gone"; |
91 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 91 | return; |
92 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); | 92 | } |
93 | return newRevisionAndReplayedEntities; | 93 | mInitialQueryComplete = true; |
94 | mQueryState = result.queryState; | ||
95 | mOffset[parentId] += result.replayedEntities; | ||
96 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
97 | if (query.liveQuery()) { | ||
98 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | ||
99 | } | ||
100 | resultProvider->setRevision(result.newRevision); | ||
101 | resultProvider->initialResultSetComplete(parent, result.replayedAll); | ||
94 | }) | 102 | }) |
95 | .template then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | 103 | .exec(); |
96 | if (!guardPtr) { | 104 | }; |
97 | qWarning() << "The parent object is already gone"; | 105 | |
98 | return; | 106 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. |
99 | } | 107 | mResultProvider->setFetcher(fetcher); |
100 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; | ||
101 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
102 | if (query.liveQuery()) { | ||
103 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | ||
104 | } | ||
105 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
106 | resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); | ||
107 | }) | ||
108 | .exec(); | ||
109 | } | ||
110 | }); | ||
111 | 108 | ||
112 | // In case of a live query we keep the runner for as long alive as the result provider exists | 109 | // In case of a live query we keep the runner for as long alive as the result provider exists |
113 | if (query.liveQuery()) { | 110 | if (query.liveQuery()) { |
@@ -117,16 +114,26 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
117 | auto resultProvider = mResultProvider; | 114 | auto resultProvider = mResultProvider; |
118 | auto resourceContext = mResourceContext; | 115 | auto resourceContext = mResourceContext; |
119 | auto logCtx = mLogCtx; | 116 | auto logCtx = mLogCtx; |
120 | return async::run<ReplayResult>([=]() { | 117 | auto state = mQueryState; |
118 | if (!mInitialQueryComplete) { | ||
119 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | ||
120 | fetcher({}); | ||
121 | return KAsync::null(); | ||
122 | } | ||
123 | Q_ASSERT(!mQueryInProgress); | ||
124 | return KAsync::syncStart<void>([&] { | ||
125 | mQueryInProgress = true; | ||
126 | }) | ||
127 | .then(async::run<ReplayResult>([=]() { | ||
121 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx); | 128 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx); |
122 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); | 129 | return worker.executeIncrementalQuery(query, *resultProvider, state); |
123 | return newRevisionAndReplayedEntities; | 130 | })) |
124 | }) | 131 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { |
125 | .template then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | ||
126 | if (!guardPtr) { | 132 | if (!guardPtr) { |
127 | qWarning() << "The parent object is already gone"; | 133 | qWarning() << "The parent object is already gone"; |
128 | return; | 134 | return; |
129 | } | 135 | } |
136 | mQueryInProgress = false; | ||
130 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 137 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
131 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | 138 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); |
132 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | 139 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); |
@@ -201,14 +208,20 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S | |||
201 | } | 208 | } |
202 | 209 | ||
203 | template <class DomainType> | 210 | template <class DomainType> |
204 | ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 211 | ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state) |
205 | { | 212 | { |
206 | QTime time; | 213 | QTime time; |
207 | time.start(); | 214 | time.start(); |
208 | 215 | ||
209 | const qint64 baseRevision = resultProvider.revision() + 1; | 216 | const qint64 baseRevision = resultProvider.revision() + 1; |
217 | SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision; | ||
218 | |||
210 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; | 219 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; |
211 | auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 220 | if (!state) { |
221 | SinkWarningCtx(mLogCtx) << "No previous query state."; | ||
222 | return {0, 0, false, DataStoreQuery::State::Ptr{}}; | ||
223 | } | ||
224 | auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | ||
212 | auto resultSet = preparedQuery.update(baseRevision); | 225 | auto resultSet = preparedQuery.update(baseRevision); |
213 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 226 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
214 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { | 227 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { |
@@ -218,7 +231,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
218 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" | 231 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" |
219 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 232 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
220 | << "Incremental query took: " << Log::TraceTime(time.elapsed()); | 233 | << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
221 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; | 234 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; |
222 | } | 235 | } |
223 | 236 | ||
224 | template <class DomainType> | 237 | template <class DomainType> |
@@ -251,14 +264,14 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
251 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" | 264 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" |
252 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 265 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
253 | << "Initial query took: " << Log::TraceTime(time.elapsed()); | 266 | << "Initial query took: " << Log::TraceTime(time.elapsed()); |
254 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; | 267 | |
268 | auto state = preparedQuery.getState(); | ||
269 | |||
270 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state}; | ||
255 | } | 271 | } |
256 | 272 | ||
257 | template class QueryRunner<Sink::ApplicationDomain::Contact>; | 273 | #define REGISTER_TYPE(T) \ |
258 | template class QueryRunner<Sink::ApplicationDomain::Folder>; | 274 | template class QueryRunner<T>; \ |
259 | template class QueryRunner<Sink::ApplicationDomain::Mail>; | 275 | template class QueryWorker<T>; \ |
260 | template class QueryRunner<Sink::ApplicationDomain::Event>; | 276 | |
261 | template class QueryWorker<Sink::ApplicationDomain::Contact>; | 277 | SINK_REGISTER_TYPES() |
262 | template class QueryWorker<Sink::ApplicationDomain::Folder>; | ||
263 | template class QueryWorker<Sink::ApplicationDomain::Mail>; | ||
264 | template class QueryWorker<Sink::ApplicationDomain::Event>; | ||