summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp560
1 files changed, 303 insertions, 257 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index eae6ead..cb2ef21 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -1,3 +1,22 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
1#include "genericresource.h" 20#include "genericresource.h"
2 21
3#include "entitybuffer.h" 22#include "entitybuffer.h"
@@ -14,6 +33,7 @@
14#include "log.h" 33#include "log.h"
15#include "definitions.h" 34#include "definitions.h"
16#include "bufferutils.h" 35#include "bufferutils.h"
36#include "adaptorfactoryregistry.h"
17 37
18#include <QUuid> 38#include <QUuid>
19#include <QDataStream> 39#include <QDataStream>
@@ -30,96 +50,6 @@ static int sCommitInterval = 10;
30using namespace Sink; 50using namespace Sink;
31 51
32#undef DEBUG_AREA 52#undef DEBUG_AREA
33#define DEBUG_AREA "resource.changereplay"
34
35/**
36 * Replays changes from the storage one by one.
37 *
38 * Uses a local database to:
39 * * Remember what changes have been replayed already.
40 * * store a mapping of remote to local buffers
41 */
42class ChangeReplay : public QObject
43{
44 Q_OBJECT
45public:
46 typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction;
47
48 ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction)
49 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayFunction(replayFunction)
50 {
51 }
52
53 qint64 getLastReplayedRevision()
54 {
55 qint64 lastReplayedRevision = 0;
56 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly);
57 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
58 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
59 lastReplayedRevision = value.toLongLong();
60 return false;
61 },
62 [](const Storage::Error &) {});
63 return lastReplayedRevision;
64 }
65
66 bool allChangesReplayed()
67 {
68 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly));
69 const qint64 lastReplayedRevision = getLastReplayedRevision();
70 Trace() << "All changes replayed " << topRevision << lastReplayedRevision;
71 return (lastReplayedRevision >= topRevision);
72 }
73
74signals:
75 void changesReplayed();
76
77public slots:
78 void revisionChanged()
79 {
80 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly);
81 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite);
82 qint64 lastReplayedRevision = 1;
83 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
84 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
85 lastReplayedRevision = value.toLongLong();
86 return false;
87 },
88 [](const Storage::Error &) {});
89 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
90
91 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
92 if (lastReplayedRevision <= topRevision) {
93 qint64 revision = lastReplayedRevision;
94 for (; revision <= topRevision; revision++) {
95 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
96 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
97 const auto key = Storage::assembleKey(uid, revision);
98 Storage::mainDatabase(mainStoreTransaction, type)
99 .scan(key,
100 [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool {
101 mReplayFunction(type, key, value).exec();
102 // TODO make for loop async, and pass to async replay function together with type
103 Trace() << "Replaying " << key;
104 return false;
105 },
106 [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; });
107 }
108 revision--;
109 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
110 replayStoreTransaction.commit();
111 Trace() << "Replayed until " << revision;
112 }
113 emit changesReplayed();
114 }
115
116private:
117 Sink::Storage mStorage;
118 Sink::Storage mChangeReplayStore;
119 ReplayFunction mReplayFunction;
120};
121
122#undef DEBUG_AREA
123#define DEBUG_AREA "resource.commandprocessor" 53#define DEBUG_AREA "resource.commandprocessor"
124 54
125/** 55/**
@@ -133,10 +63,9 @@ class CommandProcessor : public QObject
133public: 63public:
134 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) 64 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false)
135 { 65 {
136 mPipeline->startTransaction(); 66 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
137 // FIXME Should be initialized to the current value of the change replay queue 67 Warning() << error.message;
138 mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); 68 }));
139 mPipeline->commit();
140 69
141 for (auto queue : mCommandQueues) { 70 for (auto queue : mCommandQueues) {
142 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); 71 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process);
@@ -303,15 +232,22 @@ private:
303#undef DEBUG_AREA 232#undef DEBUG_AREA
304#define DEBUG_AREA "resource" 233#define DEBUG_AREA "resource"
305 234
306GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) 235GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer)
307 : Sink::Resource(), 236 : Sink::Resource(),
308 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 237 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"),
309 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), 238 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"),
239 mResourceType(resourceType),
310 mResourceInstanceIdentifier(resourceInstanceIdentifier), 240 mResourceInstanceIdentifier(resourceInstanceIdentifier),
311 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), 241 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)),
242 mChangeReplay(changeReplay),
243 mSynchronizer(synchronizer),
312 mError(0), 244 mError(0),
313 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 245 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
314{ 246{
247 mPipeline->setResourceType(mResourceType);
248 mSynchronizer->setup([this](int commandId, const QByteArray &data) {
249 enqueueCommand(mSynchronizerQueue, commandId, data);
250 });
315 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); 251 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue);
316 mProcessor->setInspectionCommand([this](void const *command, size_t size) { 252 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
317 flatbuffers::Verifier verifier((const uint8_t *)command, size); 253 flatbuffers::Verifier verifier((const uint8_t *)command, size);
@@ -353,14 +289,9 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
353 }); 289 });
354 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 290 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
355 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 291 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
356 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) {
357 // This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync)
358 auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite);
359 return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore]() {});
360 });
361 enableChangeReplay(true); 292 enableChangeReplay(true);
362 mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); 293 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
363 mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); 294 mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision());
364 295
365 mCommitQueueTimer.setInterval(sCommitInterval); 296 mCommitQueueTimer.setInterval(sCommitInterval);
366 mCommitQueueTimer.setSingleShot(true); 297 mCommitQueueTimer.setSingleShot(true);
@@ -370,7 +301,6 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
370GenericResource::~GenericResource() 301GenericResource::~GenericResource()
371{ 302{
372 delete mProcessor; 303 delete mProcessor;
373 delete mSourceChangeReplay;
374} 304}
375 305
376KAsync::Job<void> GenericResource::inspect( 306KAsync::Job<void> GenericResource::inspect(
@@ -383,86 +313,20 @@ KAsync::Job<void> GenericResource::inspect(
383void GenericResource::enableChangeReplay(bool enable) 313void GenericResource::enableChangeReplay(bool enable)
384{ 314{
385 if (enable) { 315 if (enable) {
386 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged, Qt::QueuedConnection); 316 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
387 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 317 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
388 mSourceChangeReplay->revisionChanged(); 318 mChangeReplay->revisionChanged();
389 } else { 319 } else {
390 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); 320 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged);
391 QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 321 QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
392 } 322 }
393} 323}
394 324
395void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) 325void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors)
396{ 326{
397 mPipeline->setPreprocessors(type, preprocessors); 327 mPipeline->setPreprocessors(type, preprocessors);
398 mPipeline->setAdaptorFactory(type, factory);
399 mAdaptorFactories.insert(type, factory);
400}
401
402KAsync::Job<void> GenericResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value)
403{
404 Sink::EntityBuffer buffer(value);
405 const Sink::Entity &entity = buffer.entity();
406 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
407 Q_ASSERT(metadataBuffer);
408 if (!metadataBuffer->replayToSource()) {
409 Trace() << "Change is coming from the source";
410 return KAsync::null<void>();
411 }
412 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
413 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
414 const auto uid = Sink::Storage::uidFromKey(key);
415 QByteArray oldRemoteId;
416
417 if (operation != Sink::Operation_Creation) {
418 auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadOnly);
419 oldRemoteId = resolveLocalId(type, uid, synchronizationTransaction);
420 }
421 Trace() << "Replaying " << key << type;
422
423 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
424 if (type == ENTITY_TYPE_FOLDER) {
425 const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
426 job = replay(folder, operation, oldRemoteId);
427 } else if (type == ENTITY_TYPE_MAIL) {
428 const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
429 job = replay(mail, operation, oldRemoteId);
430 }
431
432 return job.then<void, QByteArray>([=, &synchronizationStore](const QByteArray &remoteId) {
433 auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadWrite);
434 Trace() << "Replayed change with remote id: " << remoteId;
435 if (operation == Sink::Operation_Creation) {
436 if (remoteId.isEmpty()) {
437 Warning() << "Returned an empty remoteId from the creation";
438 } else {
439 recordRemoteId(type, uid, remoteId, synchronizationTransaction);
440 }
441 } else if (operation == Sink::Operation_Modification) {
442 if (remoteId.isEmpty()) {
443 Warning() << "Returned an empty remoteId from the creation";
444 } else {
445 updateRemoteId(type, uid, remoteId, synchronizationTransaction);
446 }
447 } else if (operation == Sink::Operation_Removal) {
448 removeRemoteId(type, uid, remoteId, synchronizationTransaction);
449 } else {
450 Warning() << "Unkown operation" << operation;
451 }
452 }, [](int errorCode, const QString &errorMessage) {
453 Warning() << "Failed to replay change: " << errorMessage;
454 });
455} 328}
456 329
457KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &)
458{
459 return KAsync::null<QByteArray>();
460}
461
462KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &)
463{
464 return KAsync::null<QByteArray>();
465}
466 330
467void GenericResource::removeDataFromDisk() 331void GenericResource::removeDataFromDisk()
468{ 332{
@@ -528,10 +392,8 @@ KAsync::Job<void> GenericResource::synchronizeWithSource()
528 Log() << " Synchronizing"; 392 Log() << " Synchronizing";
529 // Changereplay would deadlock otherwise when trying to open the synchronization store 393 // Changereplay would deadlock otherwise when trying to open the synchronization store
530 enableChangeReplay(false); 394 enableChangeReplay(false);
531 auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); 395 mSynchronizer->synchronize()
532 auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); 396 .then<void>([this, &future]() {
533 synchronizeWithSource(*mainStore, *syncStore)
534 .then<void>([this, mainStore, syncStore, &future]() {
535 Log() << "Done Synchronizing"; 397 Log() << "Done Synchronizing";
536 enableChangeReplay(true); 398 enableChangeReplay(true);
537 future.setFinished(); 399 future.setFinished();
@@ -576,11 +438,11 @@ KAsync::Job<void> GenericResource::processAllMessages()
576 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) 438 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); })
577 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) 439 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); })
578 .then<void>([this](KAsync::Future<void> &f) { 440 .then<void>([this](KAsync::Future<void> &f) {
579 if (mSourceChangeReplay->allChangesReplayed()) { 441 if (mChangeReplay->allChangesReplayed()) {
580 f.setFinished(); 442 f.setFinished();
581 } else { 443 } else {
582 auto context = new QObject; 444 auto context = new QObject;
583 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { 445 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() {
584 delete context; 446 delete context;
585 f.setFinished(); 447 f.setFinished();
586 }); 448 });
@@ -590,7 +452,7 @@ KAsync::Job<void> GenericResource::processAllMessages()
590 452
591void GenericResource::updateLowerBoundRevision() 453void GenericResource::updateLowerBoundRevision()
592{ 454{
593 mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision())); 455 mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision()));
594} 456}
595 457
596void GenericResource::setLowerBoundRevision(qint64 revision) 458void GenericResource::setLowerBoundRevision(qint64 revision)
@@ -599,7 +461,139 @@ void GenericResource::setLowerBoundRevision(qint64 revision)
599 updateLowerBoundRevision(); 461 updateLowerBoundRevision();
600} 462}
601 463
602void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 464
465
466
467EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction)
468 : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier),
469 mTransaction(transaction)
470{
471
472}
473
474template<typename T>
475T EntityStore::read(const QByteArray &identifier) const
476{
477 auto typeName = ApplicationDomain::getTypeName<T>();
478 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
479 auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType));
480 Q_ASSERT(bufferAdaptor);
481 return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor);
482}
483
484QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory)
485{
486 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
487 db.findLatest(uid,
488 [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
489 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
490 if (!buffer.isValid()) {
491 Warning() << "Read invalid buffer from disk";
492 } else {
493 Trace() << "Found value " << key;
494 current = adaptorFactory.createAdaptor(buffer.entity());
495 }
496 return false;
497 },
498 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
499 return current;
500}
501
502
503
504SyncStore::SyncStore(Sink::Storage::Transaction &transaction)
505 : mTransaction(transaction)
506{
507
508}
509
510void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
511{
512 Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId);
513 Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId);
514}
515
516void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
517{
518 Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId);
519 Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId);
520}
521
522void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
523{
524 const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId);
525 removeRemoteId(bufferType, localId, oldRemoteId);
526 recordRemoteId(bufferType, localId, remoteId);
527}
528
529QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId)
530{
531 // Lookup local id for remote id, or insert a new pair otherwise
532 Index index("rid.mapping." + bufferType, mTransaction);
533 QByteArray sinkId = index.lookup(remoteId);
534 if (sinkId.isEmpty()) {
535 sinkId = QUuid::createUuid().toString().toUtf8();
536 index.add(remoteId, sinkId);
537 Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId);
538 }
539 return sinkId;
540}
541
542QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId)
543{
544 QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId);
545 if (remoteId.isEmpty()) {
546 Warning() << "Couldn't find the remote id for " << localId;
547 return QByteArray();
548 }
549 return remoteId;
550}
551
552
553
554
555
556
557
558
559Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
560 : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly),
561 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite),
562 mResourceType(resourceType),
563 mResourceInstanceIdentifier(resourceInstanceIdentifier)
564{
565 Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier;
566
567}
568
569void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback)
570{
571 mEnqueue = enqueueCommandCallback;
572}
573
574void Synchronizer::enqueueCommand(int commandId, const QByteArray &data)
575{
576 Q_ASSERT(mEnqueue);
577 mEnqueue(commandId, data);
578}
579
580EntityStore &Synchronizer::store()
581{
582 if (!mEntityStore) {
583 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction);
584 }
585 return *mEntityStore;
586}
587
588SyncStore &Synchronizer::syncStore()
589{
590 if (!mSyncStore) {
591 mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction);
592 }
593 return *mSyncStore;
594}
595
596void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
603 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 597 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
604{ 598{
605 // These changes are coming from the source 599 // These changes are coming from the source
@@ -616,7 +610,7 @@ void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &b
616 callback(BufferUtils::extractBuffer(fbb)); 610 callback(BufferUtils::extractBuffer(fbb));
617} 611}
618 612
619void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 613void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
620 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 614 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
621{ 615{
622 // These changes are coming from the source 616 // These changes are coming from the source
@@ -634,7 +628,7 @@ void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, co
634 callback(BufferUtils::extractBuffer(fbb)); 628 callback(BufferUtils::extractBuffer(fbb));
635} 629}
636 630
637void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) 631void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback)
638{ 632{
639 // These changes are coming from the source 633 // These changes are coming from the source
640 const auto replayToSource = false; 634 const auto replayToSource = false;
@@ -647,96 +641,36 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co
647 callback(BufferUtils::extractBuffer(fbb)); 641 callback(BufferUtils::extractBuffer(fbb));
648} 642}
649 643
650void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) 644void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
651{
652 Index("rid.mapping." + bufferType, transaction).add(remoteId, localId);
653 ;
654 Index("localid.mapping." + bufferType, transaction).add(localId, remoteId);
655}
656
657void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
658{
659 Index("rid.mapping." + bufferType, transaction).remove(remoteId, localId);
660 Index("localid.mapping." + bufferType, transaction).remove(localId, remoteId);
661}
662
663void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
664{
665 const auto oldRemoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId);
666 removeRemoteId(bufferType, localId, oldRemoteId, transaction);
667 recordRemoteId(bufferType, localId, remoteId, transaction);
668}
669
670QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
671{
672 // Lookup local id for remote id, or insert a new pair otherwise
673 Index index("rid.mapping." + bufferType, transaction);
674 QByteArray sinkId = index.lookup(remoteId);
675 if (sinkId.isEmpty()) {
676 sinkId = QUuid::createUuid().toString().toUtf8();
677 index.add(remoteId, sinkId);
678 Index("localid.mapping." + bufferType, transaction).add(sinkId, remoteId);
679 }
680 return sinkId;
681}
682
683QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction)
684{
685 QByteArray remoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId);
686 if (remoteId.isEmpty()) {
687 Warning() << "Couldn't find the remote id for " << localId;
688 return QByteArray();
689 }
690 return remoteId;
691}
692
693void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType,
694 const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
695{ 645{
696 entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { 646 entryGenerator([this, bufferType, &exists](const QByteArray &key) {
697 auto sinkId = Sink::Storage::uidFromKey(key); 647 auto sinkId = Sink::Storage::uidFromKey(key);
698 Trace() << "Checking for removal " << key; 648 Trace() << "Checking for removal " << key;
699 const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); 649 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
700 // If we have no remoteId, the entity hasn't been replayed to the source yet 650 // If we have no remoteId, the entity hasn't been replayed to the source yet
701 if (!remoteId.isEmpty()) { 651 if (!remoteId.isEmpty()) {
702 if (!exists(remoteId)) { 652 if (!exists(remoteId)) {
703 Trace() << "Found a removed entity: " << sinkId; 653 Trace() << "Found a removed entity: " << sinkId;
704 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, 654 deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType,
705 [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); }); 655 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); });
706 } 656 }
707 } 657 }
708 }); 658 });
709} 659}
710 660
711QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> GenericResource::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) 661void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
712{
713 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
714 db.findLatest(uid,
715 [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
716 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
717 if (!buffer.isValid()) {
718 Warning() << "Read invalid buffer from disk";
719 } else {
720 current = adaptorFactory.createAdaptor(buffer.entity());
721 }
722 return false;
723 },
724 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
725 return current;
726}
727
728void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction,
729 DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
730{ 662{
731 auto mainDatabase = Storage::mainDatabase(transaction, bufferType); 663 Trace() << "Create or modify" << bufferType << remoteId;
732 const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); 664 auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType);
665 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
733 const auto found = mainDatabase.contains(sinkId); 666 const auto found = mainDatabase.contains(sinkId);
667 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType);
734 if (!found) { 668 if (!found) {
735 Trace() << "Found a new entity: " << remoteId; 669 Trace() << "Found a new entity: " << remoteId;
736 createEntity( 670 createEntity(
737 sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); }); 671 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
738 } else { // modification 672 } else { // modification
739 if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { 673 if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) {
740 bool changed = false; 674 bool changed = false;
741 for (const auto &property : entity.changedProperties()) { 675 for (const auto &property : entity.changedProperties()) {
742 if (entity.getProperty(property) != current->getProperty(property)) { 676 if (entity.getProperty(property) != current->getProperty(property)) {
@@ -746,8 +680,8 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si
746 } 680 }
747 if (changed) { 681 if (changed) {
748 Trace() << "Found a modified entity: " << remoteId; 682 Trace() << "Found a modified entity: " << remoteId;
749 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, 683 modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory,
750 [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); }); 684 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
751 } 685 }
752 } else { 686 } else {
753 Warning() << "Failed to get current entity"; 687 Warning() << "Failed to get current entity";
@@ -755,6 +689,118 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si
755 } 689 }
756} 690}
757 691
692KAsync::Job<void> Synchronizer::synchronize()
693{
694 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
695 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
696 return synchronizeWithSource().then<void>([this]() {
697 mTransaction.abort();
698 mSyncTransaction.commit();
699 mSyncStore.clear();
700 mEntityStore.clear();
701 });
702}
703
704
705
706SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
707 : ChangeReplay(resourceInstanceIdentifier),
708 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite),
709 mResourceType(resourceType),
710 mResourceInstanceIdentifier(resourceInstanceIdentifier)
711{
712
713}
714
715EntityStore &SourceWriteBack::store()
716{
717 if (!mEntityStore) {
718 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction);
719 }
720 return *mEntityStore;
721}
722
723SyncStore &SourceWriteBack::syncStore()
724{
725 if (!mSyncStore) {
726 mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction);
727 }
728 return *mSyncStore;
729}
730
731KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
732{
733 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
734 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
735
736 Sink::EntityBuffer buffer(value);
737 const Sink::Entity &entity = buffer.entity();
738 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
739 Q_ASSERT(metadataBuffer);
740 if (!metadataBuffer->replayToSource()) {
741 Trace() << "Change is coming from the source";
742 return KAsync::null<void>();
743 }
744 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
745 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
746 const auto uid = Sink::Storage::uidFromKey(key);
747 QByteArray oldRemoteId;
748
749 if (operation != Sink::Operation_Creation) {
750 oldRemoteId = syncStore().resolveLocalId(type, uid);
751 }
752 Trace() << "Replaying " << key << type;
753
754 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
755 if (type == ENTITY_TYPE_FOLDER) {
756 auto folder = store().read<ApplicationDomain::Folder>(uid);
757 // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
758 job = replay(folder, operation, oldRemoteId);
759 } else if (type == ENTITY_TYPE_MAIL) {
760 auto mail = store().read<ApplicationDomain::Mail>(uid);
761 // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
762 job = replay(mail, operation, oldRemoteId);
763 }
764
765 return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) {
766 Trace() << "Replayed change with remote id: " << remoteId;
767 if (operation == Sink::Operation_Creation) {
768 if (remoteId.isEmpty()) {
769 Warning() << "Returned an empty remoteId from the creation";
770 } else {
771 syncStore().recordRemoteId(type, uid, remoteId);
772 }
773 } else if (operation == Sink::Operation_Modification) {
774 if (remoteId.isEmpty()) {
775 Warning() << "Returned an empty remoteId from the creation";
776 } else {
777 syncStore().updateRemoteId(type, uid, remoteId);
778 }
779 } else if (operation == Sink::Operation_Removal) {
780 syncStore().removeRemoteId(type, uid, remoteId);
781 } else {
782 Warning() << "Unkown operation" << operation;
783 }
784
785 mTransaction.abort();
786 mSyncTransaction.commit();
787 mSyncStore.clear();
788 mEntityStore.clear();
789 }, [](int errorCode, const QString &errorMessage) {
790 Warning() << "Failed to replay change: " << errorMessage;
791 });
792}
793
794KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &)
795{
796 return KAsync::null<QByteArray>();
797}
798
799KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &)
800{
801 return KAsync::null<QByteArray>();
802}
803
758 804
759#pragma clang diagnostic push 805#pragma clang diagnostic push
760#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" 806#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"