From 2beb09e8b3f4922a15ec0abde737d4cf9b9d4f8b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 22 Feb 2018 10:16:18 +0100 Subject: Deal with removals in reduced queries --- common/datastorequery.cpp | 22 ++++++++++++++++----- common/datastorequery.h | 7 +++++++ common/storage/entitystore.cpp | 2 +- tests/querytest.cpp | 44 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 69 insertions(+), 6 deletions(-) diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 218796f..dacc8ab 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -300,11 +300,18 @@ public: bool next(const std::function &callback) Q_DECL_OVERRIDE { bool foundValue = false; while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { - if (result.operation == Sink::Operation_Removal) { - callback(result); - return false; - } - auto reductionValue = result.entity.getProperty(mReductionProperty); + const auto reductionValue = [&] { + if (result.operation == Sink::Operation_Removal) { + //For removals we have to read the last revision to get a value, and thus be able to find the correct thread. + QVariant reductionValue; + readPrevious(result.entity.identifier(), [&] (const ApplicationDomain::ApplicationDomainType &prev) { + reductionValue = prev.getProperty(mReductionProperty); + }); + return reductionValue; + } else { + return result.entity.getProperty(mReductionProperty); + } + }(); const auto &reductionValueBa = getByteArray(reductionValue); if (!mReducedValues.contains(reductionValueBa)) { //Only reduce every value once. @@ -439,6 +446,11 @@ void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &res mStore.readLatest(mType, key, resultCallback); } +void DataStoreQuery::readPrevious(const QByteArray &key, const std::function &callback) +{ + mStore.readPrevious(mType, key, mStore.maxRevision(), callback); +} + QVector DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) { return mStore.indexLookup(mType, property, value); diff --git a/common/datastorequery.h b/common/datastorequery.h index 8800644..2311585 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h @@ -62,6 +62,7 @@ private: QVector indexLookup(const QByteArray &property, const QVariant &value); void readEntity(const QByteArray &key, const BufferCallback &resultCallback); + void readPrevious(const QByteArray &key, const std::function &callback); ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &); QVector loadIncrementalResultSet(qint64 baseRevision); @@ -107,6 +108,12 @@ public: return mDatastore->indexLookup(property, value); } + void readPrevious(const QByteArray &key, const std::function &callback) + { + Q_ASSERT(mDatastore); + mDatastore->readPrevious(key, callback); + } + virtual void skip() { mSource->skip(); } //Returns true for as long as a result is available diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 020f3fd..7da7efa 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp @@ -563,7 +563,7 @@ void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qi return true; }, [&](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }, true); - return readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback); + readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback); } void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback) diff --git a/tests/querytest.cpp b/tests/querytest.cpp index ec6438d..f65d477 100644 --- a/tests/querytest.cpp +++ b/tests/querytest.cpp @@ -760,6 +760,50 @@ private slots: QTRY_COMPARE(model->rowCount(), 0); } + void testLivequeryRemoveOneInThread() + { + // Setup + auto folder1 = Folder::createEntity("sink.dummy.instance1"); + VERIFYEXEC(Sink::Store::create(folder1)); + + auto mail1 = Mail::createEntity("sink.dummy.instance1"); + mail1.setExtractedMessageId("mail1"); + mail1.setFolder(folder1); + VERIFYEXEC(Sink::Store::create(mail1)); + auto mail2 = Mail::createEntity("sink.dummy.instance1"); + mail2.setExtractedMessageId("mail2"); + mail2.setFolder(folder1); + VERIFYEXEC(Sink::Store::create(mail2)); + // Ensure all local data is processed + VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); + + //Setup two folders with a mail each, ensure we only get the mail from the folder that matches the folder filter. + Query query; + query.setId("testLivequeryUnmatch"); + query.reduce(Query::Reduce::Selector::max()).count("count").collect("senders"); + query.sort(); + query.setFlags(Query::LiveQuery); + auto model = Sink::Store::loadModel(query); + QTRY_COMPARE(model->rowCount(), 1); + QCOMPARE(model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value()->getProperty("count").toInt(), 2); + + //After the removal, the thread size should be reduced by one + { + + VERIFYEXEC(Sink::Store::remove(mail1)); + } + VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); + QTRY_COMPARE(model->rowCount(), 1); + QTRY_COMPARE(model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value()->getProperty("count").toInt(), 1); + + //After the second removal, the thread should be gone + { + VERIFYEXEC(Sink::Store::remove(mail2)); + } + VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); + QTRY_COMPARE(model->rowCount(), 0); + } + void testDontUpdateNonLiveQuery() { // Setup -- cgit v1.2.3