From 664fb0fbfd21a25d1f86938a186f6ec9cea6d882 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 29 Dec 2015 10:01:42 +0100 Subject: Mark commands with whether they need to be replayed This way we don't have to try to figure out whether a change is coming from the source already. --- common/commands/createentity.fbs | 1 + common/commands/deleteentity.fbs | 1 + common/commands/modifyentity.fbs | 1 + common/metadata.fbs | 3 +-- common/pipeline.cpp | 9 ++++++--- examples/maildirresource/maildirresource.cpp | 24 +++++++++++++----------- tests/domainadaptortest.cpp | 1 - 7 files changed, 23 insertions(+), 17 deletions(-) diff --git a/common/commands/createentity.fbs b/common/commands/createentity.fbs index a5bc95c..5358dea 100644 --- a/common/commands/createentity.fbs +++ b/common/commands/createentity.fbs @@ -4,6 +4,7 @@ table CreateEntity { entityId: string; domainType: string; delta: [ubyte]; + replayToSource: bool = true; } root_type CreateEntity; diff --git a/common/commands/deleteentity.fbs b/common/commands/deleteentity.fbs index 4f32b54..9f865be 100644 --- a/common/commands/deleteentity.fbs +++ b/common/commands/deleteentity.fbs @@ -4,6 +4,7 @@ table DeleteEntity { revision: ulong; entityId: string; domainType: string; + replayToSource: bool = true; } root_type DeleteEntity; diff --git a/common/commands/modifyentity.fbs b/common/commands/modifyentity.fbs index a59eb9b..03b543f 100644 --- a/common/commands/modifyentity.fbs +++ b/common/commands/modifyentity.fbs @@ -6,6 +6,7 @@ table ModifyEntity { deletions: [string]; //A list of deleted properties domainType: string; delta: [ubyte]; //Contains an entity buffer with all changed properties set + replayToSource: bool = true; } root_type ModifyEntity; diff --git a/common/metadata.fbs b/common/metadata.fbs index 1455238..0a709fe 100644 --- a/common/metadata.fbs +++ b/common/metadata.fbs @@ -4,8 +4,7 @@ enum Operation : byte { Creation = 1, Modification, Removal } table Metadata { revision: ulong; - processed: bool = true; - processingProgress: [string]; + replayToSource: bool = true; operation: Operation = Modification; } diff --git a/common/pipeline.cpp b/common/pipeline.cpp index f8b1fb1..06d8114 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -142,6 +142,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) } auto createEntity = Akonadi2::Commands::GetCreateEntity(command); + const bool replayToSource = createEntity->replayToSource(); const QByteArray bufferType = QByteArray(reinterpret_cast(createEntity->domainType()->Data()), createEntity->domainType()->size()); { flatbuffers::Verifier verifyer(reinterpret_cast(createEntity->delta()->Data()), createEntity->delta()->size()); @@ -175,8 +176,8 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); - metadataBuilder.add_processed(false); metadataBuilder.add_operation(Akonadi2::Operation_Creation); + metadataBuilder.add_replayToSource(replayToSource); auto metadataBuffer = metadataBuilder.Finish(); Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); @@ -224,6 +225,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) Q_ASSERT(modifyEntity); const qint64 baseRevision = modifyEntity->revision(); + const bool replayToSource = modifyEntity->replayToSource(); //TODO rename modifyEntity->domainType to bufferType const QByteArray bufferType = QByteArray(reinterpret_cast(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); const QByteArray key = QByteArray(reinterpret_cast(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); @@ -291,8 +293,8 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); - metadataBuilder.add_processed(false); metadataBuilder.add_operation(Akonadi2::Operation_Modification); + metadataBuilder.add_replayToSource(replayToSource); auto metadataBuffer = metadataBuilder.Finish(); Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); @@ -329,6 +331,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) } auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); + const bool replayToSource = deleteEntity->replayToSource(); const QByteArray bufferType = QByteArray(reinterpret_cast(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); @@ -366,8 +369,8 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); - metadataBuilder.add_processed(false); metadataBuilder.add_operation(Akonadi2::Operation_Removal); + metadataBuilder.add_replayToSource(replayToSource); auto metadataBuffer = metadataBuilder.Finish(); Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp index 8333f76..7fcf12e 100644 --- a/examples/maildirresource/maildirresource.cpp +++ b/examples/maildirresource/maildirresource.cpp @@ -132,6 +132,8 @@ QStringList MaildirResource::listAvailableFolders() static void createEntity(const QByteArray &akonadiId, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) { + //These changes are coming from the source + const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; adaptorFactory.createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; @@ -139,13 +141,15 @@ static void createEntity(const QByteArray &akonadiId, const QByteArray &bufferTy auto entityId = fbb.CreateString(akonadiId.toStdString()); auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); - auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta); + auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); callback(QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); } static void modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) { + //These changes are coming from the source + const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; adaptorFactory.createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; @@ -154,18 +158,20 @@ static void modifyEntity(const QByteArray &akonadiId, qint64 revision, const QBy auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); //TODO removals - auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta); + auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location); callback(QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); } static void deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function callback) { + //These changes are coming from the source + const auto replayToSource = false; flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(akonadiId.toStdString()); //This is the resource type and not the domain type auto type = fbb.CreateString(bufferType.toStdString()); - auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type); + auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location); callback(QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); } @@ -365,23 +371,19 @@ KAsync::Job MaildirResource::replay(const QByteArray &type, const QByteArr //This results in a deadlock during sync Akonadi2::Storage store(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Akonadi2::Storage::ReadWrite); auto synchronizationTransaction = store.createTransaction(Akonadi2::Storage::ReadWrite); - const auto uid = Akonadi2::Storage::uidFromKey(key); - const auto remoteId = resolveLocalId(type, uid, synchronizationTransaction); Trace() << "Replaying " << key << type; if (type == ENTITY_TYPE_FOLDER) { Akonadi2::EntityBuffer buffer(value.data(), value.size()); const Akonadi2::Entity &entity = buffer.entity(); const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); + if (metadataBuffer && !metadataBuffer->replayToSource()) { + Trace() << "Change is coming from the source"; + return KAsync::null(); + } const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; const auto operation = metadataBuffer ? metadataBuffer->operation() : Akonadi2::Operation_Creation; if (operation == Akonadi2::Operation_Creation) { - //FIXME: This check only works for new entities - //Figure out wether we have replayed that revision already to the source - if (!remoteId.isEmpty()) { - Trace() << "Change is coming from the source"; - return KAsync::null(); - } const Akonadi2::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mFolderAdaptorFactory->createAdaptor(entity)); auto folderName = folder.getProperty("name").toString(); //TODO handle non toplevel folders diff --git a/tests/domainadaptortest.cpp b/tests/domainadaptortest.cpp index f2300cb..b3d2a00 100644 --- a/tests/domainadaptortest.cpp +++ b/tests/domainadaptortest.cpp @@ -58,7 +58,6 @@ private Q_SLOTS: flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); metadataBuilder.add_revision(1); - metadataBuilder.add_processed(false); auto metadataBuffer = metadataBuilder.Finish(); Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); -- cgit v1.2.3