diff options
-rw-r--r-- | common/genericresource.cpp | 75 | ||||
-rw-r--r-- | common/messagequeue.cpp | 50 | ||||
-rw-r--r-- | common/messagequeue.h | 2 |
3 files changed, 92 insertions, 35 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index a86b518..b3df389 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -77,6 +77,27 @@ private slots: | |||
77 | return KAsync::null<void>(); | 77 | return KAsync::null<void>(); |
78 | } | 78 | } |
79 | 79 | ||
80 | KAsync::Job<void> processQueuedCommand(const QByteArray &data) | ||
81 | { | ||
82 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); | ||
83 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | ||
84 | Warning() << "invalid buffer"; | ||
85 | return KAsync::error<void>(1, "Invalid Buffer"); | ||
86 | } | ||
87 | auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData()); | ||
88 | const auto commandId = queuedCommand->commandId(); | ||
89 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); | ||
90 | return processQueuedCommand(queuedCommand).then<void>( | ||
91 | [commandId]() { | ||
92 | Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); | ||
93 | }, | ||
94 | [](int errorCode, QString errorMessage) { | ||
95 | //FIXME propagate error, we didn't handle it | ||
96 | Warning() << "Error while processing queue command: " << errorMessage; | ||
97 | } | ||
98 | ); | ||
99 | } | ||
100 | |||
80 | //Process all messages of this queue | 101 | //Process all messages of this queue |
81 | KAsync::Job<void> processQueue(MessageQueue *queue) | 102 | KAsync::Job<void> processQueue(MessageQueue *queue) |
82 | { | 103 | { |
@@ -85,45 +106,29 @@ private slots: | |||
85 | //KAsync::foreach("pass iterator here").parallel("process value here").join(); | 106 | //KAsync::foreach("pass iterator here").parallel("process value here").join(); |
86 | return KAsync::dowhile( | 107 | return KAsync::dowhile( |
87 | [this, queue](KAsync::Future<bool> &future) { | 108 | [this, queue](KAsync::Future<bool> &future) { |
88 | if (queue->isEmpty()) { | 109 | queue->dequeueBatch(100, [this](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { |
89 | future.setValue(false); | 110 | Trace() << "Got value"; |
90 | future.setFinished(); | 111 | processQueuedCommand(QByteArray::fromRawData(static_cast<char*>(ptr), size)).then<void>( |
91 | return; | 112 | [&messageQueueCallback]() { |
92 | } | 113 | Trace() << "done"; |
93 | queue->dequeue( | 114 | messageQueueCallback(true); |
94 | [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | ||
95 | auto callback = [messageQueueCallback, &future](bool success) { | ||
96 | messageQueueCallback(true); | ||
97 | future.setValue(!success); | ||
98 | future.setFinished(); | ||
99 | }; | ||
100 | |||
101 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); | ||
102 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | ||
103 | Warning() << "invalid buffer"; | ||
104 | callback(false); | ||
105 | return; | ||
106 | } | ||
107 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); | ||
108 | const auto commandId = queuedCommand->commandId(); | ||
109 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); | ||
110 | processQueuedCommand(queuedCommand).then<void>( | ||
111 | [callback, commandId]() { | ||
112 | Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); | ||
113 | callback(true); | ||
114 | }, | 115 | }, |
115 | [callback](int errorCode, QString errorMessage) { | 116 | [&messageQueueCallback](int errorCode, QString errorMessage) { |
116 | Warning() << "Error while processing queue command: " << errorMessage; | 117 | //Use false? |
117 | callback(false); | 118 | //For now we use true to make sure we don't get stuck on messages we fail to process |
119 | messageQueueCallback(true); | ||
118 | } | 120 | } |
119 | ).exec(); | 121 | ).exec(); |
120 | }, | ||
121 | [&future](const MessageQueue::Error &error) { | ||
122 | Warning() << "Error while getting message from messagequeue: " << error.message; | ||
123 | future.setValue(false); | ||
124 | future.setFinished(); | ||
125 | } | 122 | } |
126 | ); | 123 | ).then<void>([&future](){ |
124 | future.setValue(true); | ||
125 | future.setFinished(); | ||
126 | }, | ||
127 | [&future](int i, QString error) { | ||
128 | Warning() << "Error while getting message from messagequeue: " << error; | ||
129 | future.setValue(false); | ||
130 | future.setFinished(); | ||
131 | }).exec(); | ||
127 | } | 132 | } |
128 | ); | 133 | ); |
129 | } | 134 | } |
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 84385ca..598c63a 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -61,6 +61,56 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu | |||
61 | } | 61 | } |
62 | } | 62 | } |
63 | 63 | ||
64 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler) | ||
65 | { | ||
66 | auto resultCount = QSharedPointer<int>::create(0); | ||
67 | auto keyList = QSharedPointer<QByteArrayList>::create(); | ||
68 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) { | ||
69 | bool readValue = false; | ||
70 | int count = 0; | ||
71 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, &readValue, resultCount, keyList, &count, maxBatchSize](const QByteArray &key, const QByteArray &value) -> bool { | ||
72 | if (Akonadi2::Storage::isInternalKey(key)) { | ||
73 | return true; | ||
74 | } | ||
75 | readValue = true; | ||
76 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) | ||
77 | keyList->append(QByteArray(key.constData(), key.size())); | ||
78 | resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, resultCount, keyList, &count](bool success) { | ||
79 | *resultCount += 1; | ||
80 | //We're done | ||
81 | //FIXME the check below should only be done once we finished reading | ||
82 | if (*resultCount >= count) { | ||
83 | //FIXME do this from the caller thread | ||
84 | auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); | ||
85 | for (const auto &key : *keyList) { | ||
86 | transaction.remove(key, [key](const Akonadi2::Storage::Error &error) { | ||
87 | ErrorMsg() << "Error while removing value" << error.message << key; | ||
88 | //Don't call the errorhandler in here, we already called the result handler | ||
89 | }); | ||
90 | } | ||
91 | if (isEmpty()) { | ||
92 | emit this->drained(); | ||
93 | } | ||
94 | } | ||
95 | }); | ||
96 | count++; | ||
97 | if (count <= maxBatchSize) { | ||
98 | return true; | ||
99 | } | ||
100 | return false; | ||
101 | }, | ||
102 | [](const Akonadi2::Storage::Error &error) { | ||
103 | ErrorMsg() << "Error while retrieving value" << error.message; | ||
104 | // errorHandler(Error(error.store, error.code, error.message)); | ||
105 | } | ||
106 | ); | ||
107 | if (!readValue) { | ||
108 | future.setError(-1, "No message found"); | ||
109 | future.setFinished(); | ||
110 | } | ||
111 | }); | ||
112 | } | ||
113 | |||
64 | bool MessageQueue::isEmpty() | 114 | bool MessageQueue::isEmpty() |
65 | { | 115 | { |
66 | int count = 0; | 116 | int count = 0; |
diff --git a/common/messagequeue.h b/common/messagequeue.h index 3393394..c5e32db 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h | |||
@@ -4,6 +4,7 @@ | |||
4 | #include <string> | 4 | #include <string> |
5 | #include <functional> | 5 | #include <functional> |
6 | #include <QString> | 6 | #include <QString> |
7 | #include <Async/Async> | ||
7 | #include "storage.h" | 8 | #include "storage.h" |
8 | 9 | ||
9 | /** | 10 | /** |
@@ -32,6 +33,7 @@ public: | |||
32 | //TODO track processing progress to avoid processing the same message with the same preprocessor twice? | 33 | //TODO track processing progress to avoid processing the same message with the same preprocessor twice? |
33 | void dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler, | 34 | void dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler, |
34 | const std::function<void(const Error &error)> &errorHandler); | 35 | const std::function<void(const Error &error)> &errorHandler); |
36 | KAsync::Job<void> dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler); | ||
35 | bool isEmpty(); | 37 | bool isEmpty(); |
36 | signals: | 38 | signals: |
37 | void messageReady(); | 39 | void messageReady(); |