summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/resourcefacade.cpp35
-rw-r--r--common/synchronizer.cpp70
-rw-r--r--common/synchronizer.h7
-rw-r--r--tests/notificationtest.cpp4
4 files changed, 78 insertions, 38 deletions
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp
index c702777..dee0711 100644
--- a/common/resourcefacade.cpp
+++ b/common/resourcefacade.cpp
@@ -351,7 +351,7 @@ QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename ApplicationDomain
351 const auto resources = Store::read<ApplicationDomain::SinkResource>(query); 351 const auto resources = Store::read<ApplicationDomain::SinkResource>(query);
352 SinkTraceCtx(ctx) << "Found resource belonging to the account " << account.identifier() << " : " << resources; 352 SinkTraceCtx(ctx) << "Found resource belonging to the account " << account.identifier() << " : " << resources;
353 auto accountIdentifier = account.identifier(); 353 auto accountIdentifier = account.identifier();
354 ApplicationDomain::Status status = ApplicationDomain::ConnectedStatus; 354 QList<int> states;
355 for (const auto &resource : resources) { 355 for (const auto &resource : resources) {
356 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource.identifier(), ResourceConfig::getResourceType(resource.identifier())); 356 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource.identifier(), ResourceConfig::getResourceType(resource.identifier()));
357 if (!monitoredResources->contains(resource.identifier())) { 357 if (!monitoredResources->contains(resource.identifier())) {
@@ -364,27 +364,20 @@ QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename ApplicationDomain
364 Q_ASSERT(ret); 364 Q_ASSERT(ret);
365 monitoredResources->insert(resource.identifier()); 365 monitoredResources->insert(resource.identifier());
366 } 366 }
367 367 states << resourceAccess->getResourceStatus();
368 //Figure out overall status
369 auto s = resourceAccess->getResourceStatus();
370 switch (s) {
371 case ApplicationDomain::ErrorStatus:
372 status = ApplicationDomain::ErrorStatus;
373 break;
374 case ApplicationDomain::OfflineStatus:
375 if (status == ApplicationDomain::ConnectedStatus) {
376 status = ApplicationDomain::OfflineStatus;
377 }
378 break;
379 case ApplicationDomain::ConnectedStatus:
380 break;
381 case ApplicationDomain::BusyStatus:
382 if (status != ApplicationDomain::ErrorStatus) {
383 status = ApplicationDomain::BusyStatus;
384 }
385 break;
386 }
387 } 368 }
369 const auto status = [&] {
370 if (states.contains(ApplicationDomain::ErrorStatus)) {
371 return ApplicationDomain::ErrorStatus;
372 }
373 if (states.contains(ApplicationDomain::BusyStatus)) {
374 return ApplicationDomain::BusyStatus;
375 }
376 if (states.contains(ApplicationDomain::ConnectedStatus)) {
377 return ApplicationDomain::ConnectedStatus;
378 }
379 return ApplicationDomain::OfflineStatus;
380 }();
388 account.setStatusStatus(status); 381 account.setStatusStatus(status);
389 }); 382 });
390 return qMakePair(KAsync::null<void>(), runner->emitter()); 383 return qMakePair(KAsync::null<void>(), runner->emitter());
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 284f867..9451488 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -40,6 +40,7 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context)
40 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), 40 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite),
41 mSyncInProgress(false) 41 mSyncInProgress(false)
42{ 42{
43 mCurrentState.push(ApplicationDomain::Status::OfflineStatus);
43 SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); 44 SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
44} 45}
45 46
@@ -308,6 +309,23 @@ void Synchronizer::reportProgress(int progress, int total)
308 SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; 309 SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total;
309} 310}
310 311
312void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId)
313{
314 if (error) {
315 if (error.errorCode == ApplicationDomain::ConnectionError) {
316 //Couldn't connect, so we assume we don't have a network connection.
317 setStatus(ApplicationDomain::OfflineStatus, s, requestId);
318 } else if (error.errorCode == ApplicationDomain::LoginError) {
319 //If we failed to login altough we could connect that indicates a problem with our setup.
320 setStatus(ApplicationDomain::ErrorStatus, s, requestId);
321 }
322 //We don't know what kind of error this was, so we assume it's transient and don't change ou status.
323 } else {
324 //An operation against the server worked, so we're probably online.
325 setStatus(ApplicationDomain::ConnectedStatus, s, requestId);
326 }
327}
328
311KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) 329KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
312{ 330{
313 if (request.options & SyncRequest::RequestFlush) { 331 if (request.options & SyncRequest::RequestFlush) {
@@ -333,26 +351,20 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
333 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 351 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
334 return KAsync::start([this, request] { 352 return KAsync::start([this, request] {
335 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; 353 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query;
336 emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Synchronization has started.", request.requestId);
337 emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); 354 emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities);
338 }).then(synchronizeWithSource(request.query)).then([this] { 355 }).then(synchronizeWithSource(request.query)).then([this] {
339 //Commit after every request, so implementations only have to commit more if they add a lot of data. 356 //Commit after every request, so implementations only have to commit more if they add a lot of data.
340 commit(); 357 commit();
341 }).then<void>([this, request](const KAsync::Error &error) { 358 }).then<void>([this, request](const KAsync::Error &error) {
359 setStatusFromResult(error, "Synchronization has ended.", request.requestId);
342 if (error) { 360 if (error) {
343 //Emit notification with error 361 //Emit notification with error
344 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; 362 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error;
345 if (error.errorCode == ApplicationDomain::ConnectionError) {
346 emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Synchronization has ended.", request.requestId);
347 } else {
348 emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Synchronization has ended.", request.requestId);
349 }
350 emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); 363 emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities);
351 return KAsync::error(error); 364 return KAsync::error(error);
352 } else { 365 } else {
353 SinkLogCtx(mLogCtx) << "Done Synchronizing"; 366 SinkLogCtx(mLogCtx) << "Done Synchronizing";
354 emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); 367 emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities);
355 emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "Synchronization has ended.", request.requestId);
356 return KAsync::null(); 368 return KAsync::null();
357 } 369 }
358 }); 370 });
@@ -377,21 +389,15 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
377 } else { 389 } else {
378 return KAsync::start([this, request] { 390 return KAsync::start([this, request] {
379 SinkLogCtx(mLogCtx) << "Replaying changes."; 391 SinkLogCtx(mLogCtx) << "Replaying changes.";
380 emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Changereplay has started.", "changereplay");
381 }) 392 })
382 .then(replayNextRevision()) 393 .then(replayNextRevision())
383 .then<void>([this, request](const KAsync::Error &error) { 394 .then<void>([this, request](const KAsync::Error &error) {
395 setStatusFromResult(error, "Changereplay has ended.", "changereplay");
384 if (error) { 396 if (error) {
385 SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; 397 SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage;
386 if (error.errorCode == ApplicationDomain::ConnectionError) {
387 emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Changereplay has ended.", "changereplay");
388 } else {
389 emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Changereplay has ended.", "changereplay");
390 }
391 return KAsync::error(error); 398 return KAsync::error(error);
392 } else { 399 } else {
393 SinkLogCtx(mLogCtx) << "Done replaying changes"; 400 SinkLogCtx(mLogCtx) << "Done replaying changes";
394 emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "All changes have been replayed.", "changereplay");
395 return KAsync::null(); 401 return KAsync::null();
396 } 402 }
397 }); 403 });
@@ -403,6 +409,34 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
403 409
404} 410}
405 411
412void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId)
413{
414 if (state != mCurrentState.top()) {
415 if (mCurrentState.top() == ApplicationDomain::BusyStatus) {
416 mCurrentState.pop();
417 }
418 mCurrentState.push(state);
419 emitNotification(Notification::Status, state, reason, requestId);
420 }
421}
422
423void Synchronizer::resetStatus(const QByteArray requestId)
424{
425 mCurrentState.pop();
426 emitNotification(Notification::Status, mCurrentState.top(), {}, requestId);
427}
428
429void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId)
430{
431 if (busy) {
432 setStatus(ApplicationDomain::BusyStatus, reason, requestId);
433 } else {
434 if (mCurrentState.top() == ApplicationDomain::BusyStatus) {
435 resetStatus(requestId);
436 }
437 }
438}
439
406KAsync::Job<void> Synchronizer::processSyncQueue() 440KAsync::Job<void> Synchronizer::processSyncQueue()
407{ 441{
408 if (mSyncRequestQueue.isEmpty()) { 442 if (mSyncRequestQueue.isEmpty()) {
@@ -421,14 +455,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
421 } 455 }
422 456
423 const auto request = mSyncRequestQueue.takeFirst(); 457 const auto request = mSyncRequestQueue.takeFirst();
424 return KAsync::start([this] { 458 return KAsync::start([=] {
425 mMessageQueue->startTransaction(); 459 mMessageQueue->startTransaction();
426 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); 460 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly);
427 mSyncInProgress = true; 461 mSyncInProgress = true;
462 if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
463 setBusy(true, "Synchronization has started.", request.requestId);
464 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
465 setBusy(true, "ChangeReplay has started.", "changereplay");
466 }
428 }) 467 })
429 .then(processRequest(request)) 468 .then(processRequest(request))
430 .then<void>([this, request](const KAsync::Error &error) { 469 .then<void>([this, request](const KAsync::Error &error) {
431 SinkTraceCtx(mLogCtx) << "Sync request processed"; 470 SinkTraceCtx(mLogCtx) << "Sync request processed";
471 setBusy(false, {}, request.requestId);
432 mEntityStore->abortTransaction(); 472 mEntityStore->abortTransaction();
433 mSyncTransaction.abort(); 473 mSyncTransaction.abort();
434 mMessageQueue->commit(); 474 mMessageQueue->commit();
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 32e93c4..b1ee122 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -21,6 +21,7 @@
21 21
22#include "sink_export.h" 22#include "sink_export.h"
23#include <QObject> 23#include <QObject>
24#include <QStack>
24#include <KAsync/Async> 25#include <KAsync/Async>
25#include <domainadaptor.h> 26#include <domainadaptor.h>
26#include <query.h> 27#include <query.h>
@@ -193,6 +194,12 @@ protected:
193 Sink::Log::Context mLogCtx; 194 Sink::Log::Context mLogCtx;
194 195
195private: 196private:
197 QStack<ApplicationDomain::Status> mCurrentState;
198 void setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId);
199 void setStatus(ApplicationDomain::Status busy, const QString &reason, const QByteArray requestId);
200 void resetStatus(const QByteArray requestId);
201 void setBusy(bool busy, const QString &reason, const QByteArray requestId);
202
196 void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); 203 void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
197 KAsync::Job<void> processRequest(const SyncRequest &request); 204 KAsync::Job<void> processRequest(const SyncRequest &request);
198 KAsync::Job<void> processSyncQueue(); 205 KAsync::Job<void> processSyncQueue();
diff --git a/tests/notificationtest.cpp b/tests/notificationtest.cpp
index 375bfc4..a34d325 100644
--- a/tests/notificationtest.cpp
+++ b/tests/notificationtest.cpp
@@ -71,8 +71,8 @@ private slots:
71 QTRY_COMPARE(statusNotifications.size(), 3); 71 QTRY_COMPARE(statusNotifications.size(), 3);
72 //Sync 72 //Sync
73 QCOMPARE(statusNotifications.at(0).code, static_cast<int>(ApplicationDomain::Status::ConnectedStatus)); 73 QCOMPARE(statusNotifications.at(0).code, static_cast<int>(ApplicationDomain::Status::ConnectedStatus));
74 QCOMPARE(statusNotifications.at(1).code, static_cast<int>(Sink::ApplicationDomain::Status::BusyStatus)); 74 QCOMPARE(statusNotifications.at(1).code, static_cast<int>(ApplicationDomain::Status::BusyStatus));
75 QCOMPARE(statusNotifications.at(2).code, static_cast<int>(Sink::ApplicationDomain::Status::ConnectedStatus)); 75 QCOMPARE(statusNotifications.at(2).code, static_cast<int>(ApplicationDomain::Status::ConnectedStatus));
76 //Changereplay 76 //Changereplay
77 // It can happen that we get a changereplay notification pair first and then a second one at the end, 77 // It can happen that we get a changereplay notification pair first and then a second one at the end,
78 // we therefore currently filter all changereplay notifications (see above). 78 // we therefore currently filter all changereplay notifications (see above).