diff options
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 38 |
1 files changed, 21 insertions, 17 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 802fc48..43f48c0 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -51,7 +51,7 @@ public: | |||
51 | virtual ~QueryWorker(); | 51 | virtual ~QueryWorker(); |
52 | 52 | ||
53 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); | 53 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); |
54 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); | 54 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state); |
55 | 55 | ||
56 | private: | 56 | private: |
57 | void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); | 57 | void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); |
@@ -72,18 +72,18 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
72 | auto guardPtr = QPointer<QObject>(&guard); | 72 | auto guardPtr = QPointer<QObject>(&guard); |
73 | auto fetcher = [=](const typename DomainType::Ptr &parent) { | 73 | auto fetcher = [=](const typename DomainType::Ptr &parent) { |
74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); | 74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); |
75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; | 75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; |
76 | auto resultProvider = mResultProvider; | 76 | auto resultProvider = mResultProvider; |
77 | auto resultTransformation = mResultTransformation; | 77 | auto resultTransformation = mResultTransformation; |
78 | auto offset = mOffset[parentId]; | ||
79 | auto batchSize = mBatchSize; | 78 | auto batchSize = mBatchSize; |
80 | auto resourceContext = mResourceContext; | 79 | auto resourceContext = mResourceContext; |
81 | auto logCtx = mLogCtx; | 80 | auto logCtx = mLogCtx; |
81 | auto state = mQueryState.value(parentId); | ||
82 | const bool runAsync = !query.synchronousQuery(); | 82 | const bool runAsync = !query.synchronousQuery(); |
83 | //The lambda will be executed in a separate thread, so copy all arguments | 83 | //The lambda will be executed in a separate thread, so copy all arguments |
84 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { | 84 | async::run<ReplayResult>([=]() { |
85 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 85 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
86 | return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); | 86 | return worker.executeInitialQuery(query, parent, *resultProvider, batchSize, state); |
87 | }, runAsync) | 87 | }, runAsync) |
88 | .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { | 88 | .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { |
89 | if (!guardPtr) { | 89 | if (!guardPtr) { |
@@ -91,8 +91,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
91 | return; | 91 | return; |
92 | } | 92 | } |
93 | mInitialQueryComplete = true; | 93 | mInitialQueryComplete = true; |
94 | mQueryState = result.queryState; | 94 | mQueryState[parentId] = result.queryState; |
95 | mOffset[parentId] += result.replayedEntities; | ||
96 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 95 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
97 | if (query.liveQuery()) { | 96 | if (query.liveQuery()) { |
98 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | 97 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); |
@@ -111,10 +110,11 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
111 | Q_ASSERT(!query.synchronousQuery()); | 110 | Q_ASSERT(!query.synchronousQuery()); |
112 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 111 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
113 | setQuery([=]() -> KAsync::Job<void> { | 112 | setQuery([=]() -> KAsync::Job<void> { |
113 | const QByteArray parentId; | ||
114 | auto resultProvider = mResultProvider; | 114 | auto resultProvider = mResultProvider; |
115 | auto resourceContext = mResourceContext; | 115 | auto resourceContext = mResourceContext; |
116 | auto logCtx = mLogCtx; | 116 | auto logCtx = mLogCtx; |
117 | auto state = mQueryState; | 117 | auto state = mQueryState.value(parentId); |
118 | if (!mInitialQueryComplete) { | 118 | if (!mInitialQueryComplete) { |
119 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | 119 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; |
120 | fetcher({}); | 120 | fetcher({}); |
@@ -225,7 +225,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
225 | SinkWarningCtx(mLogCtx) << "No previous query state."; | 225 | SinkWarningCtx(mLogCtx) << "No previous query state."; |
226 | return {0, 0, false, DataStoreQuery::State::Ptr{}}; | 226 | return {0, 0, false, DataStoreQuery::State::Ptr{}}; |
227 | } | 227 | } |
228 | auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 228 | auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, true}; |
229 | auto resultSet = preparedQuery.update(baseRevision); | 229 | auto resultSet = preparedQuery.update(baseRevision); |
230 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 230 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
231 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { | 231 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { |
@@ -240,7 +240,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
240 | 240 | ||
241 | template <class DomainType> | 241 | template <class DomainType> |
242 | ReplayResult QueryWorker<DomainType>::executeInitialQuery( | 242 | ReplayResult QueryWorker<DomainType>::executeInitialQuery( |
243 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) | 243 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state) |
244 | { | 244 | { |
245 | QTime time; | 245 | QTime time; |
246 | time.start(); | 246 | time.start(); |
@@ -257,11 +257,17 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
257 | } | 257 | } |
258 | 258 | ||
259 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; | 259 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; |
260 | auto preparedQuery = DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 260 | auto preparedQuery = [&] { |
261 | auto resultSet = preparedQuery.execute(); | 261 | if (state) { |
262 | return DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, false}; | ||
263 | } else { | ||
264 | return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | ||
265 | } | ||
266 | }(); | ||
267 | auto resultSet = preparedQuery.execute();; | ||
262 | 268 | ||
263 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 269 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed()); |
264 | auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { | 270 | auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { |
265 | resultProviderCallback(query, resultProvider, result); | 271 | resultProviderCallback(query, resultProvider, result); |
266 | }); | 272 | }); |
267 | 273 | ||
@@ -269,9 +275,7 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
269 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 275 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
270 | << "Initial query took: " << Log::TraceTime(time.elapsed()); | 276 | << "Initial query took: " << Log::TraceTime(time.elapsed()); |
271 | 277 | ||
272 | auto state = preparedQuery.getState(); | 278 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; |
273 | |||
274 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state}; | ||
275 | } | 279 | } |
276 | 280 | ||
277 | #define REGISTER_TYPE(T) \ | 281 | #define REGISTER_TYPE(T) \ |