diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-05-28 00:24:53 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-05-28 00:24:53 +0200 |
commit | e9c75177590d8546ebd9425f16c4269a9c92f517 (patch) | |
tree | 8a953631e467d9df50657e22bd90954b7b71c990 /common/genericresource.cpp | |
parent | 8f01eb530262d1442fc4fa0782a41e052412d43b (diff) | |
download | sink-e9c75177590d8546ebd9425f16c4269a9c92f517.tar.gz sink-e9c75177590d8546ebd9425f16c4269a9c92f517.zip |
Refactored the generic resource to use separate classes for
changereplay and synchronization.
This cleans up the API and avoids the excessive passing around of
transactions. It also provides more flexibility in eventually using
different synchronization strategies for different resources.
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 560 |
1 files changed, 303 insertions, 257 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index eae6ead..cb2ef21 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -1,3 +1,22 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
1 | #include "genericresource.h" | 20 | #include "genericresource.h" |
2 | 21 | ||
3 | #include "entitybuffer.h" | 22 | #include "entitybuffer.h" |
@@ -14,6 +33,7 @@ | |||
14 | #include "log.h" | 33 | #include "log.h" |
15 | #include "definitions.h" | 34 | #include "definitions.h" |
16 | #include "bufferutils.h" | 35 | #include "bufferutils.h" |
36 | #include "adaptorfactoryregistry.h" | ||
17 | 37 | ||
18 | #include <QUuid> | 38 | #include <QUuid> |
19 | #include <QDataStream> | 39 | #include <QDataStream> |
@@ -30,96 +50,6 @@ static int sCommitInterval = 10; | |||
30 | using namespace Sink; | 50 | using namespace Sink; |
31 | 51 | ||
32 | #undef DEBUG_AREA | 52 | #undef DEBUG_AREA |
33 | #define DEBUG_AREA "resource.changereplay" | ||
34 | |||
35 | /** | ||
36 | * Replays changes from the storage one by one. | ||
37 | * | ||
38 | * Uses a local database to: | ||
39 | * * Remember what changes have been replayed already. | ||
40 | * * store a mapping of remote to local buffers | ||
41 | */ | ||
42 | class ChangeReplay : public QObject | ||
43 | { | ||
44 | Q_OBJECT | ||
45 | public: | ||
46 | typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; | ||
47 | |||
48 | ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction) | ||
49 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayFunction(replayFunction) | ||
50 | { | ||
51 | } | ||
52 | |||
53 | qint64 getLastReplayedRevision() | ||
54 | { | ||
55 | qint64 lastReplayedRevision = 0; | ||
56 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); | ||
57 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
58 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
59 | lastReplayedRevision = value.toLongLong(); | ||
60 | return false; | ||
61 | }, | ||
62 | [](const Storage::Error &) {}); | ||
63 | return lastReplayedRevision; | ||
64 | } | ||
65 | |||
66 | bool allChangesReplayed() | ||
67 | { | ||
68 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); | ||
69 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | ||
70 | Trace() << "All changes replayed " << topRevision << lastReplayedRevision; | ||
71 | return (lastReplayedRevision >= topRevision); | ||
72 | } | ||
73 | |||
74 | signals: | ||
75 | void changesReplayed(); | ||
76 | |||
77 | public slots: | ||
78 | void revisionChanged() | ||
79 | { | ||
80 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); | ||
81 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); | ||
82 | qint64 lastReplayedRevision = 1; | ||
83 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
84 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
85 | lastReplayedRevision = value.toLongLong(); | ||
86 | return false; | ||
87 | }, | ||
88 | [](const Storage::Error &) {}); | ||
89 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | ||
90 | |||
91 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | ||
92 | if (lastReplayedRevision <= topRevision) { | ||
93 | qint64 revision = lastReplayedRevision; | ||
94 | for (; revision <= topRevision; revision++) { | ||
95 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | ||
96 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | ||
97 | const auto key = Storage::assembleKey(uid, revision); | ||
98 | Storage::mainDatabase(mainStoreTransaction, type) | ||
99 | .scan(key, | ||
100 | [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | ||
101 | mReplayFunction(type, key, value).exec(); | ||
102 | // TODO make for loop async, and pass to async replay function together with type | ||
103 | Trace() << "Replaying " << key; | ||
104 | return false; | ||
105 | }, | ||
106 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); | ||
107 | } | ||
108 | revision--; | ||
109 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | ||
110 | replayStoreTransaction.commit(); | ||
111 | Trace() << "Replayed until " << revision; | ||
112 | } | ||
113 | emit changesReplayed(); | ||
114 | } | ||
115 | |||
116 | private: | ||
117 | Sink::Storage mStorage; | ||
118 | Sink::Storage mChangeReplayStore; | ||
119 | ReplayFunction mReplayFunction; | ||
120 | }; | ||
121 | |||
122 | #undef DEBUG_AREA | ||
123 | #define DEBUG_AREA "resource.commandprocessor" | 53 | #define DEBUG_AREA "resource.commandprocessor" |
124 | 54 | ||
125 | /** | 55 | /** |
@@ -133,10 +63,9 @@ class CommandProcessor : public QObject | |||
133 | public: | 63 | public: |
134 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) | 64 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) |
135 | { | 65 | { |
136 | mPipeline->startTransaction(); | 66 | mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
137 | // FIXME Should be initialized to the current value of the change replay queue | 67 | Warning() << error.message; |
138 | mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); | 68 | })); |
139 | mPipeline->commit(); | ||
140 | 69 | ||
141 | for (auto queue : mCommandQueues) { | 70 | for (auto queue : mCommandQueues) { |
142 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); | 71 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); |
@@ -303,15 +232,22 @@ private: | |||
303 | #undef DEBUG_AREA | 232 | #undef DEBUG_AREA |
304 | #define DEBUG_AREA "resource" | 233 | #define DEBUG_AREA "resource" |
305 | 234 | ||
306 | GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) | 235 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer) |
307 | : Sink::Resource(), | 236 | : Sink::Resource(), |
308 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), | 237 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), |
309 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), | 238 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), |
239 | mResourceType(resourceType), | ||
310 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 240 | mResourceInstanceIdentifier(resourceInstanceIdentifier), |
311 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), | 241 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), |
242 | mChangeReplay(changeReplay), | ||
243 | mSynchronizer(synchronizer), | ||
312 | mError(0), | 244 | mError(0), |
313 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 245 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
314 | { | 246 | { |
247 | mPipeline->setResourceType(mResourceType); | ||
248 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | ||
249 | enqueueCommand(mSynchronizerQueue, commandId, data); | ||
250 | }); | ||
315 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); | 251 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); |
316 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | 252 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { |
317 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 253 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
@@ -353,14 +289,9 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
353 | }); | 289 | }); |
354 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 290 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
355 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 291 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
356 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { | ||
357 | // This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) | ||
358 | auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); | ||
359 | return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore]() {}); | ||
360 | }); | ||
361 | enableChangeReplay(true); | 292 | enableChangeReplay(true); |
362 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | 293 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); |
363 | mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); | 294 | mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); |
364 | 295 | ||
365 | mCommitQueueTimer.setInterval(sCommitInterval); | 296 | mCommitQueueTimer.setInterval(sCommitInterval); |
366 | mCommitQueueTimer.setSingleShot(true); | 297 | mCommitQueueTimer.setSingleShot(true); |
@@ -370,7 +301,6 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
370 | GenericResource::~GenericResource() | 301 | GenericResource::~GenericResource() |
371 | { | 302 | { |
372 | delete mProcessor; | 303 | delete mProcessor; |
373 | delete mSourceChangeReplay; | ||
374 | } | 304 | } |
375 | 305 | ||
376 | KAsync::Job<void> GenericResource::inspect( | 306 | KAsync::Job<void> GenericResource::inspect( |
@@ -383,86 +313,20 @@ KAsync::Job<void> GenericResource::inspect( | |||
383 | void GenericResource::enableChangeReplay(bool enable) | 313 | void GenericResource::enableChangeReplay(bool enable) |
384 | { | 314 | { |
385 | if (enable) { | 315 | if (enable) { |
386 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged, Qt::QueuedConnection); | 316 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); |
387 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 317 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
388 | mSourceChangeReplay->revisionChanged(); | 318 | mChangeReplay->revisionChanged(); |
389 | } else { | 319 | } else { |
390 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); | 320 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged); |
391 | QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 321 | QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
392 | } | 322 | } |
393 | } | 323 | } |
394 | 324 | ||
395 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) | 325 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) |
396 | { | 326 | { |
397 | mPipeline->setPreprocessors(type, preprocessors); | 327 | mPipeline->setPreprocessors(type, preprocessors); |
398 | mPipeline->setAdaptorFactory(type, factory); | ||
399 | mAdaptorFactories.insert(type, factory); | ||
400 | } | ||
401 | |||
402 | KAsync::Job<void> GenericResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
403 | { | ||
404 | Sink::EntityBuffer buffer(value); | ||
405 | const Sink::Entity &entity = buffer.entity(); | ||
406 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
407 | Q_ASSERT(metadataBuffer); | ||
408 | if (!metadataBuffer->replayToSource()) { | ||
409 | Trace() << "Change is coming from the source"; | ||
410 | return KAsync::null<void>(); | ||
411 | } | ||
412 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
413 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
414 | const auto uid = Sink::Storage::uidFromKey(key); | ||
415 | QByteArray oldRemoteId; | ||
416 | |||
417 | if (operation != Sink::Operation_Creation) { | ||
418 | auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadOnly); | ||
419 | oldRemoteId = resolveLocalId(type, uid, synchronizationTransaction); | ||
420 | } | ||
421 | Trace() << "Replaying " << key << type; | ||
422 | |||
423 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | ||
424 | if (type == ENTITY_TYPE_FOLDER) { | ||
425 | const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
426 | job = replay(folder, operation, oldRemoteId); | ||
427 | } else if (type == ENTITY_TYPE_MAIL) { | ||
428 | const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
429 | job = replay(mail, operation, oldRemoteId); | ||
430 | } | ||
431 | |||
432 | return job.then<void, QByteArray>([=, &synchronizationStore](const QByteArray &remoteId) { | ||
433 | auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadWrite); | ||
434 | Trace() << "Replayed change with remote id: " << remoteId; | ||
435 | if (operation == Sink::Operation_Creation) { | ||
436 | if (remoteId.isEmpty()) { | ||
437 | Warning() << "Returned an empty remoteId from the creation"; | ||
438 | } else { | ||
439 | recordRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
440 | } | ||
441 | } else if (operation == Sink::Operation_Modification) { | ||
442 | if (remoteId.isEmpty()) { | ||
443 | Warning() << "Returned an empty remoteId from the creation"; | ||
444 | } else { | ||
445 | updateRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
446 | } | ||
447 | } else if (operation == Sink::Operation_Removal) { | ||
448 | removeRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
449 | } else { | ||
450 | Warning() << "Unkown operation" << operation; | ||
451 | } | ||
452 | }, [](int errorCode, const QString &errorMessage) { | ||
453 | Warning() << "Failed to replay change: " << errorMessage; | ||
454 | }); | ||
455 | } | 328 | } |
456 | 329 | ||
457 | KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) | ||
458 | { | ||
459 | return KAsync::null<QByteArray>(); | ||
460 | } | ||
461 | |||
462 | KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) | ||
463 | { | ||
464 | return KAsync::null<QByteArray>(); | ||
465 | } | ||
466 | 330 | ||
467 | void GenericResource::removeDataFromDisk() | 331 | void GenericResource::removeDataFromDisk() |
468 | { | 332 | { |
@@ -528,10 +392,8 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
528 | Log() << " Synchronizing"; | 392 | Log() << " Synchronizing"; |
529 | // Changereplay would deadlock otherwise when trying to open the synchronization store | 393 | // Changereplay would deadlock otherwise when trying to open the synchronization store |
530 | enableChangeReplay(false); | 394 | enableChangeReplay(false); |
531 | auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); | 395 | mSynchronizer->synchronize() |
532 | auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); | 396 | .then<void>([this, &future]() { |
533 | synchronizeWithSource(*mainStore, *syncStore) | ||
534 | .then<void>([this, mainStore, syncStore, &future]() { | ||
535 | Log() << "Done Synchronizing"; | 397 | Log() << "Done Synchronizing"; |
536 | enableChangeReplay(true); | 398 | enableChangeReplay(true); |
537 | future.setFinished(); | 399 | future.setFinished(); |
@@ -576,11 +438,11 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
576 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) | 438 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) |
577 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) | 439 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) |
578 | .then<void>([this](KAsync::Future<void> &f) { | 440 | .then<void>([this](KAsync::Future<void> &f) { |
579 | if (mSourceChangeReplay->allChangesReplayed()) { | 441 | if (mChangeReplay->allChangesReplayed()) { |
580 | f.setFinished(); | 442 | f.setFinished(); |
581 | } else { | 443 | } else { |
582 | auto context = new QObject; | 444 | auto context = new QObject; |
583 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { | 445 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { |
584 | delete context; | 446 | delete context; |
585 | f.setFinished(); | 447 | f.setFinished(); |
586 | }); | 448 | }); |
@@ -590,7 +452,7 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
590 | 452 | ||
591 | void GenericResource::updateLowerBoundRevision() | 453 | void GenericResource::updateLowerBoundRevision() |
592 | { | 454 | { |
593 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision())); | 455 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision())); |
594 | } | 456 | } |
595 | 457 | ||
596 | void GenericResource::setLowerBoundRevision(qint64 revision) | 458 | void GenericResource::setLowerBoundRevision(qint64 revision) |
@@ -599,7 +461,139 @@ void GenericResource::setLowerBoundRevision(qint64 revision) | |||
599 | updateLowerBoundRevision(); | 461 | updateLowerBoundRevision(); |
600 | } | 462 | } |
601 | 463 | ||
602 | void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 464 | |
465 | |||
466 | |||
467 | EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) | ||
468 | : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), | ||
469 | mTransaction(transaction) | ||
470 | { | ||
471 | |||
472 | } | ||
473 | |||
474 | template<typename T> | ||
475 | T EntityStore::read(const QByteArray &identifier) const | ||
476 | { | ||
477 | auto typeName = ApplicationDomain::getTypeName<T>(); | ||
478 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | ||
479 | auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType)); | ||
480 | Q_ASSERT(bufferAdaptor); | ||
481 | return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); | ||
482 | } | ||
483 | |||
484 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | ||
485 | { | ||
486 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | ||
487 | db.findLatest(uid, | ||
488 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
489 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
490 | if (!buffer.isValid()) { | ||
491 | Warning() << "Read invalid buffer from disk"; | ||
492 | } else { | ||
493 | Trace() << "Found value " << key; | ||
494 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
495 | } | ||
496 | return false; | ||
497 | }, | ||
498 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | ||
499 | return current; | ||
500 | } | ||
501 | |||
502 | |||
503 | |||
504 | SyncStore::SyncStore(Sink::Storage::Transaction &transaction) | ||
505 | : mTransaction(transaction) | ||
506 | { | ||
507 | |||
508 | } | ||
509 | |||
510 | void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
511 | { | ||
512 | Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId); | ||
513 | Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId); | ||
514 | } | ||
515 | |||
516 | void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
517 | { | ||
518 | Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId); | ||
519 | Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId); | ||
520 | } | ||
521 | |||
522 | void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
523 | { | ||
524 | const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
525 | removeRemoteId(bufferType, localId, oldRemoteId); | ||
526 | recordRemoteId(bufferType, localId, remoteId); | ||
527 | } | ||
528 | |||
529 | QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId) | ||
530 | { | ||
531 | // Lookup local id for remote id, or insert a new pair otherwise | ||
532 | Index index("rid.mapping." + bufferType, mTransaction); | ||
533 | QByteArray sinkId = index.lookup(remoteId); | ||
534 | if (sinkId.isEmpty()) { | ||
535 | sinkId = QUuid::createUuid().toString().toUtf8(); | ||
536 | index.add(remoteId, sinkId); | ||
537 | Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId); | ||
538 | } | ||
539 | return sinkId; | ||
540 | } | ||
541 | |||
542 | QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId) | ||
543 | { | ||
544 | QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
545 | if (remoteId.isEmpty()) { | ||
546 | Warning() << "Couldn't find the remote id for " << localId; | ||
547 | return QByteArray(); | ||
548 | } | ||
549 | return remoteId; | ||
550 | } | ||
551 | |||
552 | |||
553 | |||
554 | |||
555 | |||
556 | |||
557 | |||
558 | |||
559 | Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
560 | : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), | ||
561 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
562 | mResourceType(resourceType), | ||
563 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
564 | { | ||
565 | Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; | ||
566 | |||
567 | } | ||
568 | |||
569 | void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback) | ||
570 | { | ||
571 | mEnqueue = enqueueCommandCallback; | ||
572 | } | ||
573 | |||
574 | void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) | ||
575 | { | ||
576 | Q_ASSERT(mEnqueue); | ||
577 | mEnqueue(commandId, data); | ||
578 | } | ||
579 | |||
580 | EntityStore &Synchronizer::store() | ||
581 | { | ||
582 | if (!mEntityStore) { | ||
583 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
584 | } | ||
585 | return *mEntityStore; | ||
586 | } | ||
587 | |||
588 | SyncStore &Synchronizer::syncStore() | ||
589 | { | ||
590 | if (!mSyncStore) { | ||
591 | mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction); | ||
592 | } | ||
593 | return *mSyncStore; | ||
594 | } | ||
595 | |||
596 | void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
603 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 597 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) |
604 | { | 598 | { |
605 | // These changes are coming from the source | 599 | // These changes are coming from the source |
@@ -616,7 +610,7 @@ void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &b | |||
616 | callback(BufferUtils::extractBuffer(fbb)); | 610 | callback(BufferUtils::extractBuffer(fbb)); |
617 | } | 611 | } |
618 | 612 | ||
619 | void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 613 | void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, |
620 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 614 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) |
621 | { | 615 | { |
622 | // These changes are coming from the source | 616 | // These changes are coming from the source |
@@ -634,7 +628,7 @@ void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, co | |||
634 | callback(BufferUtils::extractBuffer(fbb)); | 628 | callback(BufferUtils::extractBuffer(fbb)); |
635 | } | 629 | } |
636 | 630 | ||
637 | void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | 631 | void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) |
638 | { | 632 | { |
639 | // These changes are coming from the source | 633 | // These changes are coming from the source |
640 | const auto replayToSource = false; | 634 | const auto replayToSource = false; |
@@ -647,96 +641,36 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co | |||
647 | callback(BufferUtils::extractBuffer(fbb)); | 641 | callback(BufferUtils::extractBuffer(fbb)); |
648 | } | 642 | } |
649 | 643 | ||
650 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | 644 | void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) |
651 | { | ||
652 | Index("rid.mapping." + bufferType, transaction).add(remoteId, localId); | ||
653 | ; | ||
654 | Index("localid.mapping." + bufferType, transaction).add(localId, remoteId); | ||
655 | } | ||
656 | |||
657 | void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
658 | { | ||
659 | Index("rid.mapping." + bufferType, transaction).remove(remoteId, localId); | ||
660 | Index("localid.mapping." + bufferType, transaction).remove(localId, remoteId); | ||
661 | } | ||
662 | |||
663 | void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
664 | { | ||
665 | const auto oldRemoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); | ||
666 | removeRemoteId(bufferType, localId, oldRemoteId, transaction); | ||
667 | recordRemoteId(bufferType, localId, remoteId, transaction); | ||
668 | } | ||
669 | |||
670 | QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
671 | { | ||
672 | // Lookup local id for remote id, or insert a new pair otherwise | ||
673 | Index index("rid.mapping." + bufferType, transaction); | ||
674 | QByteArray sinkId = index.lookup(remoteId); | ||
675 | if (sinkId.isEmpty()) { | ||
676 | sinkId = QUuid::createUuid().toString().toUtf8(); | ||
677 | index.add(remoteId, sinkId); | ||
678 | Index("localid.mapping." + bufferType, transaction).add(sinkId, remoteId); | ||
679 | } | ||
680 | return sinkId; | ||
681 | } | ||
682 | |||
683 | QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction) | ||
684 | { | ||
685 | QByteArray remoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); | ||
686 | if (remoteId.isEmpty()) { | ||
687 | Warning() << "Couldn't find the remote id for " << localId; | ||
688 | return QByteArray(); | ||
689 | } | ||
690 | return remoteId; | ||
691 | } | ||
692 | |||
693 | void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, | ||
694 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) | ||
695 | { | 645 | { |
696 | entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { | 646 | entryGenerator([this, bufferType, &exists](const QByteArray &key) { |
697 | auto sinkId = Sink::Storage::uidFromKey(key); | 647 | auto sinkId = Sink::Storage::uidFromKey(key); |
698 | Trace() << "Checking for removal " << key; | 648 | Trace() << "Checking for removal " << key; |
699 | const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); | 649 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); |
700 | // If we have no remoteId, the entity hasn't been replayed to the source yet | 650 | // If we have no remoteId, the entity hasn't been replayed to the source yet |
701 | if (!remoteId.isEmpty()) { | 651 | if (!remoteId.isEmpty()) { |
702 | if (!exists(remoteId)) { | 652 | if (!exists(remoteId)) { |
703 | Trace() << "Found a removed entity: " << sinkId; | 653 | Trace() << "Found a removed entity: " << sinkId; |
704 | deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, | 654 | deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, |
705 | [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); }); | 655 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); |
706 | } | 656 | } |
707 | } | 657 | } |
708 | }); | 658 | }); |
709 | } | 659 | } |
710 | 660 | ||
711 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> GenericResource::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | 661 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) |
712 | { | ||
713 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | ||
714 | db.findLatest(uid, | ||
715 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
716 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
717 | if (!buffer.isValid()) { | ||
718 | Warning() << "Read invalid buffer from disk"; | ||
719 | } else { | ||
720 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
721 | } | ||
722 | return false; | ||
723 | }, | ||
724 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | ||
725 | return current; | ||
726 | } | ||
727 | |||
728 | void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, | ||
729 | DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | ||
730 | { | 662 | { |
731 | auto mainDatabase = Storage::mainDatabase(transaction, bufferType); | 663 | Trace() << "Create or modify" << bufferType << remoteId; |
732 | const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); | 664 | auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); |
665 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | ||
733 | const auto found = mainDatabase.contains(sinkId); | 666 | const auto found = mainDatabase.contains(sinkId); |
667 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); | ||
734 | if (!found) { | 668 | if (!found) { |
735 | Trace() << "Found a new entity: " << remoteId; | 669 | Trace() << "Found a new entity: " << remoteId; |
736 | createEntity( | 670 | createEntity( |
737 | sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); }); | 671 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); |
738 | } else { // modification | 672 | } else { // modification |
739 | if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { | 673 | if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { |
740 | bool changed = false; | 674 | bool changed = false; |
741 | for (const auto &property : entity.changedProperties()) { | 675 | for (const auto &property : entity.changedProperties()) { |
742 | if (entity.getProperty(property) != current->getProperty(property)) { | 676 | if (entity.getProperty(property) != current->getProperty(property)) { |
@@ -746,8 +680,8 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si | |||
746 | } | 680 | } |
747 | if (changed) { | 681 | if (changed) { |
748 | Trace() << "Found a modified entity: " << remoteId; | 682 | Trace() << "Found a modified entity: " << remoteId; |
749 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, | 683 | modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, |
750 | [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); }); | 684 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); |
751 | } | 685 | } |
752 | } else { | 686 | } else { |
753 | Warning() << "Failed to get current entity"; | 687 | Warning() << "Failed to get current entity"; |
@@ -755,6 +689,118 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si | |||
755 | } | 689 | } |
756 | } | 690 | } |
757 | 691 | ||
692 | KAsync::Job<void> Synchronizer::synchronize() | ||
693 | { | ||
694 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
695 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
696 | return synchronizeWithSource().then<void>([this]() { | ||
697 | mTransaction.abort(); | ||
698 | mSyncTransaction.commit(); | ||
699 | mSyncStore.clear(); | ||
700 | mEntityStore.clear(); | ||
701 | }); | ||
702 | } | ||
703 | |||
704 | |||
705 | |||
706 | SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
707 | : ChangeReplay(resourceInstanceIdentifier), | ||
708 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
709 | mResourceType(resourceType), | ||
710 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
711 | { | ||
712 | |||
713 | } | ||
714 | |||
715 | EntityStore &SourceWriteBack::store() | ||
716 | { | ||
717 | if (!mEntityStore) { | ||
718 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
719 | } | ||
720 | return *mEntityStore; | ||
721 | } | ||
722 | |||
723 | SyncStore &SourceWriteBack::syncStore() | ||
724 | { | ||
725 | if (!mSyncStore) { | ||
726 | mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction); | ||
727 | } | ||
728 | return *mSyncStore; | ||
729 | } | ||
730 | |||
731 | KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
732 | { | ||
733 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
734 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
735 | |||
736 | Sink::EntityBuffer buffer(value); | ||
737 | const Sink::Entity &entity = buffer.entity(); | ||
738 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
739 | Q_ASSERT(metadataBuffer); | ||
740 | if (!metadataBuffer->replayToSource()) { | ||
741 | Trace() << "Change is coming from the source"; | ||
742 | return KAsync::null<void>(); | ||
743 | } | ||
744 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
745 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
746 | const auto uid = Sink::Storage::uidFromKey(key); | ||
747 | QByteArray oldRemoteId; | ||
748 | |||
749 | if (operation != Sink::Operation_Creation) { | ||
750 | oldRemoteId = syncStore().resolveLocalId(type, uid); | ||
751 | } | ||
752 | Trace() << "Replaying " << key << type; | ||
753 | |||
754 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | ||
755 | if (type == ENTITY_TYPE_FOLDER) { | ||
756 | auto folder = store().read<ApplicationDomain::Folder>(uid); | ||
757 | // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
758 | job = replay(folder, operation, oldRemoteId); | ||
759 | } else if (type == ENTITY_TYPE_MAIL) { | ||
760 | auto mail = store().read<ApplicationDomain::Mail>(uid); | ||
761 | // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
762 | job = replay(mail, operation, oldRemoteId); | ||
763 | } | ||
764 | |||
765 | return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) { | ||
766 | Trace() << "Replayed change with remote id: " << remoteId; | ||
767 | if (operation == Sink::Operation_Creation) { | ||
768 | if (remoteId.isEmpty()) { | ||
769 | Warning() << "Returned an empty remoteId from the creation"; | ||
770 | } else { | ||
771 | syncStore().recordRemoteId(type, uid, remoteId); | ||
772 | } | ||
773 | } else if (operation == Sink::Operation_Modification) { | ||
774 | if (remoteId.isEmpty()) { | ||
775 | Warning() << "Returned an empty remoteId from the creation"; | ||
776 | } else { | ||
777 | syncStore().updateRemoteId(type, uid, remoteId); | ||
778 | } | ||
779 | } else if (operation == Sink::Operation_Removal) { | ||
780 | syncStore().removeRemoteId(type, uid, remoteId); | ||
781 | } else { | ||
782 | Warning() << "Unkown operation" << operation; | ||
783 | } | ||
784 | |||
785 | mTransaction.abort(); | ||
786 | mSyncTransaction.commit(); | ||
787 | mSyncStore.clear(); | ||
788 | mEntityStore.clear(); | ||
789 | }, [](int errorCode, const QString &errorMessage) { | ||
790 | Warning() << "Failed to replay change: " << errorMessage; | ||
791 | }); | ||
792 | } | ||
793 | |||
794 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) | ||
795 | { | ||
796 | return KAsync::null<QByteArray>(); | ||
797 | } | ||
798 | |||
799 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) | ||
800 | { | ||
801 | return KAsync::null<QByteArray>(); | ||
802 | } | ||
803 | |||
758 | 804 | ||
759 | #pragma clang diagnostic push | 805 | #pragma clang diagnostic push |
760 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" | 806 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" |