summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp47
1 files changed, 36 insertions, 11 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 3e7bd30..3b32e68 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -304,9 +304,29 @@ void Synchronizer::emitNotification(Notification::NoticationType type, int code,
304 emit notify(n); 304 emit notify(n);
305} 305}
306 306
307void Synchronizer::reportProgress(int progress, int total) 307void Synchronizer::emitProgressNotification(Notification::NoticationType type, int progress, int total, const QByteArray &id, const QByteArrayList &entities)
308{ 308{
309 SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; 309 Sink::Notification n;
310 n.id = id;
311 n.type = type;
312 n.progress = progress;
313 n.total = total;
314 n.entities = entities;
315 emit notify(n);
316}
317
318void Synchronizer::reportProgress(int progress, int total, const QByteArrayList &entities)
319{
320 if (progress > 0 && total > 0) {
321 SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total << mCurrentRequest.requestId << mCurrentRequest.applicableEntities;
322 const auto applicableEntities = [&] {
323 if (entities.isEmpty()) {
324 return mCurrentRequest.applicableEntities;
325 }
326 return entities;
327 }();
328 emitProgressNotification(Notification::Progress, progress, total, mCurrentRequest.requestId, applicableEntities);
329 }
310} 330}
311 331
312void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId) 332void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId)
@@ -315,6 +335,9 @@ void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString
315 if (error.errorCode == ApplicationDomain::ConnectionError) { 335 if (error.errorCode == ApplicationDomain::ConnectionError) {
316 //Couldn't connect, so we assume we don't have a network connection. 336 //Couldn't connect, so we assume we don't have a network connection.
317 setStatus(ApplicationDomain::OfflineStatus, s, requestId); 337 setStatus(ApplicationDomain::OfflineStatus, s, requestId);
338 } else if (error.errorCode == ApplicationDomain::NoServerError) {
339 //Failed to contact the server.
340 setStatus(ApplicationDomain::OfflineStatus, s, requestId);
318 } else if (error.errorCode == ApplicationDomain::ConfigurationError) { 341 } else if (error.errorCode == ApplicationDomain::ConfigurationError) {
319 //There is an error with the configuration. 342 //There is an error with the configuration.
320 setStatus(ApplicationDomain::ErrorStatus, s, requestId); 343 setStatus(ApplicationDomain::ErrorStatus, s, requestId);
@@ -354,6 +377,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
354 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 377 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
355 return KAsync::start([this, request] { 378 return KAsync::start([this, request] {
356 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; 379 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query;
380 setBusy(true, "Synchronization has started.", request.requestId);
357 emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); 381 emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities);
358 }).then(synchronizeWithSource(request.query)).then([this] { 382 }).then(synchronizeWithSource(request.query)).then([this] {
359 //Commit after every request, so implementations only have to commit more if they add a lot of data. 383 //Commit after every request, so implementations only have to commit more if they add a lot of data.
@@ -391,11 +415,12 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
391 return KAsync::null(); 415 return KAsync::null();
392 } else { 416 } else {
393 return KAsync::start([this, request] { 417 return KAsync::start([this, request] {
418 setBusy(true, "ChangeReplay has started.", request.requestId);
394 SinkLogCtx(mLogCtx) << "Replaying changes."; 419 SinkLogCtx(mLogCtx) << "Replaying changes.";
395 }) 420 })
396 .then(replayNextRevision()) 421 .then(replayNextRevision())
397 .then<void>([this, request](const KAsync::Error &error) { 422 .then<void>([this, request](const KAsync::Error &error) {
398 setStatusFromResult(error, "Changereplay has ended.", "changereplay"); 423 setStatusFromResult(error, "Changereplay has ended.", request.requestId);
399 if (error) { 424 if (error) {
400 SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; 425 SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage;
401 return KAsync::error(error); 426 return KAsync::error(error);
@@ -462,16 +487,13 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
462 mMessageQueue->startTransaction(); 487 mMessageQueue->startTransaction();
463 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); 488 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly);
464 mSyncInProgress = true; 489 mSyncInProgress = true;
465 if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 490 mCurrentRequest = request;
466 setBusy(true, "Synchronization has started.", request.requestId);
467 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
468 setBusy(true, "ChangeReplay has started.", "changereplay");
469 }
470 }) 491 })
471 .then(processRequest(request)) 492 .then(processRequest(request))
472 .then<void>([this, request](const KAsync::Error &error) { 493 .then<void>([this, request](const KAsync::Error &error) {
473 SinkTraceCtx(mLogCtx) << "Sync request processed"; 494 SinkTraceCtx(mLogCtx) << "Sync request processed";
474 setBusy(false, {}, request.requestId); 495 setBusy(false, {}, request.requestId);
496 mCurrentRequest = {};
475 mEntityStore->abortTransaction(); 497 mEntityStore->abortTransaction();
476 mSyncTransaction.abort(); 498 mSyncTransaction.abort();
477 mMessageQueue->commit(); 499 mMessageQueue->commit();
@@ -516,7 +538,7 @@ void Synchronizer::revisionChanged()
516 return; 538 return;
517 } 539 }
518 } 540 }
519 mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay}; 541 mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay, "changereplay"};
520 processSyncQueue().exec(); 542 processSyncQueue().exec();
521} 543}
522 544
@@ -607,11 +629,14 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
607 } 629 }
608 }) 630 })
609 .then([this](const KAsync::Error &error) { 631 .then([this](const KAsync::Error &error) {
632 //We need to commit here otherwise the next change-replay step will abort the transaction
633 mSyncStore.clear();
634 mSyncTransaction.commit();
610 if (error) { 635 if (error) {
611 SinkWarningCtx(mLogCtx) << "Failed to replay change: " << error.errorMessage; 636 SinkWarningCtx(mLogCtx) << "Failed to replay change: " << error.errorMessage;
637 return KAsync::error(error);
612 } 638 }
613 mSyncStore.clear(); 639 return KAsync::null();
614 mSyncTransaction.commit();
615 }); 640 });
616} 641}
617 642