summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-16 14:55:20 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-21 09:02:21 +0200
commit237b9ae4113e7a9f489632296941becb71afdb45 (patch)
tree01cde58f495944f01cad9d282391d4efd2897141 /common/synchronizer.cpp
parent95d11bf0be98a4e3c08502fe23417b800233ce14 (diff)
downloadsink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz
sink-237b9ae4113e7a9f489632296941becb71afdb45.zip
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage. It does a couple of things: * Rename Sink::Storage to Sink::Storage::DataStore to free up the Sink::Storage namespace * Introduce a Sink::ResourceContext to have a single object that can be passed around containing everything that is necessary to operate on a resource. This is a lot better than the multiple separate parameters that we used to pass around all over the place, while still allowing for dependency injection for tests. * Tie storage access together using the new EntityStore that directly works with ApplicationDomainTypes. This gives us a central place where main storage, indexes and buffer adaptors are tied together, which will also give us a place to implement external indexes, such as a fulltextindex using xapian. * Use ApplicationDomainTypes as the default way to pass around entities. Instead of using various ways to pass around entities (buffers, buffer adaptors, ApplicationDomainTypes), only use a single way. The old approach was confusing, and was only done as: * optimization; really shouldn't be necessary and otherwise I'm sure we can find better ways to optimize ApplicationDomainType itself. * a way to account for entities that have multiple buffers, a concept that I no longer deem relevant. While this commit does the bulk of the work to get there, the following commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp148
1 files changed, 58 insertions, 90 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 53db82f..5ddd77c 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -24,7 +24,7 @@
24#include "bufferutils.h" 24#include "bufferutils.h"
25#include "entitystore.h" 25#include "entitystore.h"
26#include "remoteidmap.h" 26#include "remoteidmap.h"
27#include "adaptorfactoryregistry.h" 27#include "entityreader.h"
28#include "createentity_generated.h" 28#include "createentity_generated.h"
29#include "modifyentity_generated.h" 29#include "modifyentity_generated.h"
30#include "deleteentity_generated.h" 30#include "deleteentity_generated.h"
@@ -33,13 +33,12 @@ SINK_DEBUG_AREA("synchronizer")
33 33
34using namespace Sink; 34using namespace Sink;
35 35
36Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) 36Synchronizer::Synchronizer(const Sink::ResourceContext &context)
37 : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), 37 : mResourceContext(context),
38 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), 38 mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)),
39 mResourceType(resourceType), 39 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite)
40 mResourceInstanceIdentifier(resourceInstanceIdentifier)
41{ 40{
42 SinkTrace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; 41 SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
43} 42}
44 43
45Synchronizer::~Synchronizer() 44Synchronizer::~Synchronizer()
@@ -59,11 +58,9 @@ void Synchronizer::enqueueCommand(int commandId, const QByteArray &data)
59 mEnqueue(commandId, data); 58 mEnqueue(commandId, data);
60} 59}
61 60
62EntityStore &Synchronizer::store() 61Storage::EntityStore &Synchronizer::store()
63{ 62{
64 if (!mEntityStore) { 63 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly);
65 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, transaction());
66 }
67 return *mEntityStore; 64 return *mEntityStore;
68} 65}
69 66
@@ -75,13 +72,12 @@ RemoteIdMap &Synchronizer::syncStore()
75 return *mSyncStore; 72 return *mSyncStore;
76} 73}
77 74
78void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 75void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject)
79 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
80{ 76{
81 // These changes are coming from the source 77 // These changes are coming from the source
82 const auto replayToSource = false; 78 const auto replayToSource = false;
83 flatbuffers::FlatBufferBuilder entityFbb; 79 flatbuffers::FlatBufferBuilder entityFbb;
84 adaptorFactory.createBuffer(domainObject, entityFbb); 80 mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb);
85 flatbuffers::FlatBufferBuilder fbb; 81 flatbuffers::FlatBufferBuilder fbb;
86 // This is the resource type and not the domain type 82 // This is the resource type and not the domain type
87 auto entityId = fbb.CreateString(sinkId.toStdString()); 83 auto entityId = fbb.CreateString(sinkId.toStdString());
@@ -89,18 +85,17 @@ void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &buff
89 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 85 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
90 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); 86 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
91 Sink::Commands::FinishCreateEntityBuffer(fbb, location); 87 Sink::Commands::FinishCreateEntityBuffer(fbb, location);
92 callback(BufferUtils::extractBuffer(fbb)); 88 enqueueCommand(Sink::Commands::CreateEntityCommand, BufferUtils::extractBuffer(fbb));
93} 89}
94 90
95void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 91void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject)
96 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
97{ 92{
98 // FIXME removals 93 // FIXME removals
99 QByteArrayList deletedProperties; 94 QByteArrayList deletedProperties;
100 // These changes are coming from the source 95 // These changes are coming from the source
101 const auto replayToSource = false; 96 const auto replayToSource = false;
102 flatbuffers::FlatBufferBuilder entityFbb; 97 flatbuffers::FlatBufferBuilder entityFbb;
103 adaptorFactory.createBuffer(domainObject, entityFbb); 98 mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb);
104 flatbuffers::FlatBufferBuilder fbb; 99 flatbuffers::FlatBufferBuilder fbb;
105 auto entityId = fbb.CreateString(sinkId.toStdString()); 100 auto entityId = fbb.CreateString(sinkId.toStdString());
106 auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties()); 101 auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties());
@@ -110,10 +105,10 @@ void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const
110 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 105 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
111 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties); 106 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties);
112 Sink::Commands::FinishModifyEntityBuffer(fbb, location); 107 Sink::Commands::FinishModifyEntityBuffer(fbb, location);
113 callback(BufferUtils::extractBuffer(fbb)); 108 enqueueCommand(Sink::Commands::ModifyEntityCommand, BufferUtils::extractBuffer(fbb));
114} 109}
115 110
116void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) 111void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType)
117{ 112{
118 // These changes are coming from the source 113 // These changes are coming from the source
119 const auto replayToSource = false; 114 const auto replayToSource = false;
@@ -123,63 +118,69 @@ void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const
123 auto type = fbb.CreateString(bufferType.toStdString()); 118 auto type = fbb.CreateString(bufferType.toStdString());
124 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); 119 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
125 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 120 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
126 callback(BufferUtils::extractBuffer(fbb)); 121 enqueueCommand(Sink::Commands::DeleteEntityCommand, BufferUtils::extractBuffer(fbb));
127} 122}
128 123
129void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) 124void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
130{ 125{
131 entryGenerator([this, bufferType, &exists](const QByteArray &key) { 126 entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) {
132 auto sinkId = Sink::Storage::uidFromKey(key);
133 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); 127 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
134 SinkTrace() << "Checking for removal " << key << remoteId; 128 SinkTrace() << "Checking for removal " << sinkId << remoteId;
135 // If we have no remoteId, the entity hasn't been replayed to the source yet 129 // If we have no remoteId, the entity hasn't been replayed to the source yet
136 if (!remoteId.isEmpty()) { 130 if (!remoteId.isEmpty()) {
137 if (!exists(remoteId)) { 131 if (!exists(remoteId)) {
138 SinkTrace() << "Found a removed entity: " << sinkId; 132 SinkTrace() << "Found a removed entity: " << sinkId;
139 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, 133 deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType);
140 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); });
141 } 134 }
142 } 135 }
143 }); 136 });
144} 137}
145 138
146void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 139void Synchronizer::scanForRemovals(const QByteArray &bufferType, std::function<bool(const QByteArray &remoteId)> exists)
147{ 140{
148 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); 141 scanForRemovals(bufferType,
149 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 142 [this, &bufferType](const std::function<void(const QByteArray &)> &callback) {
150 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); 143 store().readAllUids(bufferType, [callback](const QByteArray &uid) {
151 Q_ASSERT(adaptorFactory); 144 callback(uid);
152 qint64 retrievedRevision = 0; 145 });
153 if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { 146 },
147 exists
148 );
149}
150
151void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
152{
153 store.readLatest(bufferType, sinkId, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &current) {
154 bool changed = false; 154 bool changed = false;
155 for (const auto &property : entity.changedProperties()) { 155 for (const auto &property : entity.changedProperties()) {
156 if (entity.getProperty(property) != current->getProperty(property)) { 156 if (entity.getProperty(property) != current.getProperty(property)) {
157 SinkTrace() << "Property changed " << sinkId << property; 157 SinkTrace() << "Property changed " << sinkId << property;
158 changed = true; 158 changed = true;
159 } 159 }
160 } 160 }
161 if (changed) { 161 if (changed) {
162 SinkTrace() << "Found a modified entity: " << remoteId; 162 SinkTrace() << "Found a modified entity: " << sinkId;
163 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, 163 modifyEntity(sinkId, store.maxRevision(), bufferType, entity);
164 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
165 } 164 }
166 } else { 165 });
167 SinkWarning() << "Failed to get current entity"; 166}
168 } 167
168void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
169{
170 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
171 Storage::EntityStore store(mResourceContext);
172 modifyIfChanged(store, bufferType, sinkId, entity);
169} 173}
170 174
171void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 175void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
172{ 176{
173 SinkTrace() << "Create or modify" << bufferType << remoteId; 177 SinkTrace() << "Create or modify" << bufferType << remoteId;
174 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); 178 Storage::EntityStore store(mResourceContext);
175 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 179 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
176 const auto found = mainDatabase.contains(sinkId); 180 const auto found = store.contains(bufferType, sinkId);
177 if (!found) { 181 if (!found) {
178 SinkTrace() << "Found a new entity: " << remoteId; 182 SinkTrace() << "Found a new entity: " << remoteId;
179 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); 183 createEntity(sinkId, bufferType, entity);
180 Q_ASSERT(adaptorFactory);
181 createEntity(
182 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
183 } else { // modification 184 } else { // modification
184 modify(bufferType, remoteId, entity); 185 modify(bufferType, remoteId, entity);
185 } 186 }
@@ -190,10 +191,9 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
190{ 191{
191 192
192 SinkTrace() << "Create or modify" << bufferType << remoteId; 193 SinkTrace() << "Create or modify" << bufferType << remoteId;
193 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType);
194 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 194 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
195 const auto found = mainDatabase.contains(sinkId); 195 Storage::EntityStore store(mResourceContext);
196 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); 196 const auto found = store.contains(bufferType, sinkId);
197 if (!found) { 197 if (!found) {
198 if (!mergeCriteria.isEmpty()) { 198 if (!mergeCriteria.isEmpty()) {
199 Sink::Query query; 199 Sink::Query query;
@@ -201,7 +201,8 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
201 query.filter(it.key(), it.value()); 201 query.filter(it.key(), it.value());
202 } 202 }
203 bool merge = false; 203 bool merge = false;
204 Sink::EntityReader<DomainType> reader(mResourceType, mResourceInstanceIdentifier, transaction()); 204 Storage::EntityStore store(mResourceContext);
205 Sink::EntityReader<DomainType> reader(store);
205 reader.query(query, 206 reader.query(query,
206 [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{ 207 [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{
207 merge = true; 208 merge = true;
@@ -211,43 +212,21 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
211 }); 212 });
212 if (!merge) { 213 if (!merge) {
213 SinkTrace() << "Found a new entity: " << remoteId; 214 SinkTrace() << "Found a new entity: " << remoteId;
214 createEntity( 215 createEntity(sinkId, bufferType, entity);
215 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
216 } 216 }
217 } else { 217 } else {
218 SinkTrace() << "Found a new entity: " << remoteId; 218 SinkTrace() << "Found a new entity: " << remoteId;
219 createEntity( 219 createEntity(sinkId, bufferType, entity);
220 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
221 } 220 }
222 } else { // modification 221 } else { // modification
223 qint64 retrievedRevision = 0; 222 modifyIfChanged(store, bufferType, sinkId, entity);
224 if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) {
225 bool changed = false;
226 for (const auto &property : entity.changedProperties()) {
227 if (entity.getProperty(property) != current->getProperty(property)) {
228 SinkTrace() << "Property changed " << sinkId << property;
229 changed = true;
230 }
231 }
232 if (changed) {
233 SinkTrace() << "Found a modified entity: " << remoteId;
234 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory,
235 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
236 }
237 } else {
238 SinkWarning() << "Failed to get current entity";
239 }
240 } 223 }
241} 224}
242 225
243template<typename DomainType> 226template<typename DomainType>
244void Synchronizer::modify(const DomainType &entity) 227void Synchronizer::modify(const DomainType &entity)
245{ 228{
246 const auto bufferType = ApplicationDomain::getTypeName<DomainType>(); 229 modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity);
247 const auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType);
248 Q_ASSERT(adaptorFactory);
249 modifyEntity(entity.identifier(), entity.revision(), bufferType, entity, *adaptorFactory,
250 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
251} 230}
252 231
253KAsync::Job<void> Synchronizer::synchronize() 232KAsync::Job<void> Synchronizer::synchronize()
@@ -257,7 +236,6 @@ KAsync::Job<void> Synchronizer::synchronize()
257 mMessageQueue->startTransaction(); 236 mMessageQueue->startTransaction();
258 return synchronizeWithSource().syncThen<void>([this]() { 237 return synchronizeWithSource().syncThen<void>([this]() {
259 mSyncStore.clear(); 238 mSyncStore.clear();
260 mEntityStore.clear();
261 mMessageQueue->commit(); 239 mMessageQueue->commit();
262 mSyncInProgress = false; 240 mSyncInProgress = false;
263 }); 241 });
@@ -266,8 +244,7 @@ KAsync::Job<void> Synchronizer::synchronize()
266void Synchronizer::commit() 244void Synchronizer::commit()
267{ 245{
268 mMessageQueue->commit(); 246 mMessageQueue->commit();
269 mTransaction.abort(); 247 mEntityStore->abortTransaction();
270 mEntityStore.clear();
271 mSyncTransaction.commit(); 248 mSyncTransaction.commit();
272 mSyncStore.clear(); 249 mSyncStore.clear();
273 if (mSyncInProgress) { 250 if (mSyncInProgress) {
@@ -275,20 +252,11 @@ void Synchronizer::commit()
275 } 252 }
276} 253}
277 254
278Sink::Storage::Transaction &Synchronizer::transaction() 255Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction()
279{
280 if (!mTransaction) {
281 SinkTrace() << "Starting transaction";
282 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
283 }
284 return mTransaction;
285}
286
287Sink::Storage::Transaction &Synchronizer::syncTransaction()
288{ 256{
289 if (!mSyncTransaction) { 257 if (!mSyncTransaction) {
290 SinkTrace() << "Starting transaction"; 258 SinkTrace() << "Starting transaction";
291 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); 259 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite);
292 } 260 }
293 return mSyncTransaction; 261 return mSyncTransaction;
294} 262}