From e513ee41adb6061aa72de8bfe49d117f47c1545b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 22 Jan 2017 18:25:31 +0100 Subject: Support dependencies between sync requests. If one sync task depends on the previous sync task we want to flush in between, so we can query for the results of the previous sync request locally. If we detect such a dependency we temporarily halt all processing of synchronization requests until the flush completes, so we can continue processing. --- common/synchronizer.cpp | 77 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 59 insertions(+), 18 deletions(-) (limited to 'common/synchronizer.cpp') diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index d94083b..3863cc4 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -274,12 +274,32 @@ void Synchronizer::flush(int commandId, const QByteArray &flushId) processSyncQueue().exec(); } +void Synchronizer::flushComplete(const QByteArray &flushId) +{ + SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId; + if (mPendingSyncRequests.contains(flushId)) { + const auto requests = mPendingSyncRequests.values(flushId); + for (const auto &r : requests) { + //We want to process the pending request before any others in the queue + mSyncRequestQueue.prepend(r); + } + mPendingSyncRequests.remove(flushId); + processSyncQueue().exec(); + } +} + KAsync::Job Synchronizer::processSyncQueue() { if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { SinkTrace() << "Sync still in progress or nothing to do."; return KAsync::null(); } + //Don't process any new requests until we're done with the pending ones. + //Otherwise we might process a flush before the previous request actually completed. + if (!mPendingSyncRequests.isEmpty()) { + SinkTrace() << "We still have pending sync requests. Not executing next request."; + return KAsync::null(); + } auto job = KAsync::syncStart([this] { mMessageQueue->startTransaction(); @@ -287,7 +307,26 @@ KAsync::Job Synchronizer::processSyncQueue() }); while (!mSyncRequestQueue.isEmpty()) { const auto request = mSyncRequestQueue.takeFirst(); - if (request.requestType == Synchronizer::SyncRequest::Synchronization) { + if (request.options & SyncRequest::RequestFlush) { + job = job.then([=] { + //Trigger a flush and record original request without flush option + auto modifiedRequest = request; + modifiedRequest.options = SyncRequest::NoOptions; + //Normally we won't have a requestId here + if (modifiedRequest.requestId.isEmpty()) { + modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); + } + + //The sync request will be executed once the flush has completed + mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); + + flatbuffers::FlatBufferBuilder fbb; + auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); + auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); + Sink::Commands::FinishFlushBuffer(fbb, location); + enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); + }); + } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { job = job.then([this, request] { Sink::Notification n; n.id = request.requestId; @@ -295,14 +334,14 @@ KAsync::Job Synchronizer::processSyncQueue() n.message = "Synchronization has started."; n.code = ApplicationDomain::BusyStatus; emit notify(n); - SinkLogCtx(mLogCtx) << "Synchronizing " << request.query; + SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; }).then(synchronizeWithSource(request.query)).then([this] { //Commit after every request, so implementations only have to commit more if they add a lot of data. commit(); }).then([this, request](const KAsync::Error &error) { if (error) { //Emit notification with error - SinkWarning() << "Synchronization failed: " << error.errorMessage; + SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage; Sink::Notification n; n.id = request.requestId; n.type = Notification::Status; @@ -311,7 +350,7 @@ KAsync::Job Synchronizer::processSyncQueue() emit notify(n); return KAsync::error(error); } else { - SinkLog() << "Done Synchronizing"; + SinkLogCtx(mLogCtx) << "Done Synchronizing"; Sink::Notification n; n.id = request.requestId; n.type = Notification::Status; @@ -322,20 +361,22 @@ KAsync::Job Synchronizer::processSyncQueue() } }); } else if (request.requestType == Synchronizer::SyncRequest::Flush) { - Q_ASSERT(!request.requestId.isEmpty()); - if (request.flushType == Flush::FlushReplayQueue) { - SinkTrace() << "Emitting flush completion."; - Sink::Notification n; - n.type = Sink::Notification::FlushCompletion; - n.id = request.requestId; - emit notify(n); - } else { - flatbuffers::FlatBufferBuilder fbb; - auto flushId = fbb.CreateString(request.requestId.toStdString()); - auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); - Sink::Commands::FinishFlushBuffer(fbb, location); - enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); - } + job = job.then([=] { + Q_ASSERT(!request.requestId.isEmpty()); + if (request.flushType == Flush::FlushReplayQueue) { + SinkTrace() << "Emitting flush completion."; + Sink::Notification n; + n.type = Sink::Notification::FlushCompletion; + n.id = request.requestId; + emit notify(n); + } else { + flatbuffers::FlatBufferBuilder fbb; + auto flushId = fbb.CreateString(request.requestId.toStdString()); + auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); + Sink::Commands::FinishFlushBuffer(fbb, location); + enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); + } + }); } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { job = job.then(replayNextRevision()); } else { -- cgit v1.2.3