From 6a072b2dcf23cbcdb210f2bd5c273ea0f425b188 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 29 Nov 2016 11:27:04 +0100 Subject: The synchronization call can be sync. ... because we really just enqueue the request and then wait for the notification. --- common/commandprocessor.cpp | 48 +++++---------------------------------------- common/commandprocessor.h | 1 - common/genericresource.cpp | 3 ++- common/notifier.cpp | 5 +++++ common/notifier.h | 1 + common/synchronizer.cpp | 43 +++++++++++++++++++++++++++++++++------- common/synchronizer.h | 13 ++++++------ 7 files changed, 56 insertions(+), 58 deletions(-) (limited to 'common') diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index fccff22..8eb0ef1 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -128,17 +128,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data) QDataStream stream(&data, QIODevice::ReadOnly); stream >> query; } - synchronizeWithSource(query) - .then([timer](const KAsync::Error &error) { - if (error) { - SinkWarning() << "Sync failed: " << error.errorMessage; - return KAsync::error(error); - } else { - SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); - return KAsync::null(); - } - }) - .exec(); + mSynchronizer->synchronize(query); } else { SinkWarning() << "received invalid command"; } @@ -156,34 +146,6 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data) // loadResource().setLowerBoundRevision(lowerBoundRevision()); // } -KAsync::Job CommandProcessor::synchronizeWithSource(const Sink::QueryBase &query) -{ - return KAsync::start([this, query] { - Sink::Notification n; - n.id = "sync"; - n.type = Sink::Notification::Status; - n.message = "Synchronization has started."; - n.code = Sink::ApplicationDomain::BusyStatus; - emit notify(n); - - SinkLog() << " Synchronizing"; - return mSynchronizer->synchronize(query) - .then([this](const KAsync::Error &error) { - if (!error) { - SinkLog() << "Done Synchronizing"; - Sink::Notification n; - n.id = "sync"; - n.type = Sink::Notification::Status; - n.message = "Synchronization has ended."; - n.code = Sink::ApplicationDomain::ConnectedStatus; - emit notify(n); - return KAsync::null(); - } - return KAsync::error(error); - }); - }); -} - void CommandProcessor::setOldestUsedRevision(qint64 revision) { mLowerBoundRevision = revision; @@ -337,17 +299,17 @@ void CommandProcessor::setSynchronizer(const QSharedPointer &synch QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { Sink::Notification n; n.id = "changereplay"; - n.type = Sink::Notification::Status; + n.type = Notification::Status; n.message = "Replaying changes."; - n.code = Sink::ApplicationDomain::BusyStatus; + n.code = ApplicationDomain::BusyStatus; emit notify(n); }); QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { Sink::Notification n; n.id = "changereplay"; - n.type = Sink::Notification::Status; + n.type = Notification::Status; n.message = "All changes have been replayed."; - n.code = Sink::ApplicationDomain::ConnectedStatus; + n.code = ApplicationDomain::ConnectedStatus; emit notify(n); }); diff --git a/common/commandprocessor.h b/common/commandprocessor.h index a807f46..81f93e5 100644 --- a/common/commandprocessor.h +++ b/common/commandprocessor.h @@ -78,7 +78,6 @@ private: // void processRevisionReplayedCommand(const QByteArray &data); KAsync::Job flush(void const *command, size_t size); - KAsync::Job synchronizeWithSource(const Sink::QueryBase &query); Sink::Pipeline *mPipeline; MessageQueue mUserQueue; diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 5819a07..c11e899 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -100,7 +100,8 @@ void GenericResource::processCommand(int commandId, const QByteArray &data) KAsync::Job GenericResource::synchronizeWithSource(const Sink::QueryBase &query) { - return mSynchronizer->synchronize(query); + mSynchronizer->synchronize(query); + return KAsync::null(); } KAsync::Job GenericResource::processAllMessages() diff --git a/common/notifier.cpp b/common/notifier.cpp index 94ac84e..53db5be 100644 --- a/common/notifier.cpp +++ b/common/notifier.cpp @@ -23,6 +23,7 @@ #include #include "resourceaccess.h" +#include "resourceconfig.h" #include "log.h" using namespace Sink; @@ -60,6 +61,10 @@ Notifier::Notifier(const QByteArray &instanceIdentifier, const QByteArray &resou d->resourceAccess << resourceAccess; } +Notifier::Notifier(const QByteArray &instanceIdentifier) : Notifier(instanceIdentifier, ResourceConfig::getResourceType(instanceIdentifier)) +{ +} + void Notifier::registerHandler(std::function handler) { d->handler << handler; diff --git a/common/notifier.h b/common/notifier.h index 3d61e95..df8f34b 100644 --- a/common/notifier.h +++ b/common/notifier.h @@ -36,6 +36,7 @@ class SINK_EXPORT Notifier { public: Notifier(const QSharedPointer &resourceAccess); + Notifier(const QByteArray &resourceInstanceIdentifier); Notifier(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType); void registerHandler(std::function); diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 11c7caf..6483cdf 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -38,7 +38,8 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context) : ChangeReplay(context), mResourceContext(context), mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), - mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) + mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), + mSyncInProgress(false) { SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); } @@ -254,15 +255,15 @@ void Synchronizer::modify(const DomainType &entity) QList Synchronizer::getSyncRequests(const Sink::QueryBase &query) { QList list; - list << Synchronizer::SyncRequest{query}; + list << Synchronizer::SyncRequest{query, "sync"}; return list; } -KAsync::Job Synchronizer::synchronize(const Sink::QueryBase &query) +void Synchronizer::synchronize(const Sink::QueryBase &query) { SinkTrace() << "Synchronizing"; mSyncRequestQueue << getSyncRequests(query); - return processSyncQueue(); + processSyncQueue().exec(); } void Synchronizer::flush(int commandId, const QByteArray &flushId) @@ -284,20 +285,48 @@ KAsync::Job Synchronizer::processSyncQueue() while (!mSyncRequestQueue.isEmpty()) { auto request = mSyncRequestQueue.takeFirst(); if (request.requestType == Synchronizer::SyncRequest::Synchronization) { - job = job.then(synchronizeWithSource(request.query)).syncThen([this] { + job = job.syncThen([this, request] { + Sink::Notification n; + n.id = request.requestId; + n.type = Notification::Status; + n.message = "Synchronization has started."; + n.code = ApplicationDomain::BusyStatus; + emit notify(n); + }).then(synchronizeWithSource(request.query)).syncThen([this] { //Commit after every request, so implementations only have to commit more if they add a lot of data. commit(); + }).then([this, request](const KAsync::Error &error) { + if (error) { + //Emit notification with error + SinkWarning() << "Synchronization failed: " << error.errorMessage; + Sink::Notification n; + n.id = request.requestId; + n.type = Notification::Status; + n.message = "Synchronization has ended."; + n.code = ApplicationDomain::ErrorStatus; + emit notify(n); + return KAsync::error(error); + } else { + SinkLog() << "Done Synchronizing"; + Sink::Notification n; + n.id = request.requestId; + n.type = Notification::Status; + n.message = "Synchronization has ended."; + n.code = ApplicationDomain::ConnectedStatus; + emit notify(n); + return KAsync::null(); + } }); } else if (request.requestType == Synchronizer::SyncRequest::Flush) { if (request.flushType == Flush::FlushReplayQueue) { SinkTrace() << "Emitting flush completion."; Sink::Notification n; n.type = Sink::Notification::FlushCompletion; - n.id = request.flushId; + n.id = request.requestId; emit notify(n); } else { flatbuffers::FlatBufferBuilder fbb; - auto flushId = fbb.CreateString(request.flushId); + auto flushId = fbb.CreateString(request.requestId); auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); Sink::Commands::FinishFlushBuffer(fbb, location); enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); diff --git a/common/synchronizer.h b/common/synchronizer.h index 989f902..f9b834e 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -44,7 +44,7 @@ public: virtual ~Synchronizer(); void setup(const std::function &enqueueCommandCallback, MessageQueue &messageQueue); - KAsync::Job synchronize(const Sink::QueryBase &query); + void synchronize(const Sink::QueryBase &query); void flush(int commandId, const QByteArray &flushId); //Read only access to main storage @@ -123,8 +123,9 @@ protected: Flush }; - SyncRequest(const Sink::QueryBase &q) - : requestType(Synchronization), + SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray()) + : requestId(requestId_), + requestType(Synchronization), query(q) { } @@ -134,15 +135,15 @@ protected: { } - SyncRequest(RequestType type, int flushType_, const QByteArray &flushId_) + SyncRequest(RequestType type, int flushType_, const QByteArray &requestId_) : flushType(flushType_), - flushId(flushId_), + requestId(requestId_), requestType(type) { } int flushType = 0; - QByteArray flushId; + QByteArray requestId; RequestType requestType; Sink::QueryBase query; }; -- cgit v1.2.3