From b43c422a2b1b899ce5ac27a0bc381e8a49f05d86 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 24 Sep 2015 16:45:06 +0200 Subject: Work with revisions in store + pipelinetest Cleanup of revisions, and revision for removed entity is yet missing. --- common/pipeline.cpp | 19 ++-- common/storage_lmdb.cpp | 8 +- tests/CMakeLists.txt | 1 + tests/pipelinetest.cpp | 241 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 260 insertions(+), 9 deletions(-) create mode 100644 tests/pipelinetest.cpp diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 14450aa..4fed41f 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -169,7 +169,11 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder fbb; EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); - d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); + d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()), + [](const Akonadi2::Storage::Error &error) { + Warning() << "Failed to write entity"; + } + ); Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; @@ -198,6 +202,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); Q_ASSERT(modifyEntity); + const qint64 baseRevision = modifyEntity->revision(); //TODO rename modifyEntity->domainType to bufferType const QByteArray bufferType = QByteArray(reinterpret_cast(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); const QByteArray key = QByteArray(reinterpret_cast(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); @@ -224,8 +229,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto diff = adaptorFactory->createAdaptor(*diffEntity); QSharedPointer current; - //FIXME: read the revision that this modification is based on, not just the latest one - storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { Warning() << "Read invalid buffer from disk"; @@ -234,10 +238,9 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) } return false; }, - [](const Storage::Error &error) { - Warning() << "Failed to read value from storage: " << error.message; + [baseRevision](const Storage::Error &error) { + Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); - //TODO error handler if (!current) { Warning() << "Failed to read local value " << key; @@ -275,6 +278,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) //TODO don't overwrite the old entry, but instead store a new revision d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); + Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { @@ -302,10 +306,11 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) const QByteArray bufferType = QByteArray(reinterpret_cast(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); + const qint64 baseRevision = deleteEntity->revision(); //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted //TODO remove all revisions? - d->transaction.openDatabase(bufferType + ".main").remove(key); + d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision)); Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); Log() << "Pipeline: deleted entity: "<< newRevision; diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 3073d37..be5a9da 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -114,7 +114,9 @@ bool Storage::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sVa { if (!d || !d->transaction) { Error error("", ErrorCodes::GenericError, "Not open"); - errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + if (d) { + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + } return false; } const void *keyPtr = sKey.data(); @@ -149,7 +151,9 @@ void Storage::NamedDatabase::remove(const QByteArray &k, { if (!d || !d->transaction) { Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Not open"); - errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + if (d) { + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + } return; } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 251e780..b2201ff 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -44,6 +44,7 @@ auto_tests ( genericresourcetest genericfacadetest resourcecommunicationtest + pipelinetest ) target_link_libraries(dummyresourcetest akonadi2_resource_dummy) diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp new file mode 100644 index 0000000..96448e2 --- /dev/null +++ b/tests/pipelinetest.cpp @@ -0,0 +1,241 @@ +#include + +#include + +#include "event_generated.h" +#include "entity_generated.h" +#include "metadata_generated.h" +#include "createentity_generated.h" +#include "modifyentity_generated.h" +#include "deleteentity_generated.h" +#include "dummyresource/resourcefactory.h" +#include "clientapi.h" +#include "synclistresult.h" +#include "commands.h" +#include "entitybuffer.h" +#include "resourceconfig.h" +#include "pipeline.h" +#include "log.h" +#include "domainadaptor.h" + +class TestEventAdaptorFactory : public DomainTypeAdaptorFactory +{ +public: + TestEventAdaptorFactory() + : DomainTypeAdaptorFactory() + { + } + + virtual ~TestEventAdaptorFactory() {}; +}; + +static void removeFromDisk(const QString &name) +{ + Akonadi2::Storage store(Akonadi2::Store::storageLocation(), name, Akonadi2::Storage::ReadWrite); + store.removeFromDisk(); +} + +static QList getKeys(const QByteArray &dbEnv, const QByteArray &name) +{ + Akonadi2::Storage store(Akonadi2::storageLocation(), dbEnv, Akonadi2::Storage::ReadOnly); + auto transaction = store.createTransaction(Akonadi2::Storage::ReadOnly); + auto db = transaction.openDatabase(name, nullptr, false); + QList result; + db.scan("", [&](const QByteArray &key, const QByteArray &value) { + result << key; + return true; + }); + return result; +} + +static QByteArray getEntity(const QByteArray &dbEnv, const QByteArray &name, const QByteArray &uid) +{ + Akonadi2::Storage store(Akonadi2::storageLocation(), dbEnv, Akonadi2::Storage::ReadOnly); + auto transaction = store.createTransaction(Akonadi2::Storage::ReadOnly); + auto db = transaction.openDatabase(name, nullptr, false); + QByteArray result; + db.scan(uid, [&](const QByteArray &key, const QByteArray &value) { + result = value; + return true; + }); + return result; +} + +flatbuffers::FlatBufferBuilder &createEvent(flatbuffers::FlatBufferBuilder &entityFbb, const QString &s = QString("summary")) +{ + flatbuffers::FlatBufferBuilder eventFbb; + eventFbb.Clear(); + { + Akonadi2::ApplicationDomain::Buffer::EventBuilder eventBuilder(eventFbb); + auto eventLocation = eventBuilder.Finish(); + Akonadi2::ApplicationDomain::Buffer::FinishEventBuffer(eventFbb, eventLocation); + } + + flatbuffers::FlatBufferBuilder localFbb; + { + auto uid = localFbb.CreateString("testuid"); + auto summary = localFbb.CreateString(s.toStdString()); + auto localBuilder = Akonadi2::ApplicationDomain::Buffer::EventBuilder(localFbb); + localBuilder.add_uid(uid); + localBuilder.add_summary(summary); + auto location = localBuilder.Finish(); + Akonadi2::ApplicationDomain::Buffer::FinishEventBuffer(localFbb, location); + } + + Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize()); + return entityFbb; +} + +QByteArray createEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb) +{ + flatbuffers::FlatBufferBuilder fbb; + auto type = fbb.CreateString(Akonadi2::ApplicationDomain::getTypeName().toStdString().data()); + auto delta = fbb.CreateVector(entityFbb.GetBufferPointer(), entityFbb.GetSize()); + Akonadi2::Commands::CreateEntityBuilder builder(fbb); + builder.add_domainType(type); + builder.add_delta(delta); + auto location = builder.Finish(); + Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); + + const QByteArray command(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); + { + flatbuffers::Verifier verifyer(reinterpret_cast(command.data()), command.size()); + Q_ASSERT(Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)); + } + return command; +} + +QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision) +{ + flatbuffers::FlatBufferBuilder fbb; + auto type = fbb.CreateString(Akonadi2::ApplicationDomain::getTypeName().toStdString().data()); + auto id = fbb.CreateString(std::string(uid.constData(), uid.size())); + auto delta = fbb.CreateVector(entityFbb.GetBufferPointer(), entityFbb.GetSize()); + // auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); + Akonadi2::Commands::ModifyEntityBuilder builder(fbb); + builder.add_domainType(type); + builder.add_delta(delta); + builder.add_revision(revision); + builder.add_entityId(id); + auto location = builder.Finish(); + Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location); + + const QByteArray command(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); + { + flatbuffers::Verifier verifyer(reinterpret_cast(command.data()), command.size()); + Q_ASSERT(Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)); + } + return command; +} + +QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision) +{ + flatbuffers::FlatBufferBuilder fbb; + auto type = fbb.CreateString(Akonadi2::ApplicationDomain::getTypeName().toStdString().data()); + auto id = fbb.CreateString(std::string(uid.constData(), uid.size())); + Akonadi2::Commands::DeleteEntityBuilder builder(fbb); + builder.add_domainType(type); + builder.add_revision(revision); + builder.add_entityId(id); + auto location = builder.Finish(); + Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location); + + const QByteArray command(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); + { + flatbuffers::Verifier verifyer(reinterpret_cast(command.data()), command.size()); + Q_ASSERT(Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)); + } + return command; +} + +class PipelineTest : public QObject +{ + Q_OBJECT +private Q_SLOTS: + void initTestCase() + { + Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); + } + + void init() + { + removeFromDisk("org.kde.pipelinetest.instance1"); + } + + void testCreate() + { + flatbuffers::FlatBufferBuilder entityFbb; + auto command = createEntityCommand(createEvent(entityFbb)); + + Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1"); + pipeline.startTransaction(); + pipeline.newEntity(command.constData(), command.size()); + pipeline.commit(); + + auto result = getKeys("org.kde.pipelinetest.instance1", "event.main"); + QCOMPARE(result.size(), 1); + } + + void testModify() + { + flatbuffers::FlatBufferBuilder entityFbb; + auto command = createEntityCommand(createEvent(entityFbb)); + + Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1"); + + auto adaptorFactory = QSharedPointer::create(); + pipeline.setAdaptorFactory("event", adaptorFactory); + + //Create the initial revision + pipeline.startTransaction(); + pipeline.newEntity(command.constData(), command.size()); + pipeline.commit(); + + //Get uid of written entity + auto keys = getKeys("org.kde.pipelinetest.instance1", "event.main"); + QCOMPARE(keys.size(), 1); + const auto key = keys.first(); + const auto uid = Akonadi2::Storage::uidFromKey(key); + + //Execute the modification + entityFbb.Clear(); + auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1); + pipeline.startTransaction(); + pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); + pipeline.commit(); + + //Ensure we've got the new revision with the modification + auto buffer = getEntity("org.kde.pipelinetest.instance1", "event.main", Akonadi2::Storage::assembleKey(uid, 2)); + QVERIFY(!buffer.isEmpty()); + Akonadi2::EntityBuffer entityBuffer(buffer.data(), buffer.size()); + auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); + QCOMPARE(adaptor->getProperty("summary").toString(), QString("summary2")); + } + + void testDelete() + { + flatbuffers::FlatBufferBuilder entityFbb; + auto command = createEntityCommand(createEvent(entityFbb)); + + //Create the initial revision + Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1"); + pipeline.startTransaction(); + pipeline.newEntity(command.constData(), command.size()); + pipeline.commit(); + + // const auto uid = Akonadi2::Storage::uidFromKey(key); + auto result = getKeys("org.kde.pipelinetest.instance1", "event.main"); + QCOMPARE(result.size(), 1); + + const auto uid = Akonadi2::Storage::uidFromKey(result.first()); + + //Delete entity + auto deleteCommand = deleteEntityCommand(uid,1); + pipeline.startTransaction(); + pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()); + pipeline.commit(); + } +}; + +QTEST_MAIN(PipelineTest) +#include "pipelinetest.moc" -- cgit v1.2.3