summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/genericresource.cpp75
-rw-r--r--common/messagequeue.cpp50
-rw-r--r--common/messagequeue.h2
3 files changed, 92 insertions, 35 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index a86b518..b3df389 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -77,6 +77,27 @@ private slots:
77 return KAsync::null<void>(); 77 return KAsync::null<void>();
78 } 78 }
79 79
80 KAsync::Job<void> processQueuedCommand(const QByteArray &data)
81 {
82 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
83 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
84 Warning() << "invalid buffer";
85 return KAsync::error<void>(1, "Invalid Buffer");
86 }
87 auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData());
88 const auto commandId = queuedCommand->commandId();
89 Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId);
90 return processQueuedCommand(queuedCommand).then<void>(
91 [commandId]() {
92 Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId);
93 },
94 [](int errorCode, QString errorMessage) {
95 //FIXME propagate error, we didn't handle it
96 Warning() << "Error while processing queue command: " << errorMessage;
97 }
98 );
99 }
100
80 //Process all messages of this queue 101 //Process all messages of this queue
81 KAsync::Job<void> processQueue(MessageQueue *queue) 102 KAsync::Job<void> processQueue(MessageQueue *queue)
82 { 103 {
@@ -85,45 +106,29 @@ private slots:
85 //KAsync::foreach("pass iterator here").parallel("process value here").join(); 106 //KAsync::foreach("pass iterator here").parallel("process value here").join();
86 return KAsync::dowhile( 107 return KAsync::dowhile(
87 [this, queue](KAsync::Future<bool> &future) { 108 [this, queue](KAsync::Future<bool> &future) {
88 if (queue->isEmpty()) { 109 queue->dequeueBatch(100, [this](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
89 future.setValue(false); 110 Trace() << "Got value";
90 future.setFinished(); 111 processQueuedCommand(QByteArray::fromRawData(static_cast<char*>(ptr), size)).then<void>(
91 return; 112 [&messageQueueCallback]() {
92 } 113 Trace() << "done";
93 queue->dequeue( 114 messageQueueCallback(true);
94 [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
95 auto callback = [messageQueueCallback, &future](bool success) {
96 messageQueueCallback(true);
97 future.setValue(!success);
98 future.setFinished();
99 };
100
101 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
102 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
103 Warning() << "invalid buffer";
104 callback(false);
105 return;
106 }
107 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
108 const auto commandId = queuedCommand->commandId();
109 Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId);
110 processQueuedCommand(queuedCommand).then<void>(
111 [callback, commandId]() {
112 Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId);
113 callback(true);
114 }, 115 },
115 [callback](int errorCode, QString errorMessage) { 116 [&messageQueueCallback](int errorCode, QString errorMessage) {
116 Warning() << "Error while processing queue command: " << errorMessage; 117 //Use false?
117 callback(false); 118 //For now we use true to make sure we don't get stuck on messages we fail to process
119 messageQueueCallback(true);
118 } 120 }
119 ).exec(); 121 ).exec();
120 },
121 [&future](const MessageQueue::Error &error) {
122 Warning() << "Error while getting message from messagequeue: " << error.message;
123 future.setValue(false);
124 future.setFinished();
125 } 122 }
126 ); 123 ).then<void>([&future](){
124 future.setValue(true);
125 future.setFinished();
126 },
127 [&future](int i, QString error) {
128 Warning() << "Error while getting message from messagequeue: " << error;
129 future.setValue(false);
130 future.setFinished();
131 }).exec();
127 } 132 }
128 ); 133 );
129 } 134 }
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 84385ca..598c63a 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -61,6 +61,56 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu
61 } 61 }
62} 62}
63 63
64KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler)
65{
66 auto resultCount = QSharedPointer<int>::create(0);
67 auto keyList = QSharedPointer<QByteArrayList>::create();
68 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) {
69 bool readValue = false;
70 int count = 0;
71 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, &readValue, resultCount, keyList, &count, maxBatchSize](const QByteArray &key, const QByteArray &value) -> bool {
72 if (Akonadi2::Storage::isInternalKey(key)) {
73 return true;
74 }
75 readValue = true;
76 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
77 keyList->append(QByteArray(key.constData(), key.size()));
78 resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, resultCount, keyList, &count](bool success) {
79 *resultCount += 1;
80 //We're done
81 //FIXME the check below should only be done once we finished reading
82 if (*resultCount >= count) {
83 //FIXME do this from the caller thread
84 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
85 for (const auto &key : *keyList) {
86 transaction.remove(key, [key](const Akonadi2::Storage::Error &error) {
87 ErrorMsg() << "Error while removing value" << error.message << key;
88 //Don't call the errorhandler in here, we already called the result handler
89 });
90 }
91 if (isEmpty()) {
92 emit this->drained();
93 }
94 }
95 });
96 count++;
97 if (count <= maxBatchSize) {
98 return true;
99 }
100 return false;
101 },
102 [](const Akonadi2::Storage::Error &error) {
103 ErrorMsg() << "Error while retrieving value" << error.message;
104 // errorHandler(Error(error.store, error.code, error.message));
105 }
106 );
107 if (!readValue) {
108 future.setError(-1, "No message found");
109 future.setFinished();
110 }
111 });
112}
113
64bool MessageQueue::isEmpty() 114bool MessageQueue::isEmpty()
65{ 115{
66 int count = 0; 116 int count = 0;
diff --git a/common/messagequeue.h b/common/messagequeue.h
index 3393394..c5e32db 100644
--- a/common/messagequeue.h
+++ b/common/messagequeue.h
@@ -4,6 +4,7 @@
4#include <string> 4#include <string>
5#include <functional> 5#include <functional>
6#include <QString> 6#include <QString>
7#include <Async/Async>
7#include "storage.h" 8#include "storage.h"
8 9
9/** 10/**
@@ -32,6 +33,7 @@ public:
32 //TODO track processing progress to avoid processing the same message with the same preprocessor twice? 33 //TODO track processing progress to avoid processing the same message with the same preprocessor twice?
33 void dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler, 34 void dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler,
34 const std::function<void(const Error &error)> &errorHandler); 35 const std::function<void(const Error &error)> &errorHandler);
36 KAsync::Job<void> dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler);
35 bool isEmpty(); 37 bool isEmpty();
36signals: 38signals:
37 void messageReady(); 39 void messageReady();