From e382924f1a90b5a27eba2e8c5981f6a4fe7892c9 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 10 Jun 2017 09:37:08 +0200 Subject: Fixed incremental queries The incremental querying broke as soon as a revision update came in since it would nuke the base-set. This fixes it, but it's definitely not pretty. --- common/datastorequery.cpp | 41 +++++++++++++++++++++++++++++++---------- common/datastorequery.h | 1 + common/queryrunner.cpp | 6 +++--- tests/querytest.cpp | 27 ++++++++++++++++++++++++++- 4 files changed, 61 insertions(+), 14 deletions(-) diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 2e0c348..4c95606 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -43,6 +43,8 @@ class Source : public FilterBase { QVector mIds; QVector::ConstIterator mIt; + QVector mIncrementalIds; + QVector::ConstIterator mIncrementalIt; Source (const QVector &ids, DataStoreQuery *store) : FilterBase(store), @@ -63,21 +65,36 @@ class Source : public FilterBase { void add(const QVector &ids) { - mIds = ids; - mIt = mIds.constBegin(); + mIncrementalIds = ids; + mIncrementalIt = mIncrementalIds.constBegin(); } bool next(const std::function &callback) Q_DECL_OVERRIDE { - if (mIt == mIds.constEnd()) { - return false; + if (!mIncrementalIds.isEmpty()) { + if (mIncrementalIt == mIncrementalIds.constEnd()) { + return false; + } + readEntity(*mIncrementalIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { + SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); + callback({entity, operation}); + }); + mIncrementalIt++; + if (mIncrementalIt == mIncrementalIds.constEnd()) { + return false; + } + return true; + } else { + if (mIt == mIds.constEnd()) { + return false; + } + readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { + SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); + callback({entity, operation}); + }); + mIt++; + return mIt != mIds.constEnd(); } - readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { - SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); - callback({entity, operation}); - }); - mIt++; - return mIt != mIds.constEnd(); } }; @@ -599,6 +616,10 @@ ResultSet DataStoreQuery::update(qint64 baseRevision) return ResultSet(generator, [this]() { mCollector->skip(); }); } +void DataStoreQuery::updateComplete() +{ + mSource->mIncrementalIds.clear(); +} ResultSet DataStoreQuery::execute() { diff --git a/common/datastorequery.h b/common/datastorequery.h index ee5f99e..de4ae26 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h @@ -50,6 +50,7 @@ public: ~DataStoreQuery(); ResultSet execute(); ResultSet update(qint64 baseRevision); + void updateComplete(); State::Ptr getState(); diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 43f48c0..f196965 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -231,11 +231,11 @@ ReplayResult QueryWorker::executeIncrementalQuery(const Sink::Query auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { resultProviderCallback(query, resultProvider, result); }); - + preparedQuery.updateComplete(); SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" << (replayResult.replayedAll ? "Replayed all available results.\n" : "") << "Incremental query took: " << Log::TraceTime(time.elapsed()); - return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; + return {entityStore.maxRevision(), replayResult.replayedEntities, false, preparedQuery.getState()}; } template @@ -264,7 +264,7 @@ ReplayResult QueryWorker::executeInitialQuery( return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName(), entityStore}; } }(); - auto resultSet = preparedQuery.execute();; + auto resultSet = preparedQuery.execute(); SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed()); auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { diff --git a/tests/querytest.cpp b/tests/querytest.cpp index 4ff1be8..714e549 100644 --- a/tests/querytest.cpp +++ b/tests/querytest.cpp @@ -381,6 +381,7 @@ private slots: { // Setup Folder::Ptr folderEntity; + const auto date = QDateTime(QDate(2015, 7, 7), QTime(12, 0)); { Folder folder("sink.dummy.instance1"); Sink::Store::create(folder).exec().waitForFinished(); @@ -398,7 +399,6 @@ private slots: folderEntity = model->index(0, 0).data(Sink::Store::DomainObjectRole).value(); QVERIFY(!folderEntity->identifier().isEmpty()); - const auto date = QDateTime(QDate(2015, 7, 7), QTime(12, 0)); { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("testSecond"); @@ -428,6 +428,11 @@ private slots: query.filter(*folderEntity); query.sort(); query.limit(1); + query.setFlags(Query::LiveQuery); + query.reduce(Query::Reduce::Selector::max()) + .count("count") + .collect("unreadCollected") + .collect("importantCollected"); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); @@ -443,6 +448,26 @@ private slots: QCOMPARE(model->rowCount(), 2); // We can't make any assumptions about the order of the indexes // QCOMPARE(model->index(1, 0).data(Sink::Store::DomainObjectRole).value()->getProperty("messageId").toByteArray(), QByteArray("testSecond")); + + //New revisions always go through + { + Mail mail("sink.dummy.instance1"); + mail.setExtractedMessageId("testInjected"); + mail.setFolder(folderEntity->identifier()); + mail.setExtractedDate(date.addDays(-2)); + Sink::Store::create(mail).exec().waitForFinished(); + } + QTRY_COMPARE(model->rowCount(), 3); + + //Ensure we can continue fetching after the incremental update + model->fetchMore(QModelIndex()); + QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); + QCOMPARE(model->rowCount(), 4); + + //Ensure we have fetched all + model->fetchMore(QModelIndex()); + QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); + QCOMPARE(model->rowCount(), 4); } void testReactToNewResource() -- cgit v1.2.3