diff options
-rw-r--r-- | common/resourcefacade.cpp | 35 | ||||
-rw-r--r-- | common/synchronizer.cpp | 70 | ||||
-rw-r--r-- | common/synchronizer.h | 7 | ||||
-rw-r--r-- | tests/notificationtest.cpp | 4 |
4 files changed, 78 insertions, 38 deletions
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index c702777..dee0711 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp | |||
@@ -351,7 +351,7 @@ QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename ApplicationDomain | |||
351 | const auto resources = Store::read<ApplicationDomain::SinkResource>(query); | 351 | const auto resources = Store::read<ApplicationDomain::SinkResource>(query); |
352 | SinkTraceCtx(ctx) << "Found resource belonging to the account " << account.identifier() << " : " << resources; | 352 | SinkTraceCtx(ctx) << "Found resource belonging to the account " << account.identifier() << " : " << resources; |
353 | auto accountIdentifier = account.identifier(); | 353 | auto accountIdentifier = account.identifier(); |
354 | ApplicationDomain::Status status = ApplicationDomain::ConnectedStatus; | 354 | QList<int> states; |
355 | for (const auto &resource : resources) { | 355 | for (const auto &resource : resources) { |
356 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource.identifier(), ResourceConfig::getResourceType(resource.identifier())); | 356 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource.identifier(), ResourceConfig::getResourceType(resource.identifier())); |
357 | if (!monitoredResources->contains(resource.identifier())) { | 357 | if (!monitoredResources->contains(resource.identifier())) { |
@@ -364,27 +364,20 @@ QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename ApplicationDomain | |||
364 | Q_ASSERT(ret); | 364 | Q_ASSERT(ret); |
365 | monitoredResources->insert(resource.identifier()); | 365 | monitoredResources->insert(resource.identifier()); |
366 | } | 366 | } |
367 | 367 | states << resourceAccess->getResourceStatus(); | |
368 | //Figure out overall status | ||
369 | auto s = resourceAccess->getResourceStatus(); | ||
370 | switch (s) { | ||
371 | case ApplicationDomain::ErrorStatus: | ||
372 | status = ApplicationDomain::ErrorStatus; | ||
373 | break; | ||
374 | case ApplicationDomain::OfflineStatus: | ||
375 | if (status == ApplicationDomain::ConnectedStatus) { | ||
376 | status = ApplicationDomain::OfflineStatus; | ||
377 | } | ||
378 | break; | ||
379 | case ApplicationDomain::ConnectedStatus: | ||
380 | break; | ||
381 | case ApplicationDomain::BusyStatus: | ||
382 | if (status != ApplicationDomain::ErrorStatus) { | ||
383 | status = ApplicationDomain::BusyStatus; | ||
384 | } | ||
385 | break; | ||
386 | } | ||
387 | } | 368 | } |
369 | const auto status = [&] { | ||
370 | if (states.contains(ApplicationDomain::ErrorStatus)) { | ||
371 | return ApplicationDomain::ErrorStatus; | ||
372 | } | ||
373 | if (states.contains(ApplicationDomain::BusyStatus)) { | ||
374 | return ApplicationDomain::BusyStatus; | ||
375 | } | ||
376 | if (states.contains(ApplicationDomain::ConnectedStatus)) { | ||
377 | return ApplicationDomain::ConnectedStatus; | ||
378 | } | ||
379 | return ApplicationDomain::OfflineStatus; | ||
380 | }(); | ||
388 | account.setStatusStatus(status); | 381 | account.setStatusStatus(status); |
389 | }); | 382 | }); |
390 | return qMakePair(KAsync::null<void>(), runner->emitter()); | 383 | return qMakePair(KAsync::null<void>(), runner->emitter()); |
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 284f867..9451488 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -40,6 +40,7 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context) | |||
40 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), | 40 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), |
41 | mSyncInProgress(false) | 41 | mSyncInProgress(false) |
42 | { | 42 | { |
43 | mCurrentState.push(ApplicationDomain::Status::OfflineStatus); | ||
43 | SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); | 44 | SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); |
44 | } | 45 | } |
45 | 46 | ||
@@ -308,6 +309,23 @@ void Synchronizer::reportProgress(int progress, int total) | |||
308 | SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; | 309 | SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; |
309 | } | 310 | } |
310 | 311 | ||
312 | void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId) | ||
313 | { | ||
314 | if (error) { | ||
315 | if (error.errorCode == ApplicationDomain::ConnectionError) { | ||
316 | //Couldn't connect, so we assume we don't have a network connection. | ||
317 | setStatus(ApplicationDomain::OfflineStatus, s, requestId); | ||
318 | } else if (error.errorCode == ApplicationDomain::LoginError) { | ||
319 | //If we failed to login altough we could connect that indicates a problem with our setup. | ||
320 | setStatus(ApplicationDomain::ErrorStatus, s, requestId); | ||
321 | } | ||
322 | //We don't know what kind of error this was, so we assume it's transient and don't change ou status. | ||
323 | } else { | ||
324 | //An operation against the server worked, so we're probably online. | ||
325 | setStatus(ApplicationDomain::ConnectedStatus, s, requestId); | ||
326 | } | ||
327 | } | ||
328 | |||
311 | KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | 329 | KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) |
312 | { | 330 | { |
313 | if (request.options & SyncRequest::RequestFlush) { | 331 | if (request.options & SyncRequest::RequestFlush) { |
@@ -333,26 +351,20 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
333 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | 351 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { |
334 | return KAsync::start([this, request] { | 352 | return KAsync::start([this, request] { |
335 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; | 353 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; |
336 | emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Synchronization has started.", request.requestId); | ||
337 | emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); | 354 | emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); |
338 | }).then(synchronizeWithSource(request.query)).then([this] { | 355 | }).then(synchronizeWithSource(request.query)).then([this] { |
339 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 356 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
340 | commit(); | 357 | commit(); |
341 | }).then<void>([this, request](const KAsync::Error &error) { | 358 | }).then<void>([this, request](const KAsync::Error &error) { |
359 | setStatusFromResult(error, "Synchronization has ended.", request.requestId); | ||
342 | if (error) { | 360 | if (error) { |
343 | //Emit notification with error | 361 | //Emit notification with error |
344 | SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; | 362 | SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; |
345 | if (error.errorCode == ApplicationDomain::ConnectionError) { | ||
346 | emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Synchronization has ended.", request.requestId); | ||
347 | } else { | ||
348 | emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Synchronization has ended.", request.requestId); | ||
349 | } | ||
350 | emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); | 363 | emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); |
351 | return KAsync::error(error); | 364 | return KAsync::error(error); |
352 | } else { | 365 | } else { |
353 | SinkLogCtx(mLogCtx) << "Done Synchronizing"; | 366 | SinkLogCtx(mLogCtx) << "Done Synchronizing"; |
354 | emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); | 367 | emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); |
355 | emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "Synchronization has ended.", request.requestId); | ||
356 | return KAsync::null(); | 368 | return KAsync::null(); |
357 | } | 369 | } |
358 | }); | 370 | }); |
@@ -377,21 +389,15 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
377 | } else { | 389 | } else { |
378 | return KAsync::start([this, request] { | 390 | return KAsync::start([this, request] { |
379 | SinkLogCtx(mLogCtx) << "Replaying changes."; | 391 | SinkLogCtx(mLogCtx) << "Replaying changes."; |
380 | emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Changereplay has started.", "changereplay"); | ||
381 | }) | 392 | }) |
382 | .then(replayNextRevision()) | 393 | .then(replayNextRevision()) |
383 | .then<void>([this, request](const KAsync::Error &error) { | 394 | .then<void>([this, request](const KAsync::Error &error) { |
395 | setStatusFromResult(error, "Changereplay has ended.", "changereplay"); | ||
384 | if (error) { | 396 | if (error) { |
385 | SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; | 397 | SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; |
386 | if (error.errorCode == ApplicationDomain::ConnectionError) { | ||
387 | emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Changereplay has ended.", "changereplay"); | ||
388 | } else { | ||
389 | emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Changereplay has ended.", "changereplay"); | ||
390 | } | ||
391 | return KAsync::error(error); | 398 | return KAsync::error(error); |
392 | } else { | 399 | } else { |
393 | SinkLogCtx(mLogCtx) << "Done replaying changes"; | 400 | SinkLogCtx(mLogCtx) << "Done replaying changes"; |
394 | emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "All changes have been replayed.", "changereplay"); | ||
395 | return KAsync::null(); | 401 | return KAsync::null(); |
396 | } | 402 | } |
397 | }); | 403 | }); |
@@ -403,6 +409,34 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
403 | 409 | ||
404 | } | 410 | } |
405 | 411 | ||
412 | void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId) | ||
413 | { | ||
414 | if (state != mCurrentState.top()) { | ||
415 | if (mCurrentState.top() == ApplicationDomain::BusyStatus) { | ||
416 | mCurrentState.pop(); | ||
417 | } | ||
418 | mCurrentState.push(state); | ||
419 | emitNotification(Notification::Status, state, reason, requestId); | ||
420 | } | ||
421 | } | ||
422 | |||
423 | void Synchronizer::resetStatus(const QByteArray requestId) | ||
424 | { | ||
425 | mCurrentState.pop(); | ||
426 | emitNotification(Notification::Status, mCurrentState.top(), {}, requestId); | ||
427 | } | ||
428 | |||
429 | void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId) | ||
430 | { | ||
431 | if (busy) { | ||
432 | setStatus(ApplicationDomain::BusyStatus, reason, requestId); | ||
433 | } else { | ||
434 | if (mCurrentState.top() == ApplicationDomain::BusyStatus) { | ||
435 | resetStatus(requestId); | ||
436 | } | ||
437 | } | ||
438 | } | ||
439 | |||
406 | KAsync::Job<void> Synchronizer::processSyncQueue() | 440 | KAsync::Job<void> Synchronizer::processSyncQueue() |
407 | { | 441 | { |
408 | if (mSyncRequestQueue.isEmpty()) { | 442 | if (mSyncRequestQueue.isEmpty()) { |
@@ -421,14 +455,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
421 | } | 455 | } |
422 | 456 | ||
423 | const auto request = mSyncRequestQueue.takeFirst(); | 457 | const auto request = mSyncRequestQueue.takeFirst(); |
424 | return KAsync::start([this] { | 458 | return KAsync::start([=] { |
425 | mMessageQueue->startTransaction(); | 459 | mMessageQueue->startTransaction(); |
426 | mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); | 460 | mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); |
427 | mSyncInProgress = true; | 461 | mSyncInProgress = true; |
462 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | ||
463 | setBusy(true, "Synchronization has started.", request.requestId); | ||
464 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { | ||
465 | setBusy(true, "ChangeReplay has started.", "changereplay"); | ||
466 | } | ||
428 | }) | 467 | }) |
429 | .then(processRequest(request)) | 468 | .then(processRequest(request)) |
430 | .then<void>([this, request](const KAsync::Error &error) { | 469 | .then<void>([this, request](const KAsync::Error &error) { |
431 | SinkTraceCtx(mLogCtx) << "Sync request processed"; | 470 | SinkTraceCtx(mLogCtx) << "Sync request processed"; |
471 | setBusy(false, {}, request.requestId); | ||
432 | mEntityStore->abortTransaction(); | 472 | mEntityStore->abortTransaction(); |
433 | mSyncTransaction.abort(); | 473 | mSyncTransaction.abort(); |
434 | mMessageQueue->commit(); | 474 | mMessageQueue->commit(); |
diff --git a/common/synchronizer.h b/common/synchronizer.h index 32e93c4..b1ee122 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h | |||
@@ -21,6 +21,7 @@ | |||
21 | 21 | ||
22 | #include "sink_export.h" | 22 | #include "sink_export.h" |
23 | #include <QObject> | 23 | #include <QObject> |
24 | #include <QStack> | ||
24 | #include <KAsync/Async> | 25 | #include <KAsync/Async> |
25 | #include <domainadaptor.h> | 26 | #include <domainadaptor.h> |
26 | #include <query.h> | 27 | #include <query.h> |
@@ -193,6 +194,12 @@ protected: | |||
193 | Sink::Log::Context mLogCtx; | 194 | Sink::Log::Context mLogCtx; |
194 | 195 | ||
195 | private: | 196 | private: |
197 | QStack<ApplicationDomain::Status> mCurrentState; | ||
198 | void setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId); | ||
199 | void setStatus(ApplicationDomain::Status busy, const QString &reason, const QByteArray requestId); | ||
200 | void resetStatus(const QByteArray requestId); | ||
201 | void setBusy(bool busy, const QString &reason, const QByteArray requestId); | ||
202 | |||
196 | void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); | 203 | void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); |
197 | KAsync::Job<void> processRequest(const SyncRequest &request); | 204 | KAsync::Job<void> processRequest(const SyncRequest &request); |
198 | KAsync::Job<void> processSyncQueue(); | 205 | KAsync::Job<void> processSyncQueue(); |
diff --git a/tests/notificationtest.cpp b/tests/notificationtest.cpp index 375bfc4..a34d325 100644 --- a/tests/notificationtest.cpp +++ b/tests/notificationtest.cpp | |||
@@ -71,8 +71,8 @@ private slots: | |||
71 | QTRY_COMPARE(statusNotifications.size(), 3); | 71 | QTRY_COMPARE(statusNotifications.size(), 3); |
72 | //Sync | 72 | //Sync |
73 | QCOMPARE(statusNotifications.at(0).code, static_cast<int>(ApplicationDomain::Status::ConnectedStatus)); | 73 | QCOMPARE(statusNotifications.at(0).code, static_cast<int>(ApplicationDomain::Status::ConnectedStatus)); |
74 | QCOMPARE(statusNotifications.at(1).code, static_cast<int>(Sink::ApplicationDomain::Status::BusyStatus)); | 74 | QCOMPARE(statusNotifications.at(1).code, static_cast<int>(ApplicationDomain::Status::BusyStatus)); |
75 | QCOMPARE(statusNotifications.at(2).code, static_cast<int>(Sink::ApplicationDomain::Status::ConnectedStatus)); | 75 | QCOMPARE(statusNotifications.at(2).code, static_cast<int>(ApplicationDomain::Status::ConnectedStatus)); |
76 | //Changereplay | 76 | //Changereplay |
77 | // It can happen that we get a changereplay notification pair first and then a second one at the end, | 77 | // It can happen that we get a changereplay notification pair first and then a second one at the end, |
78 | // we therefore currently filter all changereplay notifications (see above). | 78 | // we therefore currently filter all changereplay notifications (see above). |