diff options
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 47 |
1 files changed, 36 insertions, 11 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 3e7bd30..3b32e68 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -304,9 +304,29 @@ void Synchronizer::emitNotification(Notification::NoticationType type, int code, | |||
304 | emit notify(n); | 304 | emit notify(n); |
305 | } | 305 | } |
306 | 306 | ||
307 | void Synchronizer::reportProgress(int progress, int total) | 307 | void Synchronizer::emitProgressNotification(Notification::NoticationType type, int progress, int total, const QByteArray &id, const QByteArrayList &entities) |
308 | { | 308 | { |
309 | SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; | 309 | Sink::Notification n; |
310 | n.id = id; | ||
311 | n.type = type; | ||
312 | n.progress = progress; | ||
313 | n.total = total; | ||
314 | n.entities = entities; | ||
315 | emit notify(n); | ||
316 | } | ||
317 | |||
318 | void Synchronizer::reportProgress(int progress, int total, const QByteArrayList &entities) | ||
319 | { | ||
320 | if (progress > 0 && total > 0) { | ||
321 | SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total << mCurrentRequest.requestId << mCurrentRequest.applicableEntities; | ||
322 | const auto applicableEntities = [&] { | ||
323 | if (entities.isEmpty()) { | ||
324 | return mCurrentRequest.applicableEntities; | ||
325 | } | ||
326 | return entities; | ||
327 | }(); | ||
328 | emitProgressNotification(Notification::Progress, progress, total, mCurrentRequest.requestId, applicableEntities); | ||
329 | } | ||
310 | } | 330 | } |
311 | 331 | ||
312 | void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId) | 332 | void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId) |
@@ -315,6 +335,9 @@ void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString | |||
315 | if (error.errorCode == ApplicationDomain::ConnectionError) { | 335 | if (error.errorCode == ApplicationDomain::ConnectionError) { |
316 | //Couldn't connect, so we assume we don't have a network connection. | 336 | //Couldn't connect, so we assume we don't have a network connection. |
317 | setStatus(ApplicationDomain::OfflineStatus, s, requestId); | 337 | setStatus(ApplicationDomain::OfflineStatus, s, requestId); |
338 | } else if (error.errorCode == ApplicationDomain::NoServerError) { | ||
339 | //Failed to contact the server. | ||
340 | setStatus(ApplicationDomain::OfflineStatus, s, requestId); | ||
318 | } else if (error.errorCode == ApplicationDomain::ConfigurationError) { | 341 | } else if (error.errorCode == ApplicationDomain::ConfigurationError) { |
319 | //There is an error with the configuration. | 342 | //There is an error with the configuration. |
320 | setStatus(ApplicationDomain::ErrorStatus, s, requestId); | 343 | setStatus(ApplicationDomain::ErrorStatus, s, requestId); |
@@ -354,6 +377,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
354 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | 377 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { |
355 | return KAsync::start([this, request] { | 378 | return KAsync::start([this, request] { |
356 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; | 379 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; |
380 | setBusy(true, "Synchronization has started.", request.requestId); | ||
357 | emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); | 381 | emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); |
358 | }).then(synchronizeWithSource(request.query)).then([this] { | 382 | }).then(synchronizeWithSource(request.query)).then([this] { |
359 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 383 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
@@ -391,11 +415,12 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
391 | return KAsync::null(); | 415 | return KAsync::null(); |
392 | } else { | 416 | } else { |
393 | return KAsync::start([this, request] { | 417 | return KAsync::start([this, request] { |
418 | setBusy(true, "ChangeReplay has started.", request.requestId); | ||
394 | SinkLogCtx(mLogCtx) << "Replaying changes."; | 419 | SinkLogCtx(mLogCtx) << "Replaying changes."; |
395 | }) | 420 | }) |
396 | .then(replayNextRevision()) | 421 | .then(replayNextRevision()) |
397 | .then<void>([this, request](const KAsync::Error &error) { | 422 | .then<void>([this, request](const KAsync::Error &error) { |
398 | setStatusFromResult(error, "Changereplay has ended.", "changereplay"); | 423 | setStatusFromResult(error, "Changereplay has ended.", request.requestId); |
399 | if (error) { | 424 | if (error) { |
400 | SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; | 425 | SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; |
401 | return KAsync::error(error); | 426 | return KAsync::error(error); |
@@ -462,16 +487,13 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
462 | mMessageQueue->startTransaction(); | 487 | mMessageQueue->startTransaction(); |
463 | mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); | 488 | mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); |
464 | mSyncInProgress = true; | 489 | mSyncInProgress = true; |
465 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | 490 | mCurrentRequest = request; |
466 | setBusy(true, "Synchronization has started.", request.requestId); | ||
467 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { | ||
468 | setBusy(true, "ChangeReplay has started.", "changereplay"); | ||
469 | } | ||
470 | }) | 491 | }) |
471 | .then(processRequest(request)) | 492 | .then(processRequest(request)) |
472 | .then<void>([this, request](const KAsync::Error &error) { | 493 | .then<void>([this, request](const KAsync::Error &error) { |
473 | SinkTraceCtx(mLogCtx) << "Sync request processed"; | 494 | SinkTraceCtx(mLogCtx) << "Sync request processed"; |
474 | setBusy(false, {}, request.requestId); | 495 | setBusy(false, {}, request.requestId); |
496 | mCurrentRequest = {}; | ||
475 | mEntityStore->abortTransaction(); | 497 | mEntityStore->abortTransaction(); |
476 | mSyncTransaction.abort(); | 498 | mSyncTransaction.abort(); |
477 | mMessageQueue->commit(); | 499 | mMessageQueue->commit(); |
@@ -516,7 +538,7 @@ void Synchronizer::revisionChanged() | |||
516 | return; | 538 | return; |
517 | } | 539 | } |
518 | } | 540 | } |
519 | mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay}; | 541 | mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay, "changereplay"}; |
520 | processSyncQueue().exec(); | 542 | processSyncQueue().exec(); |
521 | } | 543 | } |
522 | 544 | ||
@@ -607,11 +629,14 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray | |||
607 | } | 629 | } |
608 | }) | 630 | }) |
609 | .then([this](const KAsync::Error &error) { | 631 | .then([this](const KAsync::Error &error) { |
632 | //We need to commit here otherwise the next change-replay step will abort the transaction | ||
633 | mSyncStore.clear(); | ||
634 | mSyncTransaction.commit(); | ||
610 | if (error) { | 635 | if (error) { |
611 | SinkWarningCtx(mLogCtx) << "Failed to replay change: " << error.errorMessage; | 636 | SinkWarningCtx(mLogCtx) << "Failed to replay change: " << error.errorMessage; |
637 | return KAsync::error(error); | ||
612 | } | 638 | } |
613 | mSyncStore.clear(); | 639 | return KAsync::null(); |
614 | mSyncTransaction.commit(); | ||
615 | }); | 640 | }); |
616 | } | 641 | } |
617 | 642 | ||