summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp119
1 files changed, 66 insertions, 53 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 748320f..40880eb 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -33,6 +33,7 @@ struct ReplayResult {
33 qint64 newRevision; 33 qint64 newRevision;
34 qint64 replayedEntities; 34 qint64 replayedEntities;
35 bool replayedAll; 35 bool replayedAll;
36 DataStoreQuery::State::Ptr queryState;
36}; 37};
37 38
38/* 39/*
@@ -49,7 +50,7 @@ public:
49 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); 50 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx);
50 virtual ~QueryWorker(); 51 virtual ~QueryWorker();
51 52
52 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state);
53 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 54 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
54 55
55private: 56private:
@@ -69,45 +70,41 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
69 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; 70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get.";
70 } 71 }
71 auto guardPtr = QPointer<QObject>(&guard); 72 auto guardPtr = QPointer<QObject>(&guard);
72 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 73 auto fetcher = [=](const typename DomainType::Ptr &parent) {
73 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) {
74 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 74 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
75 SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 75 SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize;
76 auto resultProvider = mResultProvider; 76 auto resultProvider = mResultProvider;
77 if (query.synchronousQuery()) { 77 auto resultTransformation = mResultTransformation;
78 QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation, mLogCtx); 78 auto offset = mOffset[parentId];
79 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 79 auto batchSize = mBatchSize;
80 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; 80 auto resourceContext = mResourceContext;
81 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); 81 auto logCtx = mLogCtx;
82 resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); 82 const bool runAsync = !query.synchronousQuery();
83 } else { 83 //The lambda will be executed in a separate thread, so copy all arguments
84 auto resultTransformation = mResultTransformation; 84 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() {
85 auto offset = mOffset[parentId]; 85 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
86 auto batchSize = mBatchSize; 86 return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize);
87 auto resourceContext = mResourceContext; 87 }, runAsync)
88 auto logCtx = mLogCtx; 88 .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) {
89 //The lambda will be executed in a separate thread, so copy all arguments 89 if (!guardPtr) {
90 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { 90 qWarning() << "The parent object is already gone";
91 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 91 return;
92 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 92 }
93 return newRevisionAndReplayedEntities; 93 mInitialQueryComplete = true;
94 mQueryState = result.queryState;
95 mOffset[parentId] += result.replayedEntities;
96 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
97 if (query.liveQuery()) {
98 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
99 }
100 resultProvider->setRevision(result.newRevision);
101 resultProvider->initialResultSetComplete(parent, result.replayedAll);
94 }) 102 })
95 .template then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { 103 .exec();
96 if (!guardPtr) { 104 };
97 qWarning() << "The parent object is already gone"; 105
98 return; 106 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
99 } 107 mResultProvider->setFetcher(fetcher);
100 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities;
101 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
102 if (query.liveQuery()) {
103 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
104 }
105 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
106 resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll);
107 })
108 .exec();
109 }
110 });
111 108
112 // In case of a live query we keep the runner for as long alive as the result provider exists 109 // In case of a live query we keep the runner for as long alive as the result provider exists
113 if (query.liveQuery()) { 110 if (query.liveQuery()) {
@@ -117,16 +114,26 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
117 auto resultProvider = mResultProvider; 114 auto resultProvider = mResultProvider;
118 auto resourceContext = mResourceContext; 115 auto resourceContext = mResourceContext;
119 auto logCtx = mLogCtx; 116 auto logCtx = mLogCtx;
120 return async::run<ReplayResult>([=]() { 117 auto state = mQueryState;
118 if (!mInitialQueryComplete) {
119 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
120 fetcher({});
121 return KAsync::null();
122 }
123 Q_ASSERT(!mQueryInProgress);
124 return KAsync::syncStart<void>([&] {
125 mQueryInProgress = true;
126 })
127 .then(async::run<ReplayResult>([=]() {
121 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx); 128 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx);
122 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); 129 return worker.executeIncrementalQuery(query, *resultProvider, state);
123 return newRevisionAndReplayedEntities; 130 }))
124 }) 131 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
125 .template then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
126 if (!guardPtr) { 132 if (!guardPtr) {
127 qWarning() << "The parent object is already gone"; 133 qWarning() << "The parent object is already gone";
128 return; 134 return;
129 } 135 }
136 mQueryInProgress = false;
130 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 137 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
131 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); 138 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
132 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); 139 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
@@ -201,14 +208,20 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S
201} 208}
202 209
203template <class DomainType> 210template <class DomainType>
204ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 211ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state)
205{ 212{
206 QTime time; 213 QTime time;
207 time.start(); 214 time.start();
208 215
209 const qint64 baseRevision = resultProvider.revision() + 1; 216 const qint64 baseRevision = resultProvider.revision() + 1;
217 SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision;
218
210 auto entityStore = EntityStore{mResourceContext, mLogCtx}; 219 auto entityStore = EntityStore{mResourceContext, mLogCtx};
211 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 220 if (!state) {
221 SinkWarningCtx(mLogCtx) << "No previous query state.";
222 return {0, 0, false, DataStoreQuery::State::Ptr{}};
223 }
224 auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore};
212 auto resultSet = preparedQuery.update(baseRevision); 225 auto resultSet = preparedQuery.update(baseRevision);
213 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 226 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
214 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 227 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
@@ -218,7 +231,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
218 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" 231 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n"
219 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 232 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
220 << "Incremental query took: " << Log::TraceTime(time.elapsed()); 233 << "Incremental query took: " << Log::TraceTime(time.elapsed());
221 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; 234 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()};
222} 235}
223 236
224template <class DomainType> 237template <class DomainType>
@@ -251,14 +264,14 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
251 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" 264 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n"
252 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 265 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
253 << "Initial query took: " << Log::TraceTime(time.elapsed()); 266 << "Initial query took: " << Log::TraceTime(time.elapsed());
254 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; 267
268 auto state = preparedQuery.getState();
269
270 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state};
255} 271}
256 272
257template class QueryRunner<Sink::ApplicationDomain::Contact>; 273#define REGISTER_TYPE(T) \
258template class QueryRunner<Sink::ApplicationDomain::Folder>; 274 template class QueryRunner<T>; \
259template class QueryRunner<Sink::ApplicationDomain::Mail>; 275 template class QueryWorker<T>; \
260template class QueryRunner<Sink::ApplicationDomain::Event>; 276
261template class QueryWorker<Sink::ApplicationDomain::Contact>; 277SINK_REGISTER_TYPES()
262template class QueryWorker<Sink::ApplicationDomain::Folder>;
263template class QueryWorker<Sink::ApplicationDomain::Mail>;
264template class QueryWorker<Sink::ApplicationDomain::Event>;