summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-04-07 14:21:44 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-04-07 14:50:15 +0200
commit9fdcedf88e2fabedad73f0c74906318673f1ffa0 (patch)
treeeceba5dfb03e59c483ad83af8ae220894083473a /common/synchronizer.cpp
parent8c7924171942f5f0d25c8a02f66d82f5be6edb5b (diff)
downloadsink-9fdcedf88e2fabedad73f0c74906318673f1ffa0.tar.gz
sink-9fdcedf88e2fabedad73f0c74906318673f1ffa0.zip
Better account status aggregation.
Only ever enter error state on non-recoverable errors. Otherwise: * Busy state while busy, then go back to online/offline/error. * If we failed connect during replay/sync we assume we're offline. * If we failed to login but could connect we have a known error condition. * If we succeeded to replay/sync something we are apprently online. At the core we have the problem that we have no way of telling wether we can connect to the server until we actually try (network is not enough: vpns, firewalls, ....). Further the status always reflects the latest status, so even if we were in an error state, once we retry we go out of the error state and either end up back in the error state or not. When aggregating states we have to similarly adjust the state to the most relevant among the resources. The states are ordered like this: * Error * Busy * Connected * Offline
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp70
1 files changed, 55 insertions, 15 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 284f867..9451488 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -40,6 +40,7 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context)
40 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), 40 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite),
41 mSyncInProgress(false) 41 mSyncInProgress(false)
42{ 42{
43 mCurrentState.push(ApplicationDomain::Status::OfflineStatus);
43 SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); 44 SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
44} 45}
45 46
@@ -308,6 +309,23 @@ void Synchronizer::reportProgress(int progress, int total)
308 SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; 309 SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total;
309} 310}
310 311
312void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId)
313{
314 if (error) {
315 if (error.errorCode == ApplicationDomain::ConnectionError) {
316 //Couldn't connect, so we assume we don't have a network connection.
317 setStatus(ApplicationDomain::OfflineStatus, s, requestId);
318 } else if (error.errorCode == ApplicationDomain::LoginError) {
319 //If we failed to login altough we could connect that indicates a problem with our setup.
320 setStatus(ApplicationDomain::ErrorStatus, s, requestId);
321 }
322 //We don't know what kind of error this was, so we assume it's transient and don't change ou status.
323 } else {
324 //An operation against the server worked, so we're probably online.
325 setStatus(ApplicationDomain::ConnectedStatus, s, requestId);
326 }
327}
328
311KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) 329KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
312{ 330{
313 if (request.options & SyncRequest::RequestFlush) { 331 if (request.options & SyncRequest::RequestFlush) {
@@ -333,26 +351,20 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
333 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 351 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
334 return KAsync::start([this, request] { 352 return KAsync::start([this, request] {
335 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; 353 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query;
336 emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Synchronization has started.", request.requestId);
337 emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); 354 emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities);
338 }).then(synchronizeWithSource(request.query)).then([this] { 355 }).then(synchronizeWithSource(request.query)).then([this] {
339 //Commit after every request, so implementations only have to commit more if they add a lot of data. 356 //Commit after every request, so implementations only have to commit more if they add a lot of data.
340 commit(); 357 commit();
341 }).then<void>([this, request](const KAsync::Error &error) { 358 }).then<void>([this, request](const KAsync::Error &error) {
359 setStatusFromResult(error, "Synchronization has ended.", request.requestId);
342 if (error) { 360 if (error) {
343 //Emit notification with error 361 //Emit notification with error
344 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; 362 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error;
345 if (error.errorCode == ApplicationDomain::ConnectionError) {
346 emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Synchronization has ended.", request.requestId);
347 } else {
348 emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Synchronization has ended.", request.requestId);
349 }
350 emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); 363 emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities);
351 return KAsync::error(error); 364 return KAsync::error(error);
352 } else { 365 } else {
353 SinkLogCtx(mLogCtx) << "Done Synchronizing"; 366 SinkLogCtx(mLogCtx) << "Done Synchronizing";
354 emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); 367 emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities);
355 emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "Synchronization has ended.", request.requestId);
356 return KAsync::null(); 368 return KAsync::null();
357 } 369 }
358 }); 370 });
@@ -377,21 +389,15 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
377 } else { 389 } else {
378 return KAsync::start([this, request] { 390 return KAsync::start([this, request] {
379 SinkLogCtx(mLogCtx) << "Replaying changes."; 391 SinkLogCtx(mLogCtx) << "Replaying changes.";
380 emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Changereplay has started.", "changereplay");
381 }) 392 })
382 .then(replayNextRevision()) 393 .then(replayNextRevision())
383 .then<void>([this, request](const KAsync::Error &error) { 394 .then<void>([this, request](const KAsync::Error &error) {
395 setStatusFromResult(error, "Changereplay has ended.", "changereplay");
384 if (error) { 396 if (error) {
385 SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; 397 SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage;
386 if (error.errorCode == ApplicationDomain::ConnectionError) {
387 emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Changereplay has ended.", "changereplay");
388 } else {
389 emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Changereplay has ended.", "changereplay");
390 }
391 return KAsync::error(error); 398 return KAsync::error(error);
392 } else { 399 } else {
393 SinkLogCtx(mLogCtx) << "Done replaying changes"; 400 SinkLogCtx(mLogCtx) << "Done replaying changes";
394 emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "All changes have been replayed.", "changereplay");
395 return KAsync::null(); 401 return KAsync::null();
396 } 402 }
397 }); 403 });
@@ -403,6 +409,34 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
403 409
404} 410}
405 411
412void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId)
413{
414 if (state != mCurrentState.top()) {
415 if (mCurrentState.top() == ApplicationDomain::BusyStatus) {
416 mCurrentState.pop();
417 }
418 mCurrentState.push(state);
419 emitNotification(Notification::Status, state, reason, requestId);
420 }
421}
422
423void Synchronizer::resetStatus(const QByteArray requestId)
424{
425 mCurrentState.pop();
426 emitNotification(Notification::Status, mCurrentState.top(), {}, requestId);
427}
428
429void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId)
430{
431 if (busy) {
432 setStatus(ApplicationDomain::BusyStatus, reason, requestId);
433 } else {
434 if (mCurrentState.top() == ApplicationDomain::BusyStatus) {
435 resetStatus(requestId);
436 }
437 }
438}
439
406KAsync::Job<void> Synchronizer::processSyncQueue() 440KAsync::Job<void> Synchronizer::processSyncQueue()
407{ 441{
408 if (mSyncRequestQueue.isEmpty()) { 442 if (mSyncRequestQueue.isEmpty()) {
@@ -421,14 +455,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
421 } 455 }
422 456
423 const auto request = mSyncRequestQueue.takeFirst(); 457 const auto request = mSyncRequestQueue.takeFirst();
424 return KAsync::start([this] { 458 return KAsync::start([=] {
425 mMessageQueue->startTransaction(); 459 mMessageQueue->startTransaction();
426 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); 460 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly);
427 mSyncInProgress = true; 461 mSyncInProgress = true;
462 if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
463 setBusy(true, "Synchronization has started.", request.requestId);
464 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
465 setBusy(true, "ChangeReplay has started.", "changereplay");
466 }
428 }) 467 })
429 .then(processRequest(request)) 468 .then(processRequest(request))
430 .then<void>([this, request](const KAsync::Error &error) { 469 .then<void>([this, request](const KAsync::Error &error) {
431 SinkTraceCtx(mLogCtx) << "Sync request processed"; 470 SinkTraceCtx(mLogCtx) << "Sync request processed";
471 setBusy(false, {}, request.requestId);
432 mEntityStore->abortTransaction(); 472 mEntityStore->abortTransaction();
433 mSyncTransaction.abort(); 473 mSyncTransaction.abort();
434 mMessageQueue->commit(); 474 mMessageQueue->commit();