summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-04-07 14:21:44 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-04-07 14:50:15 +0200
commit9fdcedf88e2fabedad73f0c74906318673f1ffa0 (patch)
treeeceba5dfb03e59c483ad83af8ae220894083473a
parent8c7924171942f5f0d25c8a02f66d82f5be6edb5b (diff)
downloadsink-9fdcedf88e2fabedad73f0c74906318673f1ffa0.tar.gz
sink-9fdcedf88e2fabedad73f0c74906318673f1ffa0.zip
Better account status aggregation.
Only ever enter error state on non-recoverable errors. Otherwise: * Busy state while busy, then go back to online/offline/error. * If we failed connect during replay/sync we assume we're offline. * If we failed to login but could connect we have a known error condition. * If we succeeded to replay/sync something we are apprently online. At the core we have the problem that we have no way of telling wether we can connect to the server until we actually try (network is not enough: vpns, firewalls, ....). Further the status always reflects the latest status, so even if we were in an error state, once we retry we go out of the error state and either end up back in the error state or not. When aggregating states we have to similarly adjust the state to the most relevant among the resources. The states are ordered like this: * Error * Busy * Connected * Offline
-rw-r--r--common/resourcefacade.cpp35
-rw-r--r--common/synchronizer.cpp70
-rw-r--r--common/synchronizer.h7
-rw-r--r--tests/notificationtest.cpp4
4 files changed, 78 insertions, 38 deletions
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp
index c702777..dee0711 100644
--- a/common/resourcefacade.cpp
+++ b/common/resourcefacade.cpp
@@ -351,7 +351,7 @@ QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename ApplicationDomain
351 const auto resources = Store::read<ApplicationDomain::SinkResource>(query); 351 const auto resources = Store::read<ApplicationDomain::SinkResource>(query);
352 SinkTraceCtx(ctx) << "Found resource belonging to the account " << account.identifier() << " : " << resources; 352 SinkTraceCtx(ctx) << "Found resource belonging to the account " << account.identifier() << " : " << resources;
353 auto accountIdentifier = account.identifier(); 353 auto accountIdentifier = account.identifier();
354 ApplicationDomain::Status status = ApplicationDomain::ConnectedStatus; 354 QList<int> states;
355 for (const auto &resource : resources) { 355 for (const auto &resource : resources) {
356 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource.identifier(), ResourceConfig::getResourceType(resource.identifier())); 356 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource.identifier(), ResourceConfig::getResourceType(resource.identifier()));
357 if (!monitoredResources->contains(resource.identifier())) { 357 if (!monitoredResources->contains(resource.identifier())) {
@@ -364,27 +364,20 @@ QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename ApplicationDomain
364 Q_ASSERT(ret); 364 Q_ASSERT(ret);
365 monitoredResources->insert(resource.identifier()); 365 monitoredResources->insert(resource.identifier());
366 } 366 }
367 367 states << resourceAccess->getResourceStatus();
368 //Figure out overall status
369 auto s = resourceAccess->getResourceStatus();
370 switch (s) {
371 case ApplicationDomain::ErrorStatus:
372 status = ApplicationDomain::ErrorStatus;
373 break;
374 case ApplicationDomain::OfflineStatus:
375 if (status == ApplicationDomain::ConnectedStatus) {
376 status = ApplicationDomain::OfflineStatus;
377 }
378 break;
379 case ApplicationDomain::ConnectedStatus:
380 break;
381 case ApplicationDomain::BusyStatus:
382 if (status != ApplicationDomain::ErrorStatus) {
383 status = ApplicationDomain::BusyStatus;
384 }
385 break;
386 }
387 } 368 }
369 const auto status = [&] {
370 if (states.contains(ApplicationDomain::ErrorStatus)) {
371 return ApplicationDomain::ErrorStatus;
372 }
373 if (states.contains(ApplicationDomain::BusyStatus)) {
374 return ApplicationDomain::BusyStatus;
375 }
376 if (states.contains(ApplicationDomain::ConnectedStatus)) {
377 return ApplicationDomain::ConnectedStatus;
378 }
379 return ApplicationDomain::OfflineStatus;
380 }();
388 account.setStatusStatus(status); 381 account.setStatusStatus(status);
389 }); 382 });
390 return qMakePair(KAsync::null<void>(), runner->emitter()); 383 return qMakePair(KAsync::null<void>(), runner->emitter());
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 284f867..9451488 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -40,6 +40,7 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context)
40 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), 40 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite),
41 mSyncInProgress(false) 41 mSyncInProgress(false)
42{ 42{
43 mCurrentState.push(ApplicationDomain::Status::OfflineStatus);
43 SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); 44 SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
44} 45}
45 46
@@ -308,6 +309,23 @@ void Synchronizer::reportProgress(int progress, int total)
308 SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; 309 SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total;
309} 310}
310 311
312void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId)
313{
314 if (error) {
315 if (error.errorCode == ApplicationDomain::ConnectionError) {
316 //Couldn't connect, so we assume we don't have a network connection.
317 setStatus(ApplicationDomain::OfflineStatus, s, requestId);
318 } else if (error.errorCode == ApplicationDomain::LoginError) {
319 //If we failed to login altough we could connect that indicates a problem with our setup.
320 setStatus(ApplicationDomain::ErrorStatus, s, requestId);
321 }
322 //We don't know what kind of error this was, so we assume it's transient and don't change ou status.
323 } else {
324 //An operation against the server worked, so we're probably online.
325 setStatus(ApplicationDomain::ConnectedStatus, s, requestId);
326 }
327}
328
311KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) 329KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
312{ 330{
313 if (request.options & SyncRequest::RequestFlush) { 331 if (request.options & SyncRequest::RequestFlush) {
@@ -333,26 +351,20 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
333 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 351 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
334 return KAsync::start([this, request] { 352 return KAsync::start([this, request] {
335 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; 353 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query;
336 emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Synchronization has started.", request.requestId);
337 emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); 354 emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities);
338 }).then(synchronizeWithSource(request.query)).then([this] { 355 }).then(synchronizeWithSource(request.query)).then([this] {
339 //Commit after every request, so implementations only have to commit more if they add a lot of data. 356 //Commit after every request, so implementations only have to commit more if they add a lot of data.
340 commit(); 357 commit();
341 }).then<void>([this, request](const KAsync::Error &error) { 358 }).then<void>([this, request](const KAsync::Error &error) {
359 setStatusFromResult(error, "Synchronization has ended.", request.requestId);
342 if (error) { 360 if (error) {
343 //Emit notification with error 361 //Emit notification with error
344 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; 362 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error;
345 if (error.errorCode == ApplicationDomain::ConnectionError) {
346 emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Synchronization has ended.", request.requestId);
347 } else {
348 emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Synchronization has ended.", request.requestId);
349 }
350 emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); 363 emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities);
351 return KAsync::error(error); 364 return KAsync::error(error);
352 } else { 365 } else {
353 SinkLogCtx(mLogCtx) << "Done Synchronizing"; 366 SinkLogCtx(mLogCtx) << "Done Synchronizing";
354 emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); 367 emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities);
355 emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "Synchronization has ended.", request.requestId);
356 return KAsync::null(); 368 return KAsync::null();
357 } 369 }
358 }); 370 });
@@ -377,21 +389,15 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
377 } else { 389 } else {
378 return KAsync::start([this, request] { 390 return KAsync::start([this, request] {
379 SinkLogCtx(mLogCtx) << "Replaying changes."; 391 SinkLogCtx(mLogCtx) << "Replaying changes.";
380 emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Changereplay has started.", "changereplay");
381 }) 392 })
382 .then(replayNextRevision()) 393 .then(replayNextRevision())
383 .then<void>([this, request](const KAsync::Error &error) { 394 .then<void>([this, request](const KAsync::Error &error) {
395 setStatusFromResult(error, "Changereplay has ended.", "changereplay");
384 if (error) { 396 if (error) {
385 SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; 397 SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage;
386 if (error.errorCode == ApplicationDomain::ConnectionError) {
387 emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Changereplay has ended.", "changereplay");
388 } else {
389 emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Changereplay has ended.", "changereplay");
390 }
391 return KAsync::error(error); 398 return KAsync::error(error);
392 } else { 399 } else {
393 SinkLogCtx(mLogCtx) << "Done replaying changes"; 400 SinkLogCtx(mLogCtx) << "Done replaying changes";
394 emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "All changes have been replayed.", "changereplay");
395 return KAsync::null(); 401 return KAsync::null();
396 } 402 }
397 }); 403 });
@@ -403,6 +409,34 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
403 409
404} 410}
405 411
412void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId)
413{
414 if (state != mCurrentState.top()) {
415 if (mCurrentState.top() == ApplicationDomain::BusyStatus) {
416 mCurrentState.pop();
417 }
418 mCurrentState.push(state);
419 emitNotification(Notification::Status, state, reason, requestId);
420 }
421}
422
423void Synchronizer::resetStatus(const QByteArray requestId)
424{
425 mCurrentState.pop();
426 emitNotification(Notification::Status, mCurrentState.top(), {}, requestId);
427}
428
429void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId)
430{
431 if (busy) {
432 setStatus(ApplicationDomain::BusyStatus, reason, requestId);
433 } else {
434 if (mCurrentState.top() == ApplicationDomain::BusyStatus) {
435 resetStatus(requestId);
436 }
437 }
438}
439
406KAsync::Job<void> Synchronizer::processSyncQueue() 440KAsync::Job<void> Synchronizer::processSyncQueue()
407{ 441{
408 if (mSyncRequestQueue.isEmpty()) { 442 if (mSyncRequestQueue.isEmpty()) {
@@ -421,14 +455,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
421 } 455 }
422 456
423 const auto request = mSyncRequestQueue.takeFirst(); 457 const auto request = mSyncRequestQueue.takeFirst();
424 return KAsync::start([this] { 458 return KAsync::start([=] {
425 mMessageQueue->startTransaction(); 459 mMessageQueue->startTransaction();
426 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); 460 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly);
427 mSyncInProgress = true; 461 mSyncInProgress = true;
462 if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
463 setBusy(true, "Synchronization has started.", request.requestId);
464 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
465 setBusy(true, "ChangeReplay has started.", "changereplay");
466 }
428 }) 467 })
429 .then(processRequest(request)) 468 .then(processRequest(request))
430 .then<void>([this, request](const KAsync::Error &error) { 469 .then<void>([this, request](const KAsync::Error &error) {
431 SinkTraceCtx(mLogCtx) << "Sync request processed"; 470 SinkTraceCtx(mLogCtx) << "Sync request processed";
471 setBusy(false, {}, request.requestId);
432 mEntityStore->abortTransaction(); 472 mEntityStore->abortTransaction();
433 mSyncTransaction.abort(); 473 mSyncTransaction.abort();
434 mMessageQueue->commit(); 474 mMessageQueue->commit();
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 32e93c4..b1ee122 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -21,6 +21,7 @@
21 21
22#include "sink_export.h" 22#include "sink_export.h"
23#include <QObject> 23#include <QObject>
24#include <QStack>
24#include <KAsync/Async> 25#include <KAsync/Async>
25#include <domainadaptor.h> 26#include <domainadaptor.h>
26#include <query.h> 27#include <query.h>
@@ -193,6 +194,12 @@ protected:
193 Sink::Log::Context mLogCtx; 194 Sink::Log::Context mLogCtx;
194 195
195private: 196private:
197 QStack<ApplicationDomain::Status> mCurrentState;
198 void setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId);
199 void setStatus(ApplicationDomain::Status busy, const QString &reason, const QByteArray requestId);
200 void resetStatus(const QByteArray requestId);
201 void setBusy(bool busy, const QString &reason, const QByteArray requestId);
202
196 void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); 203 void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
197 KAsync::Job<void> processRequest(const SyncRequest &request); 204 KAsync::Job<void> processRequest(const SyncRequest &request);
198 KAsync::Job<void> processSyncQueue(); 205 KAsync::Job<void> processSyncQueue();
diff --git a/tests/notificationtest.cpp b/tests/notificationtest.cpp
index 375bfc4..a34d325 100644
--- a/tests/notificationtest.cpp
+++ b/tests/notificationtest.cpp
@@ -71,8 +71,8 @@ private slots:
71 QTRY_COMPARE(statusNotifications.size(), 3); 71 QTRY_COMPARE(statusNotifications.size(), 3);
72 //Sync 72 //Sync
73 QCOMPARE(statusNotifications.at(0).code, static_cast<int>(ApplicationDomain::Status::ConnectedStatus)); 73 QCOMPARE(statusNotifications.at(0).code, static_cast<int>(ApplicationDomain::Status::ConnectedStatus));
74 QCOMPARE(statusNotifications.at(1).code, static_cast<int>(Sink::ApplicationDomain::Status::BusyStatus)); 74 QCOMPARE(statusNotifications.at(1).code, static_cast<int>(ApplicationDomain::Status::BusyStatus));
75 QCOMPARE(statusNotifications.at(2).code, static_cast<int>(Sink::ApplicationDomain::Status::ConnectedStatus)); 75 QCOMPARE(statusNotifications.at(2).code, static_cast<int>(ApplicationDomain::Status::ConnectedStatus));
76 //Changereplay 76 //Changereplay
77 // It can happen that we get a changereplay notification pair first and then a second one at the end, 77 // It can happen that we get a changereplay notification pair first and then a second one at the end,
78 // we therefore currently filter all changereplay notifications (see above). 78 // we therefore currently filter all changereplay notifications (see above).