diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-04-07 14:21:44 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-04-07 14:50:15 +0200 |
commit | 9fdcedf88e2fabedad73f0c74906318673f1ffa0 (patch) | |
tree | eceba5dfb03e59c483ad83af8ae220894083473a /common/synchronizer.cpp | |
parent | 8c7924171942f5f0d25c8a02f66d82f5be6edb5b (diff) | |
download | sink-9fdcedf88e2fabedad73f0c74906318673f1ffa0.tar.gz sink-9fdcedf88e2fabedad73f0c74906318673f1ffa0.zip |
Better account status aggregation.
Only ever enter error state on non-recoverable errors.
Otherwise:
* Busy state while busy, then go back to online/offline/error.
* If we failed connect during replay/sync we assume we're offline.
* If we failed to login but could connect we have a known error
condition.
* If we succeeded to replay/sync something we are apprently online.
At the core we have the problem that we have no way of telling wether
we can connect to the server until we actually try (network is not
enough: vpns, firewalls, ....). Further the status always reflects the
latest status, so even if we were in an error state, once we retry we go
out of the error state and either end up back in the error state or not.
When aggregating states we have to similarly adjust the state to the
most relevant among the resources. The states are ordered like this:
* Error
* Busy
* Connected
* Offline
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 70 |
1 files changed, 55 insertions, 15 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 284f867..9451488 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -40,6 +40,7 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context) | |||
40 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), | 40 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), |
41 | mSyncInProgress(false) | 41 | mSyncInProgress(false) |
42 | { | 42 | { |
43 | mCurrentState.push(ApplicationDomain::Status::OfflineStatus); | ||
43 | SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); | 44 | SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); |
44 | } | 45 | } |
45 | 46 | ||
@@ -308,6 +309,23 @@ void Synchronizer::reportProgress(int progress, int total) | |||
308 | SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; | 309 | SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; |
309 | } | 310 | } |
310 | 311 | ||
312 | void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId) | ||
313 | { | ||
314 | if (error) { | ||
315 | if (error.errorCode == ApplicationDomain::ConnectionError) { | ||
316 | //Couldn't connect, so we assume we don't have a network connection. | ||
317 | setStatus(ApplicationDomain::OfflineStatus, s, requestId); | ||
318 | } else if (error.errorCode == ApplicationDomain::LoginError) { | ||
319 | //If we failed to login altough we could connect that indicates a problem with our setup. | ||
320 | setStatus(ApplicationDomain::ErrorStatus, s, requestId); | ||
321 | } | ||
322 | //We don't know what kind of error this was, so we assume it's transient and don't change ou status. | ||
323 | } else { | ||
324 | //An operation against the server worked, so we're probably online. | ||
325 | setStatus(ApplicationDomain::ConnectedStatus, s, requestId); | ||
326 | } | ||
327 | } | ||
328 | |||
311 | KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | 329 | KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) |
312 | { | 330 | { |
313 | if (request.options & SyncRequest::RequestFlush) { | 331 | if (request.options & SyncRequest::RequestFlush) { |
@@ -333,26 +351,20 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
333 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | 351 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { |
334 | return KAsync::start([this, request] { | 352 | return KAsync::start([this, request] { |
335 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; | 353 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; |
336 | emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Synchronization has started.", request.requestId); | ||
337 | emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); | 354 | emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); |
338 | }).then(synchronizeWithSource(request.query)).then([this] { | 355 | }).then(synchronizeWithSource(request.query)).then([this] { |
339 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 356 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
340 | commit(); | 357 | commit(); |
341 | }).then<void>([this, request](const KAsync::Error &error) { | 358 | }).then<void>([this, request](const KAsync::Error &error) { |
359 | setStatusFromResult(error, "Synchronization has ended.", request.requestId); | ||
342 | if (error) { | 360 | if (error) { |
343 | //Emit notification with error | 361 | //Emit notification with error |
344 | SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; | 362 | SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; |
345 | if (error.errorCode == ApplicationDomain::ConnectionError) { | ||
346 | emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Synchronization has ended.", request.requestId); | ||
347 | } else { | ||
348 | emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Synchronization has ended.", request.requestId); | ||
349 | } | ||
350 | emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); | 363 | emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); |
351 | return KAsync::error(error); | 364 | return KAsync::error(error); |
352 | } else { | 365 | } else { |
353 | SinkLogCtx(mLogCtx) << "Done Synchronizing"; | 366 | SinkLogCtx(mLogCtx) << "Done Synchronizing"; |
354 | emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); | 367 | emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); |
355 | emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "Synchronization has ended.", request.requestId); | ||
356 | return KAsync::null(); | 368 | return KAsync::null(); |
357 | } | 369 | } |
358 | }); | 370 | }); |
@@ -377,21 +389,15 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
377 | } else { | 389 | } else { |
378 | return KAsync::start([this, request] { | 390 | return KAsync::start([this, request] { |
379 | SinkLogCtx(mLogCtx) << "Replaying changes."; | 391 | SinkLogCtx(mLogCtx) << "Replaying changes."; |
380 | emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Changereplay has started.", "changereplay"); | ||
381 | }) | 392 | }) |
382 | .then(replayNextRevision()) | 393 | .then(replayNextRevision()) |
383 | .then<void>([this, request](const KAsync::Error &error) { | 394 | .then<void>([this, request](const KAsync::Error &error) { |
395 | setStatusFromResult(error, "Changereplay has ended.", "changereplay"); | ||
384 | if (error) { | 396 | if (error) { |
385 | SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; | 397 | SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; |
386 | if (error.errorCode == ApplicationDomain::ConnectionError) { | ||
387 | emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Changereplay has ended.", "changereplay"); | ||
388 | } else { | ||
389 | emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Changereplay has ended.", "changereplay"); | ||
390 | } | ||
391 | return KAsync::error(error); | 398 | return KAsync::error(error); |
392 | } else { | 399 | } else { |
393 | SinkLogCtx(mLogCtx) << "Done replaying changes"; | 400 | SinkLogCtx(mLogCtx) << "Done replaying changes"; |
394 | emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "All changes have been replayed.", "changereplay"); | ||
395 | return KAsync::null(); | 401 | return KAsync::null(); |
396 | } | 402 | } |
397 | }); | 403 | }); |
@@ -403,6 +409,34 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
403 | 409 | ||
404 | } | 410 | } |
405 | 411 | ||
412 | void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId) | ||
413 | { | ||
414 | if (state != mCurrentState.top()) { | ||
415 | if (mCurrentState.top() == ApplicationDomain::BusyStatus) { | ||
416 | mCurrentState.pop(); | ||
417 | } | ||
418 | mCurrentState.push(state); | ||
419 | emitNotification(Notification::Status, state, reason, requestId); | ||
420 | } | ||
421 | } | ||
422 | |||
423 | void Synchronizer::resetStatus(const QByteArray requestId) | ||
424 | { | ||
425 | mCurrentState.pop(); | ||
426 | emitNotification(Notification::Status, mCurrentState.top(), {}, requestId); | ||
427 | } | ||
428 | |||
429 | void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId) | ||
430 | { | ||
431 | if (busy) { | ||
432 | setStatus(ApplicationDomain::BusyStatus, reason, requestId); | ||
433 | } else { | ||
434 | if (mCurrentState.top() == ApplicationDomain::BusyStatus) { | ||
435 | resetStatus(requestId); | ||
436 | } | ||
437 | } | ||
438 | } | ||
439 | |||
406 | KAsync::Job<void> Synchronizer::processSyncQueue() | 440 | KAsync::Job<void> Synchronizer::processSyncQueue() |
407 | { | 441 | { |
408 | if (mSyncRequestQueue.isEmpty()) { | 442 | if (mSyncRequestQueue.isEmpty()) { |
@@ -421,14 +455,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
421 | } | 455 | } |
422 | 456 | ||
423 | const auto request = mSyncRequestQueue.takeFirst(); | 457 | const auto request = mSyncRequestQueue.takeFirst(); |
424 | return KAsync::start([this] { | 458 | return KAsync::start([=] { |
425 | mMessageQueue->startTransaction(); | 459 | mMessageQueue->startTransaction(); |
426 | mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); | 460 | mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); |
427 | mSyncInProgress = true; | 461 | mSyncInProgress = true; |
462 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | ||
463 | setBusy(true, "Synchronization has started.", request.requestId); | ||
464 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { | ||
465 | setBusy(true, "ChangeReplay has started.", "changereplay"); | ||
466 | } | ||
428 | }) | 467 | }) |
429 | .then(processRequest(request)) | 468 | .then(processRequest(request)) |
430 | .then<void>([this, request](const KAsync::Error &error) { | 469 | .then<void>([this, request](const KAsync::Error &error) { |
431 | SinkTraceCtx(mLogCtx) << "Sync request processed"; | 470 | SinkTraceCtx(mLogCtx) << "Sync request processed"; |
471 | setBusy(false, {}, request.requestId); | ||
432 | mEntityStore->abortTransaction(); | 472 | mEntityStore->abortTransaction(); |
433 | mSyncTransaction.abort(); | 473 | mSyncTransaction.abort(); |
434 | mMessageQueue->commit(); | 474 | mMessageQueue->commit(); |