From 6a072b2dcf23cbcdb210f2bd5c273ea0f425b188 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 29 Nov 2016 11:27:04 +0100 Subject: The synchronization call can be sync. ... because we really just enqueue the request and then wait for the notification. --- common/synchronizer.cpp | 43 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 7 deletions(-) (limited to 'common/synchronizer.cpp') diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 11c7caf..6483cdf 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -38,7 +38,8 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context) : ChangeReplay(context), mResourceContext(context), mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), - mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) + mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), + mSyncInProgress(false) { SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); } @@ -254,15 +255,15 @@ void Synchronizer::modify(const DomainType &entity) QList Synchronizer::getSyncRequests(const Sink::QueryBase &query) { QList list; - list << Synchronizer::SyncRequest{query}; + list << Synchronizer::SyncRequest{query, "sync"}; return list; } -KAsync::Job Synchronizer::synchronize(const Sink::QueryBase &query) +void Synchronizer::synchronize(const Sink::QueryBase &query) { SinkTrace() << "Synchronizing"; mSyncRequestQueue << getSyncRequests(query); - return processSyncQueue(); + processSyncQueue().exec(); } void Synchronizer::flush(int commandId, const QByteArray &flushId) @@ -284,20 +285,48 @@ KAsync::Job Synchronizer::processSyncQueue() while (!mSyncRequestQueue.isEmpty()) { auto request = mSyncRequestQueue.takeFirst(); if (request.requestType == Synchronizer::SyncRequest::Synchronization) { - job = job.then(synchronizeWithSource(request.query)).syncThen([this] { + job = job.syncThen([this, request] { + Sink::Notification n; + n.id = request.requestId; + n.type = Notification::Status; + n.message = "Synchronization has started."; + n.code = ApplicationDomain::BusyStatus; + emit notify(n); + }).then(synchronizeWithSource(request.query)).syncThen([this] { //Commit after every request, so implementations only have to commit more if they add a lot of data. commit(); + }).then([this, request](const KAsync::Error &error) { + if (error) { + //Emit notification with error + SinkWarning() << "Synchronization failed: " << error.errorMessage; + Sink::Notification n; + n.id = request.requestId; + n.type = Notification::Status; + n.message = "Synchronization has ended."; + n.code = ApplicationDomain::ErrorStatus; + emit notify(n); + return KAsync::error(error); + } else { + SinkLog() << "Done Synchronizing"; + Sink::Notification n; + n.id = request.requestId; + n.type = Notification::Status; + n.message = "Synchronization has ended."; + n.code = ApplicationDomain::ConnectedStatus; + emit notify(n); + return KAsync::null(); + } }); } else if (request.requestType == Synchronizer::SyncRequest::Flush) { if (request.flushType == Flush::FlushReplayQueue) { SinkTrace() << "Emitting flush completion."; Sink::Notification n; n.type = Sink::Notification::FlushCompletion; - n.id = request.flushId; + n.id = request.requestId; emit notify(n); } else { flatbuffers::FlatBufferBuilder fbb; - auto flushId = fbb.CreateString(request.flushId); + auto flushId = fbb.CreateString(request.requestId); auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); Sink::Commands::FinishFlushBuffer(fbb, location); enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); -- cgit v1.2.3