From 8d2d52fcfd3c42a82b7f86c6f3c5009461f1de9f Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 1 Apr 2018 09:41:28 +0200 Subject: Avoid missing revision updates while a query is running. Instead we have to remember that something has changed and rerun an incremental query. --- common/queryrunner.cpp | 53 ++++++++++++++++++++++++++++++++++++++++++-------- common/queryrunner.h | 9 ++++++++- tests/querytest.cpp | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 9 deletions(-) diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 2c50fca..c0c1d00 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include "commands.h" #include "asyncutils.h" @@ -97,6 +99,13 @@ QueryRunner::~QueryRunner() SinkTraceCtx(mLogCtx) << "Stopped query"; } + +template +void QueryRunner::delayNextQuery() +{ + mDelayNextQuery = true; +} + //This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. template void QueryRunner::fetch(const Sink::Query &query, const QByteArray &bufferType) @@ -115,11 +124,20 @@ void QueryRunner::fetch(const Sink::Query &query, const QByteArray & auto resourceContext = mResourceContext; auto logCtx = mLogCtx; auto state = mQueryState; + bool addDelay = mDelayNextQuery; + mDelayNextQuery = false; const bool runAsync = !query.synchronousQuery(); //The lambda will be executed in a separate thread, so copy all arguments async::run([=]() { QueryWorker worker(query, resourceContext, bufferType, resultTransformation, logCtx); - return worker.executeInitialQuery(query, *resultProvider, batchSize, state); + const auto result = worker.executeInitialQuery(query, *resultProvider, batchSize, state); + + //For testing only + if (addDelay) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + return result; }, runAsync) .then([=](const ReplayResult &result) { if (!guardPtr) { @@ -137,7 +155,12 @@ void QueryRunner::fetch(const Sink::Query &query, const QByteArray & resultProvider->initialResultSetComplete(result.replayedAll); if (mRequestFetchMore) { mRequestFetchMore = false; + //This code exists for incemental fetches, so we don't skip loading another set. fetch(query, bufferType); + return; + } + if (mRevisionChangedMeanwhile) { + incrementalFetch(query, bufferType).exec(); } }) .exec(); @@ -146,15 +169,17 @@ void QueryRunner::fetch(const Sink::Query &query, const QByteArray & template KAsync::Job QueryRunner::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) { - if (!mInitialQueryComplete) { - SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; + if (!mInitialQueryComplete && !mQueryInProgress) { + //We rely on this codepath in the case of newly added resources to trigger the initial fetch. fetch(query, bufferType); return KAsync::null(); } if (mQueryInProgress) { - //Can happen if the revision come in quicker than we process them. + //If a query is already in progress we just remember to fetch again once the current query is done. + mRevisionChangedMeanwhile = true; return KAsync::null(); } + mRevisionChangedMeanwhile = false; auto resultProvider = mResultProvider; auto resourceContext = mResourceContext; auto logCtx = mLogCtx; @@ -162,22 +187,34 @@ KAsync::Job QueryRunner::incrementalFetch(const Sink::Query &q auto resultTransformation = mResultTransformation; Q_ASSERT(!mQueryInProgress); auto guardPtr = QPointer(&guard); + bool addDelay = mDelayNextQuery; + mDelayNextQuery = false; return KAsync::start([&] { mQueryInProgress = true; }) .then(async::run([=]() { QueryWorker worker(query, resourceContext, bufferType, resultTransformation, logCtx); - return worker.executeIncrementalQuery(query, *resultProvider, state); + const auto result = worker.executeIncrementalQuery(query, *resultProvider, state); + ////For testing only + if (addDelay) { + SinkWarning() << "Sleeping in incremental query"; + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + return result; })) - .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { + .then([query, this, resultProvider, guardPtr, bufferType](const ReplayResult &newRevisionAndReplayedEntities) { if (!guardPtr) { //Not an error, the query can vanish at any time. - return; + return KAsync::null(); } mQueryInProgress = false; - // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); + if (mRevisionChangedMeanwhile) { + return incrementalFetch(query, bufferType); + } + return KAsync::null(); }); } diff --git a/common/queryrunner.h b/common/queryrunner.h index 35093e2..e449570 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h @@ -77,7 +77,7 @@ private: * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. */ template -class QueryRunner : public QueryRunnerBase +class SINK_EXPORT QueryRunner : public QueryRunnerBase { public: QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType, const Sink::Log::Context &logCtx); @@ -91,6 +91,11 @@ public: typename Sink::ResultEmitter::Ptr emitter(); + /** + * For testing only. + */ + void delayNextQuery(); + private: void fetch(const Sink::Query &query, const QByteArray &bufferType); KAsync::Job incrementalFetch(const Sink::Query &query, const QByteArray &bufferType); @@ -106,4 +111,6 @@ private: bool mInitialQueryComplete = false; bool mQueryInProgress = false; bool mRequestFetchMore = false; + bool mDelayNextQuery = false; + bool mRevisionChangedMeanwhile = false; }; diff --git a/tests/querytest.cpp b/tests/querytest.cpp index 2a12979..81b7cdc 100644 --- a/tests/querytest.cpp +++ b/tests/querytest.cpp @@ -13,6 +13,8 @@ #include "test.h" #include "testutils.h" #include "applicationdomaintype.h" +#include "queryrunner.h" +#include "adaptorfactoryregistry.h" #include @@ -1229,6 +1231,55 @@ private slots: } } + void testQueryRunnerDontMissUpdates() + { + // Setup + auto folder1 = Folder::createEntity("sink.dummy.instance1"); + VERIFYEXEC(Sink::Store::create(folder1)); + + QDateTime now{QDate{2017, 2, 3}, QTime{10, 0, 0}}; + + auto createMail = [] (const QByteArray &messageid, const Folder &folder, const QDateTime &date, bool important) { + auto mail = Mail::createEntity("sink.dummy.instance1"); + mail.setExtractedMessageId(messageid); + mail.setFolder(folder); + mail.setExtractedDate(date); + mail.setImportant(important); + return mail; + }; + + VERIFYEXEC(Sink::Store::create(createMail("mail1", folder1, now, false))); + + // Ensure all local data is processed + VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); + + Query query; + query.setFlags(Query::LiveQuery); + + Sink::ResourceContext resourceContext{"sink.dummy.instance1", "sink.dummy", Sink::AdaptorFactoryRegistry::instance().getFactories("sink.dummy")}; + Sink::Log::Context logCtx; + auto runner = new QueryRunner(query, resourceContext, ApplicationDomain::getTypeName(), logCtx); + runner->delayNextQuery(); + + auto emitter = runner->emitter(); + QList added; + emitter->onAdded([&](Mail::Ptr mail) { + added << mail; + }); + + emitter->fetch(); + VERIFYEXEC(Sink::Store::create(createMail("mail2", folder1, now, false))); + QTRY_COMPARE(added.size(), 2); + + runner->delayNextQuery(); + VERIFYEXEC(Sink::Store::create(createMail("mail3", folder1, now, false))); + //The second revision update is supposed to come in while the initial revision update is still in the query. + //So wait a bit to make sure the query is currently runnning. + QTest::qWait(500); + VERIFYEXEC(Sink::Store::create(createMail("mail4", folder1, now, false))); + QTRY_COMPARE(added.size(), 4); + } + /* * This test is here to ensure we don't crash if we call removeFromDisk with a running query. */ -- cgit v1.2.3