From 8d2d52fcfd3c42a82b7f86c6f3c5009461f1de9f Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 1 Apr 2018 09:41:28 +0200 Subject: Avoid missing revision updates while a query is running. Instead we have to remember that something has changed and rerun an incremental query. --- common/queryrunner.cpp | 53 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 8 deletions(-) (limited to 'common/queryrunner.cpp') diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 2c50fca..c0c1d00 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include "commands.h" #include "asyncutils.h" @@ -97,6 +99,13 @@ QueryRunner::~QueryRunner() SinkTraceCtx(mLogCtx) << "Stopped query"; } + +template +void QueryRunner::delayNextQuery() +{ + mDelayNextQuery = true; +} + //This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. template void QueryRunner::fetch(const Sink::Query &query, const QByteArray &bufferType) @@ -115,11 +124,20 @@ void QueryRunner::fetch(const Sink::Query &query, const QByteArray & auto resourceContext = mResourceContext; auto logCtx = mLogCtx; auto state = mQueryState; + bool addDelay = mDelayNextQuery; + mDelayNextQuery = false; const bool runAsync = !query.synchronousQuery(); //The lambda will be executed in a separate thread, so copy all arguments async::run([=]() { QueryWorker worker(query, resourceContext, bufferType, resultTransformation, logCtx); - return worker.executeInitialQuery(query, *resultProvider, batchSize, state); + const auto result = worker.executeInitialQuery(query, *resultProvider, batchSize, state); + + //For testing only + if (addDelay) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + return result; }, runAsync) .then([=](const ReplayResult &result) { if (!guardPtr) { @@ -137,7 +155,12 @@ void QueryRunner::fetch(const Sink::Query &query, const QByteArray & resultProvider->initialResultSetComplete(result.replayedAll); if (mRequestFetchMore) { mRequestFetchMore = false; + //This code exists for incemental fetches, so we don't skip loading another set. fetch(query, bufferType); + return; + } + if (mRevisionChangedMeanwhile) { + incrementalFetch(query, bufferType).exec(); } }) .exec(); @@ -146,15 +169,17 @@ void QueryRunner::fetch(const Sink::Query &query, const QByteArray & template KAsync::Job QueryRunner::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) { - if (!mInitialQueryComplete) { - SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; + if (!mInitialQueryComplete && !mQueryInProgress) { + //We rely on this codepath in the case of newly added resources to trigger the initial fetch. fetch(query, bufferType); return KAsync::null(); } if (mQueryInProgress) { - //Can happen if the revision come in quicker than we process them. + //If a query is already in progress we just remember to fetch again once the current query is done. + mRevisionChangedMeanwhile = true; return KAsync::null(); } + mRevisionChangedMeanwhile = false; auto resultProvider = mResultProvider; auto resourceContext = mResourceContext; auto logCtx = mLogCtx; @@ -162,22 +187,34 @@ KAsync::Job QueryRunner::incrementalFetch(const Sink::Query &q auto resultTransformation = mResultTransformation; Q_ASSERT(!mQueryInProgress); auto guardPtr = QPointer(&guard); + bool addDelay = mDelayNextQuery; + mDelayNextQuery = false; return KAsync::start([&] { mQueryInProgress = true; }) .then(async::run([=]() { QueryWorker worker(query, resourceContext, bufferType, resultTransformation, logCtx); - return worker.executeIncrementalQuery(query, *resultProvider, state); + const auto result = worker.executeIncrementalQuery(query, *resultProvider, state); + ////For testing only + if (addDelay) { + SinkWarning() << "Sleeping in incremental query"; + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + return result; })) - .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { + .then([query, this, resultProvider, guardPtr, bufferType](const ReplayResult &newRevisionAndReplayedEntities) { if (!guardPtr) { //Not an error, the query can vanish at any time. - return; + return KAsync::null(); } mQueryInProgress = false; - // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); + if (mRevisionChangedMeanwhile) { + return incrementalFetch(query, bufferType); + } + return KAsync::null(); }); } -- cgit v1.2.3