From 67bb6035b6333fe0d6d8566b5962f83c5870185f Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 19 Aug 2015 14:05:05 +0200 Subject: Transactions in the pipeline --- common/genericresource.cpp | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) (limited to 'common/genericresource.cpp') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index bbd992b..3b3fdb0 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -101,26 +101,26 @@ private slots: //Process all messages of this queue KAsync::Job processQueue(MessageQueue *queue) { - //TODO use something like: - //KAsync::foreach("pass iterator here").each("process value here").join(); - //KAsync::foreach("pass iterator here").parallel("process value here").join(); - return KAsync::dowhile( - [this, queue](KAsync::Future &future) { + return KAsync::start([this](){ + mPipeline->startTransaction(); + }).then(KAsync::dowhile( + [queue]() { return !queue->isEmpty(); }, + [this, queue](KAsync::Future &future) { queue->dequeueBatch(100, [this](const QByteArray &data) { Trace() << "Got value"; return processQueuedCommand(data); } - ).then([&future](){ - future.setValue(true); + ).then([&future, queue](){ future.setFinished(); }, [&future](int i, QString error) { Warning() << "Error while getting message from messagequeue: " << error; - future.setValue(false); future.setFinished(); }).exec(); } - ); + )).then([this]() { + mPipeline->commit(); + }); } KAsync::Job processPipeline() @@ -158,6 +158,10 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c mProcessor = new Processor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue); QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); + + mCommitQueueTimer.setInterval(100); + mCommitQueueTimer.setSingleShot(true); + QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); } GenericResource::~GenericResource() @@ -187,10 +191,16 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt void GenericResource::processCommand(int commandId, const QByteArray &data) { - //TODO instead of copying the command including the full entity first into the command queue, we could directly - //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). - //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). + static int modifications = 0; + mUserQueue.startTransaction(); enqueueCommand(mUserQueue, commandId, data); + modifications++; + if (modifications >= 100) { + mUserQueue.commit(); + modifications = 0; + } else { + mCommitQueueTimer.start(); + } } static void waitForDrained(KAsync::Future &f, MessageQueue &queue) -- cgit v1.2.3