From 752f0907574debe9d7d139a117b2efac80636e93 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 23 Jan 2017 10:48:37 +0100 Subject: Process sync requests one by one --- common/synchronizer.cpp | 172 +++++++++++++++++++++++++----------------------- 1 file changed, 89 insertions(+), 83 deletions(-) (limited to 'common/synchronizer.cpp') diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 3863cc4..57e994e 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -288,6 +288,89 @@ void Synchronizer::flushComplete(const QByteArray &flushId) } } +KAsync::Job Synchronizer::processRequest(const SyncRequest &request) +{ + if (request.options & SyncRequest::RequestFlush) { + return KAsync::syncStart([=] { + //Trigger a flush and record original request without flush option + auto modifiedRequest = request; + modifiedRequest.options = SyncRequest::NoOptions; + //Normally we won't have a requestId here + if (modifiedRequest.requestId.isEmpty()) { + modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); + } + SinkWarning() << "Enquing flush request " << modifiedRequest.requestId; + + //The sync request will be executed once the flush has completed + mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); + + flatbuffers::FlatBufferBuilder fbb; + auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); + auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); + Sink::Commands::FinishFlushBuffer(fbb, location); + enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); + }); + } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { + return KAsync::syncStart([this, request] { + Sink::Notification n; + n.id = request.requestId; + n.type = Notification::Status; + n.message = "Synchronization has started."; + n.code = ApplicationDomain::BusyStatus; + emit notify(n); + SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; + }).then(synchronizeWithSource(request.query)).then([this] { + //Commit after every request, so implementations only have to commit more if they add a lot of data. + commit(); + }).then([this, request](const KAsync::Error &error) { + if (error) { + //Emit notification with error + SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage; + Sink::Notification n; + n.id = request.requestId; + n.type = Notification::Status; + n.message = "Synchronization has ended."; + n.code = ApplicationDomain::ErrorStatus; + emit notify(n); + return KAsync::error(error); + } else { + SinkLogCtx(mLogCtx) << "Done Synchronizing"; + Sink::Notification n; + n.id = request.requestId; + n.type = Notification::Status; + n.message = "Synchronization has ended."; + n.code = ApplicationDomain::ConnectedStatus; + emit notify(n); + return KAsync::null(); + } + }); + } else if (request.requestType == Synchronizer::SyncRequest::Flush) { + return KAsync::syncStart([=] { + Q_ASSERT(!request.requestId.isEmpty()); + //FIXME it looks like this is emitted before the replay actually finishes + if (request.flushType == Flush::FlushReplayQueue) { + SinkTrace() << "Emitting flush completion."; + Sink::Notification n; + n.type = Sink::Notification::FlushCompletion; + n.id = request.requestId; + emit notify(n); + } else { + flatbuffers::FlatBufferBuilder fbb; + auto flushId = fbb.CreateString(request.requestId.toStdString()); + auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); + Sink::Commands::FinishFlushBuffer(fbb, location); + enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); + } + }); + } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { + return replayNextRevision(); + } else { + SinkWarning() << "Unknown request type: " << request.requestType; + return KAsync::error(KAsync::Error{"Unknown request type."}); + } + +} + KAsync::Job Synchronizer::processSyncQueue() { if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { @@ -301,90 +384,13 @@ KAsync::Job Synchronizer::processSyncQueue() return KAsync::null(); } - auto job = KAsync::syncStart([this] { + const auto request = mSyncRequestQueue.takeFirst(); + return KAsync::syncStart([this] { mMessageQueue->startTransaction(); mSyncInProgress = true; - }); - while (!mSyncRequestQueue.isEmpty()) { - const auto request = mSyncRequestQueue.takeFirst(); - if (request.options & SyncRequest::RequestFlush) { - job = job.then([=] { - //Trigger a flush and record original request without flush option - auto modifiedRequest = request; - modifiedRequest.options = SyncRequest::NoOptions; - //Normally we won't have a requestId here - if (modifiedRequest.requestId.isEmpty()) { - modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); - } - - //The sync request will be executed once the flush has completed - mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); - - flatbuffers::FlatBufferBuilder fbb; - auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); - auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); - Sink::Commands::FinishFlushBuffer(fbb, location); - enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); - }); - } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { - job = job.then([this, request] { - Sink::Notification n; - n.id = request.requestId; - n.type = Notification::Status; - n.message = "Synchronization has started."; - n.code = ApplicationDomain::BusyStatus; - emit notify(n); - SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; - }).then(synchronizeWithSource(request.query)).then([this] { - //Commit after every request, so implementations only have to commit more if they add a lot of data. - commit(); - }).then([this, request](const KAsync::Error &error) { - if (error) { - //Emit notification with error - SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage; - Sink::Notification n; - n.id = request.requestId; - n.type = Notification::Status; - n.message = "Synchronization has ended."; - n.code = ApplicationDomain::ErrorStatus; - emit notify(n); - return KAsync::error(error); - } else { - SinkLogCtx(mLogCtx) << "Done Synchronizing"; - Sink::Notification n; - n.id = request.requestId; - n.type = Notification::Status; - n.message = "Synchronization has ended."; - n.code = ApplicationDomain::ConnectedStatus; - emit notify(n); - return KAsync::null(); - } - }); - } else if (request.requestType == Synchronizer::SyncRequest::Flush) { - job = job.then([=] { - Q_ASSERT(!request.requestId.isEmpty()); - if (request.flushType == Flush::FlushReplayQueue) { - SinkTrace() << "Emitting flush completion."; - Sink::Notification n; - n.type = Sink::Notification::FlushCompletion; - n.id = request.requestId; - emit notify(n); - } else { - flatbuffers::FlatBufferBuilder fbb; - auto flushId = fbb.CreateString(request.requestId.toStdString()); - auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); - Sink::Commands::FinishFlushBuffer(fbb, location); - enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); - } - }); - } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { - job = job.then(replayNextRevision()); - } else { - SinkWarning() << "Unknown request type: " << request.requestType; - return KAsync::error(KAsync::Error{"Unknown request type."}); - } - } - return job.then([this](const KAsync::Error &error) { + }) + .then(processRequest(request)) + .then([this](const KAsync::Error &error) { SinkTrace() << "Sync request processed"; mSyncTransaction.abort(); mMessageQueue->commit(); @@ -504,7 +510,7 @@ KAsync::Job Synchronizer::replay(const QByteArray &type, const QByteArray } else if (operation == Sink::Operation_Modification) { SinkTrace() << "Replayed modification with remote id: " << remoteId; if (remoteId.isEmpty()) { - SinkWarning() << "Returned an empty remoteId from the creation"; + SinkWarning() << "Returned an empty remoteId from the modification"; } else { syncStore().updateRemoteId(type, uid, remoteId); } -- cgit v1.2.3