summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp38
1 files changed, 21 insertions, 17 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 802fc48..43f48c0 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -51,7 +51,7 @@ public:
51 virtual ~QueryWorker(); 51 virtual ~QueryWorker();
52 52
53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); 53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state);
54 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 54 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state);
55 55
56private: 56private:
57 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); 57 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result);
@@ -72,18 +72,18 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
72 auto guardPtr = QPointer<QObject>(&guard); 72 auto guardPtr = QPointer<QObject>(&guard);
73 auto fetcher = [=](const typename DomainType::Ptr &parent) { 73 auto fetcher = [=](const typename DomainType::Ptr &parent) {
74 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 74 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
75 SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 75 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
76 auto resultProvider = mResultProvider; 76 auto resultProvider = mResultProvider;
77 auto resultTransformation = mResultTransformation; 77 auto resultTransformation = mResultTransformation;
78 auto offset = mOffset[parentId];
79 auto batchSize = mBatchSize; 78 auto batchSize = mBatchSize;
80 auto resourceContext = mResourceContext; 79 auto resourceContext = mResourceContext;
81 auto logCtx = mLogCtx; 80 auto logCtx = mLogCtx;
81 auto state = mQueryState.value(parentId);
82 const bool runAsync = !query.synchronousQuery(); 82 const bool runAsync = !query.synchronousQuery();
83 //The lambda will be executed in a separate thread, so copy all arguments 83 //The lambda will be executed in a separate thread, so copy all arguments
84 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { 84 async::run<ReplayResult>([=]() {
85 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 85 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
86 return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 86 return worker.executeInitialQuery(query, parent, *resultProvider, batchSize, state);
87 }, runAsync) 87 }, runAsync)
88 .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { 88 .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) {
89 if (!guardPtr) { 89 if (!guardPtr) {
@@ -91,8 +91,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
91 return; 91 return;
92 } 92 }
93 mInitialQueryComplete = true; 93 mInitialQueryComplete = true;
94 mQueryState = result.queryState; 94 mQueryState[parentId] = result.queryState;
95 mOffset[parentId] += result.replayedEntities;
96 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 95 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
97 if (query.liveQuery()) { 96 if (query.liveQuery()) {
98 mResourceAccess->sendRevisionReplayedCommand(result.newRevision); 97 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
@@ -111,10 +110,11 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
111 Q_ASSERT(!query.synchronousQuery()); 110 Q_ASSERT(!query.synchronousQuery());
112 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 111 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
113 setQuery([=]() -> KAsync::Job<void> { 112 setQuery([=]() -> KAsync::Job<void> {
113 const QByteArray parentId;
114 auto resultProvider = mResultProvider; 114 auto resultProvider = mResultProvider;
115 auto resourceContext = mResourceContext; 115 auto resourceContext = mResourceContext;
116 auto logCtx = mLogCtx; 116 auto logCtx = mLogCtx;
117 auto state = mQueryState; 117 auto state = mQueryState.value(parentId);
118 if (!mInitialQueryComplete) { 118 if (!mInitialQueryComplete) {
119 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; 119 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
120 fetcher({}); 120 fetcher({});
@@ -225,7 +225,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
225 SinkWarningCtx(mLogCtx) << "No previous query state."; 225 SinkWarningCtx(mLogCtx) << "No previous query state.";
226 return {0, 0, false, DataStoreQuery::State::Ptr{}}; 226 return {0, 0, false, DataStoreQuery::State::Ptr{}};
227 } 227 }
228 auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 228 auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, true};
229 auto resultSet = preparedQuery.update(baseRevision); 229 auto resultSet = preparedQuery.update(baseRevision);
230 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 230 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
231 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 231 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
@@ -240,7 +240,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
240 240
241template <class DomainType> 241template <class DomainType>
242ReplayResult QueryWorker<DomainType>::executeInitialQuery( 242ReplayResult QueryWorker<DomainType>::executeInitialQuery(
243 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) 243 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state)
244{ 244{
245 QTime time; 245 QTime time;
246 time.start(); 246 time.start();
@@ -257,11 +257,17 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
257 } 257 }
258 258
259 auto entityStore = EntityStore{mResourceContext, mLogCtx}; 259 auto entityStore = EntityStore{mResourceContext, mLogCtx};
260 auto preparedQuery = DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 260 auto preparedQuery = [&] {
261 auto resultSet = preparedQuery.execute(); 261 if (state) {
262 return DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, false};
263 } else {
264 return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore};
265 }
266 }();
267 auto resultSet = preparedQuery.execute();;
262 268
263 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 269 SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed());
264 auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { 270 auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) {
265 resultProviderCallback(query, resultProvider, result); 271 resultProviderCallback(query, resultProvider, result);
266 }); 272 });
267 273
@@ -269,9 +275,7 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
269 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 275 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
270 << "Initial query took: " << Log::TraceTime(time.elapsed()); 276 << "Initial query took: " << Log::TraceTime(time.elapsed());
271 277
272 auto state = preparedQuery.getState(); 278 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()};
273
274 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state};
275} 279}
276 280
277#define REGISTER_TYPE(T) \ 281#define REGISTER_TYPE(T) \