summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp33
1 files changed, 20 insertions, 13 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index ae4cc3d..0ce478b 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -233,7 +233,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
233 auto diff = adaptorFactory->createAdaptor(*diffEntity); 233 auto diff = adaptorFactory->createAdaptor(*diffEntity);
234 234
235 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; 235 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
236 d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 236 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
237 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 237 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
238 if (!buffer.isValid()) { 238 if (!buffer.isValid()) {
239 Warning() << "Read invalid buffer from disk"; 239 Warning() << "Read invalid buffer from disk";
@@ -328,25 +328,32 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
328 flatbuffers::FlatBufferBuilder fbb; 328 flatbuffers::FlatBufferBuilder fbb;
329 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 329 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
330 330
331 storeNewRevision(newRevision, fbb, bufferType, key);
332 Log() << "Pipeline: deleted entity: "<< newRevision;
333
334 auto adaptorFactory = d->adaptorFactory.value(bufferType); 331 auto adaptorFactory = d->adaptorFactory.value(bufferType);
335 if (!adaptorFactory) { 332 if (!adaptorFactory) {
336 Warning() << "no adaptor factory for type " << bufferType; 333 Warning() << "no adaptor factory for type " << bufferType;
337 return KAsync::error<qint64>(0); 334 return KAsync::error<qint64>(0);
338 } 335 }
339 336
340 // d->transaction.openDatabase(bufferType + ".main").scan(key, [this, bufferType, newRevision, adaptorFactory](const QByteArray &, const QByteArray &value) -> bool { 337 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
341 // auto entity = Akonadi2::GetEntity(value); 338 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
342 // auto newEntity = adaptorFactory->createAdaptor(*entity); 339 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
343 for (auto processor : d->processors[bufferType]) { 340 if (!buffer.isValid()) {
344 processor->deletedEntity(key, newRevision, Akonadi2::ApplicationDomain::BufferAdaptor(), d->transaction); 341 Warning() << "Read invalid buffer from disk";
342 } else {
343 current = adaptorFactory->createAdaptor(buffer.entity());
345 } 344 }
346 // return false; 345 return false;
347 // }, [this](const Akonadi2::Storage::Error &error) { 346 }, [this](const Akonadi2::Storage::Error &error) {
348 // ErrorMsg() << "Failed to find value in pipeline: " << error.message; 347 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
349 // }); 348 });
349
350 storeNewRevision(newRevision, fbb, bufferType, key);
351 Log() << "Pipeline: deleted entity: "<< newRevision;
352
353 for (auto processor : d->processors[bufferType]) {
354 processor->deletedEntity(key, newRevision, *current, d->transaction);
355 }
356
350 return KAsync::start<qint64>([newRevision](){ 357 return KAsync::start<qint64>([newRevision](){
351 return newRevision; 358 return newRevision;
352 }); 359 });