diff options
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 70 |
1 files changed, 55 insertions, 15 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 284f867..9451488 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -40,6 +40,7 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context) | |||
40 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), | 40 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), |
41 | mSyncInProgress(false) | 41 | mSyncInProgress(false) |
42 | { | 42 | { |
43 | mCurrentState.push(ApplicationDomain::Status::OfflineStatus); | ||
43 | SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); | 44 | SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); |
44 | } | 45 | } |
45 | 46 | ||
@@ -308,6 +309,23 @@ void Synchronizer::reportProgress(int progress, int total) | |||
308 | SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; | 309 | SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; |
309 | } | 310 | } |
310 | 311 | ||
312 | void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId) | ||
313 | { | ||
314 | if (error) { | ||
315 | if (error.errorCode == ApplicationDomain::ConnectionError) { | ||
316 | //Couldn't connect, so we assume we don't have a network connection. | ||
317 | setStatus(ApplicationDomain::OfflineStatus, s, requestId); | ||
318 | } else if (error.errorCode == ApplicationDomain::LoginError) { | ||
319 | //If we failed to login altough we could connect that indicates a problem with our setup. | ||
320 | setStatus(ApplicationDomain::ErrorStatus, s, requestId); | ||
321 | } | ||
322 | //We don't know what kind of error this was, so we assume it's transient and don't change ou status. | ||
323 | } else { | ||
324 | //An operation against the server worked, so we're probably online. | ||
325 | setStatus(ApplicationDomain::ConnectedStatus, s, requestId); | ||
326 | } | ||
327 | } | ||
328 | |||
311 | KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | 329 | KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) |
312 | { | 330 | { |
313 | if (request.options & SyncRequest::RequestFlush) { | 331 | if (request.options & SyncRequest::RequestFlush) { |
@@ -333,26 +351,20 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
333 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | 351 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { |
334 | return KAsync::start([this, request] { | 352 | return KAsync::start([this, request] { |
335 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; | 353 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; |
336 | emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Synchronization has started.", request.requestId); | ||
337 | emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); | 354 | emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); |
338 | }).then(synchronizeWithSource(request.query)).then([this] { | 355 | }).then(synchronizeWithSource(request.query)).then([this] { |
339 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 356 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
340 | commit(); | 357 | commit(); |
341 | }).then<void>([this, request](const KAsync::Error &error) { | 358 | }).then<void>([this, request](const KAsync::Error &error) { |
359 | setStatusFromResult(error, "Synchronization has ended.", request.requestId); | ||
342 | if (error) { | 360 | if (error) { |
343 | //Emit notification with error | 361 | //Emit notification with error |
344 | SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; | 362 | SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; |
345 | if (error.errorCode == ApplicationDomain::ConnectionError) { | ||
346 | emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Synchronization has ended.", request.requestId); | ||
347 | } else { | ||
348 | emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Synchronization has ended.", request.requestId); | ||
349 | } | ||
350 | emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); | 363 | emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); |
351 | return KAsync::error(error); | 364 | return KAsync::error(error); |
352 | } else { | 365 | } else { |
353 | SinkLogCtx(mLogCtx) << "Done Synchronizing"; | 366 | SinkLogCtx(mLogCtx) << "Done Synchronizing"; |
354 | emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); | 367 | emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); |
355 | emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "Synchronization has ended.", request.requestId); | ||
356 | return KAsync::null(); | 368 | return KAsync::null(); |
357 | } | 369 | } |
358 | }); | 370 | }); |
@@ -377,21 +389,15 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
377 | } else { | 389 | } else { |
378 | return KAsync::start([this, request] { | 390 | return KAsync::start([this, request] { |
379 | SinkLogCtx(mLogCtx) << "Replaying changes."; | 391 | SinkLogCtx(mLogCtx) << "Replaying changes."; |
380 | emitNotification(Notification::Status, ApplicationDomain::BusyStatus, "Changereplay has started.", "changereplay"); | ||
381 | }) | 392 | }) |
382 | .then(replayNextRevision()) | 393 | .then(replayNextRevision()) |
383 | .then<void>([this, request](const KAsync::Error &error) { | 394 | .then<void>([this, request](const KAsync::Error &error) { |
395 | setStatusFromResult(error, "Changereplay has ended.", "changereplay"); | ||
384 | if (error) { | 396 | if (error) { |
385 | SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; | 397 | SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; |
386 | if (error.errorCode == ApplicationDomain::ConnectionError) { | ||
387 | emitNotification(Notification::Status, ApplicationDomain::OfflineStatus, "Changereplay has ended.", "changereplay"); | ||
388 | } else { | ||
389 | emitNotification(Notification::Status, ApplicationDomain::ErrorStatus, "Changereplay has ended.", "changereplay"); | ||
390 | } | ||
391 | return KAsync::error(error); | 398 | return KAsync::error(error); |
392 | } else { | 399 | } else { |
393 | SinkLogCtx(mLogCtx) << "Done replaying changes"; | 400 | SinkLogCtx(mLogCtx) << "Done replaying changes"; |
394 | emitNotification(Notification::Status, ApplicationDomain::ConnectedStatus, "All changes have been replayed.", "changereplay"); | ||
395 | return KAsync::null(); | 401 | return KAsync::null(); |
396 | } | 402 | } |
397 | }); | 403 | }); |
@@ -403,6 +409,34 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
403 | 409 | ||
404 | } | 410 | } |
405 | 411 | ||
412 | void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId) | ||
413 | { | ||
414 | if (state != mCurrentState.top()) { | ||
415 | if (mCurrentState.top() == ApplicationDomain::BusyStatus) { | ||
416 | mCurrentState.pop(); | ||
417 | } | ||
418 | mCurrentState.push(state); | ||
419 | emitNotification(Notification::Status, state, reason, requestId); | ||
420 | } | ||
421 | } | ||
422 | |||
423 | void Synchronizer::resetStatus(const QByteArray requestId) | ||
424 | { | ||
425 | mCurrentState.pop(); | ||
426 | emitNotification(Notification::Status, mCurrentState.top(), {}, requestId); | ||
427 | } | ||
428 | |||
429 | void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId) | ||
430 | { | ||
431 | if (busy) { | ||
432 | setStatus(ApplicationDomain::BusyStatus, reason, requestId); | ||
433 | } else { | ||
434 | if (mCurrentState.top() == ApplicationDomain::BusyStatus) { | ||
435 | resetStatus(requestId); | ||
436 | } | ||
437 | } | ||
438 | } | ||
439 | |||
406 | KAsync::Job<void> Synchronizer::processSyncQueue() | 440 | KAsync::Job<void> Synchronizer::processSyncQueue() |
407 | { | 441 | { |
408 | if (mSyncRequestQueue.isEmpty()) { | 442 | if (mSyncRequestQueue.isEmpty()) { |
@@ -421,14 +455,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
421 | } | 455 | } |
422 | 456 | ||
423 | const auto request = mSyncRequestQueue.takeFirst(); | 457 | const auto request = mSyncRequestQueue.takeFirst(); |
424 | return KAsync::start([this] { | 458 | return KAsync::start([=] { |
425 | mMessageQueue->startTransaction(); | 459 | mMessageQueue->startTransaction(); |
426 | mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); | 460 | mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); |
427 | mSyncInProgress = true; | 461 | mSyncInProgress = true; |
462 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | ||
463 | setBusy(true, "Synchronization has started.", request.requestId); | ||
464 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { | ||
465 | setBusy(true, "ChangeReplay has started.", "changereplay"); | ||
466 | } | ||
428 | }) | 467 | }) |
429 | .then(processRequest(request)) | 468 | .then(processRequest(request)) |
430 | .then<void>([this, request](const KAsync::Error &error) { | 469 | .then<void>([this, request](const KAsync::Error &error) { |
431 | SinkTraceCtx(mLogCtx) << "Sync request processed"; | 470 | SinkTraceCtx(mLogCtx) << "Sync request processed"; |
471 | setBusy(false, {}, request.requestId); | ||
432 | mEntityStore->abortTransaction(); | 472 | mEntityStore->abortTransaction(); |
433 | mSyncTransaction.abort(); | 473 | mSyncTransaction.abort(); |
434 | mMessageQueue->commit(); | 474 | mMessageQueue->commit(); |