From 1259b236704e790fa1284a63ec537525bce23841 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 11 Feb 2017 12:02:58 +0100 Subject: Fixed reduction updates with stateful query. Some filters need to maintain state between runs in order to be able to emit only what has changed. This now also make reduction work for live queries. --- common/datastorequery.cpp | 163 +++++++++++++++++++++++++++++----------------- common/datastorequery.h | 18 +++-- common/queryrunner.cpp | 119 ++++++++++++++++++--------------- common/queryrunner.h | 4 ++ 4 files changed, 188 insertions(+), 116 deletions(-) (limited to 'common') diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 41d962c..3ba8f40 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -72,7 +72,7 @@ class Source : public FilterBase { if (mIt == mIds.constEnd()) { return false; } - readEntity(*mIt, [callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { + readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); callback({entity, operation}); }); @@ -115,7 +115,7 @@ public: virtual bool next(const std::function &callback) Q_DECL_OVERRIDE { bool foundValue = false; while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { - SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << result.operation; + SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << operationName(result.operation); //Always accept removals. They can't match the filter since the data is gone. if (result.operation == Sink::Operation_Removal) { @@ -167,17 +167,11 @@ public: } - void process() { - if (operation == QueryBase::Reduce::Aggregator::Count) { - mResult = mResult.toInt() + 1; - } else { - Q_ASSERT(false); - } - } - void process(const QVariant &value) { if (operation == QueryBase::Reduce::Aggregator::Collect) { mResult = mResult.toList() << value; + } else if (operation == QueryBase::Reduce::Aggregator::Count) { + mResult = mResult.toInt() + 1; } else { Q_ASSERT(false); } @@ -196,8 +190,8 @@ public: QVariant mResult; }; - QHash mAggregateValues; QSet mReducedValues; + QHash mSelectedValues; QByteArray mReductionProperty; QByteArray mSelectionProperty; QueryBase::Reduce::Selector::Comparator mSelectionComparator; @@ -231,51 +225,80 @@ public: return false; } + QByteArray reduceOnValue(const QVariant &reductionValue, QMap &aggregateValues) + { + QVariant selectionResultValue; + QByteArray selectionResult; + auto results = indexLookup(mReductionProperty, reductionValue); + for (auto &aggregator : mAggregators) { + aggregator.reset(); + } + + for (const auto &r : results) { + readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { + Q_ASSERT(operation != Sink::Operation_Removal); + for (auto &aggregator : mAggregators) { + if (!aggregator.property.isEmpty()) { + aggregator.process(entity.getProperty(aggregator.property)); + } else { + aggregator.process(QVariant{}); + } + } + auto selectionValue = entity.getProperty(mSelectionProperty); + if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { + selectionResultValue = selectionValue; + selectionResult = entity.identifier(); + } + }); + } + + for (auto &aggregator : mAggregators) { + aggregateValues.insert(aggregator.resultProperty, aggregator.result()); + } + return selectionResult; + } + bool next(const std::function &callback) Q_DECL_OVERRIDE { bool foundValue = false; while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { - auto reductionValue = result.entity.getProperty(mReductionProperty); if (result.operation == Sink::Operation_Removal) { callback(result); return false; } - if (!mReducedValues.contains(getByteArray(reductionValue))) { + auto reductionValue = result.entity.getProperty(mReductionProperty); + const auto &reductionValueBa = getByteArray(reductionValue); + if (!mReducedValues.contains(reductionValueBa)) { //Only reduce every value once. - mReducedValues.insert(getByteArray(reductionValue)); - QVariant selectionResultValue; - QByteArray selectionResult; - auto results = indexLookup(mReductionProperty, reductionValue); - for (auto &aggregator : mAggregators) { - aggregator.reset(); - } - - QVariantList list; - for (const auto &r : results) { - readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { - for (auto &aggregator : mAggregators) { - if (!aggregator.property.isEmpty()) { - aggregator.process(entity.getProperty(aggregator.property)); - } else { - aggregator.process(); - } - } - auto selectionValue = entity.getProperty(mSelectionProperty); - if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { - selectionResultValue = selectionValue; - selectionResult = entity.identifier(); - } - }); - } - + mReducedValues.insert(reductionValueBa); QMap aggregateValues; - for (auto &aggregator : mAggregators) { - aggregateValues.insert(aggregator.resultProperty, aggregator.result()); - } + auto selectionResult = reduceOnValue(reductionValue, aggregateValues); + mSelectedValues.insert(reductionValueBa, selectionResult); readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { callback({entity, operation, aggregateValues}); foundValue = true; }); + } else { + //During initial query, do nothing. The lookup above will take care of it. + //During updates adjust the reduction according to the modification/addition or removal + if (mIncremental) { + //redo the reduction + QMap aggregateValues; + auto selectionResult = reduceOnValue(reductionValue, aggregateValues); + + //TODO if old and new are the same a modification would be enough + auto oldSelectionResult = mSelectedValues.take(reductionValueBa); + //remove old result + readEntity(oldSelectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { + callback({entity, Sink::Operation_Removal}); + }); + + //add new result + mSelectedValues.insert(reductionValueBa, selectionResult); + readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { + callback({entity, Sink::Operation_Creation, aggregateValues}); + }); + } } return false; })) @@ -330,9 +353,23 @@ public: }; DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) - : mQuery(query), mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) + : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) +{ + setupQuery(query); +} + +DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store) + : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) { - setupQuery(); + mCollector = state.mCollector; + mSource = state.mSource; + + auto source = mCollector; + while (source) { + source->mDatastore = this; + source->mIncremental = true; + source = source->mSource; + } } DataStoreQuery::~DataStoreQuery() @@ -340,6 +377,14 @@ DataStoreQuery::~DataStoreQuery() } +DataStoreQuery::State::Ptr DataStoreQuery::getState() +{ + auto state = State::Ptr::create(); + state->mSource = mSource; + state->mCollector = mCollector; + return state; +} + void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) { mStore.readLatest(mType, key, resultCallback); @@ -443,9 +488,10 @@ QByteArrayList DataStoreQuery::executeSubquery(const QueryBase &subquery) return ids; } -void DataStoreQuery::setupQuery() +void DataStoreQuery::setupQuery(const Sink::QueryBase &query_) { - auto baseFilters = mQuery.getBaseFilters(); + auto query = query_; + auto baseFilters = query.getBaseFilters(); for (const auto &k : baseFilters.keys()) { const auto comparator = baseFilters.value(k); if (comparator.value.canConvert()) { @@ -454,44 +500,43 @@ void DataStoreQuery::setupQuery() baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); } } - mQuery.setBaseFilters(baseFilters); + query.setBaseFilters(baseFilters); FilterBase::Ptr baseSet; - QSet remainingFilters = mQuery.getBaseFilters().keys().toSet(); + QSet remainingFilters = query.getBaseFilters().keys().toSet(); QByteArray appliedSorting; - if (!mQuery.ids().isEmpty()) { - mSource = Source::Ptr::create(mQuery.ids().toVector(), this); + if (!query.ids().isEmpty()) { + mSource = Source::Ptr::create(query.ids().toVector(), this); baseSet = mSource; } else { QSet appliedFilters; - auto resultSet = mStore.indexLookup(mType, mQuery, appliedFilters, appliedSorting); + auto resultSet = mStore.indexLookup(mType, query, appliedFilters, appliedSorting); remainingFilters = remainingFilters - appliedFilters; // We do a full scan if there were no indexes available to create the initial set. if (appliedFilters.isEmpty()) { - // TODO this should be replaced by an index lookup on the uid index mSource = Source::Ptr::create(mStore.fullScan(mType), this); } else { mSource = Source::Ptr::create(resultSet, this); } baseSet = mSource; } - if (!mQuery.getBaseFilters().isEmpty()) { + if (!query.getBaseFilters().isEmpty()) { auto filter = Filter::Ptr::create(baseSet, this); //For incremental queries the remaining filters are not sufficient - for (const auto &f : mQuery.getBaseFilters().keys()) { - filter->propertyFilter.insert(f, mQuery.getFilter(f)); + for (const auto &f : query.getBaseFilters().keys()) { + filter->propertyFilter.insert(f, query.getFilter(f)); } baseSet = filter; } - /* if (appliedSorting.isEmpty() && !mQuery.sortProperty.isEmpty()) { */ + /* if (appliedSorting.isEmpty() && !query.sortProperty.isEmpty()) { */ /* //Apply manual sorting */ - /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */ + /* baseSet = Sort::Ptr::create(baseSet, query.sortProperty); */ /* } */ //Setup the rest of the filter stages on top of the base set - for (const auto &stage : mQuery.getFilterStages()) { + for (const auto &stage : query.getFilterStages()) { if (auto filter = stage.dynamicCast()) { auto f = Filter::Ptr::create(baseSet, this); f->propertyFilter = filter->propertyFilter; @@ -521,7 +566,7 @@ QVector DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision ResultSet DataStoreQuery::update(qint64 baseRevision) { - SinkTraceCtx(mLogCtx) << "Executing query update to revision " << baseRevision; + SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision; auto incrementalResultSet = loadIncrementalResultSet(baseRevision); SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; mSource->add(incrementalResultSet); diff --git a/common/datastorequery.h b/common/datastorequery.h index 5a47685..a797782 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h @@ -26,6 +26,7 @@ class Source; class Bloom; +class Reduce; class Filter; class FilterBase; @@ -33,15 +34,25 @@ class DataStoreQuery { friend class FilterBase; friend class Source; friend class Bloom; + friend class Reduce; friend class Filter; public: typedef QSharedPointer Ptr; + struct State { + typedef QSharedPointer Ptr; + QSharedPointer mCollector; + QSharedPointer mSource; + }; + DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store); + DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store); ~DataStoreQuery(); ResultSet execute(); ResultSet update(qint64 baseRevision); + State::Ptr getState(); + private: typedef std::function FilterFunction; @@ -54,12 +65,10 @@ private: ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &); QVector loadIncrementalResultSet(qint64 baseRevision); - void setupQuery(); + void setupQuery(const Sink::QueryBase &query_); QByteArrayList executeSubquery(const Sink::QueryBase &subquery); - Sink::QueryBase mQuery; const QByteArray mType; - bool mInitialQuery; QSharedPointer mCollector; QSharedPointer mSource; @@ -102,7 +111,8 @@ public: //Returns true for as long as a result is available virtual bool next(const std::function &callback) = 0; - QSharedPointer mSource; + FilterBase::Ptr mSource; DataStoreQuery *mDatastore; + bool mIncremental = false; }; diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 748320f..40880eb 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -33,6 +33,7 @@ struct ReplayResult { qint64 newRevision; qint64 replayedEntities; bool replayedAll; + DataStoreQuery::State::Ptr queryState; }; /* @@ -49,7 +50,7 @@ public: QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); virtual ~QueryWorker(); - ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); + ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider, DataStoreQuery::State::Ptr state); ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize); private: @@ -69,45 +70,41 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; } auto guardPtr = QPointer(&guard); - // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. - mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { + auto fetcher = [=](const typename DomainType::Ptr &parent) { const QByteArray parentId = parent ? parent->identifier() : QByteArray(); SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; auto resultProvider = mResultProvider; - if (query.synchronousQuery()) { - QueryWorker worker(query, mResourceContext, bufferType, mResultTransformation, mLogCtx); - const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); - mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; - resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); - resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); - } else { - auto resultTransformation = mResultTransformation; - auto offset = mOffset[parentId]; - auto batchSize = mBatchSize; - auto resourceContext = mResourceContext; - auto logCtx = mLogCtx; - //The lambda will be executed in a separate thread, so copy all arguments - async::run([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { - QueryWorker worker(query, resourceContext, bufferType, resultTransformation, logCtx); - const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); - return newRevisionAndReplayedEntities; + auto resultTransformation = mResultTransformation; + auto offset = mOffset[parentId]; + auto batchSize = mBatchSize; + auto resourceContext = mResourceContext; + auto logCtx = mLogCtx; + const bool runAsync = !query.synchronousQuery(); + //The lambda will be executed in a separate thread, so copy all arguments + async::run([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { + QueryWorker worker(query, resourceContext, bufferType, resultTransformation, logCtx); + return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); + }, runAsync) + .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { + if (!guardPtr) { + qWarning() << "The parent object is already gone"; + return; + } + mInitialQueryComplete = true; + mQueryState = result.queryState; + mOffset[parentId] += result.replayedEntities; + // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. + if (query.liveQuery()) { + mResourceAccess->sendRevisionReplayedCommand(result.newRevision); + } + resultProvider->setRevision(result.newRevision); + resultProvider->initialResultSetComplete(parent, result.replayedAll); }) - .template then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { - if (!guardPtr) { - qWarning() << "The parent object is already gone"; - return; - } - mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; - // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. - if (query.liveQuery()) { - mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); - } - resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); - resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); - }) - .exec(); - } - }); + .exec(); + }; + + // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. + mResultProvider->setFetcher(fetcher); // In case of a live query we keep the runner for as long alive as the result provider exists if (query.liveQuery()) { @@ -117,16 +114,26 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou auto resultProvider = mResultProvider; auto resourceContext = mResourceContext; auto logCtx = mLogCtx; - return async::run([=]() { + auto state = mQueryState; + if (!mInitialQueryComplete) { + SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; + fetcher({}); + return KAsync::null(); + } + Q_ASSERT(!mQueryInProgress); + return KAsync::syncStart([&] { + mQueryInProgress = true; + }) + .then(async::run([=]() { QueryWorker worker(query, resourceContext, bufferType, mResultTransformation, logCtx); - const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); - return newRevisionAndReplayedEntities; - }) - .template then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { + return worker.executeIncrementalQuery(query, *resultProvider, state); + })) + .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { if (!guardPtr) { qWarning() << "The parent object is already gone"; return; } + mQueryInProgress = false; // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); @@ -201,14 +208,20 @@ void QueryWorker::resultProviderCallback(const Sink::Query &query, S } template -ReplayResult QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) +ReplayResult QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider, DataStoreQuery::State::Ptr state) { QTime time; time.start(); const qint64 baseRevision = resultProvider.revision() + 1; + SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision; + auto entityStore = EntityStore{mResourceContext, mLogCtx}; - auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName(), entityStore}; + if (!state) { + SinkWarningCtx(mLogCtx) << "No previous query state."; + return {0, 0, false, DataStoreQuery::State::Ptr{}}; + } + auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName(), entityStore}; auto resultSet = preparedQuery.update(baseRevision); SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { @@ -218,7 +231,7 @@ ReplayResult QueryWorker::executeIncrementalQuery(const Sink::Query SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" << (replayResult.replayedAll ? "Replayed all available results.\n" : "") << "Incremental query took: " << Log::TraceTime(time.elapsed()); - return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; + return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; } template @@ -251,14 +264,14 @@ ReplayResult QueryWorker::executeInitialQuery( SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" << (replayResult.replayedAll ? "Replayed all available results.\n" : "") << "Initial query took: " << Log::TraceTime(time.elapsed()); - return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; + + auto state = preparedQuery.getState(); + + return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state}; } -template class QueryRunner; -template class QueryRunner; -template class QueryRunner; -template class QueryRunner; -template class QueryWorker; -template class QueryWorker; -template class QueryWorker; -template class QueryWorker; +#define REGISTER_TYPE(T) \ + template class QueryRunner; \ + template class QueryWorker; \ + +SINK_REGISTER_TYPES() diff --git a/common/queryrunner.h b/common/queryrunner.h index 66dc68f..f5c7ead 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h @@ -24,6 +24,7 @@ #include "resultprovider.h" #include "query.h" #include "log.h" +#include "datastorequery.h" /** * Base clase because you can't have the Q_OBJECT macro in template classes @@ -101,4 +102,7 @@ private: int mBatchSize; QObject guard; Sink::Log::Context mLogCtx; + DataStoreQuery::State::Ptr mQueryState; + bool mInitialQueryComplete = false; + bool mQueryInProgress = false; }; -- cgit v1.2.3