summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp70
1 files changed, 55 insertions, 15 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 284f867..9451488 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -40,6 +40,7 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context)
40 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), 40 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite),
41 mSyncInProgress(false) 41 mSyncInProgress(false)
42{ 42{
43 mCurrentState.push(ApplicationDomain::Status::OfflineStatus);
43 SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); 44 SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
44} 45}
45 46
@@ -308,6 +309,23 @@ void Synchronizer::reportProgress(int progress, int total)
308 SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; 309 SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total;
309} 310}
310 311
312void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId)
313{
314 if (error) {
315 if (error.errorCode == ApplicationDomain::ConnectionError) {
316 //Couldn't connect, so we assume we don't have a network connection.
317 setStatus(ApplicationDomain::OfflineStatus, s, requestId);
318 } else if (error.errorCode == ApplicationDomain::LoginError) {
319 //If we failed to login altough we could connect that indicates a problem with our setup.
320 setStatus(ApplicationDomain::ErrorStatus, s, requestId);
321 }
322 //We don't know what kind of error this was, so we assume it's transient and don't change ou status.
323 } else {
324 //An operation against the server worked, so we're probably online.
325 setStatus(ApplicationDomain::ConnectedStatus, s, requestId);
326 }
327}
328
311KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) 329KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
312{ 330{
313 if (request.options & SyncRequest::RequestFlush) { 331 if (request.options & SyncRequest::RequestFlush) {
@@ -333,26 +351,20 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
333 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 351 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
334 return KAsync::start([this, request] { 352 return KAsync::start([this, request] {
335 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; 353 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query;
336 emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Synchronization has started.", request.requestId);
337 emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); 354 emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities);
338 }).then(synchronizeWithSource(request.query)).then([this] { 355 }).then(synchronizeWithSource(request.query)).then([this] {
339 //Commit after every request, so implementations only have to commit more if they add a lot of data. 356 //Commit after every request, so implementations only have to commit more if they add a lot of data.
340 commit(); 357 commit();
341 }).then<void>([this, request](const KAsync::Error &error) { 358 }).then<void>([this, request](const KAsync::Error &error) {
359 setStatusFromResult(error, "Synchronization has ended.", request.requestId);
342 if (error) { 360 if (error) {
343 //Emit notification with error 361 //Emit notification with error
344 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; 362 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error;
345 if (error.errorCode == ApplicationDomain::ConnectionError) {
346 emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Synchronization has ended.", request.requestId);
347 } else {
348 emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Synchronization has ended.", request.requestId);
349 }
350 emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); 363 emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities);
351 return KAsync::error(error); 364 return KAsync::error(error);
352 } else { 365 } else {
353 SinkLogCtx(mLogCtx) << "Done Synchronizing"; 366 SinkLogCtx(mLogCtx) << "Done Synchronizing";
354 emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); 367 emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities);
355 emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "Synchronization has ended.", request.requestId);
356 return KAsync::null(); 368 return KAsync::null();
357 } 369 }
358 }); 370 });
@@ -377,21 +389,15 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
377 } else { 389 } else {
378 return KAsync::start([this, request] { 390 return KAsync::start([this, request] {
379 SinkLogCtx(mLogCtx) << "Replaying changes."; 391 SinkLogCtx(mLogCtx) << "Replaying changes.";
380 emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Changereplay has started.", "changereplay");
381 }) 392 })
382 .then(replayNextRevision()) 393 .then(replayNextRevision())
383 .then<void>([this, request](const KAsync::Error &error) { 394 .then<void>([this, request](const KAsync::Error &error) {
395 setStatusFromResult(error, "Changereplay has ended.", "changereplay");
384 if (error) { 396 if (error) {
385 SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; 397 SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage;
386 if (error.errorCode == ApplicationDomain::ConnectionError) {
387 emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Changereplay has ended.", "changereplay");
388 } else {
389 emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Changereplay has ended.", "changereplay");
390 }
391 return KAsync::error(error); 398 return KAsync::error(error);
392 } else { 399 } else {
393 SinkLogCtx(mLogCtx) << "Done replaying changes"; 400 SinkLogCtx(mLogCtx) << "Done replaying changes";
394 emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "All changes have been replayed.", "changereplay");
395 return KAsync::null(); 401 return KAsync::null();
396 } 402 }
397 }); 403 });
@@ -403,6 +409,34 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
403 409
404} 410}
405 411
412void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId)
413{
414 if (state != mCurrentState.top()) {
415 if (mCurrentState.top() == ApplicationDomain::BusyStatus) {
416 mCurrentState.pop();
417 }
418 mCurrentState.push(state);
419 emitNotification(Notification::Status, state, reason, requestId);
420 }
421}
422
423void Synchronizer::resetStatus(const QByteArray requestId)
424{
425 mCurrentState.pop();
426 emitNotification(Notification::Status, mCurrentState.top(), {}, requestId);
427}
428
429void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId)
430{
431 if (busy) {
432 setStatus(ApplicationDomain::BusyStatus, reason, requestId);
433 } else {
434 if (mCurrentState.top() == ApplicationDomain::BusyStatus) {
435 resetStatus(requestId);
436 }
437 }
438}
439
406KAsync::Job<void> Synchronizer::processSyncQueue() 440KAsync::Job<void> Synchronizer::processSyncQueue()
407{ 441{
408 if (mSyncRequestQueue.isEmpty()) { 442 if (mSyncRequestQueue.isEmpty()) {
@@ -421,14 +455,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
421 } 455 }
422 456
423 const auto request = mSyncRequestQueue.takeFirst(); 457 const auto request = mSyncRequestQueue.takeFirst();
424 return KAsync::start([this] { 458 return KAsync::start([=] {
425 mMessageQueue->startTransaction(); 459 mMessageQueue->startTransaction();
426 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); 460 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly);
427 mSyncInProgress = true; 461 mSyncInProgress = true;
462 if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
463 setBusy(true, "Synchronization has started.", request.requestId);
464 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
465 setBusy(true, "ChangeReplay has started.", "changereplay");
466 }
428 }) 467 })
429 .then(processRequest(request)) 468 .then(processRequest(request))
430 .then<void>([this, request](const KAsync::Error &error) { 469 .then<void>([this, request](const KAsync::Error &error) {
431 SinkTraceCtx(mLogCtx) << "Sync request processed"; 470 SinkTraceCtx(mLogCtx) << "Sync request processed";
471 setBusy(false, {}, request.requestId);
432 mEntityStore->abortTransaction(); 472 mEntityStore->abortTransaction();
433 mSyncTransaction.abort(); 473 mSyncTransaction.abort();
434 mMessageQueue->commit(); 474 mMessageQueue->commit();