diff options
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 168 |
1 files changed, 127 insertions, 41 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index b147615..3e7bd30 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -33,13 +33,14 @@ | |||
33 | using namespace Sink; | 33 | using namespace Sink; |
34 | 34 | ||
35 | Synchronizer::Synchronizer(const Sink::ResourceContext &context) | 35 | Synchronizer::Synchronizer(const Sink::ResourceContext &context) |
36 | : ChangeReplay(context), | 36 | : ChangeReplay(context, {"synchronizer"}), |
37 | mLogCtx{"synchronizer"}, | 37 | mLogCtx{"synchronizer"}, |
38 | mResourceContext(context), | 38 | mResourceContext(context), |
39 | mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext, mLogCtx)), | 39 | mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext, mLogCtx)), |
40 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), | 40 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), |
41 | mSyncInProgress(false) | 41 | mSyncInProgress(false) |
42 | { | 42 | { |
43 | mCurrentState.push(ApplicationDomain::Status::OfflineStatus); | ||
43 | SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); | 44 | SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); |
44 | } | 45 | } |
45 | 46 | ||
@@ -252,15 +253,21 @@ void Synchronizer::modify(const DomainType &entity, const QByteArray &newResourc | |||
252 | 253 | ||
253 | QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query) | 254 | QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query) |
254 | { | 255 | { |
255 | QList<Synchronizer::SyncRequest> list; | 256 | return QList<Synchronizer::SyncRequest>() << Synchronizer::SyncRequest{query, "sync"}; |
256 | list << Synchronizer::SyncRequest{query, "sync"}; | 257 | } |
257 | return list; | 258 | |
259 | void Synchronizer::mergeIntoQueue(const Synchronizer::SyncRequest &request, QList<Synchronizer::SyncRequest> &queue) | ||
260 | { | ||
261 | mSyncRequestQueue << request; | ||
258 | } | 262 | } |
259 | 263 | ||
260 | void Synchronizer::synchronize(const Sink::QueryBase &query) | 264 | void Synchronizer::synchronize(const Sink::QueryBase &query) |
261 | { | 265 | { |
262 | SinkTraceCtx(mLogCtx) << "Synchronizing"; | 266 | SinkTraceCtx(mLogCtx) << "Synchronizing"; |
263 | mSyncRequestQueue << getSyncRequests(query); | 267 | auto newRequests = getSyncRequests(query); |
268 | for (const auto &request: newRequests) { | ||
269 | mergeIntoQueue(request, mSyncRequestQueue); | ||
270 | } | ||
264 | processSyncQueue().exec(); | 271 | processSyncQueue().exec(); |
265 | } | 272 | } |
266 | 273 | ||
@@ -286,6 +293,42 @@ void Synchronizer::flushComplete(const QByteArray &flushId) | |||
286 | } | 293 | } |
287 | } | 294 | } |
288 | 295 | ||
296 | void Synchronizer::emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id, const QByteArrayList &entities) | ||
297 | { | ||
298 | Sink::Notification n; | ||
299 | n.id = id; | ||
300 | n.type = type; | ||
301 | n.message = message; | ||
302 | n.code = code; | ||
303 | n.entities = entities; | ||
304 | emit notify(n); | ||
305 | } | ||
306 | |||
307 | void Synchronizer::reportProgress(int progress, int total) | ||
308 | { | ||
309 | SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total; | ||
310 | } | ||
311 | |||
312 | void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId) | ||
313 | { | ||
314 | if (error) { | ||
315 | if (error.errorCode == ApplicationDomain::ConnectionError) { | ||
316 | //Couldn't connect, so we assume we don't have a network connection. | ||
317 | setStatus(ApplicationDomain::OfflineStatus, s, requestId); | ||
318 | } else if (error.errorCode == ApplicationDomain::ConfigurationError) { | ||
319 | //There is an error with the configuration. | ||
320 | setStatus(ApplicationDomain::ErrorStatus, s, requestId); | ||
321 | } else if (error.errorCode == ApplicationDomain::LoginError) { | ||
322 | //If we failed to login altough we could connect that indicates a problem with our setup. | ||
323 | setStatus(ApplicationDomain::ErrorStatus, s, requestId); | ||
324 | } | ||
325 | //We don't know what kind of error this was, so we assume it's transient and don't change ou status. | ||
326 | } else { | ||
327 | //An operation against the server worked, so we're probably online. | ||
328 | setStatus(ApplicationDomain::ConnectedStatus, s, requestId); | ||
329 | } | ||
330 | } | ||
331 | |||
289 | KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | 332 | KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) |
290 | { | 333 | { |
291 | if (request.options & SyncRequest::RequestFlush) { | 334 | if (request.options & SyncRequest::RequestFlush) { |
@@ -310,35 +353,21 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
310 | }); | 353 | }); |
311 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | 354 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { |
312 | return KAsync::start([this, request] { | 355 | return KAsync::start([this, request] { |
313 | Sink::Notification n; | ||
314 | n.id = request.requestId; | ||
315 | n.type = Notification::Status; | ||
316 | n.message = "Synchronization has started."; | ||
317 | n.code = ApplicationDomain::BusyStatus; | ||
318 | emit notify(n); | ||
319 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; | 356 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; |
357 | emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); | ||
320 | }).then(synchronizeWithSource(request.query)).then([this] { | 358 | }).then(synchronizeWithSource(request.query)).then([this] { |
321 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 359 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
322 | commit(); | 360 | commit(); |
323 | }).then<void>([this, request](const KAsync::Error &error) { | 361 | }).then<void>([this, request](const KAsync::Error &error) { |
362 | setStatusFromResult(error, "Synchronization has ended.", request.requestId); | ||
324 | if (error) { | 363 | if (error) { |
325 | //Emit notification with error | 364 | //Emit notification with error |
326 | SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage; | 365 | SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; |
327 | Sink::Notification n; | 366 | emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); |
328 | n.id = request.requestId; | ||
329 | n.type = Notification::Status; | ||
330 | n.message = "Synchronization has ended."; | ||
331 | n.code = ApplicationDomain::ErrorStatus; | ||
332 | emit notify(n); | ||
333 | return KAsync::error(error); | 367 | return KAsync::error(error); |
334 | } else { | 368 | } else { |
335 | SinkLogCtx(mLogCtx) << "Done Synchronizing"; | 369 | SinkLogCtx(mLogCtx) << "Done Synchronizing"; |
336 | Sink::Notification n; | 370 | emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); |
337 | n.id = request.requestId; | ||
338 | n.type = Notification::Status; | ||
339 | n.message = "Synchronization has ended."; | ||
340 | n.code = ApplicationDomain::ConnectedStatus; | ||
341 | emit notify(n); | ||
342 | return KAsync::null(); | 371 | return KAsync::null(); |
343 | } | 372 | } |
344 | }); | 373 | }); |
@@ -347,11 +376,8 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
347 | Q_ASSERT(!request.requestId.isEmpty()); | 376 | Q_ASSERT(!request.requestId.isEmpty()); |
348 | //FIXME it looks like this is emitted before the replay actually finishes | 377 | //FIXME it looks like this is emitted before the replay actually finishes |
349 | if (request.flushType == Flush::FlushReplayQueue) { | 378 | if (request.flushType == Flush::FlushReplayQueue) { |
350 | SinkTraceCtx(mLogCtx) << "Emitting flush completion."; | 379 | SinkTraceCtx(mLogCtx) << "Emitting flush completion: " << request.requestId; |
351 | Sink::Notification n; | 380 | emitNotification(Notification::FlushCompletion, 0, "", request.requestId); |
352 | n.type = Sink::Notification::FlushCompletion; | ||
353 | n.id = request.requestId; | ||
354 | emit notify(n); | ||
355 | } else { | 381 | } else { |
356 | flatbuffers::FlatBufferBuilder fbb; | 382 | flatbuffers::FlatBufferBuilder fbb; |
357 | auto flushId = fbb.CreateString(request.requestId.toStdString()); | 383 | auto flushId = fbb.CreateString(request.requestId.toStdString()); |
@@ -361,7 +387,24 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
361 | } | 387 | } |
362 | }); | 388 | }); |
363 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { | 389 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { |
364 | return replayNextRevision(); | 390 | if (ChangeReplay::allChangesReplayed()) { |
391 | return KAsync::null(); | ||
392 | } else { | ||
393 | return KAsync::start([this, request] { | ||
394 | SinkLogCtx(mLogCtx) << "Replaying changes."; | ||
395 | }) | ||
396 | .then(replayNextRevision()) | ||
397 | .then<void>([this, request](const KAsync::Error &error) { | ||
398 | setStatusFromResult(error, "Changereplay has ended.", "changereplay"); | ||
399 | if (error) { | ||
400 | SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; | ||
401 | return KAsync::error(error); | ||
402 | } else { | ||
403 | SinkLogCtx(mLogCtx) << "Done replaying changes"; | ||
404 | return KAsync::null(); | ||
405 | } | ||
406 | }); | ||
407 | } | ||
365 | } else { | 408 | } else { |
366 | SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType; | 409 | SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType; |
367 | return KAsync::error(KAsync::Error{"Unknown request type."}); | 410 | return KAsync::error(KAsync::Error{"Unknown request type."}); |
@@ -369,6 +412,34 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
369 | 412 | ||
370 | } | 413 | } |
371 | 414 | ||
415 | void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId) | ||
416 | { | ||
417 | if (state != mCurrentState.top()) { | ||
418 | if (mCurrentState.top() == ApplicationDomain::BusyStatus) { | ||
419 | mCurrentState.pop(); | ||
420 | } | ||
421 | mCurrentState.push(state); | ||
422 | emitNotification(Notification::Status, state, reason, requestId); | ||
423 | } | ||
424 | } | ||
425 | |||
426 | void Synchronizer::resetStatus(const QByteArray requestId) | ||
427 | { | ||
428 | mCurrentState.pop(); | ||
429 | emitNotification(Notification::Status, mCurrentState.top(), {}, requestId); | ||
430 | } | ||
431 | |||
432 | void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId) | ||
433 | { | ||
434 | if (busy) { | ||
435 | setStatus(ApplicationDomain::BusyStatus, reason, requestId); | ||
436 | } else { | ||
437 | if (mCurrentState.top() == ApplicationDomain::BusyStatus) { | ||
438 | resetStatus(requestId); | ||
439 | } | ||
440 | } | ||
441 | } | ||
442 | |||
372 | KAsync::Job<void> Synchronizer::processSyncQueue() | 443 | KAsync::Job<void> Synchronizer::processSyncQueue() |
373 | { | 444 | { |
374 | if (mSyncRequestQueue.isEmpty()) { | 445 | if (mSyncRequestQueue.isEmpty()) { |
@@ -387,14 +458,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
387 | } | 458 | } |
388 | 459 | ||
389 | const auto request = mSyncRequestQueue.takeFirst(); | 460 | const auto request = mSyncRequestQueue.takeFirst(); |
390 | return KAsync::start([this] { | 461 | return KAsync::start([=] { |
391 | mMessageQueue->startTransaction(); | 462 | mMessageQueue->startTransaction(); |
392 | mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); | 463 | mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); |
393 | mSyncInProgress = true; | 464 | mSyncInProgress = true; |
465 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | ||
466 | setBusy(true, "Synchronization has started.", request.requestId); | ||
467 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { | ||
468 | setBusy(true, "ChangeReplay has started.", "changereplay"); | ||
469 | } | ||
394 | }) | 470 | }) |
395 | .then(processRequest(request)) | 471 | .then(processRequest(request)) |
396 | .then<void>([this](const KAsync::Error &error) { | 472 | .then<void>([this, request](const KAsync::Error &error) { |
397 | SinkTraceCtx(mLogCtx) << "Sync request processed"; | 473 | SinkTraceCtx(mLogCtx) << "Sync request processed"; |
474 | setBusy(false, {}, request.requestId); | ||
398 | mEntityStore->abortTransaction(); | 475 | mEntityStore->abortTransaction(); |
399 | mSyncTransaction.abort(); | 476 | mSyncTransaction.abort(); |
400 | mMessageQueue->commit(); | 477 | mMessageQueue->commit(); |
@@ -404,8 +481,8 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
404 | emit changesReplayed(); | 481 | emit changesReplayed(); |
405 | } | 482 | } |
406 | if (error) { | 483 | if (error) { |
407 | SinkWarningCtx(mLogCtx) << "Error during sync: " << error.errorMessage; | 484 | SinkWarningCtx(mLogCtx) << "Error during sync: " << error; |
408 | return KAsync::error(error); | 485 | emitNotification(Notification::Error, error.errorCode, error.errorMessage, request.requestId); |
409 | } | 486 | } |
410 | //In case we got more requests meanwhile. | 487 | //In case we got more requests meanwhile. |
411 | return processSyncQueue(); | 488 | return processSyncQueue(); |
@@ -499,6 +576,12 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray | |||
499 | } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Mail>()) { | 576 | } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Mail>()) { |
500 | auto mail = store().readEntity<ApplicationDomain::Mail>(key); | 577 | auto mail = store().readEntity<ApplicationDomain::Mail>(key); |
501 | job = replay(mail, operation, oldRemoteId, modifiedProperties); | 578 | job = replay(mail, operation, oldRemoteId, modifiedProperties); |
579 | } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Contact>()) { | ||
580 | auto mail = store().readEntity<ApplicationDomain::Contact>(key); | ||
581 | job = replay(mail, operation, oldRemoteId, modifiedProperties); | ||
582 | } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Addressbook>()) { | ||
583 | auto mail = store().readEntity<ApplicationDomain::Addressbook>(key); | ||
584 | job = replay(mail, operation, oldRemoteId, modifiedProperties); | ||
502 | } else { | 585 | } else { |
503 | SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type; | 586 | SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type; |
504 | } | 587 | } |
@@ -506,21 +589,19 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray | |||
506 | return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { | 589 | return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { |
507 | if (operation == Sink::Operation_Creation) { | 590 | if (operation == Sink::Operation_Creation) { |
508 | SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId; | 591 | SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId; |
509 | if (remoteId.isEmpty()) { | 592 | if (!remoteId.isEmpty()) { |
510 | SinkWarningCtx(mLogCtx) << "Returned an empty remoteId from the creation"; | ||
511 | } else { | ||
512 | syncStore().recordRemoteId(type, uid, remoteId); | 593 | syncStore().recordRemoteId(type, uid, remoteId); |
513 | } | 594 | } |
514 | } else if (operation == Sink::Operation_Modification) { | 595 | } else if (operation == Sink::Operation_Modification) { |
515 | SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId; | 596 | SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId; |
516 | if (remoteId.isEmpty()) { | 597 | if (!remoteId.isEmpty()) { |
517 | SinkWarningCtx(mLogCtx) << "Returned an empty remoteId from the modification"; | ||
518 | } else { | ||
519 | syncStore().updateRemoteId(type, uid, remoteId); | 598 | syncStore().updateRemoteId(type, uid, remoteId); |
520 | } | 599 | } |
521 | } else if (operation == Sink::Operation_Removal) { | 600 | } else if (operation == Sink::Operation_Removal) { |
522 | SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId; | 601 | SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId; |
523 | syncStore().removeRemoteId(type, uid, oldRemoteId); | 602 | if (!oldRemoteId.isEmpty()) { |
603 | syncStore().removeRemoteId(type, uid, oldRemoteId); | ||
604 | } | ||
524 | } else { | 605 | } else { |
525 | SinkErrorCtx(mLogCtx) << "Unkown operation" << operation; | 606 | SinkErrorCtx(mLogCtx) << "Unkown operation" << operation; |
526 | } | 607 | } |
@@ -539,6 +620,11 @@ KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Contact &, | |||
539 | return KAsync::null<QByteArray>(); | 620 | return KAsync::null<QByteArray>(); |
540 | } | 621 | } |
541 | 622 | ||
623 | KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &, const QList<QByteArray> &) | ||
624 | { | ||
625 | return KAsync::null<QByteArray>(); | ||
626 | } | ||
627 | |||
542 | KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList<QByteArray> &) | 628 | KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList<QByteArray> &) |
543 | { | 629 | { |
544 | return KAsync::null<QByteArray>(); | 630 | return KAsync::null<QByteArray>(); |