From 043cd5c9e1c90ba04659b67000b974cf8c35f7ba Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 28 Oct 2015 17:21:47 +0100 Subject: Correctly execute modifications and removals ... also if there are intermediate revisions. --- common/pipeline.cpp | 33 ++++++++++++++++++------------- tests/pipelinetest.cpp | 53 ++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 71 insertions(+), 15 deletions(-) diff --git a/common/pipeline.cpp b/common/pipeline.cpp index ae4cc3d..0ce478b 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -233,7 +233,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto diff = adaptorFactory->createAdaptor(*diffEntity); QSharedPointer current; - d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + d->transaction.openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { Warning() << "Read invalid buffer from disk"; @@ -328,25 +328,32 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder fbb; EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); - storeNewRevision(newRevision, fbb, bufferType, key); - Log() << "Pipeline: deleted entity: "<< newRevision; - auto adaptorFactory = d->adaptorFactory.value(bufferType); if (!adaptorFactory) { Warning() << "no adaptor factory for type " << bufferType; return KAsync::error(0); } - // d->transaction.openDatabase(bufferType + ".main").scan(key, [this, bufferType, newRevision, adaptorFactory](const QByteArray &, const QByteArray &value) -> bool { - // auto entity = Akonadi2::GetEntity(value); - // auto newEntity = adaptorFactory->createAdaptor(*entity); - for (auto processor : d->processors[bufferType]) { - processor->deletedEntity(key, newRevision, Akonadi2::ApplicationDomain::BufferAdaptor(), d->transaction); + QSharedPointer current; + d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { + Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + current = adaptorFactory->createAdaptor(buffer.entity()); } - // return false; - // }, [this](const Akonadi2::Storage::Error &error) { - // ErrorMsg() << "Failed to find value in pipeline: " << error.message; - // }); + return false; + }, [this](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Failed to find value in pipeline: " << error.message; + }); + + storeNewRevision(newRevision, fbb, bufferType, key); + Log() << "Pipeline: deleted entity: "<< newRevision; + + for (auto processor : d->processors[bufferType]) { + processor->deletedEntity(key, newRevision, *current, d->transaction); + } + return KAsync::start([newRevision](){ return newRevision; }); diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 0b4c13e..47090a8 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp @@ -157,6 +157,7 @@ public: { deletedUids << uid; deletedRevisions << revision; + deletedSummaries << oldEntity.getProperty("summary").toByteArray(); } QList newUids; @@ -165,6 +166,7 @@ public: QList modifiedRevisions; QList deletedUids; QList deletedRevisions; + QList deletedSummaries; }; /** @@ -245,18 +247,63 @@ private Q_SLOTS: QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 1); } - void testDelete() + void testModifyWithUnrelatedOperationInbetween() { flatbuffers::FlatBufferBuilder entityFbb; auto command = createEntityCommand(createEvent(entityFbb)); + Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1"); + + auto adaptorFactory = QSharedPointer::create(); + pipeline.setAdaptorFactory("event", adaptorFactory); + //Create the initial revision + pipeline.startTransaction(); + pipeline.newEntity(command.constData(), command.size()); + pipeline.commit(); + + //Get uid of written entity + auto keys = getKeys("org.kde.pipelinetest.instance1", "event.main"); + QCOMPARE(keys.size(), 1); + const auto uid = Akonadi2::Storage::uidFromKey(keys.first()); + + + //Create another operation inbetween + { + entityFbb.Clear(); + auto command = createEntityCommand(createEvent(entityFbb)); + pipeline.startTransaction(); + pipeline.newEntity(command.constData(), command.size()); + pipeline.commit(); + } + + //Execute the modification on revision 2 + entityFbb.Clear(); + auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 2); + pipeline.startTransaction(); + pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); + pipeline.commit(); + + //Ensure we've got the new revision with the modification + auto buffer = getEntity("org.kde.pipelinetest.instance1", "event.main", Akonadi2::Storage::assembleKey(uid, 3)); + QVERIFY(!buffer.isEmpty()); + Akonadi2::EntityBuffer entityBuffer(buffer.data(), buffer.size()); + auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); + QCOMPARE(adaptor->getProperty("summary").toString(), QString("summary2")); + } + + void testDelete() + { + flatbuffers::FlatBufferBuilder entityFbb; + auto command = createEntityCommand(createEvent(entityFbb)); Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1"); + pipeline.setAdaptorFactory("event", QSharedPointer::create()); + + //Create the initial revision pipeline.startTransaction(); pipeline.newEntity(command.constData(), command.size()); pipeline.commit(); - // const auto uid = Akonadi2::Storage::uidFromKey(key); auto result = getKeys("org.kde.pipelinetest.instance1", "event.main"); QCOMPARE(result.size(), 1); @@ -322,8 +369,10 @@ private Q_SLOTS: pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()); QCOMPARE(testProcessor.deletedUids.size(), 1); QCOMPARE(testProcessor.deletedUids.size(), 1); + QCOMPARE(testProcessor.deletedSummaries.size(), 1); //Key doesn't contain revision and is just the uid QCOMPARE(testProcessor.deletedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.deletedUids.at(0))); + QCOMPARE(testProcessor.deletedSummaries.at(0), QByteArray("summary2")); } } }; -- cgit v1.2.3