From 9fdcedf88e2fabedad73f0c74906318673f1ffa0 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 7 Apr 2017 14:21:44 +0200 Subject: Better account status aggregation. Only ever enter error state on non-recoverable errors. Otherwise: * Busy state while busy, then go back to online/offline/error. * If we failed connect during replay/sync we assume we're offline. * If we failed to login but could connect we have a known error condition. * If we succeeded to replay/sync something we are apprently online. At the core we have the problem that we have no way of telling wether we can connect to the server until we actually try (network is not enough: vpns, firewalls, ....). Further the status always reflects the latest status, so even if we were in an error state, once we retry we go out of the error state and either end up back in the error state or not. When aggregating states we have to similarly adjust the state to the most relevant among the resources. The states are ordered like this: * Error * Busy * Connected * Offline --- common/synchronizer.cpp | 70 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 55 insertions(+), 15 deletions(-) (limited to 'common/synchronizer.cpp') diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 284f867..9451488 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -40,6 +40,7 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context) mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), mSyncInProgress(false) { + mCurrentState.push(ApplicationDomain::Status::OfflineStatus); SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); } @@ -308,6 +309,23 @@ void Synchronizer::reportProgress(int progress, int total) SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; } +void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId) +{ + if (error) { + if (error.errorCode == ApplicationDomain::ConnectionError) { + //Couldn't connect, so we assume we don't have a network connection. + setStatus(ApplicationDomain::OfflineStatus, s, requestId); + } else if (error.errorCode == ApplicationDomain::LoginError) { + //If we failed to login altough we could connect that indicates a problem with our setup. + setStatus(ApplicationDomain::ErrorStatus, s, requestId); + } + //We don't know what kind of error this was, so we assume it's transient and don't change ou status. + } else { + //An operation against the server worked, so we're probably online. + setStatus(ApplicationDomain::ConnectedStatus, s, requestId); + } +} + KAsync::Job Synchronizer::processRequest(const SyncRequest &request) { if (request.options & SyncRequest::RequestFlush) { @@ -333,26 +351,20 @@ KAsync::Job Synchronizer::processRequest(const SyncRequest &request) } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { return KAsync::start([this, request] { SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; - emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Synchronization has started.", request.requestId); emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); }).then(synchronizeWithSource(request.query)).then([this] { //Commit after every request, so implementations only have to commit more if they add a lot of data. commit(); }).then([this, request](const KAsync::Error &error) { + setStatusFromResult(error, "Synchronization has ended.", request.requestId); if (error) { //Emit notification with error SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; - if (error.errorCode == ApplicationDomain::ConnectionError) { - emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Synchronization has ended.", request.requestId); - } else { - emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Synchronization has ended.", request.requestId); - } emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); return KAsync::error(error); } else { SinkLogCtx(mLogCtx) << "Done Synchronizing"; emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); - emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "Synchronization has ended.", request.requestId); return KAsync::null(); } }); @@ -377,21 +389,15 @@ KAsync::Job Synchronizer::processRequest(const SyncRequest &request) } else { return KAsync::start([this, request] { SinkLogCtx(mLogCtx) << "Replaying changes."; - emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Changereplay has started.", "changereplay"); }) .then(replayNextRevision()) .then([this, request](const KAsync::Error &error) { + setStatusFromResult(error, "Changereplay has ended.", "changereplay"); if (error) { SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; - if (error.errorCode == ApplicationDomain::ConnectionError) { - emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Changereplay has ended.", "changereplay"); - } else { - emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Changereplay has ended.", "changereplay"); - } return KAsync::error(error); } else { SinkLogCtx(mLogCtx) << "Done replaying changes"; - emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "All changes have been replayed.", "changereplay"); return KAsync::null(); } }); @@ -403,6 +409,34 @@ KAsync::Job Synchronizer::processRequest(const SyncRequest &request) } +void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId) +{ + if (state != mCurrentState.top()) { + if (mCurrentState.top() == ApplicationDomain::BusyStatus) { + mCurrentState.pop(); + } + mCurrentState.push(state); + emitNotification(Notification::Status, state, reason, requestId); + } +} + +void Synchronizer::resetStatus(const QByteArray requestId) +{ + mCurrentState.pop(); + emitNotification(Notification::Status, mCurrentState.top(), {}, requestId); +} + +void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId) +{ + if (busy) { + setStatus(ApplicationDomain::BusyStatus, reason, requestId); + } else { + if (mCurrentState.top() == ApplicationDomain::BusyStatus) { + resetStatus(requestId); + } + } +} + KAsync::Job Synchronizer::processSyncQueue() { if (mSyncRequestQueue.isEmpty()) { @@ -421,14 +455,20 @@ KAsync::Job Synchronizer::processSyncQueue() } const auto request = mSyncRequestQueue.takeFirst(); - return KAsync::start([this] { + return KAsync::start([=] { mMessageQueue->startTransaction(); mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); mSyncInProgress = true; + if (request.requestType == Synchronizer::SyncRequest::Synchronization) { + setBusy(true, "Synchronization has started.", request.requestId); + } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { + setBusy(true, "ChangeReplay has started.", "changereplay"); + } }) .then(processRequest(request)) .then([this, request](const KAsync::Error &error) { SinkTraceCtx(mLogCtx) << "Sync request processed"; + setBusy(false, {}, request.requestId); mEntityStore->abortTransaction(); mSyncTransaction.abort(); mMessageQueue->commit(); -- cgit v1.2.3