summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp34
1 files changed, 22 insertions, 12 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index bbd992b..3b3fdb0 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -101,26 +101,26 @@ private slots:
101 //Process all messages of this queue 101 //Process all messages of this queue
102 KAsync::Job<void> processQueue(MessageQueue *queue) 102 KAsync::Job<void> processQueue(MessageQueue *queue)
103 { 103 {
104 //TODO use something like: 104 return KAsync::start<void>([this](){
105 //KAsync::foreach("pass iterator here").each("process value here").join(); 105 mPipeline->startTransaction();
106 //KAsync::foreach("pass iterator here").parallel("process value here").join(); 106 }).then(KAsync::dowhile(
107 return KAsync::dowhile( 107 [queue]() { return !queue->isEmpty(); },
108 [this, queue](KAsync::Future<bool> &future) { 108 [this, queue](KAsync::Future<void> &future) {
109 queue->dequeueBatch(100, [this](const QByteArray &data) { 109 queue->dequeueBatch(100, [this](const QByteArray &data) {
110 Trace() << "Got value"; 110 Trace() << "Got value";
111 return processQueuedCommand(data); 111 return processQueuedCommand(data);
112 } 112 }
113 ).then<void>([&future](){ 113 ).then<void>([&future, queue](){
114 future.setValue(true);
115 future.setFinished(); 114 future.setFinished();
116 }, 115 },
117 [&future](int i, QString error) { 116 [&future](int i, QString error) {
118 Warning() << "Error while getting message from messagequeue: " << error; 117 Warning() << "Error while getting message from messagequeue: " << error;
119 future.setValue(false);
120 future.setFinished(); 118 future.setFinished();
121 }).exec(); 119 }).exec();
122 } 120 }
123 ); 121 )).then<void>([this]() {
122 mPipeline->commit();
123 });
124 } 124 }
125 125
126 KAsync::Job<void> processPipeline() 126 KAsync::Job<void> processPipeline()
@@ -158,6 +158,10 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
158 mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); 158 mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
159 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 159 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
160 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 160 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
161
162 mCommitQueueTimer.setInterval(100);
163 mCommitQueueTimer.setSingleShot(true);
164 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit);
161} 165}
162 166
163GenericResource::~GenericResource() 167GenericResource::~GenericResource()
@@ -187,10 +191,16 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt
187 191
188void GenericResource::processCommand(int commandId, const QByteArray &data) 192void GenericResource::processCommand(int commandId, const QByteArray &data)
189{ 193{
190 //TODO instead of copying the command including the full entity first into the command queue, we could directly 194 static int modifications = 0;
191 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). 195 mUserQueue.startTransaction();
192 //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire).
193 enqueueCommand(mUserQueue, commandId, data); 196 enqueueCommand(mUserQueue, commandId, data);
197 modifications++;
198 if (modifications >= 100) {
199 mUserQueue.commit();
200 modifications = 0;
201 } else {
202 mCommitQueueTimer.start();
203 }
194} 204}
195 205
196static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) 206static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue)