From 2beb09e8b3f4922a15ec0abde737d4cf9b9d4f8b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 22 Feb 2018 10:16:18 +0100 Subject: Deal with removals in reduced queries --- common/datastorequery.cpp | 22 +++++++++++++++++----- common/datastorequery.h | 7 +++++++ common/storage/entitystore.cpp | 2 +- 3 files changed, 25 insertions(+), 6 deletions(-) (limited to 'common') diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 218796f..dacc8ab 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -300,11 +300,18 @@ public: bool next(const std::function &callback) Q_DECL_OVERRIDE { bool foundValue = false; while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { - if (result.operation == Sink::Operation_Removal) { - callback(result); - return false; - } - auto reductionValue = result.entity.getProperty(mReductionProperty); + const auto reductionValue = [&] { + if (result.operation == Sink::Operation_Removal) { + //For removals we have to read the last revision to get a value, and thus be able to find the correct thread. + QVariant reductionValue; + readPrevious(result.entity.identifier(), [&] (const ApplicationDomain::ApplicationDomainType &prev) { + reductionValue = prev.getProperty(mReductionProperty); + }); + return reductionValue; + } else { + return result.entity.getProperty(mReductionProperty); + } + }(); const auto &reductionValueBa = getByteArray(reductionValue); if (!mReducedValues.contains(reductionValueBa)) { //Only reduce every value once. @@ -439,6 +446,11 @@ void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &res mStore.readLatest(mType, key, resultCallback); } +void DataStoreQuery::readPrevious(const QByteArray &key, const std::function &callback) +{ + mStore.readPrevious(mType, key, mStore.maxRevision(), callback); +} + QVector DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) { return mStore.indexLookup(mType, property, value); diff --git a/common/datastorequery.h b/common/datastorequery.h index 8800644..2311585 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h @@ -62,6 +62,7 @@ private: QVector indexLookup(const QByteArray &property, const QVariant &value); void readEntity(const QByteArray &key, const BufferCallback &resultCallback); + void readPrevious(const QByteArray &key, const std::function &callback); ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &); QVector loadIncrementalResultSet(qint64 baseRevision); @@ -107,6 +108,12 @@ public: return mDatastore->indexLookup(property, value); } + void readPrevious(const QByteArray &key, const std::function &callback) + { + Q_ASSERT(mDatastore); + mDatastore->readPrevious(key, callback); + } + virtual void skip() { mSource->skip(); } //Returns true for as long as a result is available diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 020f3fd..7da7efa 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp @@ -563,7 +563,7 @@ void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qi return true; }, [&](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }, true); - return readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback); + readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback); } void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback) -- cgit v1.2.3