summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-28 02:09:58 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-28 02:09:58 +0200
commitb441386c4e138d19bbd79d578e0a2ff1b3f54a93 (patch)
tree1110b6ec00ce29a8bcd7f6db0717f4a483f50587 /common/synchronizer.cpp
parentafb29c153daff23e491a350784ce6af5db5e28af (diff)
downloadsink-b441386c4e138d19bbd79d578e0a2ff1b3f54a93.tar.gz
sink-b441386c4e138d19bbd79d578e0a2ff1b3f54a93.zip
Moved the classes to individual files
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp176
1 files changed, 176 insertions, 0 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
new file mode 100644
index 0000000..fb0baaa
--- /dev/null
+++ b/common/synchronizer.cpp
@@ -0,0 +1,176 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "synchronizer.h"
21
22#include "definitions.h"
23#include "commands.h"
24#include "bufferutils.h"
25#include "entitystore.h"
26#include "remoteidmap.h"
27#include "adaptorfactoryregistry.h"
28#include "createentity_generated.h"
29#include "modifyentity_generated.h"
30#include "deleteentity_generated.h"
31
32using namespace Sink;
33
34Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
35 : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly),
36 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite),
37 mResourceType(resourceType),
38 mResourceInstanceIdentifier(resourceInstanceIdentifier)
39{
40 Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier;
41}
42
43void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback)
44{
45 mEnqueue = enqueueCommandCallback;
46}
47
48void Synchronizer::enqueueCommand(int commandId, const QByteArray &data)
49{
50 Q_ASSERT(mEnqueue);
51 mEnqueue(commandId, data);
52}
53
54EntityStore &Synchronizer::store()
55{
56 if (!mEntityStore) {
57 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction);
58 }
59 return *mEntityStore;
60}
61
62RemoteIdMap &Synchronizer::syncStore()
63{
64 if (!mSyncStore) {
65 mSyncStore = QSharedPointer<RemoteIdMap>::create(mSyncTransaction);
66 }
67 return *mSyncStore;
68}
69
70void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
71 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
72{
73 // These changes are coming from the source
74 const auto replayToSource = false;
75 flatbuffers::FlatBufferBuilder entityFbb;
76 adaptorFactory.createBuffer(domainObject, entityFbb);
77 flatbuffers::FlatBufferBuilder fbb;
78 // This is the resource type and not the domain type
79 auto entityId = fbb.CreateString(sinkId.toStdString());
80 auto type = fbb.CreateString(bufferType.toStdString());
81 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
82 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
83 Sink::Commands::FinishCreateEntityBuffer(fbb, location);
84 callback(BufferUtils::extractBuffer(fbb));
85}
86
87void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
88 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
89{
90 // These changes are coming from the source
91 const auto replayToSource = false;
92 flatbuffers::FlatBufferBuilder entityFbb;
93 adaptorFactory.createBuffer(domainObject, entityFbb);
94 flatbuffers::FlatBufferBuilder fbb;
95 auto entityId = fbb.CreateString(sinkId.toStdString());
96 // This is the resource type and not the domain type
97 auto type = fbb.CreateString(bufferType.toStdString());
98 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
99 // FIXME removals
100 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource);
101 Sink::Commands::FinishModifyEntityBuffer(fbb, location);
102 callback(BufferUtils::extractBuffer(fbb));
103}
104
105void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback)
106{
107 // These changes are coming from the source
108 const auto replayToSource = false;
109 flatbuffers::FlatBufferBuilder fbb;
110 auto entityId = fbb.CreateString(sinkId.toStdString());
111 // This is the resource type and not the domain type
112 auto type = fbb.CreateString(bufferType.toStdString());
113 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
114 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
115 callback(BufferUtils::extractBuffer(fbb));
116}
117
118void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
119{
120 entryGenerator([this, bufferType, &exists](const QByteArray &key) {
121 auto sinkId = Sink::Storage::uidFromKey(key);
122 Trace() << "Checking for removal " << key;
123 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
124 // If we have no remoteId, the entity hasn't been replayed to the source yet
125 if (!remoteId.isEmpty()) {
126 if (!exists(remoteId)) {
127 Trace() << "Found a removed entity: " << sinkId;
128 deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType,
129 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); });
130 }
131 }
132 });
133}
134
135void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
136{
137 Trace() << "Create or modify" << bufferType << remoteId;
138 auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType);
139 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
140 const auto found = mainDatabase.contains(sinkId);
141 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType);
142 if (!found) {
143 Trace() << "Found a new entity: " << remoteId;
144 createEntity(
145 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
146 } else { // modification
147 if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) {
148 bool changed = false;
149 for (const auto &property : entity.changedProperties()) {
150 if (entity.getProperty(property) != current->getProperty(property)) {
151 Trace() << "Property changed " << sinkId << property;
152 changed = true;
153 }
154 }
155 if (changed) {
156 Trace() << "Found a modified entity: " << remoteId;
157 modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory,
158 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
159 }
160 } else {
161 Warning() << "Failed to get current entity";
162 }
163 }
164}
165
166KAsync::Job<void> Synchronizer::synchronize()
167{
168 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
169 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
170 return synchronizeWithSource().then<void>([this]() {
171 mTransaction.abort();
172 mSyncTransaction.commit();
173 mSyncStore.clear();
174 mEntityStore.clear();
175 });
176}