From 64f4244f5b5a2e830a240b6962c6708bd12a6d35 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 14 Jul 2015 01:54:17 +0200 Subject: Modify/Delete commands --- common/commands/deleteentity.fbs | 3 +- common/commands/modifyentity.fbs | 2 +- common/genericresource.cpp | 7 +-- common/pipeline.cpp | 119 ++++++++++++++++++++++++++++++++++++--- common/pipeline.h | 6 +- 5 files changed, 120 insertions(+), 17 deletions(-) (limited to 'common') diff --git a/common/commands/deleteentity.fbs b/common/commands/deleteentity.fbs index c9b7850..4f32b54 100644 --- a/common/commands/deleteentity.fbs +++ b/common/commands/deleteentity.fbs @@ -1,8 +1,9 @@ -namespace Akonadi2; +namespace Akonadi2.Commands; table DeleteEntity { revision: ulong; entityId: string; + domainType: string; } root_type DeleteEntity; diff --git a/common/commands/modifyentity.fbs b/common/commands/modifyentity.fbs index d26051e..a59eb9b 100644 --- a/common/commands/modifyentity.fbs +++ b/common/commands/modifyentity.fbs @@ -1,4 +1,4 @@ -namespace Akonadi2; +namespace Akonadi2.Commands; table ModifyEntity { revision: ulong; diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 139ae98..a500aed 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -9,7 +9,6 @@ #include "clientapi.h" #include "index.h" #include "log.h" -#include using namespace Akonadi2; @@ -67,11 +66,9 @@ private slots: //Throw command into appropriate pipeline switch (queuedCommand->commandId()) { case Akonadi2::Commands::DeleteEntityCommand: - //mPipeline->removedEntity - return KAsync::null(); + return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); case Akonadi2::Commands::ModifyEntityCommand: - //mPipeline->modifiedEntity - return KAsync::null(); + return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); case Akonadi2::Commands::CreateEntityCommand: return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); default: diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 21cf1c5..afb9e34 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -28,8 +28,11 @@ #include "entity_generated.h" #include "metadata_generated.h" #include "createentity_generated.h" +#include "modifyentity_generated.h" +#include "deleteentity_generated.h" #include "entitybuffer.h" #include "log.h" +#include "domain/applicationdomaintype.h" namespace Akonadi2 { @@ -50,6 +53,7 @@ public: QHash > deletedPipeline; QVector activePipelines; bool stepScheduled; + QHash adaptorFactory; }; Pipeline::Pipeline(const QString &resourceName, QObject *parent) @@ -81,8 +85,10 @@ void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, co } Storage &Pipeline::storage() const +void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) { return d->storage; + d->adaptorFactory.insert(entityType, factory); } void Pipeline::null() @@ -147,18 +153,115 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) }); } -void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) +KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) { - PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [](){}); - d->activePipelines << state; - state.step(); + Log() << "Pipeline: Modified Entity"; + + const qint64 newRevision = storage().maxRevision() + 1; + + { + flatbuffers::Verifier verifyer(reinterpret_cast(command), size); + if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) { + qWarning() << "invalid buffer, not a modify entity buffer"; + return KAsync::error(); + } + } + auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); + + //TODO rename modifyEntity->domainType to bufferType + const QByteArray entityType = QByteArray(reinterpret_cast(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); + const QByteArray key = QByteArray(reinterpret_cast(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); + { + flatbuffers::Verifier verifyer(reinterpret_cast(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); + if (!Akonadi2::VerifyEntityBuffer(verifyer)) { + qWarning() << "invalid buffer, not an entity buffer"; + return KAsync::error(); + } + } + + auto adaptorFactory = d->adaptorFactory.value(entityType); + if (adaptorFactory) { + qWarning() << "no adaptor factory"; + return KAsync::error(); + } + + auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data()); + auto diff = adaptorFactory->createAdaptor(*diffEntity); + + Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr domainType; + storage().scan(QByteArray::fromRawData(key.data(), key.size()), [&domainType](const QByteArray &data) -> bool { + auto existingEntity = Akonadi2::GetEntity(data.data()); + domainType = getDomainType(*existingEntity); + return false; + }); + //TODO error handler + + //Apply diff + //FIXME only apply the properties that are available in the buffer + for (const auto &property : diff->availableProperties()) { + domainType->setProperty(property, diff->getProperty(property)); + } + //Remove deletions + for (const auto &property : *modifyEntity->deletions()) { + domainType->setProperty(QByteArray::fromRawData(property->data(), property->size()), QVariant()); + } + + + //Add metadata buffer + flatbuffers::FlatBufferBuilder metadataFbb; + auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); + metadataBuilder.add_revision(newRevision); + metadataBuilder.add_processed(false); + auto metadataBuffer = metadataBuilder.Finish(); + Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); + + + flatbuffers::FlatBufferBuilder fbb; + adaptorFactory->createBuffer(*domainType, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); + + //TODO don't overwrite the old entry, but instead store a new revision + storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); + storage().setMaxRevision(newRevision); + + return KAsync::start([this, key, entityType](KAsync::Future &future) { + PipelineState state(this, ModifiedPipeline, key, d->newPipeline[entityType], [&future]() { + future.setFinished(); + }); + d->activePipelines << state; + state.step(); + }); } -void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) +KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) { - PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [](){}); - d->activePipelines << state; - state.step(); + Log() << "Pipeline: Deleted Entity"; + + const qint64 newRevision = storage().maxRevision() + 1; + + { + flatbuffers::Verifier verifyer(reinterpret_cast(command), size); + if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) { + qWarning() << "invalid buffer, not a delete entity buffer"; + return KAsync::error(); + } + } + auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); + + const QByteArray entityType = QByteArray(reinterpret_cast(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); + const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); + + //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted + storage().remove(key.data(), key.size()); + storage().setMaxRevision(newRevision); + Log() << "Pipeline: deleted entity: "<< newRevision; + + return KAsync::start([this, key, entityType](KAsync::Future &future) { + PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [&future](){ + future.setFinished(); + }); + d->activePipelines << state; + state.step(); + }); } void Pipeline::pipelineStepped(const PipelineState &state) diff --git a/common/pipeline.h b/common/pipeline.h index 6df2d76..a6696ec 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -31,6 +31,7 @@ #include #include "entity_generated.h" +#include "domainadaptor.h" namespace Akonadi2 { @@ -53,10 +54,11 @@ public: void setPreprocessors(const QString &entityType, Type pipelineType, const QVector &preprocessors); void null(); + void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); KAsync::Job newEntity(void const *command, size_t size); - void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); - void deletedEntity(const QString &entityType, const QByteArray &key); + KAsync::Job modifiedEntity(void const *command, size_t size); + KAsync::Job deletedEntity(void const *command, size_t size); Q_SIGNALS: void revisionUpdated(); -- cgit v1.2.3