From e382924f1a90b5a27eba2e8c5981f6a4fe7892c9 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 10 Jun 2017 09:37:08 +0200 Subject: Fixed incremental queries The incremental querying broke as soon as a revision update came in since it would nuke the base-set. This fixes it, but it's definitely not pretty. --- common/datastorequery.cpp | 41 +++++++++++++++++++++++++++++++---------- common/datastorequery.h | 1 + common/queryrunner.cpp | 6 +++--- 3 files changed, 35 insertions(+), 13 deletions(-) (limited to 'common') diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 2e0c348..4c95606 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -43,6 +43,8 @@ class Source : public FilterBase { QVector mIds; QVector::ConstIterator mIt; + QVector mIncrementalIds; + QVector::ConstIterator mIncrementalIt; Source (const QVector &ids, DataStoreQuery *store) : FilterBase(store), @@ -63,21 +65,36 @@ class Source : public FilterBase { void add(const QVector &ids) { - mIds = ids; - mIt = mIds.constBegin(); + mIncrementalIds = ids; + mIncrementalIt = mIncrementalIds.constBegin(); } bool next(const std::function &callback) Q_DECL_OVERRIDE { - if (mIt == mIds.constEnd()) { - return false; + if (!mIncrementalIds.isEmpty()) { + if (mIncrementalIt == mIncrementalIds.constEnd()) { + return false; + } + readEntity(*mIncrementalIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { + SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); + callback({entity, operation}); + }); + mIncrementalIt++; + if (mIncrementalIt == mIncrementalIds.constEnd()) { + return false; + } + return true; + } else { + if (mIt == mIds.constEnd()) { + return false; + } + readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { + SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); + callback({entity, operation}); + }); + mIt++; + return mIt != mIds.constEnd(); } - readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { - SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); - callback({entity, operation}); - }); - mIt++; - return mIt != mIds.constEnd(); } }; @@ -599,6 +616,10 @@ ResultSet DataStoreQuery::update(qint64 baseRevision) return ResultSet(generator, [this]() { mCollector->skip(); }); } +void DataStoreQuery::updateComplete() +{ + mSource->mIncrementalIds.clear(); +} ResultSet DataStoreQuery::execute() { diff --git a/common/datastorequery.h b/common/datastorequery.h index ee5f99e..de4ae26 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h @@ -50,6 +50,7 @@ public: ~DataStoreQuery(); ResultSet execute(); ResultSet update(qint64 baseRevision); + void updateComplete(); State::Ptr getState(); diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 43f48c0..f196965 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -231,11 +231,11 @@ ReplayResult QueryWorker::executeIncrementalQuery(const Sink::Query auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { resultProviderCallback(query, resultProvider, result); }); - + preparedQuery.updateComplete(); SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" << (replayResult.replayedAll ? "Replayed all available results.\n" : "") << "Incremental query took: " << Log::TraceTime(time.elapsed()); - return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; + return {entityStore.maxRevision(), replayResult.replayedEntities, false, preparedQuery.getState()}; } template @@ -264,7 +264,7 @@ ReplayResult QueryWorker::executeInitialQuery( return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName(), entityStore}; } }(); - auto resultSet = preparedQuery.execute();; + auto resultSet = preparedQuery.execute(); SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed()); auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { -- cgit v1.2.3