summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp148
1 files changed, 58 insertions, 90 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 53db82f..5ddd77c 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -24,7 +24,7 @@
24#include "bufferutils.h" 24#include "bufferutils.h"
25#include "entitystore.h" 25#include "entitystore.h"
26#include "remoteidmap.h" 26#include "remoteidmap.h"
27#include "adaptorfactoryregistry.h" 27#include "entityreader.h"
28#include "createentity_generated.h" 28#include "createentity_generated.h"
29#include "modifyentity_generated.h" 29#include "modifyentity_generated.h"
30#include "deleteentity_generated.h" 30#include "deleteentity_generated.h"
@@ -33,13 +33,12 @@ SINK_DEBUG_AREA("synchronizer")
33 33
34using namespace Sink; 34using namespace Sink;
35 35
36Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) 36Synchronizer::Synchronizer(const Sink::ResourceContext &context)
37 : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), 37 : mResourceContext(context),
38 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), 38 mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)),
39 mResourceType(resourceType), 39 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite)
40 mResourceInstanceIdentifier(resourceInstanceIdentifier)
41{ 40{
42 SinkTrace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; 41 SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
43} 42}
44 43
45Synchronizer::~Synchronizer() 44Synchronizer::~Synchronizer()
@@ -59,11 +58,9 @@ void Synchronizer::enqueueCommand(int commandId, const QByteArray &data)
59 mEnqueue(commandId, data); 58 mEnqueue(commandId, data);
60} 59}
61 60
62EntityStore &Synchronizer::store() 61Storage::EntityStore &Synchronizer::store()
63{ 62{
64 if (!mEntityStore) { 63 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly);
65 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, transaction());
66 }
67 return *mEntityStore; 64 return *mEntityStore;
68} 65}
69 66
@@ -75,13 +72,12 @@ RemoteIdMap &Synchronizer::syncStore()
75 return *mSyncStore; 72 return *mSyncStore;
76} 73}
77 74
78void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 75void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject)
79 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
80{ 76{
81 // These changes are coming from the source 77 // These changes are coming from the source
82 const auto replayToSource = false; 78 const auto replayToSource = false;
83 flatbuffers::FlatBufferBuilder entityFbb; 79 flatbuffers::FlatBufferBuilder entityFbb;
84 adaptorFactory.createBuffer(domainObject, entityFbb); 80 mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb);
85 flatbuffers::FlatBufferBuilder fbb; 81 flatbuffers::FlatBufferBuilder fbb;
86 // This is the resource type and not the domain type 82 // This is the resource type and not the domain type
87 auto entityId = fbb.CreateString(sinkId.toStdString()); 83 auto entityId = fbb.CreateString(sinkId.toStdString());
@@ -89,18 +85,17 @@ void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &buff
89 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 85 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
90 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); 86 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
91 Sink::Commands::FinishCreateEntityBuffer(fbb, location); 87 Sink::Commands::FinishCreateEntityBuffer(fbb, location);
92 callback(BufferUtils::extractBuffer(fbb)); 88 enqueueCommand(Sink::Commands::CreateEntityCommand, BufferUtils::extractBuffer(fbb));
93} 89}
94 90
95void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 91void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject)
96 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
97{ 92{
98 // FIXME removals 93 // FIXME removals
99 QByteArrayList deletedProperties; 94 QByteArrayList deletedProperties;
100 // These changes are coming from the source 95 // These changes are coming from the source
101 const auto replayToSource = false; 96 const auto replayToSource = false;
102 flatbuffers::FlatBufferBuilder entityFbb; 97 flatbuffers::FlatBufferBuilder entityFbb;
103 adaptorFactory.createBuffer(domainObject, entityFbb); 98 mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb);
104 flatbuffers::FlatBufferBuilder fbb; 99 flatbuffers::FlatBufferBuilder fbb;
105 auto entityId = fbb.CreateString(sinkId.toStdString()); 100 auto entityId = fbb.CreateString(sinkId.toStdString());
106 auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties()); 101 auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties());
@@ -110,10 +105,10 @@ void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const
110 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 105 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
111 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties); 106 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties);
112 Sink::Commands::FinishModifyEntityBuffer(fbb, location); 107 Sink::Commands::FinishModifyEntityBuffer(fbb, location);
113 callback(BufferUtils::extractBuffer(fbb)); 108 enqueueCommand(Sink::Commands::ModifyEntityCommand, BufferUtils::extractBuffer(fbb));
114} 109}
115 110
116void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) 111void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType)
117{ 112{
118 // These changes are coming from the source 113 // These changes are coming from the source
119 const auto replayToSource = false; 114 const auto replayToSource = false;
@@ -123,63 +118,69 @@ void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const
123 auto type = fbb.CreateString(bufferType.toStdString()); 118 auto type = fbb.CreateString(bufferType.toStdString());
124 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); 119 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
125 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 120 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
126 callback(BufferUtils::extractBuffer(fbb)); 121 enqueueCommand(Sink::Commands::DeleteEntityCommand, BufferUtils::extractBuffer(fbb));
127} 122}
128 123
129void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) 124void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
130{ 125{
131 entryGenerator([this, bufferType, &exists](const QByteArray &key) { 126 entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) {
132 auto sinkId = Sink::Storage::uidFromKey(key);
133 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); 127 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
134 SinkTrace() << "Checking for removal " << key << remoteId; 128 SinkTrace() << "Checking for removal " << sinkId << remoteId;
135 // If we have no remoteId, the entity hasn't been replayed to the source yet 129 // If we have no remoteId, the entity hasn't been replayed to the source yet
136 if (!remoteId.isEmpty()) { 130 if (!remoteId.isEmpty()) {
137 if (!exists(remoteId)) { 131 if (!exists(remoteId)) {
138 SinkTrace() << "Found a removed entity: " << sinkId; 132 SinkTrace() << "Found a removed entity: " << sinkId;
139 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, 133 deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType);
140 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); });
141 } 134 }
142 } 135 }
143 }); 136 });
144} 137}
145 138
146void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 139void Synchronizer::scanForRemovals(const QByteArray &bufferType, std::function<bool(const QByteArray &remoteId)> exists)
147{ 140{
148 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); 141 scanForRemovals(bufferType,
149 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 142 [this, &bufferType](const std::function<void(const QByteArray &)> &callback) {
150 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); 143 store().readAllUids(bufferType, [callback](const QByteArray &uid) {
151 Q_ASSERT(adaptorFactory); 144 callback(uid);
152 qint64 retrievedRevision = 0; 145 });
153 if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { 146 },
147 exists
148 );
149}
150
151void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
152{
153 store.readLatest(bufferType, sinkId, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &current) {
154 bool changed = false; 154 bool changed = false;
155 for (const auto &property : entity.changedProperties()) { 155 for (const auto &property : entity.changedProperties()) {
156 if (entity.getProperty(property) != current->getProperty(property)) { 156 if (entity.getProperty(property) != current.getProperty(property)) {
157 SinkTrace() << "Property changed " << sinkId << property; 157 SinkTrace() << "Property changed " << sinkId << property;
158 changed = true; 158 changed = true;
159 } 159 }
160 } 160 }
161 if (changed) { 161 if (changed) {
162 SinkTrace() << "Found a modified entity: " << remoteId; 162 SinkTrace() << "Found a modified entity: " << sinkId;
163 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, 163 modifyEntity(sinkId, store.maxRevision(), bufferType, entity);
164 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
165 } 164 }
166 } else { 165 });
167 SinkWarning() << "Failed to get current entity"; 166}
168 } 167
168void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
169{
170 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
171 Storage::EntityStore store(mResourceContext);
172 modifyIfChanged(store, bufferType, sinkId, entity);
169} 173}
170 174
171void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 175void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
172{ 176{
173 SinkTrace() << "Create or modify" << bufferType << remoteId; 177 SinkTrace() << "Create or modify" << bufferType << remoteId;
174 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); 178 Storage::EntityStore store(mResourceContext);
175 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 179 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
176 const auto found = mainDatabase.contains(sinkId); 180 const auto found = store.contains(bufferType, sinkId);
177 if (!found) { 181 if (!found) {
178 SinkTrace() << "Found a new entity: " << remoteId; 182 SinkTrace() << "Found a new entity: " << remoteId;
179 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); 183 createEntity(sinkId, bufferType, entity);
180 Q_ASSERT(adaptorFactory);
181 createEntity(
182 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
183 } else { // modification 184 } else { // modification
184 modify(bufferType, remoteId, entity); 185 modify(bufferType, remoteId, entity);
185 } 186 }
@@ -190,10 +191,9 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
190{ 191{
191 192
192 SinkTrace() << "Create or modify" << bufferType << remoteId; 193 SinkTrace() << "Create or modify" << bufferType << remoteId;
193 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType);
194 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 194 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
195 const auto found = mainDatabase.contains(sinkId); 195 Storage::EntityStore store(mResourceContext);
196 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); 196 const auto found = store.contains(bufferType, sinkId);
197 if (!found) { 197 if (!found) {
198 if (!mergeCriteria.isEmpty()) { 198 if (!mergeCriteria.isEmpty()) {
199 Sink::Query query; 199 Sink::Query query;
@@ -201,7 +201,8 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
201 query.filter(it.key(), it.value()); 201 query.filter(it.key(), it.value());
202 } 202 }
203 bool merge = false; 203 bool merge = false;
204 Sink::EntityReader<DomainType> reader(mResourceType, mResourceInstanceIdentifier, transaction()); 204 Storage::EntityStore store(mResourceContext);
205 Sink::EntityReader<DomainType> reader(store);
205 reader.query(query, 206 reader.query(query,
206 [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{ 207 [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{
207 merge = true; 208 merge = true;
@@ -211,43 +212,21 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
211 }); 212 });
212 if (!merge) { 213 if (!merge) {
213 SinkTrace() << "Found a new entity: " << remoteId; 214 SinkTrace() << "Found a new entity: " << remoteId;
214 createEntity( 215 createEntity(sinkId, bufferType, entity);
215 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
216 } 216 }
217 } else { 217 } else {
218 SinkTrace() << "Found a new entity: " << remoteId; 218 SinkTrace() << "Found a new entity: " << remoteId;
219 createEntity( 219 createEntity(sinkId, bufferType, entity);
220 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
221 } 220 }
222 } else { // modification 221 } else { // modification
223 qint64 retrievedRevision = 0; 222 modifyIfChanged(store, bufferType, sinkId, entity);
224 if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) {
225 bool changed = false;
226 for (const auto &property : entity.changedProperties()) {
227 if (entity.getProperty(property) != current->getProperty(property)) {
228 SinkTrace() << "Property changed " << sinkId << property;
229 changed = true;
230 }
231 }
232 if (changed) {
233 SinkTrace() << "Found a modified entity: " << remoteId;
234 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory,
235 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
236 }
237 } else {
238 SinkWarning() << "Failed to get current entity";
239 }
240 } 223 }
241} 224}
242 225
243template<typename DomainType> 226template<typename DomainType>
244void Synchronizer::modify(const DomainType &entity) 227void Synchronizer::modify(const DomainType &entity)
245{ 228{
246 const auto bufferType = ApplicationDomain::getTypeName<DomainType>(); 229 modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity);
247 const auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType);
248 Q_ASSERT(adaptorFactory);
249 modifyEntity(entity.identifier(), entity.revision(), bufferType, entity, *adaptorFactory,
250 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
251} 230}
252 231
253KAsync::Job<void> Synchronizer::synchronize() 232KAsync::Job<void> Synchronizer::synchronize()
@@ -257,7 +236,6 @@ KAsync::Job<void> Synchronizer::synchronize()
257 mMessageQueue->startTransaction(); 236 mMessageQueue->startTransaction();
258 return synchronizeWithSource().syncThen<void>([this]() { 237 return synchronizeWithSource().syncThen<void>([this]() {
259 mSyncStore.clear(); 238 mSyncStore.clear();
260 mEntityStore.clear();
261 mMessageQueue->commit(); 239 mMessageQueue->commit();
262 mSyncInProgress = false; 240 mSyncInProgress = false;
263 }); 241 });
@@ -266,8 +244,7 @@ KAsync::Job<void> Synchronizer::synchronize()
266void Synchronizer::commit() 244void Synchronizer::commit()
267{ 245{
268 mMessageQueue->commit(); 246 mMessageQueue->commit();
269 mTransaction.abort(); 247 mEntityStore->abortTransaction();
270 mEntityStore.clear();
271 mSyncTransaction.commit(); 248 mSyncTransaction.commit();
272 mSyncStore.clear(); 249 mSyncStore.clear();
273 if (mSyncInProgress) { 250 if (mSyncInProgress) {
@@ -275,20 +252,11 @@ void Synchronizer::commit()
275 } 252 }
276} 253}
277 254
278Sink::Storage::Transaction &Synchronizer::transaction() 255Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction()
279{
280 if (!mTransaction) {
281 SinkTrace() << "Starting transaction";
282 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
283 }
284 return mTransaction;
285}
286
287Sink::Storage::Transaction &Synchronizer::syncTransaction()
288{ 256{
289 if (!mSyncTransaction) { 257 if (!mSyncTransaction) {
290 SinkTrace() << "Starting transaction"; 258 SinkTrace() << "Starting transaction";
291 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); 259 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite);
292 } 260 }
293 return mSyncTransaction; 261 return mSyncTransaction;
294} 262}