diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-19 14:05:05 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-19 14:05:05 +0200 |
commit | 67bb6035b6333fe0d6d8566b5962f83c5870185f (patch) | |
tree | 39f2fdbeb4ad814cbe0066f1df627b56328f5fe1 /common/genericresource.cpp | |
parent | b6502ce1137b3ef7af8a908a9fa5d8fbeed6ed32 (diff) | |
download | sink-67bb6035b6333fe0d6d8566b5962f83c5870185f.tar.gz sink-67bb6035b6333fe0d6d8566b5962f83c5870185f.zip |
Transactions in the pipeline
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index bbd992b..3b3fdb0 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -101,26 +101,26 @@ private slots: | |||
101 | //Process all messages of this queue | 101 | //Process all messages of this queue |
102 | KAsync::Job<void> processQueue(MessageQueue *queue) | 102 | KAsync::Job<void> processQueue(MessageQueue *queue) |
103 | { | 103 | { |
104 | //TODO use something like: | 104 | return KAsync::start<void>([this](){ |
105 | //KAsync::foreach("pass iterator here").each("process value here").join(); | 105 | mPipeline->startTransaction(); |
106 | //KAsync::foreach("pass iterator here").parallel("process value here").join(); | 106 | }).then(KAsync::dowhile( |
107 | return KAsync::dowhile( | 107 | [queue]() { return !queue->isEmpty(); }, |
108 | [this, queue](KAsync::Future<bool> &future) { | 108 | [this, queue](KAsync::Future<void> &future) { |
109 | queue->dequeueBatch(100, [this](const QByteArray &data) { | 109 | queue->dequeueBatch(100, [this](const QByteArray &data) { |
110 | Trace() << "Got value"; | 110 | Trace() << "Got value"; |
111 | return processQueuedCommand(data); | 111 | return processQueuedCommand(data); |
112 | } | 112 | } |
113 | ).then<void>([&future](){ | 113 | ).then<void>([&future, queue](){ |
114 | future.setValue(true); | ||
115 | future.setFinished(); | 114 | future.setFinished(); |
116 | }, | 115 | }, |
117 | [&future](int i, QString error) { | 116 | [&future](int i, QString error) { |
118 | Warning() << "Error while getting message from messagequeue: " << error; | 117 | Warning() << "Error while getting message from messagequeue: " << error; |
119 | future.setValue(false); | ||
120 | future.setFinished(); | 118 | future.setFinished(); |
121 | }).exec(); | 119 | }).exec(); |
122 | } | 120 | } |
123 | ); | 121 | )).then<void>([this]() { |
122 | mPipeline->commit(); | ||
123 | }); | ||
124 | } | 124 | } |
125 | 125 | ||
126 | KAsync::Job<void> processPipeline() | 126 | KAsync::Job<void> processPipeline() |
@@ -158,6 +158,10 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
158 | mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | 158 | mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); |
159 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 159 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
160 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 160 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
161 | |||
162 | mCommitQueueTimer.setInterval(100); | ||
163 | mCommitQueueTimer.setSingleShot(true); | ||
164 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); | ||
161 | } | 165 | } |
162 | 166 | ||
163 | GenericResource::~GenericResource() | 167 | GenericResource::~GenericResource() |
@@ -187,10 +191,16 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt | |||
187 | 191 | ||
188 | void GenericResource::processCommand(int commandId, const QByteArray &data) | 192 | void GenericResource::processCommand(int commandId, const QByteArray &data) |
189 | { | 193 | { |
190 | //TODO instead of copying the command including the full entity first into the command queue, we could directly | 194 | static int modifications = 0; |
191 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). | 195 | mUserQueue.startTransaction(); |
192 | //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). | ||
193 | enqueueCommand(mUserQueue, commandId, data); | 196 | enqueueCommand(mUserQueue, commandId, data); |
197 | modifications++; | ||
198 | if (modifications >= 100) { | ||
199 | mUserQueue.commit(); | ||
200 | modifications = 0; | ||
201 | } else { | ||
202 | mCommitQueueTimer.start(); | ||
203 | } | ||
194 | } | 204 | } |
195 | 205 | ||
196 | static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) | 206 | static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) |