From dabd408dcd372f16c7934597db30346869cd8ad8 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 29 May 2016 15:19:21 +0200 Subject: Fixed genericresource so it works with the maildirresourcetest --- common/changereplay.cpp | 2 +- common/genericresource.cpp | 31 +++++++++++++++++-------------- common/genericresource.h | 7 ++++--- common/sourcewriteback.cpp | 30 +++++++++++++++++++++--------- common/storage.h | 5 +---- common/storage_lmdb.cpp | 21 +++++++++++++++++---- common/synchronizer.cpp | 43 ++++++++++++++++++++++++++++++++++--------- common/synchronizer.h | 17 +++++++++++------ 8 files changed, 106 insertions(+), 50 deletions(-) (limited to 'common') diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 2447b6e..63c41c8 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp @@ -85,9 +85,9 @@ void ChangeReplay::revisionChanged() Storage::mainDatabase(mainStoreTransaction, type) .scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { + Trace() << "Replaying " << key; replay(type, key, value).exec(); // TODO make for loop async, and pass to async replay function together with type - Trace() << "Replaying " << key; return false; }, [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 568e066..cd3ea02 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -233,22 +233,17 @@ private: #undef DEBUG_AREA #define DEBUG_AREA "resource" -GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline, const QSharedPointer &changeReplay, const QSharedPointer &synchronizer) +GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline ) : Sink::Resource(), mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceInstanceIdentifier)), - mChangeReplay(changeReplay), - mSynchronizer(synchronizer), mError(0), mClientLowerBoundRevision(std::numeric_limits::max()) { mPipeline->setResourceType(mResourceType); - mSynchronizer->setup([this](int commandId, const QByteArray &data) { - enqueueCommand(mSynchronizerQueue, commandId, data); - }); mProcessor = new CommandProcessor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue); mProcessor->setInspectionCommand([this](void const *command, size_t size) { flatbuffers::Verifier verifier((const uint8_t *)command, size); @@ -290,9 +285,7 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra }); QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); - enableChangeReplay(true); mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); - mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); mCommitQueueTimer.setInterval(sCommitInterval); mCommitQueueTimer.setSingleShot(true); @@ -313,6 +306,7 @@ KAsync::Job GenericResource::inspect( void GenericResource::enableChangeReplay(bool enable) { + Q_ASSERT(mChangeReplay); if (enable) { QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); @@ -323,11 +317,25 @@ void GenericResource::enableChangeReplay(bool enable) } } -void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector &preprocessors) +void GenericResource::setupPreprocessors(const QByteArray &type, const QVector &preprocessors) { mPipeline->setPreprocessors(type, preprocessors); } +void GenericResource::setupSynchronizer(const QSharedPointer &synchronizer) +{ + mSynchronizer = synchronizer; + mSynchronizer->setup([this](int commandId, const QByteArray &data) { + enqueueCommand(mSynchronizerQueue, commandId, data); + }); +} + +void GenericResource::setupChangereplay(const QSharedPointer &changeReplay) +{ + mChangeReplay = changeReplay; + mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); + enableChangeReplay(true); +} void GenericResource::removeDataFromDisk() { @@ -406,11 +414,6 @@ KAsync::Job GenericResource::synchronizeWithSource() }); } -KAsync::Job GenericResource::synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore) -{ - return KAsync::null(); -} - static void waitForDrained(KAsync::Future &f, MessageQueue &queue) { if (queue.isEmpty()) { diff --git a/common/genericresource.h b/common/genericresource.h index 4ed408d..0878968 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -41,12 +41,11 @@ class Synchronizer; class SINK_EXPORT GenericResource : public Resource { public: - GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline, const QSharedPointer &changeReplay, const QSharedPointer &synchronizer); + GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline); virtual ~GenericResource(); virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; virtual KAsync::Job synchronizeWithSource() Q_DECL_OVERRIDE; - virtual KAsync::Job synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore); virtual KAsync::Job processAllMessages() Q_DECL_OVERRIDE; virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; virtual KAsync::Job @@ -64,7 +63,9 @@ private slots: protected: void enableChangeReplay(bool); - void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector &preprocessors); + void setupPreprocessors(const QByteArray &type, const QVector &preprocessors); + void setupSynchronizer(const QSharedPointer &synchronizer); + void setupChangereplay(const QSharedPointer &changeReplay); void onProcessorError(int errorCode, const QString &errorMessage); void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp index 1ef20d2..1c07577 100644 --- a/common/sourcewriteback.cpp +++ b/common/sourcewriteback.cpp @@ -54,8 +54,7 @@ RemoteIdMap &SourceWriteBack::syncStore() KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) { - mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); - mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); + Trace() << "Replaying" << type << key; Sink::EntityBuffer buffer(value); const Sink::Entity &entity = buffer.entity(); @@ -65,7 +64,14 @@ KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArr Trace() << "Change is coming from the source"; return KAsync::null(); } - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + Q_ASSERT(!mSyncStore); + Q_ASSERT(!mEntityStore); + Q_ASSERT(!mTransaction); + Q_ASSERT(!mSyncTransaction); + mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); + mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); + + // const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; const auto uid = Sink::Storage::uidFromKey(key); QByteArray oldRemoteId; @@ -84,32 +90,38 @@ KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArr job = replay(mail, operation, oldRemoteId); } - return job.then([this, operation, type, uid](const QByteArray &remoteId) { - Trace() << "Replayed change with remote id: " << remoteId; + return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { if (operation == Sink::Operation_Creation) { + Trace() << "Replayed creation with remote id: " << remoteId; if (remoteId.isEmpty()) { Warning() << "Returned an empty remoteId from the creation"; } else { syncStore().recordRemoteId(type, uid, remoteId); } } else if (operation == Sink::Operation_Modification) { + Trace() << "Replayed modification with remote id: " << remoteId; if (remoteId.isEmpty()) { Warning() << "Returned an empty remoteId from the creation"; } else { syncStore().updateRemoteId(type, uid, remoteId); } } else if (operation == Sink::Operation_Removal) { - syncStore().removeRemoteId(type, uid, remoteId); + Trace() << "Replayed removal with remote id: " << oldRemoteId; + syncStore().removeRemoteId(type, uid, oldRemoteId); } else { - Warning() << "Unkown operation" << operation; + ErrorMsg() << "Unkown operation" << operation; } + mSyncStore.clear(); + mEntityStore.clear(); mTransaction.abort(); mSyncTransaction.commit(); + }, [this](int errorCode, const QString &errorMessage) { + Warning() << "Failed to replay change: " << errorMessage; mSyncStore.clear(); mEntityStore.clear(); - }, [](int errorCode, const QString &errorMessage) { - Warning() << "Failed to replay change: " << errorMessage; + mTransaction.abort(); + mSyncTransaction.commit(); }); } diff --git a/common/storage.h b/common/storage.h index b051daa..87573e2 100644 --- a/common/storage.h +++ b/common/storage.h @@ -157,10 +157,7 @@ public: return *this; } - operator bool() const - { - return (d != nullptr); - } + operator bool() const; private: Transaction(Transaction &other); diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 4ed9525..cc8b28d 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -347,7 +347,7 @@ class Storage::Transaction::Private { public: Private(bool _requestRead, const std::function &_defaultErrorHandler, const QString &_name, MDB_env *_env) - : env(_env), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0) + : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0) { } ~Private() @@ -366,8 +366,15 @@ public: void startTransaction() { - // qDebug() << "Opening transaction " << requestedRead; + Q_ASSERT(!transaction); + // auto f = [](const char *msg, void *ctx) -> int { + // qDebug() << msg; + // return 0; + // }; + // mdb_reader_list(env, f, nullptr); + // Trace_area("storage." + name.toLatin1()) << "Opening transaction " << requestedRead; const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction); + // Trace_area("storage." + name.toLatin1()) << "Started transaction " << mdb_txn_id(transaction) << transaction; if (rc) { defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); } @@ -387,22 +394,27 @@ Storage::Transaction::~Transaction() { if (d && d->transaction) { if (d->implicitCommit && !d->error) { - // qDebug() << "implicit commit"; commit(); } else { - // qDebug() << "Aorting transaction"; + // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; mdb_txn_abort(d->transaction); } } delete d; } +Storage::Transaction::operator bool() const +{ + return (d && d->transaction); +} + bool Storage::Transaction::commit(const std::function &errorHandler) { if (!d || !d->transaction) { return false; } + // Trace_area("storage." + d->name.toLatin1()) << "Committing transaction" << mdb_txn_id(d->transaction) << d->transaction; const int rc = mdb_txn_commit(d->transaction); if (rc) { mdb_txn_abort(d->transaction); @@ -420,6 +432,7 @@ void Storage::Transaction::abort() return; } + // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; mdb_txn_abort(d->transaction); d->transaction = nullptr; } diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index fb0baaa..b264662 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -54,7 +54,7 @@ void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) EntityStore &Synchronizer::store() { if (!mEntityStore) { - mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, mTransaction); + mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, transaction()); } return *mEntityStore; } @@ -62,7 +62,7 @@ EntityStore &Synchronizer::store() RemoteIdMap &Synchronizer::syncStore() { if (!mSyncStore) { - mSyncStore = QSharedPointer::create(mSyncTransaction); + mSyncStore = QSharedPointer::create(syncTransaction()); } return *mSyncStore; } @@ -125,7 +125,7 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func if (!remoteId.isEmpty()) { if (!exists(remoteId)) { Trace() << "Found a removed entity: " << sinkId; - deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, + deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); } } @@ -135,7 +135,7 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { Trace() << "Create or modify" << bufferType << remoteId; - auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); + auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); const auto found = mainDatabase.contains(sinkId); auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); @@ -154,7 +154,7 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray } if (changed) { Trace() << "Found a modified entity: " << remoteId; - modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, + modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); } } else { @@ -165,12 +165,37 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray KAsync::Job Synchronizer::synchronize() { - mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); - mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); + Trace() << "Synchronizing"; return synchronizeWithSource().then([this]() { - mTransaction.abort(); - mSyncTransaction.commit(); mSyncStore.clear(); mEntityStore.clear(); }); } + +void Synchronizer::commit() +{ + mTransaction.abort(); +} + +void Synchronizer::commitSync() +{ + mSyncTransaction.commit(); +} + +Sink::Storage::Transaction &Synchronizer::transaction() +{ + if (!mTransaction) { + Trace() << "Starting transaction"; + mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); + } + return mTransaction; +} + +Sink::Storage::Transaction &Synchronizer::syncTransaction() +{ + if (!mSyncTransaction) { + Trace() << "Starting transaction"; + mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); + } + return mSyncTransaction; +} diff --git a/common/synchronizer.h b/common/synchronizer.h index 61bca7d..17e7003 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -41,6 +41,17 @@ public: void setup(const std::function &enqueueCommandCallback); KAsync::Job synchronize(); + //Read only access to main storage + EntityStore &store(); + + //Read/Write access to sync storage + RemoteIdMap &syncStore(); + + void commit(); + void commitSync(); + Sink::Storage::Transaction &transaction(); + Sink::Storage::Transaction &syncTransaction(); + protected: ///Calls the callback to enqueue the command void enqueueCommand(int commandId, const QByteArray &data); @@ -71,12 +82,6 @@ protected: */ void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); - //Read only access to main storage - EntityStore &store(); - - //Read/Write access to sync storage - RemoteIdMap &syncStore(); - virtual KAsync::Job synchronizeWithSource() = 0; private: -- cgit v1.2.3