summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-28 02:09:58 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-28 02:09:58 +0200
commitb441386c4e138d19bbd79d578e0a2ff1b3f54a93 (patch)
tree1110b6ec00ce29a8bcd7f6db0717f4a483f50587 /common
parentafb29c153daff23e491a350784ce6af5db5e28af (diff)
downloadsink-b441386c4e138d19bbd79d578e0a2ff1b3f54a93.tar.gz
sink-b441386c4e138d19bbd79d578e0a2ff1b3f54a93.zip
Moved the classes to individual files
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt4
-rw-r--r--common/entitystore.cpp48
-rw-r--r--common/entitystore.h53
-rw-r--r--common/genericresource.cpp342
-rw-r--r--common/genericresource.h141
-rw-r--r--common/remoteidmap.cpp75
-rw-r--r--common/remoteidmap.h61
-rw-r--r--common/sourcewriteback.cpp124
-rw-r--r--common/sourcewriteback.h64
-rw-r--r--common/synchronizer.cpp176
-rw-r--r--common/synchronizer.h95
11 files changed, 701 insertions, 482 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 79b627a..3c6a083 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -68,6 +68,10 @@ set(command_SRCS
68 query.cpp 68 query.cpp
69 changereplay.cpp 69 changereplay.cpp
70 adaptorfactoryregistry.cpp 70 adaptorfactoryregistry.cpp
71 synchronizer.cpp
72 entitystore.cpp
73 remoteidmap.cpp
74 sourcewriteback.cpp
71 ${storage_SRCS}) 75 ${storage_SRCS})
72 76
73add_library(${PROJECT_NAME} SHARED ${command_SRCS}) 77add_library(${PROJECT_NAME} SHARED ${command_SRCS})
diff --git a/common/entitystore.cpp b/common/entitystore.cpp
new file mode 100644
index 0000000..2c15abf
--- /dev/null
+++ b/common/entitystore.cpp
@@ -0,0 +1,48 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "entitystore.h"
21
22using namespace Sink;
23
24EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction)
25 : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier),
26 mTransaction(transaction)
27{
28
29}
30
31QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory)
32{
33 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
34 db.findLatest(uid,
35 [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
36 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
37 if (!buffer.isValid()) {
38 Warning() << "Read invalid buffer from disk";
39 } else {
40 Trace() << "Found value " << key;
41 current = adaptorFactory.createAdaptor(buffer.entity());
42 }
43 return false;
44 },
45 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
46 return current;
47}
48
diff --git a/common/entitystore.h b/common/entitystore.h
new file mode 100644
index 0000000..17156ec
--- /dev/null
+++ b/common/entitystore.h
@@ -0,0 +1,53 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include "sink_export.h"
23#include <domainadaptor.h>
24
25#include "storage.h"
26#include "adaptorfactoryregistry.h"
27
28namespace Sink {
29
30class SINK_EXPORT EntityStore
31{
32public:
33 EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction);
34
35 template<typename T>
36 T read(const QByteArray &identifier) const
37 {
38 auto typeName = ApplicationDomain::getTypeName<T>();
39 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
40 auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType));
41 Q_ASSERT(bufferAdaptor);
42 return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor);
43 }
44
45
46 static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory);
47private:
48 QByteArray mResourceType;
49 QByteArray mResourceInstanceIdentifier;
50 Sink::Storage::Transaction &mTransaction;
51};
52
53}
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index cb2ef21..568e066 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -34,6 +34,7 @@
34#include "definitions.h" 34#include "definitions.h"
35#include "bufferutils.h" 35#include "bufferutils.h"
36#include "adaptorfactoryregistry.h" 36#include "adaptorfactoryregistry.h"
37#include "synchronizer.h"
37 38
38#include <QUuid> 39#include <QUuid>
39#include <QDataStream> 40#include <QDataStream>
@@ -461,347 +462,6 @@ void GenericResource::setLowerBoundRevision(qint64 revision)
461 updateLowerBoundRevision(); 462 updateLowerBoundRevision();
462} 463}
463 464
464
465
466
467EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction)
468 : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier),
469 mTransaction(transaction)
470{
471
472}
473
474template<typename T>
475T EntityStore::read(const QByteArray &identifier) const
476{
477 auto typeName = ApplicationDomain::getTypeName<T>();
478 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
479 auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType));
480 Q_ASSERT(bufferAdaptor);
481 return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor);
482}
483
484QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory)
485{
486 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
487 db.findLatest(uid,
488 [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
489 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
490 if (!buffer.isValid()) {
491 Warning() << "Read invalid buffer from disk";
492 } else {
493 Trace() << "Found value " << key;
494 current = adaptorFactory.createAdaptor(buffer.entity());
495 }
496 return false;
497 },
498 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
499 return current;
500}
501
502
503
504SyncStore::SyncStore(Sink::Storage::Transaction &transaction)
505 : mTransaction(transaction)
506{
507
508}
509
510void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
511{
512 Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId);
513 Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId);
514}
515
516void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
517{
518 Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId);
519 Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId);
520}
521
522void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
523{
524 const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId);
525 removeRemoteId(bufferType, localId, oldRemoteId);
526 recordRemoteId(bufferType, localId, remoteId);
527}
528
529QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId)
530{
531 // Lookup local id for remote id, or insert a new pair otherwise
532 Index index("rid.mapping." + bufferType, mTransaction);
533 QByteArray sinkId = index.lookup(remoteId);
534 if (sinkId.isEmpty()) {
535 sinkId = QUuid::createUuid().toString().toUtf8();
536 index.add(remoteId, sinkId);
537 Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId);
538 }
539 return sinkId;
540}
541
542QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId)
543{
544 QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId);
545 if (remoteId.isEmpty()) {
546 Warning() << "Couldn't find the remote id for " << localId;
547 return QByteArray();
548 }
549 return remoteId;
550}
551
552
553
554
555
556
557
558
559Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
560 : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly),
561 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite),
562 mResourceType(resourceType),
563 mResourceInstanceIdentifier(resourceInstanceIdentifier)
564{
565 Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier;
566
567}
568
569void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback)
570{
571 mEnqueue = enqueueCommandCallback;
572}
573
574void Synchronizer::enqueueCommand(int commandId, const QByteArray &data)
575{
576 Q_ASSERT(mEnqueue);
577 mEnqueue(commandId, data);
578}
579
580EntityStore &Synchronizer::store()
581{
582 if (!mEntityStore) {
583 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction);
584 }
585 return *mEntityStore;
586}
587
588SyncStore &Synchronizer::syncStore()
589{
590 if (!mSyncStore) {
591 mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction);
592 }
593 return *mSyncStore;
594}
595
596void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
597 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
598{
599 // These changes are coming from the source
600 const auto replayToSource = false;
601 flatbuffers::FlatBufferBuilder entityFbb;
602 adaptorFactory.createBuffer(domainObject, entityFbb);
603 flatbuffers::FlatBufferBuilder fbb;
604 // This is the resource type and not the domain type
605 auto entityId = fbb.CreateString(sinkId.toStdString());
606 auto type = fbb.CreateString(bufferType.toStdString());
607 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
608 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
609 Sink::Commands::FinishCreateEntityBuffer(fbb, location);
610 callback(BufferUtils::extractBuffer(fbb));
611}
612
613void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
614 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
615{
616 // These changes are coming from the source
617 const auto replayToSource = false;
618 flatbuffers::FlatBufferBuilder entityFbb;
619 adaptorFactory.createBuffer(domainObject, entityFbb);
620 flatbuffers::FlatBufferBuilder fbb;
621 auto entityId = fbb.CreateString(sinkId.toStdString());
622 // This is the resource type and not the domain type
623 auto type = fbb.CreateString(bufferType.toStdString());
624 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
625 // TODO removals
626 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource);
627 Sink::Commands::FinishModifyEntityBuffer(fbb, location);
628 callback(BufferUtils::extractBuffer(fbb));
629}
630
631void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback)
632{
633 // These changes are coming from the source
634 const auto replayToSource = false;
635 flatbuffers::FlatBufferBuilder fbb;
636 auto entityId = fbb.CreateString(sinkId.toStdString());
637 // This is the resource type and not the domain type
638 auto type = fbb.CreateString(bufferType.toStdString());
639 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
640 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
641 callback(BufferUtils::extractBuffer(fbb));
642}
643
644void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
645{
646 entryGenerator([this, bufferType, &exists](const QByteArray &key) {
647 auto sinkId = Sink::Storage::uidFromKey(key);
648 Trace() << "Checking for removal " << key;
649 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
650 // If we have no remoteId, the entity hasn't been replayed to the source yet
651 if (!remoteId.isEmpty()) {
652 if (!exists(remoteId)) {
653 Trace() << "Found a removed entity: " << sinkId;
654 deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType,
655 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); });
656 }
657 }
658 });
659}
660
661void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
662{
663 Trace() << "Create or modify" << bufferType << remoteId;
664 auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType);
665 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
666 const auto found = mainDatabase.contains(sinkId);
667 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType);
668 if (!found) {
669 Trace() << "Found a new entity: " << remoteId;
670 createEntity(
671 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
672 } else { // modification
673 if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) {
674 bool changed = false;
675 for (const auto &property : entity.changedProperties()) {
676 if (entity.getProperty(property) != current->getProperty(property)) {
677 Trace() << "Property changed " << sinkId << property;
678 changed = true;
679 }
680 }
681 if (changed) {
682 Trace() << "Found a modified entity: " << remoteId;
683 modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory,
684 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
685 }
686 } else {
687 Warning() << "Failed to get current entity";
688 }
689 }
690}
691
692KAsync::Job<void> Synchronizer::synchronize()
693{
694 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
695 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
696 return synchronizeWithSource().then<void>([this]() {
697 mTransaction.abort();
698 mSyncTransaction.commit();
699 mSyncStore.clear();
700 mEntityStore.clear();
701 });
702}
703
704
705
706SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
707 : ChangeReplay(resourceInstanceIdentifier),
708 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite),
709 mResourceType(resourceType),
710 mResourceInstanceIdentifier(resourceInstanceIdentifier)
711{
712
713}
714
715EntityStore &SourceWriteBack::store()
716{
717 if (!mEntityStore) {
718 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction);
719 }
720 return *mEntityStore;
721}
722
723SyncStore &SourceWriteBack::syncStore()
724{
725 if (!mSyncStore) {
726 mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction);
727 }
728 return *mSyncStore;
729}
730
731KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
732{
733 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
734 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
735
736 Sink::EntityBuffer buffer(value);
737 const Sink::Entity &entity = buffer.entity();
738 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
739 Q_ASSERT(metadataBuffer);
740 if (!metadataBuffer->replayToSource()) {
741 Trace() << "Change is coming from the source";
742 return KAsync::null<void>();
743 }
744 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
745 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
746 const auto uid = Sink::Storage::uidFromKey(key);
747 QByteArray oldRemoteId;
748
749 if (operation != Sink::Operation_Creation) {
750 oldRemoteId = syncStore().resolveLocalId(type, uid);
751 }
752 Trace() << "Replaying " << key << type;
753
754 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
755 if (type == ENTITY_TYPE_FOLDER) {
756 auto folder = store().read<ApplicationDomain::Folder>(uid);
757 // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
758 job = replay(folder, operation, oldRemoteId);
759 } else if (type == ENTITY_TYPE_MAIL) {
760 auto mail = store().read<ApplicationDomain::Mail>(uid);
761 // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
762 job = replay(mail, operation, oldRemoteId);
763 }
764
765 return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) {
766 Trace() << "Replayed change with remote id: " << remoteId;
767 if (operation == Sink::Operation_Creation) {
768 if (remoteId.isEmpty()) {
769 Warning() << "Returned an empty remoteId from the creation";
770 } else {
771 syncStore().recordRemoteId(type, uid, remoteId);
772 }
773 } else if (operation == Sink::Operation_Modification) {
774 if (remoteId.isEmpty()) {
775 Warning() << "Returned an empty remoteId from the creation";
776 } else {
777 syncStore().updateRemoteId(type, uid, remoteId);
778 }
779 } else if (operation == Sink::Operation_Removal) {
780 syncStore().removeRemoteId(type, uid, remoteId);
781 } else {
782 Warning() << "Unkown operation" << operation;
783 }
784
785 mTransaction.abort();
786 mSyncTransaction.commit();
787 mSyncStore.clear();
788 mEntityStore.clear();
789 }, [](int errorCode, const QString &errorMessage) {
790 Warning() << "Failed to replay change: " << errorMessage;
791 });
792}
793
794KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &)
795{
796 return KAsync::null<QByteArray>();
797}
798
799KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &)
800{
801 return KAsync::null<QByteArray>();
802}
803
804
805#pragma clang diagnostic push 465#pragma clang diagnostic push
806#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" 466#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
807#include "genericresource.moc" 467#include "genericresource.moc"
diff --git a/common/genericresource.h b/common/genericresource.h
index 45d5d3a..4ed408d 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -82,147 +82,6 @@ private:
82 int mError; 82 int mError;
83 QTimer mCommitQueueTimer; 83 QTimer mCommitQueueTimer;
84 qint64 mClientLowerBoundRevision; 84 qint64 mClientLowerBoundRevision;
85 QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories;
86}; 85};
87 86
88class SINK_EXPORT SyncStore
89{
90public:
91 SyncStore(Sink::Storage::Transaction &);
92
93 /**
94 * Records a localId to remoteId mapping
95 */
96 void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
97 void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
98 void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
99
100 /**
101 * Tries to find a local id for the remote id, and creates a new local id otherwise.
102 *
103 * The new local id is recorded in the local to remote id mapping.
104 */
105 QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId);
106
107 /**
108 * Tries to find a remote id for a local id.
109 *
110 * This can fail if the entity hasn't been written back to the server yet.
111 */
112 QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId);
113
114private:
115 Sink::Storage::Transaction &mTransaction;
116};
117
118class SINK_EXPORT EntityStore
119{
120public:
121 EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction);
122
123 template<typename T>
124 T read(const QByteArray &identifier) const;
125
126 static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory);
127private:
128 QByteArray mResourceType;
129 QByteArray mResourceInstanceIdentifier;
130 Sink::Storage::Transaction &mTransaction;
131};
132
133/**
134 * Synchronize and add what we don't already have to local queue
135 */
136class SINK_EXPORT Synchronizer
137{
138public:
139 Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier);
140
141 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback);
142 KAsync::Job<void> synchronize();
143
144protected:
145 ///Calls the callback to enqueue the command
146 void enqueueCommand(int commandId, const QByteArray &data);
147
148 static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
149 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback);
150 static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
151 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback);
152 static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback);
153
154 /**
155 * A synchronous algorithm to remove entities that are no longer existing.
156 *
157 * A list of entities is generated by @param entryGenerator.
158 * The entiry Generator typically iterates over an index to produce all existing entries.
159 * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false,
160 * an entity delete command is enqueued.
161 *
162 * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous.
163 */
164 void scanForRemovals(const QByteArray &bufferType,
165 const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists);
166
167 /**
168 * An algorithm to create or modify the entity.
169 *
170 * Depending on whether the entity is locally available, or has changed.
171 */
172 void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
173
174 //Read only access to main storage
175 EntityStore &store();
176
177 //Read/Write access to sync storage
178 SyncStore &syncStore();
179
180 virtual KAsync::Job<void> synchronizeWithSource() = 0;
181
182private:
183 QSharedPointer<SyncStore> mSyncStore;
184 QSharedPointer<EntityStore> mEntityStore;
185 Sink::Storage mStorage;
186 Sink::Storage mSyncStorage;
187 QByteArray mResourceType;
188 QByteArray mResourceInstanceIdentifier;
189 Sink::Storage::Transaction mTransaction;
190 Sink::Storage::Transaction mSyncTransaction;
191 std::function<void(int commandId, const QByteArray &data)> mEnqueue;
192};
193
194/**
195 * Replay changes to the source
196 */
197class SINK_EXPORT SourceWriteBack : public ChangeReplay
198{
199public:
200 SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier);
201
202protected:
203 ///Base implementation calls the replay$Type calls
204 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
205
206protected:
207 ///Implement to write back changes to the server
208 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId);
209 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId);
210
211 //Read only access to main storage
212 EntityStore &store();
213
214 //Read/Write access to sync storage
215 SyncStore &syncStore();
216
217private:
218 Sink::Storage mSyncStorage;
219 QSharedPointer<SyncStore> mSyncStore;
220 QSharedPointer<EntityStore> mEntityStore;
221 Sink::Storage::Transaction mTransaction;
222 Sink::Storage::Transaction mSyncTransaction;
223 QByteArray mResourceType;
224 QByteArray mResourceInstanceIdentifier;
225};
226
227
228} 87}
diff --git a/common/remoteidmap.cpp b/common/remoteidmap.cpp
new file mode 100644
index 0000000..f72369d
--- /dev/null
+++ b/common/remoteidmap.cpp
@@ -0,0 +1,75 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "remoteidmap.h"
21
22#include <QUuid>
23#include "index.h"
24#include "log.h"
25
26using namespace Sink;
27
28RemoteIdMap::RemoteIdMap(Sink::Storage::Transaction &transaction)
29 : mTransaction(transaction)
30{
31
32}
33
34void RemoteIdMap::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
35{
36 Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId);
37 Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId);
38}
39
40void RemoteIdMap::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
41{
42 Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId);
43 Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId);
44}
45
46void RemoteIdMap::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
47{
48 const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId);
49 removeRemoteId(bufferType, localId, oldRemoteId);
50 recordRemoteId(bufferType, localId, remoteId);
51}
52
53QByteArray RemoteIdMap::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId)
54{
55 // Lookup local id for remote id, or insert a new pair otherwise
56 Index index("rid.mapping." + bufferType, mTransaction);
57 QByteArray sinkId = index.lookup(remoteId);
58 if (sinkId.isEmpty()) {
59 sinkId = QUuid::createUuid().toString().toUtf8();
60 index.add(remoteId, sinkId);
61 Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId);
62 }
63 return sinkId;
64}
65
66QByteArray RemoteIdMap::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId)
67{
68 QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId);
69 if (remoteId.isEmpty()) {
70 Warning() << "Couldn't find the remote id for " << localId;
71 return QByteArray();
72 }
73 return remoteId;
74}
75
diff --git a/common/remoteidmap.h b/common/remoteidmap.h
new file mode 100644
index 0000000..12891dc
--- /dev/null
+++ b/common/remoteidmap.h
@@ -0,0 +1,61 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include "sink_export.h"
23
24#include "storage.h"
25
26namespace Sink {
27
28/**
29 * A remoteId mapping
30 */
31class SINK_EXPORT RemoteIdMap
32{
33public:
34 RemoteIdMap(Sink::Storage::Transaction &);
35
36 /**
37 * Records a localId to remoteId mapping
38 */
39 void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
40 void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
41 void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
42
43 /**
44 * Tries to find a local id for the remote id, and creates a new local id otherwise.
45 *
46 * The new local id is recorded in the local to remote id mapping.
47 */
48 QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId);
49
50 /**
51 * Tries to find a remote id for a local id.
52 *
53 * This can fail if the entity hasn't been written back to the server yet.
54 */
55 QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId);
56
57private:
58 Sink::Storage::Transaction &mTransaction;
59};
60
61}
diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp
new file mode 100644
index 0000000..1ef20d2
--- /dev/null
+++ b/common/sourcewriteback.cpp
@@ -0,0 +1,124 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "sourcewriteback.h"
21
22#include "definitions.h"
23#include "log.h"
24
25#define ENTITY_TYPE_MAIL "mail"
26#define ENTITY_TYPE_FOLDER "folder"
27
28using namespace Sink;
29
30SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
31 : ChangeReplay(resourceInstanceIdentifier),
32 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite),
33 mResourceType(resourceType),
34 mResourceInstanceIdentifier(resourceInstanceIdentifier)
35{
36
37}
38
39EntityStore &SourceWriteBack::store()
40{
41 if (!mEntityStore) {
42 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction);
43 }
44 return *mEntityStore;
45}
46
47RemoteIdMap &SourceWriteBack::syncStore()
48{
49 if (!mSyncStore) {
50 mSyncStore = QSharedPointer<RemoteIdMap>::create(mSyncTransaction);
51 }
52 return *mSyncStore;
53}
54
55KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
56{
57 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
58 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
59
60 Sink::EntityBuffer buffer(value);
61 const Sink::Entity &entity = buffer.entity();
62 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
63 Q_ASSERT(metadataBuffer);
64 if (!metadataBuffer->replayToSource()) {
65 Trace() << "Change is coming from the source";
66 return KAsync::null<void>();
67 }
68 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
69 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
70 const auto uid = Sink::Storage::uidFromKey(key);
71 QByteArray oldRemoteId;
72
73 if (operation != Sink::Operation_Creation) {
74 oldRemoteId = syncStore().resolveLocalId(type, uid);
75 }
76 Trace() << "Replaying " << key << type;
77
78 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
79 if (type == ENTITY_TYPE_FOLDER) {
80 auto folder = store().read<ApplicationDomain::Folder>(uid);
81 job = replay(folder, operation, oldRemoteId);
82 } else if (type == ENTITY_TYPE_MAIL) {
83 auto mail = store().read<ApplicationDomain::Mail>(uid);
84 job = replay(mail, operation, oldRemoteId);
85 }
86
87 return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) {
88 Trace() << "Replayed change with remote id: " << remoteId;
89 if (operation == Sink::Operation_Creation) {
90 if (remoteId.isEmpty()) {
91 Warning() << "Returned an empty remoteId from the creation";
92 } else {
93 syncStore().recordRemoteId(type, uid, remoteId);
94 }
95 } else if (operation == Sink::Operation_Modification) {
96 if (remoteId.isEmpty()) {
97 Warning() << "Returned an empty remoteId from the creation";
98 } else {
99 syncStore().updateRemoteId(type, uid, remoteId);
100 }
101 } else if (operation == Sink::Operation_Removal) {
102 syncStore().removeRemoteId(type, uid, remoteId);
103 } else {
104 Warning() << "Unkown operation" << operation;
105 }
106
107 mTransaction.abort();
108 mSyncTransaction.commit();
109 mSyncStore.clear();
110 mEntityStore.clear();
111 }, [](int errorCode, const QString &errorMessage) {
112 Warning() << "Failed to replay change: " << errorMessage;
113 });
114}
115
116KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &)
117{
118 return KAsync::null<QByteArray>();
119}
120
121KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &)
122{
123 return KAsync::null<QByteArray>();
124}
diff --git a/common/sourcewriteback.h b/common/sourcewriteback.h
new file mode 100644
index 0000000..8470e00
--- /dev/null
+++ b/common/sourcewriteback.h
@@ -0,0 +1,64 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include "sink_export.h"
23
24#include "changereplay.h"
25#include "storage.h"
26#include "entitystore.h"
27#include "remoteidmap.h"
28
29namespace Sink {
30
31/**
32 * Replay changes to the source
33 */
34class SINK_EXPORT SourceWriteBack : public ChangeReplay
35{
36public:
37 SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier);
38
39protected:
40 ///Base implementation calls the replay$Type calls
41 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
42
43protected:
44 ///Implement to write back changes to the server
45 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId);
46 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId);
47
48 //Read only access to main storage
49 EntityStore &store();
50
51 //Read/Write access to sync storage
52 RemoteIdMap &syncStore();
53
54private:
55 Sink::Storage mSyncStorage;
56 QSharedPointer<RemoteIdMap> mSyncStore;
57 QSharedPointer<EntityStore> mEntityStore;
58 Sink::Storage::Transaction mTransaction;
59 Sink::Storage::Transaction mSyncTransaction;
60 QByteArray mResourceType;
61 QByteArray mResourceInstanceIdentifier;
62};
63
64}
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
new file mode 100644
index 0000000..fb0baaa
--- /dev/null
+++ b/common/synchronizer.cpp
@@ -0,0 +1,176 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "synchronizer.h"
21
22#include "definitions.h"
23#include "commands.h"
24#include "bufferutils.h"
25#include "entitystore.h"
26#include "remoteidmap.h"
27#include "adaptorfactoryregistry.h"
28#include "createentity_generated.h"
29#include "modifyentity_generated.h"
30#include "deleteentity_generated.h"
31
32using namespace Sink;
33
34Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
35 : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly),
36 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite),
37 mResourceType(resourceType),
38 mResourceInstanceIdentifier(resourceInstanceIdentifier)
39{
40 Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier;
41}
42
43void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback)
44{
45 mEnqueue = enqueueCommandCallback;
46}
47
48void Synchronizer::enqueueCommand(int commandId, const QByteArray &data)
49{
50 Q_ASSERT(mEnqueue);
51 mEnqueue(commandId, data);
52}
53
54EntityStore &Synchronizer::store()
55{
56 if (!mEntityStore) {
57 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction);
58 }
59 return *mEntityStore;
60}
61
62RemoteIdMap &Synchronizer::syncStore()
63{
64 if (!mSyncStore) {
65 mSyncStore = QSharedPointer<RemoteIdMap>::create(mSyncTransaction);
66 }
67 return *mSyncStore;
68}
69
70void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
71 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
72{
73 // These changes are coming from the source
74 const auto replayToSource = false;
75 flatbuffers::FlatBufferBuilder entityFbb;
76 adaptorFactory.createBuffer(domainObject, entityFbb);
77 flatbuffers::FlatBufferBuilder fbb;
78 // This is the resource type and not the domain type
79 auto entityId = fbb.CreateString(sinkId.toStdString());
80 auto type = fbb.CreateString(bufferType.toStdString());
81 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
82 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
83 Sink::Commands::FinishCreateEntityBuffer(fbb, location);
84 callback(BufferUtils::extractBuffer(fbb));
85}
86
87void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
88 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
89{
90 // These changes are coming from the source
91 const auto replayToSource = false;
92 flatbuffers::FlatBufferBuilder entityFbb;
93 adaptorFactory.createBuffer(domainObject, entityFbb);
94 flatbuffers::FlatBufferBuilder fbb;
95 auto entityId = fbb.CreateString(sinkId.toStdString());
96 // This is the resource type and not the domain type
97 auto type = fbb.CreateString(bufferType.toStdString());
98 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
99 // FIXME removals
100 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource);
101 Sink::Commands::FinishModifyEntityBuffer(fbb, location);
102 callback(BufferUtils::extractBuffer(fbb));
103}
104
105void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback)
106{
107 // These changes are coming from the source
108 const auto replayToSource = false;
109 flatbuffers::FlatBufferBuilder fbb;
110 auto entityId = fbb.CreateString(sinkId.toStdString());
111 // This is the resource type and not the domain type
112 auto type = fbb.CreateString(bufferType.toStdString());
113 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
114 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
115 callback(BufferUtils::extractBuffer(fbb));
116}
117
118void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
119{
120 entryGenerator([this, bufferType, &exists](const QByteArray &key) {
121 auto sinkId = Sink::Storage::uidFromKey(key);
122 Trace() << "Checking for removal " << key;
123 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
124 // If we have no remoteId, the entity hasn't been replayed to the source yet
125 if (!remoteId.isEmpty()) {
126 if (!exists(remoteId)) {
127 Trace() << "Found a removed entity: " << sinkId;
128 deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType,
129 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); });
130 }
131 }
132 });
133}
134
135void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
136{
137 Trace() << "Create or modify" << bufferType << remoteId;
138 auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType);
139 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
140 const auto found = mainDatabase.contains(sinkId);
141 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType);
142 if (!found) {
143 Trace() << "Found a new entity: " << remoteId;
144 createEntity(
145 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
146 } else { // modification
147 if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) {
148 bool changed = false;
149 for (const auto &property : entity.changedProperties()) {
150 if (entity.getProperty(property) != current->getProperty(property)) {
151 Trace() << "Property changed " << sinkId << property;
152 changed = true;
153 }
154 }
155 if (changed) {
156 Trace() << "Found a modified entity: " << remoteId;
157 modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory,
158 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
159 }
160 } else {
161 Warning() << "Failed to get current entity";
162 }
163 }
164}
165
166KAsync::Job<void> Synchronizer::synchronize()
167{
168 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
169 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
170 return synchronizeWithSource().then<void>([this]() {
171 mTransaction.abort();
172 mSyncTransaction.commit();
173 mSyncStore.clear();
174 mEntityStore.clear();
175 });
176}
diff --git a/common/synchronizer.h b/common/synchronizer.h
new file mode 100644
index 0000000..61bca7d
--- /dev/null
+++ b/common/synchronizer.h
@@ -0,0 +1,95 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include "sink_export.h"
23#include <QObject>
24#include <Async/Async>
25#include <domainadaptor.h>
26
27#include "storage.h"
28
29namespace Sink {
30class EntityStore;
31class RemoteIdMap;
32
33/**
34 * Synchronize and add what we don't already have to local queue
35 */
36class SINK_EXPORT Synchronizer
37{
38public:
39 Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier);
40
41 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback);
42 KAsync::Job<void> synchronize();
43
44protected:
45 ///Calls the callback to enqueue the command
46 void enqueueCommand(int commandId, const QByteArray &data);
47
48 static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
49 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback);
50 static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
51 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback);
52 static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback);
53
54 /**
55 * A synchronous algorithm to remove entities that are no longer existing.
56 *
57 * A list of entities is generated by @param entryGenerator.
58 * The entiry Generator typically iterates over an index to produce all existing entries.
59 * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false,
60 * an entity delete command is enqueued.
61 *
62 * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous.
63 */
64 void scanForRemovals(const QByteArray &bufferType,
65 const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists);
66
67 /**
68 * An algorithm to create or modify the entity.
69 *
70 * Depending on whether the entity is locally available, or has changed.
71 */
72 void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
73
74 //Read only access to main storage
75 EntityStore &store();
76
77 //Read/Write access to sync storage
78 RemoteIdMap &syncStore();
79
80 virtual KAsync::Job<void> synchronizeWithSource() = 0;
81
82private:
83 QSharedPointer<RemoteIdMap> mSyncStore;
84 QSharedPointer<EntityStore> mEntityStore;
85 Sink::Storage mStorage;
86 Sink::Storage mSyncStorage;
87 QByteArray mResourceType;
88 QByteArray mResourceInstanceIdentifier;
89 Sink::Storage::Transaction mTransaction;
90 Sink::Storage::Transaction mSyncTransaction;
91 std::function<void(int commandId, const QByteArray &data)> mEnqueue;
92};
93
94}
95