summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp168
1 files changed, 127 insertions, 41 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index b147615..3e7bd30 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -33,13 +33,14 @@
33using namespace Sink; 33using namespace Sink;
34 34
35Synchronizer::Synchronizer(const Sink::ResourceContext &context) 35Synchronizer::Synchronizer(const Sink::ResourceContext &context)
36 : ChangeReplay(context), 36 : ChangeReplay(context, {"synchronizer"}),
37 mLogCtx{"synchronizer"}, 37 mLogCtx{"synchronizer"},
38 mResourceContext(context), 38 mResourceContext(context),
39 mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext, mLogCtx)), 39 mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext, mLogCtx)),
40 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), 40 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite),
41 mSyncInProgress(false) 41 mSyncInProgress(false)
42{ 42{
43 mCurrentState.push(ApplicationDomain::Status::OfflineStatus);
43 SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); 44 SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
44} 45}
45 46
@@ -252,15 +253,21 @@ void Synchronizer::modify(const DomainType &entity, const QByteArray &newResourc
252 253
253QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query) 254QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query)
254{ 255{
255 QList<Synchronizer::SyncRequest> list; 256 return QList<Synchronizer::SyncRequest>() << Synchronizer::SyncRequest{query, "sync"};
256 list << Synchronizer::SyncRequest{query, "sync"}; 257}
257 return list; 258
259void Synchronizer::mergeIntoQueue(const Synchronizer::SyncRequest &request, QList<Synchronizer::SyncRequest> &queue)
260{
261 mSyncRequestQueue << request;
258} 262}
259 263
260void Synchronizer::synchronize(const Sink::QueryBase &query) 264void Synchronizer::synchronize(const Sink::QueryBase &query)
261{ 265{
262 SinkTraceCtx(mLogCtx) << "Synchronizing"; 266 SinkTraceCtx(mLogCtx) << "Synchronizing";
263 mSyncRequestQueue << getSyncRequests(query); 267 auto newRequests = getSyncRequests(query);
268 for (const auto &request: newRequests) {
269 mergeIntoQueue(request, mSyncRequestQueue);
270 }
264 processSyncQueue().exec(); 271 processSyncQueue().exec();
265} 272}
266 273
@@ -286,6 +293,42 @@ void Synchronizer::flushComplete(const QByteArray &flushId)
286 } 293 }
287} 294}
288 295
296void Synchronizer::emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id, const QByteArrayList &entities)
297{
298 Sink::Notification n;
299 n.id = id;
300 n.type = type;
301 n.message = message;
302 n.code = code;
303 n.entities = entities;
304 emit notify(n);
305}
306
307void Synchronizer::reportProgress(int progress, int total)
308{
309 SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total;
310}
311
312void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId)
313{
314 if (error) {
315 if (error.errorCode == ApplicationDomain::ConnectionError) {
316 //Couldn't connect, so we assume we don't have a network connection.
317 setStatus(ApplicationDomain::OfflineStatus, s, requestId);
318 } else if (error.errorCode == ApplicationDomain::ConfigurationError) {
319 //There is an error with the configuration.
320 setStatus(ApplicationDomain::ErrorStatus, s, requestId);
321 } else if (error.errorCode == ApplicationDomain::LoginError) {
322 //If we failed to login altough we could connect that indicates a problem with our setup.
323 setStatus(ApplicationDomain::ErrorStatus, s, requestId);
324 }
325 //We don't know what kind of error this was, so we assume it's transient and don't change ou status.
326 } else {
327 //An operation against the server worked, so we're probably online.
328 setStatus(ApplicationDomain::ConnectedStatus, s, requestId);
329 }
330}
331
289KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) 332KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
290{ 333{
291 if (request.options & SyncRequest::RequestFlush) { 334 if (request.options & SyncRequest::RequestFlush) {
@@ -310,35 +353,21 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
310 }); 353 });
311 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 354 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
312 return KAsync::start([this, request] { 355 return KAsync::start([this, request] {
313 Sink::Notification n;
314 n.id = request.requestId;
315 n.type = Notification::Status;
316 n.message = "Synchronization has started.";
317 n.code = ApplicationDomain::BusyStatus;
318 emit notify(n);
319 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; 356 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query;
357 emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities);
320 }).then(synchronizeWithSource(request.query)).then([this] { 358 }).then(synchronizeWithSource(request.query)).then([this] {
321 //Commit after every request, so implementations only have to commit more if they add a lot of data. 359 //Commit after every request, so implementations only have to commit more if they add a lot of data.
322 commit(); 360 commit();
323 }).then<void>([this, request](const KAsync::Error &error) { 361 }).then<void>([this, request](const KAsync::Error &error) {
362 setStatusFromResult(error, "Synchronization has ended.", request.requestId);
324 if (error) { 363 if (error) {
325 //Emit notification with error 364 //Emit notification with error
326 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage; 365 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error;
327 Sink::Notification n; 366 emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities);
328 n.id = request.requestId;
329 n.type = Notification::Status;
330 n.message = "Synchronization has ended.";
331 n.code = ApplicationDomain::ErrorStatus;
332 emit notify(n);
333 return KAsync::error(error); 367 return KAsync::error(error);
334 } else { 368 } else {
335 SinkLogCtx(mLogCtx) << "Done Synchronizing"; 369 SinkLogCtx(mLogCtx) << "Done Synchronizing";
336 Sink::Notification n; 370 emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities);
337 n.id = request.requestId;
338 n.type = Notification::Status;
339 n.message = "Synchronization has ended.";
340 n.code = ApplicationDomain::ConnectedStatus;
341 emit notify(n);
342 return KAsync::null(); 371 return KAsync::null();
343 } 372 }
344 }); 373 });
@@ -347,11 +376,8 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
347 Q_ASSERT(!request.requestId.isEmpty()); 376 Q_ASSERT(!request.requestId.isEmpty());
348 //FIXME it looks like this is emitted before the replay actually finishes 377 //FIXME it looks like this is emitted before the replay actually finishes
349 if (request.flushType == Flush::FlushReplayQueue) { 378 if (request.flushType == Flush::FlushReplayQueue) {
350 SinkTraceCtx(mLogCtx) << "Emitting flush completion."; 379 SinkTraceCtx(mLogCtx) << "Emitting flush completion: " << request.requestId;
351 Sink::Notification n; 380 emitNotification(Notification::FlushCompletion, 0, "", request.requestId);
352 n.type = Sink::Notification::FlushCompletion;
353 n.id = request.requestId;
354 emit notify(n);
355 } else { 381 } else {
356 flatbuffers::FlatBufferBuilder fbb; 382 flatbuffers::FlatBufferBuilder fbb;
357 auto flushId = fbb.CreateString(request.requestId.toStdString()); 383 auto flushId = fbb.CreateString(request.requestId.toStdString());
@@ -361,7 +387,24 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
361 } 387 }
362 }); 388 });
363 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { 389 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
364 return replayNextRevision(); 390 if (ChangeReplay::allChangesReplayed()) {
391 return KAsync::null();
392 } else {
393 return KAsync::start([this, request] {
394 SinkLogCtx(mLogCtx) << "Replaying changes.";
395 })
396 .then(replayNextRevision())
397 .then<void>([this, request](const KAsync::Error &error) {
398 setStatusFromResult(error, "Changereplay has ended.", "changereplay");
399 if (error) {
400 SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage;
401 return KAsync::error(error);
402 } else {
403 SinkLogCtx(mLogCtx) << "Done replaying changes";
404 return KAsync::null();
405 }
406 });
407 }
365 } else { 408 } else {
366 SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType; 409 SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType;
367 return KAsync::error(KAsync::Error{"Unknown request type."}); 410 return KAsync::error(KAsync::Error{"Unknown request type."});
@@ -369,6 +412,34 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
369 412
370} 413}
371 414
415void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId)
416{
417 if (state != mCurrentState.top()) {
418 if (mCurrentState.top() == ApplicationDomain::BusyStatus) {
419 mCurrentState.pop();
420 }
421 mCurrentState.push(state);
422 emitNotification(Notification::Status, state, reason, requestId);
423 }
424}
425
426void Synchronizer::resetStatus(const QByteArray requestId)
427{
428 mCurrentState.pop();
429 emitNotification(Notification::Status, mCurrentState.top(), {}, requestId);
430}
431
432void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId)
433{
434 if (busy) {
435 setStatus(ApplicationDomain::BusyStatus, reason, requestId);
436 } else {
437 if (mCurrentState.top() == ApplicationDomain::BusyStatus) {
438 resetStatus(requestId);
439 }
440 }
441}
442
372KAsync::Job<void> Synchronizer::processSyncQueue() 443KAsync::Job<void> Synchronizer::processSyncQueue()
373{ 444{
374 if (mSyncRequestQueue.isEmpty()) { 445 if (mSyncRequestQueue.isEmpty()) {
@@ -387,14 +458,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
387 } 458 }
388 459
389 const auto request = mSyncRequestQueue.takeFirst(); 460 const auto request = mSyncRequestQueue.takeFirst();
390 return KAsync::start([this] { 461 return KAsync::start([=] {
391 mMessageQueue->startTransaction(); 462 mMessageQueue->startTransaction();
392 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); 463 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly);
393 mSyncInProgress = true; 464 mSyncInProgress = true;
465 if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
466 setBusy(true, "Synchronization has started.", request.requestId);
467 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
468 setBusy(true, "ChangeReplay has started.", "changereplay");
469 }
394 }) 470 })
395 .then(processRequest(request)) 471 .then(processRequest(request))
396 .then<void>([this](const KAsync::Error &error) { 472 .then<void>([this, request](const KAsync::Error &error) {
397 SinkTraceCtx(mLogCtx) << "Sync request processed"; 473 SinkTraceCtx(mLogCtx) << "Sync request processed";
474 setBusy(false, {}, request.requestId);
398 mEntityStore->abortTransaction(); 475 mEntityStore->abortTransaction();
399 mSyncTransaction.abort(); 476 mSyncTransaction.abort();
400 mMessageQueue->commit(); 477 mMessageQueue->commit();
@@ -404,8 +481,8 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
404 emit changesReplayed(); 481 emit changesReplayed();
405 } 482 }
406 if (error) { 483 if (error) {
407 SinkWarningCtx(mLogCtx) << "Error during sync: " << error.errorMessage; 484 SinkWarningCtx(mLogCtx) << "Error during sync: " << error;
408 return KAsync::error(error); 485 emitNotification(Notification::Error, error.errorCode, error.errorMessage, request.requestId);
409 } 486 }
410 //In case we got more requests meanwhile. 487 //In case we got more requests meanwhile.
411 return processSyncQueue(); 488 return processSyncQueue();
@@ -499,6 +576,12 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
499 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Mail>()) { 576 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Mail>()) {
500 auto mail = store().readEntity<ApplicationDomain::Mail>(key); 577 auto mail = store().readEntity<ApplicationDomain::Mail>(key);
501 job = replay(mail, operation, oldRemoteId, modifiedProperties); 578 job = replay(mail, operation, oldRemoteId, modifiedProperties);
579 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Contact>()) {
580 auto mail = store().readEntity<ApplicationDomain::Contact>(key);
581 job = replay(mail, operation, oldRemoteId, modifiedProperties);
582 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Addressbook>()) {
583 auto mail = store().readEntity<ApplicationDomain::Addressbook>(key);
584 job = replay(mail, operation, oldRemoteId, modifiedProperties);
502 } else { 585 } else {
503 SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type; 586 SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type;
504 } 587 }
@@ -506,21 +589,19 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
506 return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { 589 return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) {
507 if (operation == Sink::Operation_Creation) { 590 if (operation == Sink::Operation_Creation) {
508 SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId; 591 SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId;
509 if (remoteId.isEmpty()) { 592 if (!remoteId.isEmpty()) {
510 SinkWarningCtx(mLogCtx) << "Returned an empty remoteId from the creation";
511 } else {
512 syncStore().recordRemoteId(type, uid, remoteId); 593 syncStore().recordRemoteId(type, uid, remoteId);
513 } 594 }
514 } else if (operation == Sink::Operation_Modification) { 595 } else if (operation == Sink::Operation_Modification) {
515 SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId; 596 SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId;
516 if (remoteId.isEmpty()) { 597 if (!remoteId.isEmpty()) {
517 SinkWarningCtx(mLogCtx) << "Returned an empty remoteId from the modification";
518 } else {
519 syncStore().updateRemoteId(type, uid, remoteId); 598 syncStore().updateRemoteId(type, uid, remoteId);
520 } 599 }
521 } else if (operation == Sink::Operation_Removal) { 600 } else if (operation == Sink::Operation_Removal) {
522 SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId; 601 SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId;
523 syncStore().removeRemoteId(type, uid, oldRemoteId); 602 if (!oldRemoteId.isEmpty()) {
603 syncStore().removeRemoteId(type, uid, oldRemoteId);
604 }
524 } else { 605 } else {
525 SinkErrorCtx(mLogCtx) << "Unkown operation" << operation; 606 SinkErrorCtx(mLogCtx) << "Unkown operation" << operation;
526 } 607 }
@@ -539,6 +620,11 @@ KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Contact &,
539 return KAsync::null<QByteArray>(); 620 return KAsync::null<QByteArray>();
540} 621}
541 622
623KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &, const QList<QByteArray> &)
624{
625 return KAsync::null<QByteArray>();
626}
627
542KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList<QByteArray> &) 628KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList<QByteArray> &)
543{ 629{
544 return KAsync::null<QByteArray>(); 630 return KAsync::null<QByteArray>();