From c3f6e72c2d46906a4699127b558ca248729ce577 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 29 Sep 2015 00:52:07 +0200 Subject: Revision cleanup --- common/genericresource.cpp | 31 ++++++++++++++------- common/metadata.fbs | 3 +++ common/pipeline.cpp | 67 +++++++++++++++++++++++++++++++++------------- common/pipeline.h | 12 ++++++--- common/storage.h | 5 ++++ common/storage_common.cpp | 42 +++++++++++++++++++++++++++++ tests/pipelinetest.cpp | 21 ++++++++++++++- 7 files changed, 149 insertions(+), 32 deletions(-) diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 3ffc56b..4abcecd 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -60,7 +60,7 @@ private slots: }).exec(); } - KAsync::Job processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) + KAsync::Job processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) { Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); //Throw command into appropriate pipeline @@ -72,25 +72,27 @@ private slots: case Akonadi2::Commands::CreateEntityCommand: return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); default: - return KAsync::error(-1, "Unhandled command"); + return KAsync::error(-1, "Unhandled command"); } - return KAsync::null(); + return KAsync::null(); } - KAsync::Job processQueuedCommand(const QByteArray &data) + KAsync::Job processQueuedCommand(const QByteArray &data) { flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { Warning() << "invalid buffer"; - return KAsync::error(1, "Invalid Buffer"); + // return KAsync::error(1, "Invalid Buffer"); } auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData()); const auto commandId = queuedCommand->commandId(); Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); - return processQueuedCommand(queuedCommand).then( - [commandId]() { + return processQueuedCommand(queuedCommand).then( + [commandId](qint64 createdRevision) -> qint64 { Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); - }, + return createdRevision; + } + , [](int errorCode, QString errorMessage) { //FIXME propagate error, we didn't handle it Warning() << "Error while processing queue command: " << errorMessage; @@ -106,8 +108,17 @@ private slots: }).then(KAsync::dowhile( [queue]() { return !queue->isEmpty(); }, [this, queue](KAsync::Future &future) { - queue->dequeueBatch(100, [this](const QByteArray &data) { - return processQueuedCommand(data); + const int batchSize = 100; + queue->dequeueBatch(batchSize, [this](const QByteArray &data) { + return KAsync::start([this, data](KAsync::Future &future) { + processQueuedCommand(data).then([&future, this](qint64 createdRevision) { + Trace() << "Created revision " << createdRevision; + //We don't have a writeback yet, so we cleanup revisions immediately + //TODO: only cleanup once writeback is done + mPipeline->cleanupRevision(createdRevision); + future.setFinished(); + }).exec(); + }); } ).then([&future, queue](){ future.setFinished(); diff --git a/common/metadata.fbs b/common/metadata.fbs index bb1163d..1455238 100644 --- a/common/metadata.fbs +++ b/common/metadata.fbs @@ -1,9 +1,12 @@ namespace Akonadi2; +enum Operation : byte { Creation = 1, Modification, Removal } + table Metadata { revision: ulong; processed: bool = true; processingProgress: [string]; + operation: Operation = Modification; } root_type Metadata; diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 4fed41f..c108540 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -124,7 +124,7 @@ void Pipeline::null() // state.step(); } -KAsync::Job Pipeline::newEntity(void const *command, size_t size) +KAsync::Job Pipeline::newEntity(void const *command, size_t size) { Log() << "Pipeline: New Entity"; @@ -137,7 +137,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { Warning() << "invalid buffer, not a create entity buffer"; - return KAsync::error(); + return KAsync::error(0); } } auto createEntity = Akonadi2::Commands::GetCreateEntity(command); @@ -148,13 +148,13 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) flatbuffers::Verifier verifyer(reinterpret_cast(createEntity->delta()->Data()), createEntity->delta()->size()); if (!Akonadi2::VerifyEntityBuffer(verifyer)) { Warning() << "invalid buffer, not an entity buffer"; - return KAsync::error(); + return KAsync::error(0); } } auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); if (!entity->resource()->size() && !entity->local()->size()) { Warning() << "No local and no resource buffer while trying to create entity."; - return KAsync::error(); + return KAsync::error(0); } //Add metadata buffer @@ -175,10 +175,12 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) } ); Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); + Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType); Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; - return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { - PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future]() { + return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { + PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future, newRevision]() { + future.setValue(newRevision); future.setFinished(); }, bufferType); d->activePipelines << state; @@ -186,7 +188,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) }); } -KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) +KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) { Log() << "Pipeline: Modified Entity"; @@ -196,7 +198,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) { Warning() << "invalid buffer, not a modify entity buffer"; - return KAsync::error(); + return KAsync::error(0); } } auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); @@ -208,20 +210,20 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) const QByteArray key = QByteArray(reinterpret_cast(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); if (bufferType.isEmpty() || key.isEmpty()) { Warning() << "entity type or key " << bufferType << key; - return KAsync::error(); + return KAsync::error(0); } { flatbuffers::Verifier verifyer(reinterpret_cast(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); if (!Akonadi2::VerifyEntityBuffer(verifyer)) { Warning() << "invalid buffer, not an entity buffer"; - return KAsync::error(); + return KAsync::error(0); } } auto adaptorFactory = d->adaptorFactory.value(bufferType); if (!adaptorFactory) { Warning() << "no adaptor factory for type " << bufferType; - return KAsync::error(); + return KAsync::error(0); } auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data()); @@ -244,7 +246,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) if (!current) { Warning() << "Failed to read local value " << key; - return KAsync::error(); + return KAsync::error(0); } //resource and uid don't matter at this point @@ -278,10 +280,12 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) //TODO don't overwrite the old entry, but instead store a new revision d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); + Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType); Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; - return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { - PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { + return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { + PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future, newRevision]() { + future.setValue(newRevision); future.setFinished(); }, bufferType); d->activePipelines << state; @@ -289,7 +293,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) }); } -KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) +KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) { Log() << "Pipeline: Deleted Entity"; @@ -299,7 +303,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) { Warning() << "invalid buffer, not a delete entity buffer"; - return KAsync::error(); + return KAsync::error(0); } } auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); @@ -312,10 +316,12 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) //TODO remove all revisions? d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision)); Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); + Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType); Log() << "Pipeline: deleted entity: "<< newRevision; - return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { - PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future](){ + return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { + PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future, newRevision](){ + future.setValue(newRevision); future.setFinished(); }, bufferType); d->activePipelines << state; @@ -323,6 +329,31 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) }); } +void Pipeline::cleanupRevision(qint64 revision) +{ + const auto uid = Akonadi2::Storage::getUidFromRevision(d->transaction, revision); + const auto bufferType = Akonadi2::Storage::getTypeFromRevision(d->transaction, revision); + Trace() << "Cleaning up revision " << revision << uid << bufferType; + d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { + Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + const auto metadata = flatbuffers::GetRoot(buffer.metadataBuffer()); + const qint64 rev = metadata->revision(); + //Remove old revisions, and the current if the entity has already been removed + if (rev < revision || metadata->operation() == Akonadi2::Operation_Removal) { + Akonadi2::Storage::removeRevision(d->transaction, rev); + d->transaction.openDatabase(bufferType + ".main").remove(key); + } + } + + return true; + }, [](const Akonadi2::Storage::Error &error) { + Warning() << "Error while reading: " << error.message; + }, true); +} + void Pipeline::pipelineStepped(const PipelineState &state) { scheduleStep(); diff --git a/common/pipeline.h b/common/pipeline.h index 573af73..89232d0 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -58,9 +58,15 @@ public: void null(); void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); - KAsync::Job newEntity(void const *command, size_t size); - KAsync::Job modifiedEntity(void const *command, size_t size); - KAsync::Job deletedEntity(void const *command, size_t size); + KAsync::Job newEntity(void const *command, size_t size); + KAsync::Job modifiedEntity(void const *command, size_t size); + KAsync::Job deletedEntity(void const *command, size_t size); + /* + * Cleans up a single revision. + * + * This has to be called for every revision in consecutive order. + */ + void cleanupRevision(qint64 revision); Q_SIGNALS: void revisionUpdated(qint64); diff --git a/common/storage.h b/common/storage.h index 98b12ed..9459f04 100644 --- a/common/storage.h +++ b/common/storage.h @@ -168,6 +168,11 @@ public: static qint64 maxRevision(const Akonadi2::Storage::Transaction &); static void setMaxRevision(Akonadi2::Storage::Transaction &, qint64 revision); + static QByteArray getUidFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision); + static QByteArray getTypeFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision); + static void recordRevision(Akonadi2::Storage::Transaction &, qint64 revision, const QByteArray &uid, const QByteArray &type); + static void removeRevision(Akonadi2::Storage::Transaction &, qint64 revision); + bool exists() const; static bool isInternalKey(const char *key); diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 28fb4c2..dc02aec 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp @@ -74,6 +74,48 @@ qint64 Storage::maxRevision(const Akonadi2::Storage::Transaction &transaction) return r; } +QByteArray Storage::getUidFromRevision(const Akonadi2::Storage::Transaction &transaction, qint64 revision) +{ + QByteArray uid; + transaction.openDatabase("revisions").scan(QByteArray::number(revision), [&](const QByteArray &, const QByteArray &value) -> bool { + uid = value; + return false; + }, [](const Error &error){ + if (error.code != ErrorCodes::NotFound) { + //FIXME + // defaultErrorHandler()(error); + } + }); + return uid; +} + +QByteArray Storage::getTypeFromRevision(const Akonadi2::Storage::Transaction &transaction, qint64 revision) +{ + QByteArray type; + transaction.openDatabase("revisionType").scan(QByteArray::number(revision), [&](const QByteArray &, const QByteArray &value) -> bool { + type = value; + return false; + }, [](const Error &error){ + if (error.code != ErrorCodes::NotFound) { + //FIXME + // defaultErrorHandler()(error); + } + }); + return type; +} + +void Storage::recordRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision, const QByteArray &uid, const QByteArray &type) +{ + //TODO use integerkeys + transaction.openDatabase("revisions").write(QByteArray::number(revision), uid); + transaction.openDatabase("revisionType").write(QByteArray::number(revision), type); +} + +void Storage::removeRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision) +{ + transaction.openDatabase("revisions").remove(QByteArray::number(revision)); +} + bool Storage::isInternalKey(const char *key) { return key && strncmp(key, s_internalPrefix, s_internalPrefixSize) == 0; diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 96448e2..5dede64 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp @@ -210,6 +210,17 @@ private Q_SLOTS: Akonadi2::EntityBuffer entityBuffer(buffer.data(), buffer.size()); auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); QCOMPARE(adaptor->getProperty("summary").toString(), QString("summary2")); + + //Both revisions are in the store at this point + QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 2); + + //Cleanup old revisions + pipeline.startTransaction(); + pipeline.cleanupRevision(2); + pipeline.commit(); + + //And now only the latest revision is left + QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 1); } void testDelete() @@ -230,10 +241,18 @@ private Q_SLOTS: const auto uid = Akonadi2::Storage::uidFromKey(result.first()); //Delete entity - auto deleteCommand = deleteEntityCommand(uid,1); + auto deleteCommand = deleteEntityCommand(uid, 1); pipeline.startTransaction(); pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()); pipeline.commit(); + + //Cleanup old revisions + pipeline.startTransaction(); + pipeline.cleanupRevision(2); + pipeline.commit(); + + //And all revisions are gone + QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 0); } }; -- cgit v1.2.3