From 1259b236704e790fa1284a63ec537525bce23841 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 11 Feb 2017 12:02:58 +0100 Subject: Fixed reduction updates with stateful query. Some filters need to maintain state between runs in order to be able to emit only what has changed. This now also make reduction work for live queries. --- common/queryrunner.cpp | 119 +++++++++++++++++++++++++++---------------------- 1 file changed, 66 insertions(+), 53 deletions(-) (limited to 'common/queryrunner.cpp') diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 748320f..40880eb 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -33,6 +33,7 @@ struct ReplayResult { qint64 newRevision; qint64 replayedEntities; bool replayedAll; + DataStoreQuery::State::Ptr queryState; }; /* @@ -49,7 +50,7 @@ public: QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); virtual ~QueryWorker(); - ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); + ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider, DataStoreQuery::State::Ptr state); ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize); private: @@ -69,45 +70,41 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; } auto guardPtr = QPointer(&guard); - // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. - mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { + auto fetcher = [=](const typename DomainType::Ptr &parent) { const QByteArray parentId = parent ? parent->identifier() : QByteArray(); SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; auto resultProvider = mResultProvider; - if (query.synchronousQuery()) { - QueryWorker worker(query, mResourceContext, bufferType, mResultTransformation, mLogCtx); - const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); - mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; - resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); - resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); - } else { - auto resultTransformation = mResultTransformation; - auto offset = mOffset[parentId]; - auto batchSize = mBatchSize; - auto resourceContext = mResourceContext; - auto logCtx = mLogCtx; - //The lambda will be executed in a separate thread, so copy all arguments - async::run([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { - QueryWorker worker(query, resourceContext, bufferType, resultTransformation, logCtx); - const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); - return newRevisionAndReplayedEntities; + auto resultTransformation = mResultTransformation; + auto offset = mOffset[parentId]; + auto batchSize = mBatchSize; + auto resourceContext = mResourceContext; + auto logCtx = mLogCtx; + const bool runAsync = !query.synchronousQuery(); + //The lambda will be executed in a separate thread, so copy all arguments + async::run([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { + QueryWorker worker(query, resourceContext, bufferType, resultTransformation, logCtx); + return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); + }, runAsync) + .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { + if (!guardPtr) { + qWarning() << "The parent object is already gone"; + return; + } + mInitialQueryComplete = true; + mQueryState = result.queryState; + mOffset[parentId] += result.replayedEntities; + // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. + if (query.liveQuery()) { + mResourceAccess->sendRevisionReplayedCommand(result.newRevision); + } + resultProvider->setRevision(result.newRevision); + resultProvider->initialResultSetComplete(parent, result.replayedAll); }) - .template then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { - if (!guardPtr) { - qWarning() << "The parent object is already gone"; - return; - } - mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; - // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. - if (query.liveQuery()) { - mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); - } - resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); - resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); - }) - .exec(); - } - }); + .exec(); + }; + + // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. + mResultProvider->setFetcher(fetcher); // In case of a live query we keep the runner for as long alive as the result provider exists if (query.liveQuery()) { @@ -117,16 +114,26 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou auto resultProvider = mResultProvider; auto resourceContext = mResourceContext; auto logCtx = mLogCtx; - return async::run([=]() { + auto state = mQueryState; + if (!mInitialQueryComplete) { + SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; + fetcher({}); + return KAsync::null(); + } + Q_ASSERT(!mQueryInProgress); + return KAsync::syncStart([&] { + mQueryInProgress = true; + }) + .then(async::run([=]() { QueryWorker worker(query, resourceContext, bufferType, mResultTransformation, logCtx); - const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); - return newRevisionAndReplayedEntities; - }) - .template then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { + return worker.executeIncrementalQuery(query, *resultProvider, state); + })) + .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { if (!guardPtr) { qWarning() << "The parent object is already gone"; return; } + mQueryInProgress = false; // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); @@ -201,14 +208,20 @@ void QueryWorker::resultProviderCallback(const Sink::Query &query, S } template -ReplayResult QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) +ReplayResult QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider, DataStoreQuery::State::Ptr state) { QTime time; time.start(); const qint64 baseRevision = resultProvider.revision() + 1; + SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision; + auto entityStore = EntityStore{mResourceContext, mLogCtx}; - auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName(), entityStore}; + if (!state) { + SinkWarningCtx(mLogCtx) << "No previous query state."; + return {0, 0, false, DataStoreQuery::State::Ptr{}}; + } + auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName(), entityStore}; auto resultSet = preparedQuery.update(baseRevision); SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { @@ -218,7 +231,7 @@ ReplayResult QueryWorker::executeIncrementalQuery(const Sink::Query SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" << (replayResult.replayedAll ? "Replayed all available results.\n" : "") << "Incremental query took: " << Log::TraceTime(time.elapsed()); - return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; + return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; } template @@ -251,14 +264,14 @@ ReplayResult QueryWorker::executeInitialQuery( SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" << (replayResult.replayedAll ? "Replayed all available results.\n" : "") << "Initial query took: " << Log::TraceTime(time.elapsed()); - return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; + + auto state = preparedQuery.getState(); + + return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state}; } -template class QueryRunner; -template class QueryRunner; -template class QueryRunner; -template class QueryRunner; -template class QueryWorker; -template class QueryWorker; -template class QueryWorker; -template class QueryWorker; +#define REGISTER_TYPE(T) \ + template class QueryRunner; \ + template class QueryWorker; \ + +SINK_REGISTER_TYPES() -- cgit v1.2.3