summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-08 10:32:01 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-08 10:32:01 +0200
commitfdeb92ca128e6eb51bbcace0b47519b12a08ce93 (patch)
treef0b4162ca74d5ee1fe6209069c910e41c92de570 /common/pipeline.cpp
parent2df41a3a919f1a131d311112e6fc444eff12c229 (diff)
downloadsink-fdeb92ca128e6eb51bbcace0b47519b12a08ce93.tar.gz
sink-fdeb92ca128e6eb51bbcace0b47519b12a08ce93.zip
Run preprocessors before persising the value.
And allow preprocessors to modify the result.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp58
1 files changed, 18 insertions, 40 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 65a2f5b..637a1b8 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -189,31 +189,23 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
189 auto metadataBuffer = metadataBuilder.Finish(); 189 auto metadataBuffer = metadataBuilder.Finish();
190 FinishMetadataBuffer(metadataFbb, metadataBuffer); 190 FinishMetadataBuffer(metadataFbb, metadataBuffer);
191 191
192 flatbuffers::FlatBufferBuilder fbb;
193 EntityBuffer::assembleEntityBuffer(
194 fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
195
196 d->storeNewRevision(newRevision, fbb, bufferType, key);
197
198 auto adaptorFactory = d->adaptorFactory.value(bufferType); 192 auto adaptorFactory = d->adaptorFactory.value(bufferType);
199 if (!adaptorFactory) { 193 if (!adaptorFactory) {
200 Warning() << "no adaptor factory for type " << bufferType; 194 Warning() << "no adaptor factory for type " << bufferType;
201 return KAsync::error<qint64>(0); 195 return KAsync::error<qint64>(0);
202 } 196 }
203 197
198 auto adaptor = adaptorFactory->createAdaptor(*entity);
199 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties());
200 for (auto processor : d->processors[bufferType]) {
201 processor->newEntity(key, newRevision, *memoryAdaptor, d->transaction);
202 }
203 flatbuffers::FlatBufferBuilder fbb;
204 adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
205
206 d->storeNewRevision(newRevision, fbb, bufferType, key);
207
204 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 208 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
205 Storage::mainDatabase(d->transaction, bufferType)
206 .scan(Storage::assembleKey(key, newRevision),
207 [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool {
208 auto entity = GetEntity(value);
209 Q_ASSERT(entity->resource() || entity->local());
210 auto adaptor = adaptorFactory->createAdaptor(*entity);
211 for (auto processor : d->processors[bufferType]) {
212 processor->newEntity(key, newRevision, *adaptor, d->transaction);
213 }
214 return false;
215 },
216 [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; });
217 return KAsync::start<qint64>([newRevision]() { return newRevision; }); 209 return KAsync::start<qint64>([newRevision]() { return newRevision; });
218} 210}
219 211
@@ -281,9 +273,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
281 return KAsync::error<qint64>(0); 273 return KAsync::error<qint64>(0);
282 } 274 }
283 275
284 // resource and uid don't matter at this point 276 auto newAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(current), current->availableProperties());
285 const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current);
286 auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(existingObject);
287 277
288 // Apply diff 278 // Apply diff
289 // FIXME only apply the properties that are available in the buffer 279 // FIXME only apply the properties that are available in the buffer
@@ -293,19 +283,21 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
293 changeset << property; 283 changeset << property;
294 const auto value = diff->getProperty(property); 284 const auto value = diff->getProperty(property);
295 if (value.isValid()) { 285 if (value.isValid()) {
296 newObject->setProperty(property, value); 286 newAdaptor->setProperty(property, value);
297 } 287 }
298 } 288 }
299 // Altough we only set some properties, we want all to be serialized
300 newObject->setChangedProperties(changeset);
301 289
302 // Remove deletions 290 // Remove deletions
303 if (modifyEntity->deletions()) { 291 if (modifyEntity->deletions()) {
304 for (const auto &property : *modifyEntity->deletions()) { 292 for (const auto &property : *modifyEntity->deletions()) {
305 newObject->setProperty(BufferUtils::extractBuffer(property), QVariant()); 293 newAdaptor->setProperty(BufferUtils::extractBuffer(property), QVariant());
306 } 294 }
307 } 295 }
308 296
297 for (auto processor : d->processors[bufferType]) {
298 processor->modifiedEntity(key, newRevision, *current, *newAdaptor, d->transaction);
299 }
300
309 // Add metadata buffer 301 // Add metadata buffer
310 flatbuffers::FlatBufferBuilder metadataFbb; 302 flatbuffers::FlatBufferBuilder metadataFbb;
311 auto metadataBuilder = MetadataBuilder(metadataFbb); 303 auto metadataBuilder = MetadataBuilder(metadataFbb);
@@ -316,24 +308,10 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
316 FinishMetadataBuffer(metadataFbb, metadataBuffer); 308 FinishMetadataBuffer(metadataFbb, metadataBuffer);
317 309
318 flatbuffers::FlatBufferBuilder fbb; 310 flatbuffers::FlatBufferBuilder fbb;
319 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 311 adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
320 312
321 d->storeNewRevision(newRevision, fbb, bufferType, key); 313 d->storeNewRevision(newRevision, fbb, bufferType, key);
322 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 314 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
323 Storage::mainDatabase(d->transaction, bufferType)
324 .scan(Storage::assembleKey(key, newRevision),
325 [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool {
326 if (value.isEmpty()) {
327 ErrorMsg() << "Read buffer is empty.";
328 }
329 auto entity = GetEntity(value.data());
330 auto newEntity = adaptorFactory->createAdaptor(*entity);
331 for (auto processor : d->processors[bufferType]) {
332 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction);
333 }
334 return false;
335 },
336 [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; });
337 return KAsync::start<qint64>([newRevision]() { return newRevision; }); 315 return KAsync::start<qint64>([newRevision]() { return newRevision; });
338} 316}
339 317