summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-28 00:24:53 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-28 00:24:53 +0200
commite9c75177590d8546ebd9425f16c4269a9c92f517 (patch)
tree8a953631e467d9df50657e22bd90954b7b71c990 /common/genericresource.cpp
parent8f01eb530262d1442fc4fa0782a41e052412d43b (diff)
downloadsink-e9c75177590d8546ebd9425f16c4269a9c92f517.tar.gz
sink-e9c75177590d8546ebd9425f16c4269a9c92f517.zip
Refactored the generic resource to use separate classes for
changereplay and synchronization. This cleans up the API and avoids the excessive passing around of transactions. It also provides more flexibility in eventually using different synchronization strategies for different resources.
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp560
1 files changed, 303 insertions, 257 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index eae6ead..cb2ef21 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -1,3 +1,22 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
1#include "genericresource.h" 20#include "genericresource.h"
2 21
3#include "entitybuffer.h" 22#include "entitybuffer.h"
@@ -14,6 +33,7 @@
14#include "log.h" 33#include "log.h"
15#include "definitions.h" 34#include "definitions.h"
16#include "bufferutils.h" 35#include "bufferutils.h"
36#include "adaptorfactoryregistry.h"
17 37
18#include <QUuid> 38#include <QUuid>
19#include <QDataStream> 39#include <QDataStream>
@@ -30,96 +50,6 @@ static int sCommitInterval = 10;
30using namespace Sink; 50using namespace Sink;
31 51
32#undef DEBUG_AREA 52#undef DEBUG_AREA
33#define DEBUG_AREA "resource.changereplay"
34
35/**
36 * Replays changes from the storage one by one.
37 *
38 * Uses a local database to:
39 * * Remember what changes have been replayed already.
40 * * store a mapping of remote to local buffers
41 */
42class ChangeReplay : public QObject
43{
44 Q_OBJECT
45public:
46 typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction;
47
48 ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction)
49 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayFunction(replayFunction)
50 {
51 }
52
53 qint64 getLastReplayedRevision()
54 {
55 qint64 lastReplayedRevision = 0;
56 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly);
57 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
58 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
59 lastReplayedRevision = value.toLongLong();
60 return false;
61 },
62 [](const Storage::Error &) {});
63 return lastReplayedRevision;
64 }
65
66 bool allChangesReplayed()
67 {
68 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly));
69 const qint64 lastReplayedRevision = getLastReplayedRevision();
70 Trace() << "All changes replayed " << topRevision << lastReplayedRevision;
71 return (lastReplayedRevision >= topRevision);
72 }
73
74signals:
75 void changesReplayed();
76
77public slots:
78 void revisionChanged()
79 {
80 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly);
81 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite);
82 qint64 lastReplayedRevision = 1;
83 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
84 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
85 lastReplayedRevision = value.toLongLong();
86 return false;
87 },
88 [](const Storage::Error &) {});
89 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
90
91 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
92 if (lastReplayedRevision <= topRevision) {
93 qint64 revision = lastReplayedRevision;
94 for (; revision <= topRevision; revision++) {
95 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
96 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
97 const auto key = Storage::assembleKey(uid, revision);
98 Storage::mainDatabase(mainStoreTransaction, type)
99 .scan(key,
100 [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool {
101 mReplayFunction(type, key, value).exec();
102 // TODO make for loop async, and pass to async replay function together with type
103 Trace() << "Replaying " << key;
104 return false;
105 },
106 [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; });
107 }
108 revision--;
109 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
110 replayStoreTransaction.commit();
111 Trace() << "Replayed until " << revision;
112 }
113 emit changesReplayed();
114 }
115
116private:
117 Sink::Storage mStorage;
118 Sink::Storage mChangeReplayStore;
119 ReplayFunction mReplayFunction;
120};
121
122#undef DEBUG_AREA
123#define DEBUG_AREA "resource.commandprocessor" 53#define DEBUG_AREA "resource.commandprocessor"
124 54
125/** 55/**
@@ -133,10 +63,9 @@ class CommandProcessor : public QObject
133public: 63public:
134 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) 64 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false)
135 { 65 {
136 mPipeline->startTransaction(); 66 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
137 // FIXME Should be initialized to the current value of the change replay queue 67 Warning() << error.message;
138 mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); 68 }));
139 mPipeline->commit();
140 69
141 for (auto queue : mCommandQueues) { 70 for (auto queue : mCommandQueues) {
142 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); 71 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process);
@@ -303,15 +232,22 @@ private:
303#undef DEBUG_AREA 232#undef DEBUG_AREA
304#define DEBUG_AREA "resource" 233#define DEBUG_AREA "resource"
305 234
306GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) 235GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer)
307 : Sink::Resource(), 236 : Sink::Resource(),
308 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 237 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"),
309 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), 238 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"),
239 mResourceType(resourceType),
310 mResourceInstanceIdentifier(resourceInstanceIdentifier), 240 mResourceInstanceIdentifier(resourceInstanceIdentifier),
311 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), 241 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)),
242 mChangeReplay(changeReplay),
243 mSynchronizer(synchronizer),
312 mError(0), 244 mError(0),
313 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 245 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
314{ 246{
247 mPipeline->setResourceType(mResourceType);
248 mSynchronizer->setup([this](int commandId, const QByteArray &data) {
249 enqueueCommand(mSynchronizerQueue, commandId, data);
250 });
315 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); 251 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue);
316 mProcessor->setInspectionCommand([this](void const *command, size_t size) { 252 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
317 flatbuffers::Verifier verifier((const uint8_t *)command, size); 253 flatbuffers::Verifier verifier((const uint8_t *)command, size);
@@ -353,14 +289,9 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
353 }); 289 });
354 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 290 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
355 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 291 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
356 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) {
357 // This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync)
358 auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite);
359 return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore]() {});
360 });
361 enableChangeReplay(true); 292 enableChangeReplay(true);
362 mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); 293 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
363 mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); 294 mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision());
364 295
365 mCommitQueueTimer.setInterval(sCommitInterval); 296 mCommitQueueTimer.setInterval(sCommitInterval);
366 mCommitQueueTimer.setSingleShot(true); 297 mCommitQueueTimer.setSingleShot(true);
@@ -370,7 +301,6 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
370GenericResource::~GenericResource() 301GenericResource::~GenericResource()
371{ 302{
372 delete mProcessor; 303 delete mProcessor;
373 delete mSourceChangeReplay;
374} 304}
375 305
376KAsync::Job<void> GenericResource::inspect( 306KAsync::Job<void> GenericResource::inspect(
@@ -383,86 +313,20 @@ KAsync::Job<void> GenericResource::inspect(
383void GenericResource::enableChangeReplay(bool enable) 313void GenericResource::enableChangeReplay(bool enable)
384{ 314{
385 if (enable) { 315 if (enable) {
386 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged, Qt::QueuedConnection); 316 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
387 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 317 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
388 mSourceChangeReplay->revisionChanged(); 318 mChangeReplay->revisionChanged();
389 } else { 319 } else {
390 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); 320 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged);
391 QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 321 QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
392 } 322 }
393} 323}
394 324
395void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) 325void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors)
396{ 326{
397 mPipeline->setPreprocessors(type, preprocessors); 327 mPipeline->setPreprocessors(type, preprocessors);
398 mPipeline->setAdaptorFactory(type, factory);
399 mAdaptorFactories.insert(type, factory);
400}
401
402KAsync::Job<void> GenericResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value)
403{
404 Sink::EntityBuffer buffer(value);
405 const Sink::Entity &entity = buffer.entity();
406 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
407 Q_ASSERT(metadataBuffer);
408 if (!metadataBuffer->replayToSource()) {
409 Trace() << "Change is coming from the source";
410 return KAsync::null<void>();
411 }
412 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
413 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
414 const auto uid = Sink::Storage::uidFromKey(key);
415 QByteArray oldRemoteId;
416
417 if (operation != Sink::Operation_Creation) {
418 auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadOnly);
419 oldRemoteId = resolveLocalId(type, uid, synchronizationTransaction);
420 }
421 Trace() << "Replaying " << key << type;
422
423 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
424 if (type == ENTITY_TYPE_FOLDER) {
425 const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
426 job = replay(folder, operation, oldRemoteId);
427 } else if (type == ENTITY_TYPE_MAIL) {
428 const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
429 job = replay(mail, operation, oldRemoteId);
430 }
431
432 return job.then<void, QByteArray>([=, &synchronizationStore](const QByteArray &remoteId) {
433 auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadWrite);
434 Trace() << "Replayed change with remote id: " << remoteId;
435 if (operation == Sink::Operation_Creation) {
436 if (remoteId.isEmpty()) {
437 Warning() << "Returned an empty remoteId from the creation";
438 } else {
439 recordRemoteId(type, uid, remoteId, synchronizationTransaction);
440 }
441 } else if (operation == Sink::Operation_Modification) {
442 if (remoteId.isEmpty()) {
443 Warning() << "Returned an empty remoteId from the creation";
444 } else {
445 updateRemoteId(type, uid, remoteId, synchronizationTransaction);
446 }
447 } else if (operation == Sink::Operation_Removal) {
448 removeRemoteId(type, uid, remoteId, synchronizationTransaction);
449 } else {
450 Warning() << "Unkown operation" << operation;
451 }
452 }, [](int errorCode, const QString &errorMessage) {
453 Warning() << "Failed to replay change: " << errorMessage;
454 });
455} 328}
456 329
457KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &)
458{
459 return KAsync::null<QByteArray>();
460}
461
462KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &)
463{
464 return KAsync::null<QByteArray>();
465}
466 330
467void GenericResource::removeDataFromDisk() 331void GenericResource::removeDataFromDisk()
468{ 332{
@@ -528,10 +392,8 @@ KAsync::Job<void> GenericResource::synchronizeWithSource()
528 Log() << " Synchronizing"; 392 Log() << " Synchronizing";
529 // Changereplay would deadlock otherwise when trying to open the synchronization store 393 // Changereplay would deadlock otherwise when trying to open the synchronization store
530 enableChangeReplay(false); 394 enableChangeReplay(false);
531 auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); 395 mSynchronizer->synchronize()
532 auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); 396 .then<void>([this, &future]() {
533 synchronizeWithSource(*mainStore, *syncStore)
534 .then<void>([this, mainStore, syncStore, &future]() {
535 Log() << "Done Synchronizing"; 397 Log() << "Done Synchronizing";
536 enableChangeReplay(true); 398 enableChangeReplay(true);
537 future.setFinished(); 399 future.setFinished();
@@ -576,11 +438,11 @@ KAsync::Job<void> GenericResource::processAllMessages()
576 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) 438 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); })
577 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) 439 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); })
578 .then<void>([this](KAsync::Future<void> &f) { 440 .then<void>([this](KAsync::Future<void> &f) {
579 if (mSourceChangeReplay->allChangesReplayed()) { 441 if (mChangeReplay->allChangesReplayed()) {
580 f.setFinished(); 442 f.setFinished();
581 } else { 443 } else {
582 auto context = new QObject; 444 auto context = new QObject;
583 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { 445 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() {
584 delete context; 446 delete context;
585 f.setFinished(); 447 f.setFinished();
586 }); 448 });
@@ -590,7 +452,7 @@ KAsync::Job<void> GenericResource::processAllMessages()
590 452
591void GenericResource::updateLowerBoundRevision() 453void GenericResource::updateLowerBoundRevision()
592{ 454{
593 mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision())); 455 mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision()));
594} 456}
595 457
596void GenericResource::setLowerBoundRevision(qint64 revision) 458void GenericResource::setLowerBoundRevision(qint64 revision)
@@ -599,7 +461,139 @@ void GenericResource::setLowerBoundRevision(qint64 revision)
599 updateLowerBoundRevision(); 461 updateLowerBoundRevision();
600} 462}
601 463
602void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 464
465
466
467EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction)
468 : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier),
469 mTransaction(transaction)
470{
471
472}
473
474template<typename T>
475T EntityStore::read(const QByteArray &identifier) const
476{
477 auto typeName = ApplicationDomain::getTypeName<T>();
478 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
479 auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType));
480 Q_ASSERT(bufferAdaptor);
481 return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor);
482}
483
484QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory)
485{
486 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
487 db.findLatest(uid,
488 [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
489 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
490 if (!buffer.isValid()) {
491 Warning() << "Read invalid buffer from disk";
492 } else {
493 Trace() << "Found value " << key;
494 current = adaptorFactory.createAdaptor(buffer.entity());
495 }
496 return false;
497 },
498 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
499 return current;
500}
501
502
503
504SyncStore::SyncStore(Sink::Storage::Transaction &transaction)
505 : mTransaction(transaction)
506{
507
508}
509
510void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
511{
512 Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId);
513 Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId);
514}
515
516void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
517{
518 Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId);
519 Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId);
520}
521
522void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
523{
524 const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId);
525 removeRemoteId(bufferType, localId, oldRemoteId);
526 recordRemoteId(bufferType, localId, remoteId);
527}
528
529QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId)
530{
531 // Lookup local id for remote id, or insert a new pair otherwise
532 Index index("rid.mapping." + bufferType, mTransaction);
533 QByteArray sinkId = index.lookup(remoteId);
534 if (sinkId.isEmpty()) {
535 sinkId = QUuid::createUuid().toString().toUtf8();
536 index.add(remoteId, sinkId);
537 Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId);
538 }
539 return sinkId;
540}
541
542QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId)
543{
544 QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId);
545 if (remoteId.isEmpty()) {
546 Warning() << "Couldn't find the remote id for " << localId;
547 return QByteArray();
548 }
549 return remoteId;
550}
551
552
553
554
555
556
557
558
559Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
560 : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly),
561 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite),
562 mResourceType(resourceType),
563 mResourceInstanceIdentifier(resourceInstanceIdentifier)
564{
565 Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier;
566
567}
568
569void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback)
570{
571 mEnqueue = enqueueCommandCallback;
572}
573
574void Synchronizer::enqueueCommand(int commandId, const QByteArray &data)
575{
576 Q_ASSERT(mEnqueue);
577 mEnqueue(commandId, data);
578}
579
580EntityStore &Synchronizer::store()
581{
582 if (!mEntityStore) {
583 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction);
584 }
585 return *mEntityStore;
586}
587
588SyncStore &Synchronizer::syncStore()
589{
590 if (!mSyncStore) {
591 mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction);
592 }
593 return *mSyncStore;
594}
595
596void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
603 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 597 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
604{ 598{
605 // These changes are coming from the source 599 // These changes are coming from the source
@@ -616,7 +610,7 @@ void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &b
616 callback(BufferUtils::extractBuffer(fbb)); 610 callback(BufferUtils::extractBuffer(fbb));
617} 611}
618 612
619void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 613void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
620 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 614 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
621{ 615{
622 // These changes are coming from the source 616 // These changes are coming from the source
@@ -634,7 +628,7 @@ void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, co
634 callback(BufferUtils::extractBuffer(fbb)); 628 callback(BufferUtils::extractBuffer(fbb));
635} 629}
636 630
637void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) 631void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback)
638{ 632{
639 // These changes are coming from the source 633 // These changes are coming from the source
640 const auto replayToSource = false; 634 const auto replayToSource = false;
@@ -647,96 +641,36 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co
647 callback(BufferUtils::extractBuffer(fbb)); 641 callback(BufferUtils::extractBuffer(fbb));
648} 642}
649 643
650void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) 644void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
651{
652 Index("rid.mapping." + bufferType, transaction).add(remoteId, localId);
653 ;
654 Index("localid.mapping." + bufferType, transaction).add(localId, remoteId);
655}
656
657void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
658{
659 Index("rid.mapping." + bufferType, transaction).remove(remoteId, localId);
660 Index("localid.mapping." + bufferType, transaction).remove(localId, remoteId);
661}
662
663void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
664{
665 const auto oldRemoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId);
666 removeRemoteId(bufferType, localId, oldRemoteId, transaction);
667 recordRemoteId(bufferType, localId, remoteId, transaction);
668}
669
670QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
671{
672 // Lookup local id for remote id, or insert a new pair otherwise
673 Index index("rid.mapping." + bufferType, transaction);
674 QByteArray sinkId = index.lookup(remoteId);
675 if (sinkId.isEmpty()) {
676 sinkId = QUuid::createUuid().toString().toUtf8();
677 index.add(remoteId, sinkId);
678 Index("localid.mapping." + bufferType, transaction).add(sinkId, remoteId);
679 }
680 return sinkId;
681}
682
683QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction)
684{
685 QByteArray remoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId);
686 if (remoteId.isEmpty()) {
687 Warning() << "Couldn't find the remote id for " << localId;
688 return QByteArray();
689 }
690 return remoteId;
691}
692
693void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType,
694 const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
695{ 645{
696 entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { 646 entryGenerator([this, bufferType, &exists](const QByteArray &key) {
697 auto sinkId = Sink::Storage::uidFromKey(key); 647 auto sinkId = Sink::Storage::uidFromKey(key);
698 Trace() << "Checking for removal " << key; 648 Trace() << "Checking for removal " << key;
699 const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); 649 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
700 // If we have no remoteId, the entity hasn't been replayed to the source yet 650 // If we have no remoteId, the entity hasn't been replayed to the source yet
701 if (!remoteId.isEmpty()) { 651 if (!remoteId.isEmpty()) {
702 if (!exists(remoteId)) { 652 if (!exists(remoteId)) {
703 Trace() << "Found a removed entity: " << sinkId; 653 Trace() << "Found a removed entity: " << sinkId;
704 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, 654 deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType,
705 [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); }); 655 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); });
706 } 656 }
707 } 657 }
708 }); 658 });
709} 659}
710 660
711QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> GenericResource::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) 661void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
712{
713 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
714 db.findLatest(uid,
715 [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
716 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
717 if (!buffer.isValid()) {
718 Warning() << "Read invalid buffer from disk";
719 } else {
720 current = adaptorFactory.createAdaptor(buffer.entity());
721 }
722 return false;
723 },
724 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
725 return current;
726}
727
728void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction,
729 DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
730{ 662{
731 auto mainDatabase = Storage::mainDatabase(transaction, bufferType); 663 Trace() << "Create or modify" << bufferType << remoteId;
732 const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); 664 auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType);
665 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
733 const auto found = mainDatabase.contains(sinkId); 666 const auto found = mainDatabase.contains(sinkId);
667 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType);
734 if (!found) { 668 if (!found) {
735 Trace() << "Found a new entity: " << remoteId; 669 Trace() << "Found a new entity: " << remoteId;
736 createEntity( 670 createEntity(
737 sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); }); 671 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
738 } else { // modification 672 } else { // modification
739 if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { 673 if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) {
740 bool changed = false; 674 bool changed = false;
741 for (const auto &property : entity.changedProperties()) { 675 for (const auto &property : entity.changedProperties()) {
742 if (entity.getProperty(property) != current->getProperty(property)) { 676 if (entity.getProperty(property) != current->getProperty(property)) {
@@ -746,8 +680,8 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si
746 } 680 }
747 if (changed) { 681 if (changed) {
748 Trace() << "Found a modified entity: " << remoteId; 682 Trace() << "Found a modified entity: " << remoteId;
749 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, 683 modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory,
750 [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); }); 684 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
751 } 685 }
752 } else { 686 } else {
753 Warning() << "Failed to get current entity"; 687 Warning() << "Failed to get current entity";
@@ -755,6 +689,118 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si
755 } 689 }
756} 690}
757 691
692KAsync::Job<void> Synchronizer::synchronize()
693{
694 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
695 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
696 return synchronizeWithSource().then<void>([this]() {
697 mTransaction.abort();
698 mSyncTransaction.commit();
699 mSyncStore.clear();
700 mEntityStore.clear();
701 });
702}
703
704
705
706SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
707 : ChangeReplay(resourceInstanceIdentifier),
708 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite),
709 mResourceType(resourceType),
710 mResourceInstanceIdentifier(resourceInstanceIdentifier)
711{
712
713}
714
715EntityStore &SourceWriteBack::store()
716{
717 if (!mEntityStore) {
718 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction);
719 }
720 return *mEntityStore;
721}
722
723SyncStore &SourceWriteBack::syncStore()
724{
725 if (!mSyncStore) {
726 mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction);
727 }
728 return *mSyncStore;
729}
730
731KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
732{
733 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
734 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
735
736 Sink::EntityBuffer buffer(value);
737 const Sink::Entity &entity = buffer.entity();
738 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
739 Q_ASSERT(metadataBuffer);
740 if (!metadataBuffer->replayToSource()) {
741 Trace() << "Change is coming from the source";
742 return KAsync::null<void>();
743 }
744 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
745 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
746 const auto uid = Sink::Storage::uidFromKey(key);
747 QByteArray oldRemoteId;
748
749 if (operation != Sink::Operation_Creation) {
750 oldRemoteId = syncStore().resolveLocalId(type, uid);
751 }
752 Trace() << "Replaying " << key << type;
753
754 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
755 if (type == ENTITY_TYPE_FOLDER) {
756 auto folder = store().read<ApplicationDomain::Folder>(uid);
757 // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
758 job = replay(folder, operation, oldRemoteId);
759 } else if (type == ENTITY_TYPE_MAIL) {
760 auto mail = store().read<ApplicationDomain::Mail>(uid);
761 // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
762 job = replay(mail, operation, oldRemoteId);
763 }
764
765 return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) {
766 Trace() << "Replayed change with remote id: " << remoteId;
767 if (operation == Sink::Operation_Creation) {
768 if (remoteId.isEmpty()) {
769 Warning() << "Returned an empty remoteId from the creation";
770 } else {
771 syncStore().recordRemoteId(type, uid, remoteId);
772 }
773 } else if (operation == Sink::Operation_Modification) {
774 if (remoteId.isEmpty()) {
775 Warning() << "Returned an empty remoteId from the creation";
776 } else {
777 syncStore().updateRemoteId(type, uid, remoteId);
778 }
779 } else if (operation == Sink::Operation_Removal) {
780 syncStore().removeRemoteId(type, uid, remoteId);
781 } else {
782 Warning() << "Unkown operation" << operation;
783 }
784
785 mTransaction.abort();
786 mSyncTransaction.commit();
787 mSyncStore.clear();
788 mEntityStore.clear();
789 }, [](int errorCode, const QString &errorMessage) {
790 Warning() << "Failed to replay change: " << errorMessage;
791 });
792}
793
794KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &)
795{
796 return KAsync::null<QByteArray>();
797}
798
799KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &)
800{
801 return KAsync::null<QByteArray>();
802}
803
758 804
759#pragma clang diagnostic push 805#pragma clang diagnostic push
760#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" 806#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"