From e513ee41adb6061aa72de8bfe49d117f47c1545b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 22 Jan 2017 18:25:31 +0100 Subject: Support dependencies between sync requests. If one sync task depends on the previous sync task we want to flush in between, so we can query for the results of the previous sync request locally. If we detect such a dependency we temporarily halt all processing of synchronization requests until the flush completes, so we can continue processing. --- common/commandprocessor.cpp | 1 + common/synchronizer.cpp | 77 ++++++++++++++++++++++++++-------- common/synchronizer.h | 11 ++++- examples/imapresource/imapresource.cpp | 3 +- 4 files changed, 72 insertions(+), 20 deletions(-) diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index a6371be..5d5261f 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -331,6 +331,7 @@ KAsync::Job CommandProcessor::flush(void const *command, size_t size) mSynchronizer->flush(flushType, flushId); } else { SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; + mSynchronizer->flushComplete(flushId); Sink::Notification n; n.type = Sink::Notification::FlushCompletion; n.id = flushId; diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index d94083b..3863cc4 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -274,12 +274,32 @@ void Synchronizer::flush(int commandId, const QByteArray &flushId) processSyncQueue().exec(); } +void Synchronizer::flushComplete(const QByteArray &flushId) +{ + SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId; + if (mPendingSyncRequests.contains(flushId)) { + const auto requests = mPendingSyncRequests.values(flushId); + for (const auto &r : requests) { + //We want to process the pending request before any others in the queue + mSyncRequestQueue.prepend(r); + } + mPendingSyncRequests.remove(flushId); + processSyncQueue().exec(); + } +} + KAsync::Job Synchronizer::processSyncQueue() { if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { SinkTrace() << "Sync still in progress or nothing to do."; return KAsync::null(); } + //Don't process any new requests until we're done with the pending ones. + //Otherwise we might process a flush before the previous request actually completed. + if (!mPendingSyncRequests.isEmpty()) { + SinkTrace() << "We still have pending sync requests. Not executing next request."; + return KAsync::null(); + } auto job = KAsync::syncStart([this] { mMessageQueue->startTransaction(); @@ -287,7 +307,26 @@ KAsync::Job Synchronizer::processSyncQueue() }); while (!mSyncRequestQueue.isEmpty()) { const auto request = mSyncRequestQueue.takeFirst(); - if (request.requestType == Synchronizer::SyncRequest::Synchronization) { + if (request.options & SyncRequest::RequestFlush) { + job = job.then([=] { + //Trigger a flush and record original request without flush option + auto modifiedRequest = request; + modifiedRequest.options = SyncRequest::NoOptions; + //Normally we won't have a requestId here + if (modifiedRequest.requestId.isEmpty()) { + modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); + } + + //The sync request will be executed once the flush has completed + mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); + + flatbuffers::FlatBufferBuilder fbb; + auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); + auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); + Sink::Commands::FinishFlushBuffer(fbb, location); + enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); + }); + } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { job = job.then([this, request] { Sink::Notification n; n.id = request.requestId; @@ -295,14 +334,14 @@ KAsync::Job Synchronizer::processSyncQueue() n.message = "Synchronization has started."; n.code = ApplicationDomain::BusyStatus; emit notify(n); - SinkLogCtx(mLogCtx) << "Synchronizing " << request.query; + SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; }).then(synchronizeWithSource(request.query)).then([this] { //Commit after every request, so implementations only have to commit more if they add a lot of data. commit(); }).then([this, request](const KAsync::Error &error) { if (error) { //Emit notification with error - SinkWarning() << "Synchronization failed: " << error.errorMessage; + SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage; Sink::Notification n; n.id = request.requestId; n.type = Notification::Status; @@ -311,7 +350,7 @@ KAsync::Job Synchronizer::processSyncQueue() emit notify(n); return KAsync::error(error); } else { - SinkLog() << "Done Synchronizing"; + SinkLogCtx(mLogCtx) << "Done Synchronizing"; Sink::Notification n; n.id = request.requestId; n.type = Notification::Status; @@ -322,20 +361,22 @@ KAsync::Job Synchronizer::processSyncQueue() } }); } else if (request.requestType == Synchronizer::SyncRequest::Flush) { - Q_ASSERT(!request.requestId.isEmpty()); - if (request.flushType == Flush::FlushReplayQueue) { - SinkTrace() << "Emitting flush completion."; - Sink::Notification n; - n.type = Sink::Notification::FlushCompletion; - n.id = request.requestId; - emit notify(n); - } else { - flatbuffers::FlatBufferBuilder fbb; - auto flushId = fbb.CreateString(request.requestId.toStdString()); - auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); - Sink::Commands::FinishFlushBuffer(fbb, location); - enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); - } + job = job.then([=] { + Q_ASSERT(!request.requestId.isEmpty()); + if (request.flushType == Flush::FlushReplayQueue) { + SinkTrace() << "Emitting flush completion."; + Sink::Notification n; + n.type = Sink::Notification::FlushCompletion; + n.id = request.requestId; + emit notify(n); + } else { + flatbuffers::FlatBufferBuilder fbb; + auto flushId = fbb.CreateString(request.requestId.toStdString()); + auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); + Sink::Commands::FinishFlushBuffer(fbb, location); + enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); + } + }); } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { job = job.then(replayNextRevision()); } else { diff --git a/common/synchronizer.h b/common/synchronizer.h index a0a432c..be90293 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -57,6 +57,7 @@ public: Sink::Storage::DataStore::Transaction &syncTransaction(); bool allChangesReplayed() Q_DECL_OVERRIDE; + void flushComplete(const QByteArray &flushId); signals: void notify(Notification); @@ -123,9 +124,15 @@ protected: Flush }; - SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray()) + enum RequestOptions { + NoOptions, + RequestFlush + }; + + SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray(), RequestOptions o = NoOptions) : requestId(requestId_), requestType(Synchronization), + options(o), query(q) { } @@ -145,6 +152,7 @@ protected: int flushType = 0; QByteArray requestId; RequestType requestType; + RequestOptions options = NoOptions; Sink::QueryBase query; }; @@ -181,6 +189,7 @@ private: QList mSyncRequestQueue; MessageQueue *mMessageQueue; bool mSyncInProgress; + QMultiHash mPendingSyncRequests; }; } diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp index 25d905b..06dc340 100644 --- a/examples/imapresource/imapresource.cpp +++ b/examples/imapresource/imapresource.cpp @@ -386,7 +386,8 @@ public: list << Synchronizer::SyncRequest{query}; } else { list << Synchronizer::SyncRequest{Sink::QueryBase(ApplicationDomain::getTypeName())}; - list << Synchronizer::SyncRequest{applyMailDefaults(Sink::QueryBase(ApplicationDomain::getTypeName()))}; + //This request depends on the previous one so we flush first. + list << Synchronizer::SyncRequest{applyMailDefaults(Sink::QueryBase(ApplicationDomain::getTypeName())), QByteArray{}, Synchronizer::SyncRequest::RequestFlush}; } return list; } -- cgit v1.2.3