diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-28 17:21:47 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-28 17:21:47 +0100 |
commit | 043cd5c9e1c90ba04659b67000b974cf8c35f7ba (patch) | |
tree | 87fa328a404ba7385d8535c6dad8b0f86dc0ab46 /common/pipeline.cpp | |
parent | 81859328bf30c2aeecdf3ee48e5939e0496552fd (diff) | |
download | sink-043cd5c9e1c90ba04659b67000b974cf8c35f7ba.tar.gz sink-043cd5c9e1c90ba04659b67000b974cf8c35f7ba.zip |
Correctly execute modifications and removals
... also if there are intermediate revisions.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 33 |
1 files changed, 20 insertions, 13 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index ae4cc3d..0ce478b 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -233,7 +233,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
233 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 233 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
234 | 234 | ||
235 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | 235 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; |
236 | d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 236 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
237 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 237 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
238 | if (!buffer.isValid()) { | 238 | if (!buffer.isValid()) { |
239 | Warning() << "Read invalid buffer from disk"; | 239 | Warning() << "Read invalid buffer from disk"; |
@@ -328,25 +328,32 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
328 | flatbuffers::FlatBufferBuilder fbb; | 328 | flatbuffers::FlatBufferBuilder fbb; |
329 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | 329 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); |
330 | 330 | ||
331 | storeNewRevision(newRevision, fbb, bufferType, key); | ||
332 | Log() << "Pipeline: deleted entity: "<< newRevision; | ||
333 | |||
334 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 331 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
335 | if (!adaptorFactory) { | 332 | if (!adaptorFactory) { |
336 | Warning() << "no adaptor factory for type " << bufferType; | 333 | Warning() << "no adaptor factory for type " << bufferType; |
337 | return KAsync::error<qint64>(0); | 334 | return KAsync::error<qint64>(0); |
338 | } | 335 | } |
339 | 336 | ||
340 | // d->transaction.openDatabase(bufferType + ".main").scan(key, [this, bufferType, newRevision, adaptorFactory](const QByteArray &, const QByteArray &value) -> bool { | 337 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; |
341 | // auto entity = Akonadi2::GetEntity(value); | 338 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { |
342 | // auto newEntity = adaptorFactory->createAdaptor(*entity); | 339 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
343 | for (auto processor : d->processors[bufferType]) { | 340 | if (!buffer.isValid()) { |
344 | processor->deletedEntity(key, newRevision, Akonadi2::ApplicationDomain::BufferAdaptor(), d->transaction); | 341 | Warning() << "Read invalid buffer from disk"; |
342 | } else { | ||
343 | current = adaptorFactory->createAdaptor(buffer.entity()); | ||
345 | } | 344 | } |
346 | // return false; | 345 | return false; |
347 | // }, [this](const Akonadi2::Storage::Error &error) { | 346 | }, [this](const Akonadi2::Storage::Error &error) { |
348 | // ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 347 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; |
349 | // }); | 348 | }); |
349 | |||
350 | storeNewRevision(newRevision, fbb, bufferType, key); | ||
351 | Log() << "Pipeline: deleted entity: "<< newRevision; | ||
352 | |||
353 | for (auto processor : d->processors[bufferType]) { | ||
354 | processor->deletedEntity(key, newRevision, *current, d->transaction); | ||
355 | } | ||
356 | |||
350 | return KAsync::start<qint64>([newRevision](){ | 357 | return KAsync::start<qint64>([newRevision](){ |
351 | return newRevision; | 358 | return newRevision; |
352 | }); | 359 | }); |