From b441386c4e138d19bbd79d578e0a2ff1b3f54a93 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 28 May 2016 02:09:58 +0200 Subject: Moved the classes to individual files --- common/CMakeLists.txt | 4 + common/entitystore.cpp | 48 +++++++ common/entitystore.h | 53 +++++++ common/genericresource.cpp | 342 +-------------------------------------------- common/genericresource.h | 141 ------------------- common/remoteidmap.cpp | 75 ++++++++++ common/remoteidmap.h | 61 ++++++++ common/sourcewriteback.cpp | 124 ++++++++++++++++ common/sourcewriteback.h | 64 +++++++++ common/synchronizer.cpp | 176 +++++++++++++++++++++++ common/synchronizer.h | 95 +++++++++++++ 11 files changed, 701 insertions(+), 482 deletions(-) create mode 100644 common/entitystore.cpp create mode 100644 common/entitystore.h create mode 100644 common/remoteidmap.cpp create mode 100644 common/remoteidmap.h create mode 100644 common/sourcewriteback.cpp create mode 100644 common/sourcewriteback.h create mode 100644 common/synchronizer.cpp create mode 100644 common/synchronizer.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 79b627a..3c6a083 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -68,6 +68,10 @@ set(command_SRCS query.cpp changereplay.cpp adaptorfactoryregistry.cpp + synchronizer.cpp + entitystore.cpp + remoteidmap.cpp + sourcewriteback.cpp ${storage_SRCS}) add_library(${PROJECT_NAME} SHARED ${command_SRCS}) diff --git a/common/entitystore.cpp b/common/entitystore.cpp new file mode 100644 index 0000000..2c15abf --- /dev/null +++ b/common/entitystore.cpp @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "entitystore.h" + +using namespace Sink; + +EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) + : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), + mTransaction(transaction) +{ + +} + +QSharedPointer EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) +{ + QSharedPointer current; + db.findLatest(uid, + [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + Trace() << "Found value " << key; + current = adaptorFactory.createAdaptor(buffer.entity()); + } + return false; + }, + [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); + return current; +} + diff --git a/common/entitystore.h b/common/entitystore.h new file mode 100644 index 0000000..17156ec --- /dev/null +++ b/common/entitystore.h @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "sink_export.h" +#include + +#include "storage.h" +#include "adaptorfactoryregistry.h" + +namespace Sink { + +class SINK_EXPORT EntityStore +{ +public: + EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); + + template + T read(const QByteArray &identifier) const + { + auto typeName = ApplicationDomain::getTypeName(); + auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); + auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType)); + Q_ASSERT(bufferAdaptor); + return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); + } + + + static QSharedPointer getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); +private: + QByteArray mResourceType; + QByteArray mResourceInstanceIdentifier; + Sink::Storage::Transaction &mTransaction; +}; + +} diff --git a/common/genericresource.cpp b/common/genericresource.cpp index cb2ef21..568e066 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -34,6 +34,7 @@ #include "definitions.h" #include "bufferutils.h" #include "adaptorfactoryregistry.h" +#include "synchronizer.h" #include #include @@ -461,347 +462,6 @@ void GenericResource::setLowerBoundRevision(qint64 revision) updateLowerBoundRevision(); } - - - -EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) - : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), - mTransaction(transaction) -{ - -} - -template -T EntityStore::read(const QByteArray &identifier) const -{ - auto typeName = ApplicationDomain::getTypeName(); - auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); - auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType)); - Q_ASSERT(bufferAdaptor); - return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); -} - -QSharedPointer EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) -{ - QSharedPointer current; - db.findLatest(uid, - [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { - Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - Warning() << "Read invalid buffer from disk"; - } else { - Trace() << "Found value " << key; - current = adaptorFactory.createAdaptor(buffer.entity()); - } - return false; - }, - [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); - return current; -} - - - -SyncStore::SyncStore(Sink::Storage::Transaction &transaction) - : mTransaction(transaction) -{ - -} - -void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) -{ - Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId); - Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId); -} - -void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) -{ - Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId); - Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId); -} - -void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) -{ - const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); - removeRemoteId(bufferType, localId, oldRemoteId); - recordRemoteId(bufferType, localId, remoteId); -} - -QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId) -{ - // Lookup local id for remote id, or insert a new pair otherwise - Index index("rid.mapping." + bufferType, mTransaction); - QByteArray sinkId = index.lookup(remoteId); - if (sinkId.isEmpty()) { - sinkId = QUuid::createUuid().toString().toUtf8(); - index.add(remoteId, sinkId); - Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId); - } - return sinkId; -} - -QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId) -{ - QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); - if (remoteId.isEmpty()) { - Warning() << "Couldn't find the remote id for " << localId; - return QByteArray(); - } - return remoteId; -} - - - - - - - - -Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) - : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), - mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), - mResourceType(resourceType), - mResourceInstanceIdentifier(resourceInstanceIdentifier) -{ - Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; - -} - -void Synchronizer::setup(const std::function &enqueueCommandCallback) -{ - mEnqueue = enqueueCommandCallback; -} - -void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) -{ - Q_ASSERT(mEnqueue); - mEnqueue(commandId, data); -} - -EntityStore &Synchronizer::store() -{ - if (!mEntityStore) { - mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, mTransaction); - } - return *mEntityStore; -} - -SyncStore &Synchronizer::syncStore() -{ - if (!mSyncStore) { - mSyncStore = QSharedPointer::create(mSyncTransaction); - } - return *mSyncStore; -} - -void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) -{ - // These changes are coming from the source - const auto replayToSource = false; - flatbuffers::FlatBufferBuilder entityFbb; - adaptorFactory.createBuffer(domainObject, entityFbb); - flatbuffers::FlatBufferBuilder fbb; - // This is the resource type and not the domain type - auto entityId = fbb.CreateString(sinkId.toStdString()); - auto type = fbb.CreateString(bufferType.toStdString()); - auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); - auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); - Sink::Commands::FinishCreateEntityBuffer(fbb, location); - callback(BufferUtils::extractBuffer(fbb)); -} - -void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) -{ - // These changes are coming from the source - const auto replayToSource = false; - flatbuffers::FlatBufferBuilder entityFbb; - adaptorFactory.createBuffer(domainObject, entityFbb); - flatbuffers::FlatBufferBuilder fbb; - auto entityId = fbb.CreateString(sinkId.toStdString()); - // This is the resource type and not the domain type - auto type = fbb.CreateString(bufferType.toStdString()); - auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); - // TODO removals - auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); - Sink::Commands::FinishModifyEntityBuffer(fbb, location); - callback(BufferUtils::extractBuffer(fbb)); -} - -void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function callback) -{ - // These changes are coming from the source - const auto replayToSource = false; - flatbuffers::FlatBufferBuilder fbb; - auto entityId = fbb.CreateString(sinkId.toStdString()); - // This is the resource type and not the domain type - auto type = fbb.CreateString(bufferType.toStdString()); - auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); - Sink::Commands::FinishDeleteEntityBuffer(fbb, location); - callback(BufferUtils::extractBuffer(fbb)); -} - -void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) -{ - entryGenerator([this, bufferType, &exists](const QByteArray &key) { - auto sinkId = Sink::Storage::uidFromKey(key); - Trace() << "Checking for removal " << key; - const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); - // If we have no remoteId, the entity hasn't been replayed to the source yet - if (!remoteId.isEmpty()) { - if (!exists(remoteId)) { - Trace() << "Found a removed entity: " << sinkId; - deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, - [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); - } - } - }); -} - -void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) -{ - Trace() << "Create or modify" << bufferType << remoteId; - auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); - const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); - const auto found = mainDatabase.contains(sinkId); - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); - if (!found) { - Trace() << "Found a new entity: " << remoteId; - createEntity( - sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); - } else { // modification - if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { - bool changed = false; - for (const auto &property : entity.changedProperties()) { - if (entity.getProperty(property) != current->getProperty(property)) { - Trace() << "Property changed " << sinkId << property; - changed = true; - } - } - if (changed) { - Trace() << "Found a modified entity: " << remoteId; - modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, - [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); - } - } else { - Warning() << "Failed to get current entity"; - } - } -} - -KAsync::Job Synchronizer::synchronize() -{ - mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); - mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); - return synchronizeWithSource().then([this]() { - mTransaction.abort(); - mSyncTransaction.commit(); - mSyncStore.clear(); - mEntityStore.clear(); - }); -} - - - -SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) - : ChangeReplay(resourceInstanceIdentifier), - mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), - mResourceType(resourceType), - mResourceInstanceIdentifier(resourceInstanceIdentifier) -{ - -} - -EntityStore &SourceWriteBack::store() -{ - if (!mEntityStore) { - mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, mTransaction); - } - return *mEntityStore; -} - -SyncStore &SourceWriteBack::syncStore() -{ - if (!mSyncStore) { - mSyncStore = QSharedPointer::create(mSyncTransaction); - } - return *mSyncStore; -} - -KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) -{ - mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); - mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); - - Sink::EntityBuffer buffer(value); - const Sink::Entity &entity = buffer.entity(); - const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); - Q_ASSERT(metadataBuffer); - if (!metadataBuffer->replayToSource()) { - Trace() << "Change is coming from the source"; - return KAsync::null(); - } - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; - const auto uid = Sink::Storage::uidFromKey(key); - QByteArray oldRemoteId; - - if (operation != Sink::Operation_Creation) { - oldRemoteId = syncStore().resolveLocalId(type, uid); - } - Trace() << "Replaying " << key << type; - - KAsync::Job job = KAsync::null(); - if (type == ENTITY_TYPE_FOLDER) { - auto folder = store().read(uid); - // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); - job = replay(folder, operation, oldRemoteId); - } else if (type == ENTITY_TYPE_MAIL) { - auto mail = store().read(uid); - // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); - job = replay(mail, operation, oldRemoteId); - } - - return job.then([this, operation, type, uid](const QByteArray &remoteId) { - Trace() << "Replayed change with remote id: " << remoteId; - if (operation == Sink::Operation_Creation) { - if (remoteId.isEmpty()) { - Warning() << "Returned an empty remoteId from the creation"; - } else { - syncStore().recordRemoteId(type, uid, remoteId); - } - } else if (operation == Sink::Operation_Modification) { - if (remoteId.isEmpty()) { - Warning() << "Returned an empty remoteId from the creation"; - } else { - syncStore().updateRemoteId(type, uid, remoteId); - } - } else if (operation == Sink::Operation_Removal) { - syncStore().removeRemoteId(type, uid, remoteId); - } else { - Warning() << "Unkown operation" << operation; - } - - mTransaction.abort(); - mSyncTransaction.commit(); - mSyncStore.clear(); - mEntityStore.clear(); - }, [](int errorCode, const QString &errorMessage) { - Warning() << "Failed to replay change: " << errorMessage; - }); -} - -KAsync::Job SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) -{ - return KAsync::null(); -} - -KAsync::Job SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) -{ - return KAsync::null(); -} - - #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" #include "genericresource.moc" diff --git a/common/genericresource.h b/common/genericresource.h index 45d5d3a..4ed408d 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -82,147 +82,6 @@ private: int mError; QTimer mCommitQueueTimer; qint64 mClientLowerBoundRevision; - QHash mAdaptorFactories; }; -class SINK_EXPORT SyncStore -{ -public: - SyncStore(Sink::Storage::Transaction &); - - /** - * Records a localId to remoteId mapping - */ - void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); - void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); - void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); - - /** - * Tries to find a local id for the remote id, and creates a new local id otherwise. - * - * The new local id is recorded in the local to remote id mapping. - */ - QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId); - - /** - * Tries to find a remote id for a local id. - * - * This can fail if the entity hasn't been written back to the server yet. - */ - QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); - -private: - Sink::Storage::Transaction &mTransaction; -}; - -class SINK_EXPORT EntityStore -{ -public: - EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); - - template - T read(const QByteArray &identifier) const; - - static QSharedPointer getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); -private: - QByteArray mResourceType; - QByteArray mResourceInstanceIdentifier; - Sink::Storage::Transaction &mTransaction; -}; - -/** - * Synchronize and add what we don't already have to local queue - */ -class SINK_EXPORT Synchronizer -{ -public: - Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); - - void setup(const std::function &enqueueCommandCallback); - KAsync::Job synchronize(); - -protected: - ///Calls the callback to enqueue the command - void enqueueCommand(int commandId, const QByteArray &data); - - static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); - static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); - static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function callback); - - /** - * A synchronous algorithm to remove entities that are no longer existing. - * - * A list of entities is generated by @param entryGenerator. - * The entiry Generator typically iterates over an index to produce all existing entries. - * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false, - * an entity delete command is enqueued. - * - * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. - */ - void scanForRemovals(const QByteArray &bufferType, - const std::function &callback)> &entryGenerator, std::function exists); - - /** - * An algorithm to create or modify the entity. - * - * Depending on whether the entity is locally available, or has changed. - */ - void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); - - //Read only access to main storage - EntityStore &store(); - - //Read/Write access to sync storage - SyncStore &syncStore(); - - virtual KAsync::Job synchronizeWithSource() = 0; - -private: - QSharedPointer mSyncStore; - QSharedPointer mEntityStore; - Sink::Storage mStorage; - Sink::Storage mSyncStorage; - QByteArray mResourceType; - QByteArray mResourceInstanceIdentifier; - Sink::Storage::Transaction mTransaction; - Sink::Storage::Transaction mSyncTransaction; - std::function mEnqueue; -}; - -/** - * Replay changes to the source - */ -class SINK_EXPORT SourceWriteBack : public ChangeReplay -{ -public: - SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier); - -protected: - ///Base implementation calls the replay$Type calls - virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; - -protected: - ///Implement to write back changes to the server - virtual KAsync::Job replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); - virtual KAsync::Job replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); - - //Read only access to main storage - EntityStore &store(); - - //Read/Write access to sync storage - SyncStore &syncStore(); - -private: - Sink::Storage mSyncStorage; - QSharedPointer mSyncStore; - QSharedPointer mEntityStore; - Sink::Storage::Transaction mTransaction; - Sink::Storage::Transaction mSyncTransaction; - QByteArray mResourceType; - QByteArray mResourceInstanceIdentifier; -}; - - } diff --git a/common/remoteidmap.cpp b/common/remoteidmap.cpp new file mode 100644 index 0000000..f72369d --- /dev/null +++ b/common/remoteidmap.cpp @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "remoteidmap.h" + +#include +#include "index.h" +#include "log.h" + +using namespace Sink; + +RemoteIdMap::RemoteIdMap(Sink::Storage::Transaction &transaction) + : mTransaction(transaction) +{ + +} + +void RemoteIdMap::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) +{ + Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId); + Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId); +} + +void RemoteIdMap::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) +{ + Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId); + Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId); +} + +void RemoteIdMap::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) +{ + const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); + removeRemoteId(bufferType, localId, oldRemoteId); + recordRemoteId(bufferType, localId, remoteId); +} + +QByteArray RemoteIdMap::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId) +{ + // Lookup local id for remote id, or insert a new pair otherwise + Index index("rid.mapping." + bufferType, mTransaction); + QByteArray sinkId = index.lookup(remoteId); + if (sinkId.isEmpty()) { + sinkId = QUuid::createUuid().toString().toUtf8(); + index.add(remoteId, sinkId); + Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId); + } + return sinkId; +} + +QByteArray RemoteIdMap::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId) +{ + QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); + if (remoteId.isEmpty()) { + Warning() << "Couldn't find the remote id for " << localId; + return QByteArray(); + } + return remoteId; +} + diff --git a/common/remoteidmap.h b/common/remoteidmap.h new file mode 100644 index 0000000..12891dc --- /dev/null +++ b/common/remoteidmap.h @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "sink_export.h" + +#include "storage.h" + +namespace Sink { + +/** + * A remoteId mapping + */ +class SINK_EXPORT RemoteIdMap +{ +public: + RemoteIdMap(Sink::Storage::Transaction &); + + /** + * Records a localId to remoteId mapping + */ + void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); + void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); + void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); + + /** + * Tries to find a local id for the remote id, and creates a new local id otherwise. + * + * The new local id is recorded in the local to remote id mapping. + */ + QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId); + + /** + * Tries to find a remote id for a local id. + * + * This can fail if the entity hasn't been written back to the server yet. + */ + QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); + +private: + Sink::Storage::Transaction &mTransaction; +}; + +} diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp new file mode 100644 index 0000000..1ef20d2 --- /dev/null +++ b/common/sourcewriteback.cpp @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "sourcewriteback.h" + +#include "definitions.h" +#include "log.h" + +#define ENTITY_TYPE_MAIL "mail" +#define ENTITY_TYPE_FOLDER "folder" + +using namespace Sink; + +SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) + : ChangeReplay(resourceInstanceIdentifier), + mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), + mResourceType(resourceType), + mResourceInstanceIdentifier(resourceInstanceIdentifier) +{ + +} + +EntityStore &SourceWriteBack::store() +{ + if (!mEntityStore) { + mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, mTransaction); + } + return *mEntityStore; +} + +RemoteIdMap &SourceWriteBack::syncStore() +{ + if (!mSyncStore) { + mSyncStore = QSharedPointer::create(mSyncTransaction); + } + return *mSyncStore; +} + +KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) +{ + mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); + mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); + + Sink::EntityBuffer buffer(value); + const Sink::Entity &entity = buffer.entity(); + const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); + if (!metadataBuffer->replayToSource()) { + Trace() << "Change is coming from the source"; + return KAsync::null(); + } + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; + const auto uid = Sink::Storage::uidFromKey(key); + QByteArray oldRemoteId; + + if (operation != Sink::Operation_Creation) { + oldRemoteId = syncStore().resolveLocalId(type, uid); + } + Trace() << "Replaying " << key << type; + + KAsync::Job job = KAsync::null(); + if (type == ENTITY_TYPE_FOLDER) { + auto folder = store().read(uid); + job = replay(folder, operation, oldRemoteId); + } else if (type == ENTITY_TYPE_MAIL) { + auto mail = store().read(uid); + job = replay(mail, operation, oldRemoteId); + } + + return job.then([this, operation, type, uid](const QByteArray &remoteId) { + Trace() << "Replayed change with remote id: " << remoteId; + if (operation == Sink::Operation_Creation) { + if (remoteId.isEmpty()) { + Warning() << "Returned an empty remoteId from the creation"; + } else { + syncStore().recordRemoteId(type, uid, remoteId); + } + } else if (operation == Sink::Operation_Modification) { + if (remoteId.isEmpty()) { + Warning() << "Returned an empty remoteId from the creation"; + } else { + syncStore().updateRemoteId(type, uid, remoteId); + } + } else if (operation == Sink::Operation_Removal) { + syncStore().removeRemoteId(type, uid, remoteId); + } else { + Warning() << "Unkown operation" << operation; + } + + mTransaction.abort(); + mSyncTransaction.commit(); + mSyncStore.clear(); + mEntityStore.clear(); + }, [](int errorCode, const QString &errorMessage) { + Warning() << "Failed to replay change: " << errorMessage; + }); +} + +KAsync::Job SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) +{ + return KAsync::null(); +} + +KAsync::Job SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) +{ + return KAsync::null(); +} diff --git a/common/sourcewriteback.h b/common/sourcewriteback.h new file mode 100644 index 0000000..8470e00 --- /dev/null +++ b/common/sourcewriteback.h @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "sink_export.h" + +#include "changereplay.h" +#include "storage.h" +#include "entitystore.h" +#include "remoteidmap.h" + +namespace Sink { + +/** + * Replay changes to the source + */ +class SINK_EXPORT SourceWriteBack : public ChangeReplay +{ +public: + SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier); + +protected: + ///Base implementation calls the replay$Type calls + virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; + +protected: + ///Implement to write back changes to the server + virtual KAsync::Job replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); + virtual KAsync::Job replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); + + //Read only access to main storage + EntityStore &store(); + + //Read/Write access to sync storage + RemoteIdMap &syncStore(); + +private: + Sink::Storage mSyncStorage; + QSharedPointer mSyncStore; + QSharedPointer mEntityStore; + Sink::Storage::Transaction mTransaction; + Sink::Storage::Transaction mSyncTransaction; + QByteArray mResourceType; + QByteArray mResourceInstanceIdentifier; +}; + +} diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp new file mode 100644 index 0000000..fb0baaa --- /dev/null +++ b/common/synchronizer.cpp @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "synchronizer.h" + +#include "definitions.h" +#include "commands.h" +#include "bufferutils.h" +#include "entitystore.h" +#include "remoteidmap.h" +#include "adaptorfactoryregistry.h" +#include "createentity_generated.h" +#include "modifyentity_generated.h" +#include "deleteentity_generated.h" + +using namespace Sink; + +Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) + : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), + mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), + mResourceType(resourceType), + mResourceInstanceIdentifier(resourceInstanceIdentifier) +{ + Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; +} + +void Synchronizer::setup(const std::function &enqueueCommandCallback) +{ + mEnqueue = enqueueCommandCallback; +} + +void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) +{ + Q_ASSERT(mEnqueue); + mEnqueue(commandId, data); +} + +EntityStore &Synchronizer::store() +{ + if (!mEntityStore) { + mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, mTransaction); + } + return *mEntityStore; +} + +RemoteIdMap &Synchronizer::syncStore() +{ + if (!mSyncStore) { + mSyncStore = QSharedPointer::create(mSyncTransaction); + } + return *mSyncStore; +} + +void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, + DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) +{ + // These changes are coming from the source + const auto replayToSource = false; + flatbuffers::FlatBufferBuilder entityFbb; + adaptorFactory.createBuffer(domainObject, entityFbb); + flatbuffers::FlatBufferBuilder fbb; + // This is the resource type and not the domain type + auto entityId = fbb.CreateString(sinkId.toStdString()); + auto type = fbb.CreateString(bufferType.toStdString()); + auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); + auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); + Sink::Commands::FinishCreateEntityBuffer(fbb, location); + callback(BufferUtils::extractBuffer(fbb)); +} + +void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, + DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) +{ + // These changes are coming from the source + const auto replayToSource = false; + flatbuffers::FlatBufferBuilder entityFbb; + adaptorFactory.createBuffer(domainObject, entityFbb); + flatbuffers::FlatBufferBuilder fbb; + auto entityId = fbb.CreateString(sinkId.toStdString()); + // This is the resource type and not the domain type + auto type = fbb.CreateString(bufferType.toStdString()); + auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); + // FIXME removals + auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); + Sink::Commands::FinishModifyEntityBuffer(fbb, location); + callback(BufferUtils::extractBuffer(fbb)); +} + +void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function callback) +{ + // These changes are coming from the source + const auto replayToSource = false; + flatbuffers::FlatBufferBuilder fbb; + auto entityId = fbb.CreateString(sinkId.toStdString()); + // This is the resource type and not the domain type + auto type = fbb.CreateString(bufferType.toStdString()); + auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); + Sink::Commands::FinishDeleteEntityBuffer(fbb, location); + callback(BufferUtils::extractBuffer(fbb)); +} + +void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) +{ + entryGenerator([this, bufferType, &exists](const QByteArray &key) { + auto sinkId = Sink::Storage::uidFromKey(key); + Trace() << "Checking for removal " << key; + const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); + // If we have no remoteId, the entity hasn't been replayed to the source yet + if (!remoteId.isEmpty()) { + if (!exists(remoteId)) { + Trace() << "Found a removed entity: " << sinkId; + deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, + [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); + } + } + }); +} + +void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) +{ + Trace() << "Create or modify" << bufferType << remoteId; + auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); + const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); + const auto found = mainDatabase.contains(sinkId); + auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); + if (!found) { + Trace() << "Found a new entity: " << remoteId; + createEntity( + sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); + } else { // modification + if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { + bool changed = false; + for (const auto &property : entity.changedProperties()) { + if (entity.getProperty(property) != current->getProperty(property)) { + Trace() << "Property changed " << sinkId << property; + changed = true; + } + } + if (changed) { + Trace() << "Found a modified entity: " << remoteId; + modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, + [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); + } + } else { + Warning() << "Failed to get current entity"; + } + } +} + +KAsync::Job Synchronizer::synchronize() +{ + mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); + mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); + return synchronizeWithSource().then([this]() { + mTransaction.abort(); + mSyncTransaction.commit(); + mSyncStore.clear(); + mEntityStore.clear(); + }); +} diff --git a/common/synchronizer.h b/common/synchronizer.h new file mode 100644 index 0000000..61bca7d --- /dev/null +++ b/common/synchronizer.h @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "sink_export.h" +#include +#include +#include + +#include "storage.h" + +namespace Sink { +class EntityStore; +class RemoteIdMap; + +/** + * Synchronize and add what we don't already have to local queue + */ +class SINK_EXPORT Synchronizer +{ +public: + Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); + + void setup(const std::function &enqueueCommandCallback); + KAsync::Job synchronize(); + +protected: + ///Calls the callback to enqueue the command + void enqueueCommand(int commandId, const QByteArray &data); + + static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, + DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); + static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, + DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); + static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function callback); + + /** + * A synchronous algorithm to remove entities that are no longer existing. + * + * A list of entities is generated by @param entryGenerator. + * The entiry Generator typically iterates over an index to produce all existing entries. + * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false, + * an entity delete command is enqueued. + * + * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. + */ + void scanForRemovals(const QByteArray &bufferType, + const std::function &callback)> &entryGenerator, std::function exists); + + /** + * An algorithm to create or modify the entity. + * + * Depending on whether the entity is locally available, or has changed. + */ + void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); + + //Read only access to main storage + EntityStore &store(); + + //Read/Write access to sync storage + RemoteIdMap &syncStore(); + + virtual KAsync::Job synchronizeWithSource() = 0; + +private: + QSharedPointer mSyncStore; + QSharedPointer mEntityStore; + Sink::Storage mStorage; + Sink::Storage mSyncStorage; + QByteArray mResourceType; + QByteArray mResourceInstanceIdentifier; + Sink::Storage::Transaction mTransaction; + Sink::Storage::Transaction mSyncTransaction; + std::function mEnqueue; +}; + +} + -- cgit v1.2.3