summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-04-11 15:41:20 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-04-11 15:41:20 +0200
commit5bf7ded65ef517fac6b088342d195392bc09be4c (patch)
tree793eac32ddc018a3d2bb42cb088f4c919f22e0ed /common/pipeline.cpp
parentf6c3c144e60611d2da7ba7aa5b115affe92a57a4 (diff)
downloadsink-5bf7ded65ef517fac6b088342d195392bc09be4c.tar.gz
sink-5bf7ded65ef517fac6b088342d195392bc09be4c.zip
Moved all preprocessing back into the pipeline
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp25
1 files changed, 13 insertions, 12 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 15ed5fc..91437d4 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -175,13 +175,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
175 auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; 175 auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor};
176 o.setChangedProperties(o.availableProperties().toSet()); 176 o.setChangedProperties(o.availableProperties().toSet());
177 177
178 auto preprocess = [&, this](ApplicationDomain::ApplicationDomainType &newEntity) { 178 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(o, o.availableProperties());
179 foreach (const auto &processor, d->processors[bufferType]) { 179 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
180 processor->newEntity(newEntity); 180
181 } 181 foreach (const auto &processor, d->processors[bufferType]) {
182 }; 182 processor->newEntity(newEntity);
183 }
183 184
184 if (!d->entityStore.add(bufferType, o, replayToSource, preprocess)) { 185 if (!d->entityStore.add(bufferType, o, replayToSource)) {
185 return KAsync::error<qint64>(0); 186 return KAsync::error<qint64>(0);
186 } 187 }
187 188
@@ -323,14 +324,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
323 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 324 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
324 SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 325 SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
325 326
326 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { 327 const auto current = d->entityStore.readLatest(bufferType, key);
327 foreach (const auto &processor, d->processors[bufferType]) { 328
328 processor->deletedEntity(oldEntity); 329 foreach (const auto &processor, d->processors[bufferType]) {
329 } 330 processor->deletedEntity(current);
330 }; 331 }
331 332
332 d->revisionChanged = true; 333 d->revisionChanged = true;
333 if (!d->entityStore.remove(bufferType, key, replayToSource, preprocess)) { 334 if (!d->entityStore.remove(bufferType, current, replayToSource)) {
334 return KAsync::error<qint64>(0); 335 return KAsync::error<qint64>(0);
335 } 336 }
336 337