From 84957496800a862aa88bb2e88da0a9b2c4e19dc2 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 30 Dec 2015 10:34:57 +0100 Subject: Moved all generic synchronization code to the base class. --- common/genericresource.cpp | 160 +++++++++++++++++++++++++++++++++++++++++++++ common/genericresource.h | 44 +++++++++++++ 2 files changed, 204 insertions(+) (limited to 'common') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 9fbcaaa..42153ec 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -4,12 +4,16 @@ #include "pipeline.h" #include "queuedcommand_generated.h" #include "createentity_generated.h" +#include "modifyentity_generated.h" +#include "deleteentity_generated.h" #include "domainadaptor.h" #include "commands.h" #include "index.h" #include "log.h" #include "definitions.h" +#include + static int sBatchSize = 100; using namespace Akonadi2; @@ -52,6 +56,7 @@ public: { const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); const qint64 lastReplayedRevision = getLastReplayedRevision(); + Trace() << "All changes replayed " << topRevision << lastReplayedRevision; return (lastReplayedRevision >= topRevision); } @@ -444,4 +449,159 @@ void GenericResource::setLowerBoundRevision(qint64 revision) updateLowerBoundRevision(); } +void GenericResource::createEntity(const QByteArray &akonadiId, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) +{ + //These changes are coming from the source + const auto replayToSource = false; + flatbuffers::FlatBufferBuilder entityFbb; + adaptorFactory.createBuffer(domainObject, entityFbb); + flatbuffers::FlatBufferBuilder fbb; + //This is the resource type and not the domain type + auto entityId = fbb.CreateString(akonadiId.toStdString()); + auto type = fbb.CreateString(bufferType.toStdString()); + auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); + auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); + Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); + callback(QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); +} + +void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) +{ + //These changes are coming from the source + const auto replayToSource = false; + flatbuffers::FlatBufferBuilder entityFbb; + adaptorFactory.createBuffer(domainObject, entityFbb); + flatbuffers::FlatBufferBuilder fbb; + auto entityId = fbb.CreateString(akonadiId.toStdString()); + //This is the resource type and not the domain type + auto type = fbb.CreateString(bufferType.toStdString()); + auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); + //TODO removals + auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); + Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location); + callback(QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); +} + +void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function callback) +{ + //These changes are coming from the source + const auto replayToSource = false; + flatbuffers::FlatBufferBuilder fbb; + auto entityId = fbb.CreateString(akonadiId.toStdString()); + //This is the resource type and not the domain type + auto type = fbb.CreateString(bufferType.toStdString()); + auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); + Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location); + callback(QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); +} + +void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) +{ + Index index("rid.mapping." + bufferType, transaction); + Index localIndex("localid.mapping." + bufferType, transaction); + index.add(remoteId, localId); + localIndex.add(localId, remoteId); +} + +void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) +{ + Index index("rid.mapping." + bufferType, transaction); + Index localIndex("localid.mapping." + bufferType, transaction); + index.remove(remoteId, localId); + localIndex.remove(localId, remoteId); +} + +QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) +{ + //Lookup local id for remote id, or insert a new pair otherwise + Index index("rid.mapping." + bufferType, transaction); + Index localIndex("localid.mapping." + bufferType, transaction); + QByteArray akonadiId = index.lookup(remoteId); + if (akonadiId.isEmpty()) { + akonadiId = QUuid::createUuid().toString().toUtf8(); + index.add(remoteId, akonadiId); + localIndex.add(akonadiId, remoteId); + } + return akonadiId; +} + +QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Akonadi2::Storage::Transaction &transaction) +{ + Index index("localid.mapping." + bufferType, transaction); + QByteArray remoteId = index.lookup(localId); + if (remoteId.isEmpty()) { + Warning() << "Couldn't find the remote id for " << localId; + return QByteArray(); + } + return remoteId; +} + +void GenericResource::scanForRemovals(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) +{ + entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { + auto akonadiId = Akonadi2::Storage::uidFromKey(key); + Trace() << "Checking for removal " << key; + const auto remoteId = resolveLocalId(bufferType, akonadiId, synchronizationTransaction); + //If we have no remoteId, the entity hasn't been replayed to the source yet + if (!remoteId.isEmpty()) { + if (!exists(remoteId)) { + Trace() << "Found a removed entity: " << akonadiId; + deleteEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, [this](const QByteArray &buffer) { + enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::DeleteEntityCommand, buffer); + }); + } + } + }); +} + +static QSharedPointer getLatest(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) +{ + QSharedPointer current; + db.findLatest(uid, [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + current = adaptorFactory.createAdaptor(buffer.entity()); + } + return false; + }, + [](const Akonadi2::Storage::Error &error) { + Warning() << "Failed to read current value from storage: " << error.message; + }); + return current; +} + +void GenericResource::createOrModify(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Akonadi2::ApplicationDomain::ApplicationDomainType &entity) +{ + auto mainDatabase = transaction.openDatabase(bufferType + ".main"); + const auto akonadiId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); + const auto found = mainDatabase.contains(akonadiId); + if (!found) { + Trace() << "Found a new entity: " << remoteId; + createEntity(akonadiId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { + enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, buffer); + }); + } else { //modification + if (auto current = getLatest(mainDatabase, akonadiId, adaptorFactory)) { + bool changed = false; + for (const auto &property : entity.changedProperties()) { + if (entity.getProperty(property) != current->getProperty(property)) { + Trace() << "Property changed " << akonadiId << property; + changed = true; + } + } + if (changed) { + Trace() << "Found a modified entity: " << remoteId; + modifyEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { + enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::ModifyEntityCommand, buffer); + }); + } + } else { + Warning() << "Failed to get current entity"; + } + } +} + + #include "genericresource.moc" diff --git a/common/genericresource.h b/common/genericresource.h index ea68a25..c12c631 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -63,6 +63,50 @@ protected: virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value); void onProcessorError(int errorCode, const QString &errorMessage); void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); + + static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); + static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); + static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function callback); + + /** + * Records a localId to remoteId mapping + */ + void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction); + void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction); + + /** + * Tries to find a local id for the remote id, and creates a new local id otherwise. + * + * The new local id is recorded in the local to remote id mapping. + */ + QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction); + + /** + * Tries to find a remote id for a local id. + * + * This can fail if the entity hasn't been written back to the server yet. + */ + QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Akonadi2::Storage::Transaction &transaction); + + /** + * A synchronous algorithm to remove entities that are no longer existing. + * + * A list of entities is generated by @param entryGenerator. + * The entiry Generator typically iterates over an index to produce all existing entries. + * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false, + * an entity delete command is enqueued. + * + * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. + */ + void scanForRemovals(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists); + + /** + * An algorithm to create or modify the entity. + * + * Depending on whether the entity is locally available, or has changed. + */ + void createOrModify(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Akonadi2::ApplicationDomain::ApplicationDomainType &entity); + MessageQueue mUserQueue; MessageQueue mSynchronizerQueue; QByteArray mResourceInstanceIdentifier; -- cgit v1.2.3