From 67bb6035b6333fe0d6d8566b5962f83c5870185f Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 19 Aug 2015 14:05:05 +0200 Subject: Transactions in the pipeline --- common/genericresource.cpp | 34 ++++++++++++++++++++------------ common/genericresource.h | 2 ++ common/pipeline.cpp | 48 ++++++++++++++++++++++++++++++++-------------- common/pipeline.h | 3 +++ 4 files changed, 61 insertions(+), 26 deletions(-) (limited to 'common') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index bbd992b..3b3fdb0 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -101,26 +101,26 @@ private slots: //Process all messages of this queue KAsync::Job processQueue(MessageQueue *queue) { - //TODO use something like: - //KAsync::foreach("pass iterator here").each("process value here").join(); - //KAsync::foreach("pass iterator here").parallel("process value here").join(); - return KAsync::dowhile( - [this, queue](KAsync::Future &future) { + return KAsync::start([this](){ + mPipeline->startTransaction(); + }).then(KAsync::dowhile( + [queue]() { return !queue->isEmpty(); }, + [this, queue](KAsync::Future &future) { queue->dequeueBatch(100, [this](const QByteArray &data) { Trace() << "Got value"; return processQueuedCommand(data); } - ).then([&future](){ - future.setValue(true); + ).then([&future, queue](){ future.setFinished(); }, [&future](int i, QString error) { Warning() << "Error while getting message from messagequeue: " << error; - future.setValue(false); future.setFinished(); }).exec(); } - ); + )).then([this]() { + mPipeline->commit(); + }); } KAsync::Job processPipeline() @@ -158,6 +158,10 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c mProcessor = new Processor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue); QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); + + mCommitQueueTimer.setInterval(100); + mCommitQueueTimer.setSingleShot(true); + QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); } GenericResource::~GenericResource() @@ -187,10 +191,16 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt void GenericResource::processCommand(int commandId, const QByteArray &data) { - //TODO instead of copying the command including the full entity first into the command queue, we could directly - //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). - //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). + static int modifications = 0; + mUserQueue.startTransaction(); enqueueCommand(mUserQueue, commandId, data); + modifications++; + if (modifications >= 100) { + mUserQueue.commit(); + modifications = 0; + } else { + mCommitQueueTimer.start(); + } } static void waitForDrained(KAsync::Future &f, MessageQueue &queue) diff --git a/common/genericresource.h b/common/genericresource.h index 4a285ea..532632e 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -23,6 +23,7 @@ #include #include #include +#include class Processor; @@ -56,6 +57,7 @@ protected: private: Processor *mProcessor; int mError; + QTimer mCommitQueueTimer; }; } diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 207cc5e..27b9deb 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -47,6 +47,7 @@ public: } Storage storage; + Storage::Transaction transaction; QHash > nullPipeline; QHash > newPipeline; QHash > modifiedPipeline; @@ -89,6 +90,27 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac d->adaptorFactory.insert(entityType, factory); } +void Pipeline::startTransaction() +{ + if (d->transaction) { + return; + } + d->transaction = std::move(storage().createTransaction(Akonadi2::Storage::ReadWrite)); +} + +void Pipeline::commit() +{ + if (d->transaction) { + d->transaction.commit(); + } + d->transaction = Storage::Transaction(); +} + +Storage::Transaction &Pipeline::transaction() +{ + return d->transaction; +} + Storage &Pipeline::storage() const { return d->storage; @@ -109,7 +131,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. const auto key = QUuid::createUuid().toString().toUtf8(); - const qint64 newRevision = storage().maxRevision() + 1; + const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; { flatbuffers::Verifier verifyer(reinterpret_cast(command), size); @@ -143,10 +165,8 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder fbb; EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); - auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); - transaction.write(key, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); - Akonadi2::Storage::setMaxRevision(transaction, newRevision); - transaction.commit(); + d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); + Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); Log() << "Pipeline: wrote entity: " << key << newRevision; return KAsync::start([this, key, entityType](KAsync::Future &future) { @@ -162,7 +182,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) { Log() << "Pipeline: Modified Entity"; - const qint64 newRevision = storage().maxRevision() + 1; + const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; { flatbuffers::Verifier verifyer(reinterpret_cast(command), size); @@ -245,9 +265,8 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); //TODO don't overwrite the old entry, but instead store a new revision - auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); - transaction.write(key, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); - Akonadi2::Storage::setMaxRevision(transaction, newRevision); + d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); + Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); return KAsync::start([this, key, entityType](KAsync::Future &future) { PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { @@ -262,7 +281,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) { Log() << "Pipeline: Deleted Entity"; - const qint64 newRevision = storage().maxRevision() + 1; + const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; { flatbuffers::Verifier verifyer(reinterpret_cast(command), size); @@ -277,9 +296,8 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted - auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); - transaction.remove(key); - Akonadi2::Storage::setMaxRevision(transaction, newRevision); + d->transaction.remove(key); + Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); Log() << "Pipeline: deleted entity: "<< newRevision; return KAsync::start([this, key, entityType](KAsync::Future &future) { @@ -418,7 +436,9 @@ void PipelineState::step() //TODO skip step if already processed //FIXME error handling if no result is found auto preprocessor = d->filterIt.next(); - d->pipeline->storage().createTransaction(Akonadi2::Storage::ReadOnly).scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool { + //FIXME this read should not be necessary + //Perhaps simply use entity that is initially stored and synchronously process all filters. (Making the first filter somewhat redundant) + d->pipeline->transaction().scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool { auto entity = Akonadi2::GetEntity(value); preprocessor->process(*this, *entity); return false; diff --git a/common/pipeline.h b/common/pipeline.h index 1a33f9a..7307b2e 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -51,6 +51,9 @@ public: Storage &storage() const; void setPreprocessors(const QString &entityType, Type pipelineType, const QVector &preprocessors); + void startTransaction(); + void commit(); + Storage::Transaction &transaction(); void null(); void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); -- cgit v1.2.3