summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-04-11 15:16:26 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-04-11 15:16:26 +0200
commitf6c3c144e60611d2da7ba7aa5b115affe92a57a4 (patch)
tree4e4b01382d7c2893d4d1d14489506e3b0066fce9
parentadb11fd81404b9ab3b01975ed93babe12a22dee4 (diff)
downloadsink-f6c3c144e60611d2da7ba7aa5b115affe92a57a4.tar.gz
sink-f6c3c144e60611d2da7ba7aa5b115affe92a57a4.zip
Move the preprocssing back out of entitystore into the pipeline.
This is where this really belongs, only the indexing is part of storage. This is necessary so preprocessors can move entities as well.
-rw-r--r--common/pipeline.cpp93
-rw-r--r--common/storage/entitystore.cpp34
-rw-r--r--common/storage/entitystore.h4
3 files changed, 68 insertions, 63 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 7f836c4..15ed5fc 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -195,6 +195,11 @@ struct CreateHelper {
195 } 195 }
196}; 196};
197 197
198static KAsync::Job<void> create(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &newEntity)
199{
200 return TypeHelper<CreateHelper>{type}.operator()<KAsync::Job<void>, const ApplicationDomain::ApplicationDomainType&>(newEntity);
201}
202
198KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 203KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
199{ 204{
200 d->transactionItemCount++; 205 d->transactionItemCount++;
@@ -248,64 +253,52 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
248 deletions = BufferUtils::fromVector(*modifyEntity->deletions()); 253 deletions = BufferUtils::fromVector(*modifyEntity->deletions());
249 } 254 }
250 255
251 if (modifyEntity->targetResource()) { 256 const auto current = d->entityStore.readLatest(bufferType, diff.identifier());
252 auto isMove = modifyEntity->removeEntity(); 257 if (current.identifier().isEmpty()) {
253 auto targetResource = BufferUtils::extractBuffer(modifyEntity->targetResource()); 258 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
254 auto changeset = diff.changedProperties(); 259 return KAsync::error<qint64>(0);
255 const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); 260 }
256 if (current.identifier().isEmpty()) {
257 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
258 return KAsync::error<qint64>(0);
259 }
260 261
261 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryCopy<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); 262 auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions);
262 263
263 // Apply diff 264 bool isMove = false;
264 for (const auto &property : changeset) { 265 if (modifyEntity->targetResource()) {
265 const auto value = diff.getProperty(property); 266 isMove = modifyEntity->removeEntity();
266 if (value.isValid()) { 267 newEntity.setResource(BufferUtils::extractBuffer(modifyEntity->targetResource()));
267 newEntity.setProperty(property, value); 268 }
268 }
269 }
270 269
271 // Remove deletions 270 foreach (const auto &processor, d->processors[bufferType]) {
272 for (const auto &property : deletions) { 271 processor->modifiedEntity(current, newEntity);
273 newEntity.setProperty(property, QVariant()); 272 }
274 }
275 newEntity.setResource(targetResource);
276 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
277 273
278 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; 274 //The entity is either being copied or moved
279 auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); 275 if (newEntity.resourceInstanceIdentifier() != d->resourceContext.resourceInstanceIdentifier) {
280 job = job.then([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { 276 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier();
281 if (!error) { 277 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
282 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; 278 return create(bufferType, newEntity)
283 if (isMove) { 279 .then([=](const KAsync::Error &error) {
284 flatbuffers::FlatBufferBuilder fbb; 280 if (!error) {
285 auto entityId = fbb.CreateString(current.identifier()); 281 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull";
286 auto type = fbb.CreateString(bufferType); 282 if (isMove) {
287 auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); 283 flatbuffers::FlatBufferBuilder fbb;
288 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 284 auto entityId = fbb.CreateString(current.identifier());
289 const auto data = BufferUtils::extractBuffer(fbb); 285 auto type = fbb.CreateString(bufferType);
290 deletedEntity(data, data.size()).exec(); 286 auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true);
287 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
288 const auto data = BufferUtils::extractBuffer(fbb);
289 deletedEntity(data, data.size()).exec();
290 }
291 } else {
292 SinkErrorCtx(d->logCtx) << "Failed to move entity " << newEntity.identifier() << " to resource " << newEntity.resourceInstanceIdentifier();
291 } 293 }
292 } else { 294 })
293 SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); 295 .then([this] {
294 } 296 return d->entityStore.maxRevision();
295 }); 297 });
296 return job.then([this] {
297 return d->entityStore.maxRevision();
298 });
299 } 298 }
300 299
301 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) {
302 foreach (const auto &processor, d->processors[bufferType]) {
303 processor->modifiedEntity(oldEntity, newEntity);
304 }
305 };
306
307 d->revisionChanged = true; 300 d->revisionChanged = true;
308 if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { 301 if (!d->entityStore.modify(bufferType, current, newEntity, replayToSource)) {
309 return KAsync::error<qint64>(0); 302 return KAsync::error<qint64>(0);
310 } 303 }
311 304
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index 4afb407..3ef8784 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -209,22 +209,15 @@ bool EntityStore::add(const QByteArray &type, const ApplicationDomain::Applicati
209 return true; 209 return true;
210} 210}
211 211
212bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &preprocess) 212ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const
213{ 213{
214 auto changeset = diff.changedProperties();
215 const auto current = readLatest(type, diff.identifier());
216 if (current.identifier().isEmpty()) {
217 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
218 return false;
219 }
220
221 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); 214 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties());
222 215
223 SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; 216 SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity;
224 217
225 // Apply diff 218 // Apply diff
226 //SinkTrace() << "Applying changed properties: " << changeset; 219 //SinkTrace() << "Applying changed properties: " << changeset;
227 for (const auto &property : changeset) { 220 for (const auto &property : diff.changedProperties()) {
228 const auto value = diff.getProperty(property); 221 const auto value = diff.getProperty(property);
229 if (value.isValid()) { 222 if (value.isValid()) {
230 //SinkTrace() << "Setting property: " << property; 223 //SinkTrace() << "Setting property: " << property;
@@ -237,8 +230,25 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
237 //SinkTrace() << "Removing property: " << property; 230 //SinkTrace() << "Removing property: " << property;
238 newEntity.setProperty(property, QVariant()); 231 newEntity.setProperty(property, QVariant());
239 } 232 }
233 return newEntity;
234}
235
236bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource)
237{
238 const auto current = readLatest(type, diff.identifier());
239 if (current.identifier().isEmpty()) {
240 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
241 return false;
242 }
243
244 auto newEntity = applyDiff(type, current, diff, deletions);
245 return modify(type, current, newEntity, replayToSource);
246}
247
248bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource)
249{
250 SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity;
240 251
241 preprocess(current, newEntity);
242 d->typeIndex(type).remove(current.identifier(), current, d->transaction); 252 d->typeIndex(type).remove(current.identifier(), current, d->transaction);
243 d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction); 253 d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction);
244 254
@@ -250,7 +260,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
250 flatbuffers::FlatBufferBuilder metadataFbb; 260 flatbuffers::FlatBufferBuilder metadataFbb;
251 { 261 {
252 //We add availableProperties to account for the properties that have been changed by the preprocessors 262 //We add availableProperties to account for the properties that have been changed by the preprocessors
253 auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newEntity.changedProperties()); 263 auto modifiedProperties = BufferUtils::toVector(metadataFbb, newEntity.changedProperties());
254 auto metadataBuilder = MetadataBuilder(metadataFbb); 264 auto metadataBuilder = MetadataBuilder(metadataFbb);
255 metadataBuilder.add_revision(newRevision); 265 metadataBuilder.add_revision(newRevision);
256 metadataBuilder.add_operation(Operation_Modification); 266 metadataBuilder.add_operation(Operation_Modification);
@@ -259,7 +269,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
259 auto metadataBuffer = metadataBuilder.Finish(); 269 auto metadataBuffer = metadataBuilder.Finish();
260 FinishMetadataBuffer(metadataFbb, metadataBuffer); 270 FinishMetadataBuffer(metadataFbb, metadataBuffer);
261 } 271 }
262 SinkTraceCtx(d->logCtx) << "Changed properties: " << changeset + newEntity.changedProperties(); 272 SinkTraceCtx(d->logCtx) << "Changed properties: " << newEntity.changedProperties();
263 273
264 newEntity.setChangedProperties(newEntity.availableProperties().toSet()); 274 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
265 275
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h
index 46410cd..ddb4ef9 100644
--- a/common/storage/entitystore.h
+++ b/common/storage/entitystore.h
@@ -44,9 +44,11 @@ public:
44 44
45 //Only the pipeline may call the following functions outside of tests 45 //Only the pipeline may call the following functions outside of tests
46 bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); 46 bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &);
47 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &); 47 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource);
48 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource);
48 bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); 49 bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &);
49 bool cleanupRevisions(qint64 revision); 50 bool cleanupRevisions(qint64 revision);
51 ApplicationDomain::ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const;
50 52
51 void startTransaction(Sink::Storage::DataStore::AccessMode); 53 void startTransaction(Sink::Storage::DataStore::AccessMode);
52 void commitTransaction(); 54 void commitTransaction();