From 86045e308c10c60cd7c4339d305cee1acb084760 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 21 Mar 2017 22:06:12 +0100 Subject: Maintain the query state instead of using the offset. Instead of using the offset to skip over old results requires recalculating them, and resulted in some cases in results being added multiple times to the model. By just maintaining the state we can apply the offset directly to the base-set, and maintain the state in reduction etc. which is necessary to continue streaming results while making sure we don't report anything twice. --- common/queryrunner.cpp | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) (limited to 'common/queryrunner.cpp') diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 802fc48..43f48c0 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -51,7 +51,7 @@ public: virtual ~QueryWorker(); ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider, DataStoreQuery::State::Ptr state); - ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize); + ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int batchsize, DataStoreQuery::State::Ptr state); private: void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider, const ResultSet::Result &result); @@ -72,18 +72,18 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou auto guardPtr = QPointer(&guard); auto fetcher = [=](const typename DomainType::Ptr &parent) { const QByteArray parentId = parent ? parent->identifier() : QByteArray(); - SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; + SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; auto resultProvider = mResultProvider; auto resultTransformation = mResultTransformation; - auto offset = mOffset[parentId]; auto batchSize = mBatchSize; auto resourceContext = mResourceContext; auto logCtx = mLogCtx; + auto state = mQueryState.value(parentId); const bool runAsync = !query.synchronousQuery(); //The lambda will be executed in a separate thread, so copy all arguments - async::run([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { + async::run([=]() { QueryWorker worker(query, resourceContext, bufferType, resultTransformation, logCtx); - return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); + return worker.executeInitialQuery(query, parent, *resultProvider, batchSize, state); }, runAsync) .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { if (!guardPtr) { @@ -91,8 +91,7 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou return; } mInitialQueryComplete = true; - mQueryState = result.queryState; - mOffset[parentId] += result.replayedEntities; + mQueryState[parentId] = result.queryState; // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. if (query.liveQuery()) { mResourceAccess->sendRevisionReplayedCommand(result.newRevision); @@ -111,10 +110,11 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou Q_ASSERT(!query.synchronousQuery()); // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting setQuery([=]() -> KAsync::Job { + const QByteArray parentId; auto resultProvider = mResultProvider; auto resourceContext = mResourceContext; auto logCtx = mLogCtx; - auto state = mQueryState; + auto state = mQueryState.value(parentId); if (!mInitialQueryComplete) { SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; fetcher({}); @@ -225,7 +225,7 @@ ReplayResult QueryWorker::executeIncrementalQuery(const Sink::Query SinkWarningCtx(mLogCtx) << "No previous query state."; return {0, 0, false, DataStoreQuery::State::Ptr{}}; } - auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName(), entityStore}; + auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName(), entityStore, true}; auto resultSet = preparedQuery.update(baseRevision); SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { @@ -240,7 +240,7 @@ ReplayResult QueryWorker::executeIncrementalQuery(const Sink::Query template ReplayResult QueryWorker::executeInitialQuery( - const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize) + const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int batchsize, DataStoreQuery::State::Ptr state) { QTime time; time.start(); @@ -257,11 +257,17 @@ ReplayResult QueryWorker::executeInitialQuery( } auto entityStore = EntityStore{mResourceContext, mLogCtx}; - auto preparedQuery = DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName(), entityStore}; - auto resultSet = preparedQuery.execute(); + auto preparedQuery = [&] { + if (state) { + return DataStoreQuery{*state, ApplicationDomain::getTypeName(), entityStore, false}; + } else { + return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName(), entityStore}; + } + }(); + auto resultSet = preparedQuery.execute();; - SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); - auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { + SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed()); + auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { resultProviderCallback(query, resultProvider, result); }); @@ -269,9 +275,7 @@ ReplayResult QueryWorker::executeInitialQuery( << (replayResult.replayedAll ? "Replayed all available results.\n" : "") << "Initial query took: " << Log::TraceTime(time.elapsed()); - auto state = preparedQuery.getState(); - - return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state}; + return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; } #define REGISTER_TYPE(T) \ -- cgit v1.2.3