summaryrefslogtreecommitdiffstats
path: root/common/messagequeue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r--common/messagequeue.cpp89
1 files changed, 61 insertions, 28 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 598c63a..3b5ca2b 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -3,6 +3,39 @@
3#include <QDebug> 3#include <QDebug>
4#include <log.h> 4#include <log.h>
5 5
6static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void> > &futures)
7{
8 auto context = new QObject;
9 return KAsync::start<void>([futures, context](KAsync::Future<void> &future) {
10 const auto total = futures.size();
11 auto count = QSharedPointer<int>::create();
12 int i = 0;
13 for (KAsync::Future<void> subFuture : futures) {
14 i++;
15 if (subFuture.isFinished()) {
16 *count += 1;
17 continue;
18 }
19 //FIXME bind lifetime all watcher to future (repectively the main job
20 auto watcher = QSharedPointer<KAsync::FutureWatcher<void> >::create();
21 QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady,
22 [count, total, &future](){
23 *count += 1;
24 if (*count == total) {
25 future.setFinished();
26 }
27 });
28 watcher->setFuture(subFuture);
29 context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher));
30 }
31 if (*count == total) {
32 future.setFinished();
33 }
34 }).then<void>([context]() {
35 delete context;
36 });
37}
38
6MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) 39MessageQueue::MessageQueue(const QString &storageRoot, const QString &name)
7 : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) 40 : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite)
8{ 41{
@@ -61,38 +94,22 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu
61 } 94 }
62} 95}
63 96
64KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler) 97KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler)
65{ 98{
66 auto resultCount = QSharedPointer<int>::create(0); 99 auto resultCount = QSharedPointer<int>::create(0);
67 auto keyList = QSharedPointer<QByteArrayList>::create(); 100 auto keyList = QSharedPointer<QByteArrayList>::create();
68 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) { 101 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) {
69 bool readValue = false;
70 int count = 0; 102 int count = 0;
71 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, &readValue, resultCount, keyList, &count, maxBatchSize](const QByteArray &key, const QByteArray &value) -> bool { 103 QList<KAsync::Future<void> > waitCondition;
104 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, keyList, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool {
72 if (Akonadi2::Storage::isInternalKey(key)) { 105 if (Akonadi2::Storage::isInternalKey(key)) {
73 return true; 106 return true;
74 } 107 }
75 readValue = true;
76 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) 108 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
77 keyList->append(QByteArray(key.constData(), key.size())); 109 keyList->append(QByteArray(key.constData(), key.size()));
78 resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, resultCount, keyList, &count](bool success) { 110
79 *resultCount += 1; 111 waitCondition << resultHandler(value).exec();
80 //We're done 112
81 //FIXME the check below should only be done once we finished reading
82 if (*resultCount >= count) {
83 //FIXME do this from the caller thread
84 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
85 for (const auto &key : *keyList) {
86 transaction.remove(key, [key](const Akonadi2::Storage::Error &error) {
87 ErrorMsg() << "Error while removing value" << error.message << key;
88 //Don't call the errorhandler in here, we already called the result handler
89 });
90 }
91 if (isEmpty()) {
92 emit this->drained();
93 }
94 }
95 });
96 count++; 113 count++;
97 if (count <= maxBatchSize) { 114 if (count <= maxBatchSize) {
98 return true; 115 return true;
@@ -102,12 +119,28 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
102 [](const Akonadi2::Storage::Error &error) { 119 [](const Akonadi2::Storage::Error &error) {
103 ErrorMsg() << "Error while retrieving value" << error.message; 120 ErrorMsg() << "Error while retrieving value" << error.message;
104 // errorHandler(Error(error.store, error.code, error.message)); 121 // errorHandler(Error(error.store, error.code, error.message));
105 } 122 });
106 ); 123
107 if (!readValue) { 124 ::waitForCompletion(waitCondition).then<void>([this, keyList, &future]() {
108 future.setError(-1, "No message found"); 125 Trace() << "Dequeue complete, removing values " << *keyList;
109 future.setFinished(); 126 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
110 } 127 for (const auto &key : *keyList) {
128 transaction.remove(key, [key](const Akonadi2::Storage::Error &error) {
129 ErrorMsg() << "Error while removing value" << error.message << key;
130 //Don't call the errorhandler in here, we already called the result handler
131 });
132 }
133 transaction.commit();
134 if (keyList->isEmpty()) {
135 future.setError(-1, "No message found");
136 future.setFinished();
137 } else {
138 if (isEmpty()) {
139 emit this->drained();
140 }
141 future.setFinished();
142 }
143 }).exec();
111 }); 144 });
112} 145}
113 146