summaryrefslogtreecommitdiffstats
path: root/dummyresource/resourcefactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'dummyresource/resourcefactory.cpp')
-rw-r--r--dummyresource/resourcefactory.cpp22
1 files changed, 19 insertions, 3 deletions
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index 10c8eaf..6e06250 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -122,6 +122,17 @@ public:
122signals: 122signals:
123 void error(int errorCode, const QString &errorMessage); 123 void error(int errorCode, const QString &errorMessage);
124 124
125private:
126 bool messagesToProcessAvailable()
127 {
128 for (auto queue : mCommandQueues) {
129 if (!queue->isEmpty()) {
130 return true;
131 }
132 }
133 return false;
134 }
135
125private slots: 136private slots:
126 void process() 137 void process()
127 { 138 {
@@ -131,11 +142,15 @@ private slots:
131 mProcessingLock = true; 142 mProcessingLock = true;
132 auto job = processPipeline().then<void>([this]() { 143 auto job = processPipeline().then<void>([this]() {
133 mProcessingLock = false; 144 mProcessingLock = false;
145 if (messagesToProcessAvailable()) {
146 process();
147 }
134 }).exec(); 148 }).exec();
135 } 149 }
136 150
137 Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) 151 Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand)
138 { 152 {
153 Log() << "Processing command: " << queuedCommand->commandId();
139 //Throw command into appropriate pipeline 154 //Throw command into appropriate pipeline
140 switch (queuedCommand->commandId()) { 155 switch (queuedCommand->commandId()) {
141 case Akonadi2::Commands::DeleteEntityCommand: 156 case Akonadi2::Commands::DeleteEntityCommand:
@@ -170,7 +185,7 @@ private slots:
170 185
171 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); 186 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
172 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { 187 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
173 qWarning() << "invalid buffer"; 188 Warning() << "invalid buffer";
174 callback(false); 189 callback(false);
175 return; 190 return;
176 } 191 }
@@ -185,13 +200,13 @@ private slots:
185 callback(true); 200 callback(true);
186 }, 201 },
187 [callback](int errorCode, QString errorMessage) { 202 [callback](int errorCode, QString errorMessage) {
188 Warning() << errorMessage; 203 Warning() << "Error while processing queue command: " << errorMessage;
189 callback(false); 204 callback(false);
190 } 205 }
191 ).exec(); 206 ).exec();
192 }, 207 },
193 [&future](const MessageQueue::Error &error) { 208 [&future](const MessageQueue::Error &error) {
194 Warning() << error.message; 209 Warning() << "Error while getting message from messagequeue: " << error.message;
195 future.setValue(false); 210 future.setValue(false);
196 future.setFinished(); 211 future.setFinished();
197 } 212 }
@@ -209,6 +224,7 @@ private slots:
209 [it, this](Async::Future<void> &future) { 224 [it, this](Async::Future<void> &future) {
210 auto queue = it->next(); 225 auto queue = it->next();
211 processQueue(queue).then<void>([&future]() { 226 processQueue(queue).then<void>([&future]() {
227 Trace() << "Queue processed";
212 future.setFinished(); 228 future.setFinished();
213 }).exec(); 229 }).exec();
214 } 230 }