From a908ea3ecb5ad78e4bdadf13d40ff76d0a038b76 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 19 Jul 2015 20:18:14 +0200 Subject: Modify/Delete actions --- common/pipeline.cpp | 65 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 44 insertions(+), 21 deletions(-) (limited to 'common/pipeline.cpp') diff --git a/common/pipeline.cpp b/common/pipeline.cpp index afb9e34..1197408 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -84,13 +84,16 @@ void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, co }; } -Storage &Pipeline::storage() const void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) { - return d->storage; d->adaptorFactory.insert(entityType, factory); } +Storage &Pipeline::storage() const +{ + return d->storage; +} + void Pipeline::null() { //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) @@ -111,7 +114,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) { flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { - qWarning() << "invalid buffer, not a create entity buffer"; + Warning() << "invalid buffer, not a create entity buffer"; return KAsync::error(); } } @@ -122,7 +125,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) { flatbuffers::Verifier verifyer(reinterpret_cast(createEntity->delta()->Data()), createEntity->delta()->size()); if (!Akonadi2::VerifyEntityBuffer(verifyer)) { - qWarning() << "invalid buffer, not an entity buffer"; + Warning() << "invalid buffer, not an entity buffer"; return KAsync::error(); } } @@ -142,7 +145,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); storage().setMaxRevision(newRevision); - Log() << "Pipeline: wrote entity: "<< newRevision; + Log() << "Pipeline: wrote entity: " << key << newRevision; return KAsync::start([this, key, entityType](KAsync::Future &future) { PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { @@ -162,51 +165,72 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) { flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) { - qWarning() << "invalid buffer, not a modify entity buffer"; + Warning() << "invalid buffer, not a modify entity buffer"; return KAsync::error(); } } auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); + Q_ASSERT(modifyEntity); //TODO rename modifyEntity->domainType to bufferType const QByteArray entityType = QByteArray(reinterpret_cast(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); const QByteArray key = QByteArray(reinterpret_cast(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); + if (entityType.isEmpty() || key.isEmpty()) { + Warning() << "entity type or key " << entityType << key; + return KAsync::error(); + } { flatbuffers::Verifier verifyer(reinterpret_cast(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); if (!Akonadi2::VerifyEntityBuffer(verifyer)) { - qWarning() << "invalid buffer, not an entity buffer"; + Warning() << "invalid buffer, not an entity buffer"; return KAsync::error(); } } auto adaptorFactory = d->adaptorFactory.value(entityType); - if (adaptorFactory) { - qWarning() << "no adaptor factory"; + if (!adaptorFactory) { + Warning() << "no adaptor factory for type " << entityType; return KAsync::error(); } auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data()); + Q_ASSERT(diffEntity); auto diff = adaptorFactory->createAdaptor(*diffEntity); - Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr domainType; - storage().scan(QByteArray::fromRawData(key.data(), key.size()), [&domainType](const QByteArray &data) -> bool { - auto existingEntity = Akonadi2::GetEntity(data.data()); - domainType = getDomainType(*existingEntity); + QSharedPointer current; + storage().scan(QByteArray::fromRawData(key.data(), key.size()), [¤t, adaptorFactory](const QByteArray &data) -> bool { + Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + current = adaptorFactory->createAdaptor(buffer.entity()); + } return false; }); //TODO error handler + if (!current) { + Warning() << "Failed to read local value "; + return KAsync::error(); + } + + //resource and uid don't matter at this point + const Akonadi2::ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); + auto newObject = Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(existingObject); + //Apply diff //FIXME only apply the properties that are available in the buffer for (const auto &property : diff->availableProperties()) { - domainType->setProperty(property, diff->getProperty(property)); + newObject->setProperty(property, diff->getProperty(property)); } + //Remove deletions - for (const auto &property : *modifyEntity->deletions()) { - domainType->setProperty(QByteArray::fromRawData(property->data(), property->size()), QVariant()); + if (modifyEntity->deletions()) { + for (const auto &property : *modifyEntity->deletions()) { + newObject->setProperty(QByteArray::fromRawData(property->data(), property->size()), QVariant()); + } } - //Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); @@ -215,16 +239,15 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto metadataBuffer = metadataBuilder.Finish(); Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); - flatbuffers::FlatBufferBuilder fbb; - adaptorFactory->createBuffer(*domainType, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); + adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); //TODO don't overwrite the old entry, but instead store a new revision storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); storage().setMaxRevision(newRevision); return KAsync::start([this, key, entityType](KAsync::Future &future) { - PipelineState state(this, ModifiedPipeline, key, d->newPipeline[entityType], [&future]() { + PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { future.setFinished(); }); d->activePipelines << state; @@ -241,7 +264,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) { flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) { - qWarning() << "invalid buffer, not a delete entity buffer"; + Warning() << "invalid buffer, not a delete entity buffer"; return KAsync::error(); } } -- cgit v1.2.3