diff options
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r-- | common/messagequeue.cpp | 89 |
1 files changed, 61 insertions, 28 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 598c63a..3b5ca2b 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -3,6 +3,39 @@ | |||
3 | #include <QDebug> | 3 | #include <QDebug> |
4 | #include <log.h> | 4 | #include <log.h> |
5 | 5 | ||
6 | static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void> > &futures) | ||
7 | { | ||
8 | auto context = new QObject; | ||
9 | return KAsync::start<void>([futures, context](KAsync::Future<void> &future) { | ||
10 | const auto total = futures.size(); | ||
11 | auto count = QSharedPointer<int>::create(); | ||
12 | int i = 0; | ||
13 | for (KAsync::Future<void> subFuture : futures) { | ||
14 | i++; | ||
15 | if (subFuture.isFinished()) { | ||
16 | *count += 1; | ||
17 | continue; | ||
18 | } | ||
19 | //FIXME bind lifetime all watcher to future (repectively the main job | ||
20 | auto watcher = QSharedPointer<KAsync::FutureWatcher<void> >::create(); | ||
21 | QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, | ||
22 | [count, total, &future](){ | ||
23 | *count += 1; | ||
24 | if (*count == total) { | ||
25 | future.setFinished(); | ||
26 | } | ||
27 | }); | ||
28 | watcher->setFuture(subFuture); | ||
29 | context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); | ||
30 | } | ||
31 | if (*count == total) { | ||
32 | future.setFinished(); | ||
33 | } | ||
34 | }).then<void>([context]() { | ||
35 | delete context; | ||
36 | }); | ||
37 | } | ||
38 | |||
6 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) | 39 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) |
7 | : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) | 40 | : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) |
8 | { | 41 | { |
@@ -61,38 +94,22 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu | |||
61 | } | 94 | } |
62 | } | 95 | } |
63 | 96 | ||
64 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler) | 97 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) |
65 | { | 98 | { |
66 | auto resultCount = QSharedPointer<int>::create(0); | 99 | auto resultCount = QSharedPointer<int>::create(0); |
67 | auto keyList = QSharedPointer<QByteArrayList>::create(); | 100 | auto keyList = QSharedPointer<QByteArrayList>::create(); |
68 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) { | 101 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) { |
69 | bool readValue = false; | ||
70 | int count = 0; | 102 | int count = 0; |
71 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, &readValue, resultCount, keyList, &count, maxBatchSize](const QByteArray &key, const QByteArray &value) -> bool { | 103 | QList<KAsync::Future<void> > waitCondition; |
104 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, keyList, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { | ||
72 | if (Akonadi2::Storage::isInternalKey(key)) { | 105 | if (Akonadi2::Storage::isInternalKey(key)) { |
73 | return true; | 106 | return true; |
74 | } | 107 | } |
75 | readValue = true; | ||
76 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) | 108 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) |
77 | keyList->append(QByteArray(key.constData(), key.size())); | 109 | keyList->append(QByteArray(key.constData(), key.size())); |
78 | resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, resultCount, keyList, &count](bool success) { | 110 | |
79 | *resultCount += 1; | 111 | waitCondition << resultHandler(value).exec(); |
80 | //We're done | 112 | |
81 | //FIXME the check below should only be done once we finished reading | ||
82 | if (*resultCount >= count) { | ||
83 | //FIXME do this from the caller thread | ||
84 | auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); | ||
85 | for (const auto &key : *keyList) { | ||
86 | transaction.remove(key, [key](const Akonadi2::Storage::Error &error) { | ||
87 | ErrorMsg() << "Error while removing value" << error.message << key; | ||
88 | //Don't call the errorhandler in here, we already called the result handler | ||
89 | }); | ||
90 | } | ||
91 | if (isEmpty()) { | ||
92 | emit this->drained(); | ||
93 | } | ||
94 | } | ||
95 | }); | ||
96 | count++; | 113 | count++; |
97 | if (count <= maxBatchSize) { | 114 | if (count <= maxBatchSize) { |
98 | return true; | 115 | return true; |
@@ -102,12 +119,28 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
102 | [](const Akonadi2::Storage::Error &error) { | 119 | [](const Akonadi2::Storage::Error &error) { |
103 | ErrorMsg() << "Error while retrieving value" << error.message; | 120 | ErrorMsg() << "Error while retrieving value" << error.message; |
104 | // errorHandler(Error(error.store, error.code, error.message)); | 121 | // errorHandler(Error(error.store, error.code, error.message)); |
105 | } | 122 | }); |
106 | ); | 123 | |
107 | if (!readValue) { | 124 | ::waitForCompletion(waitCondition).then<void>([this, keyList, &future]() { |
108 | future.setError(-1, "No message found"); | 125 | Trace() << "Dequeue complete, removing values " << *keyList; |
109 | future.setFinished(); | 126 | auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); |
110 | } | 127 | for (const auto &key : *keyList) { |
128 | transaction.remove(key, [key](const Akonadi2::Storage::Error &error) { | ||
129 | ErrorMsg() << "Error while removing value" << error.message << key; | ||
130 | //Don't call the errorhandler in here, we already called the result handler | ||
131 | }); | ||
132 | } | ||
133 | transaction.commit(); | ||
134 | if (keyList->isEmpty()) { | ||
135 | future.setError(-1, "No message found"); | ||
136 | future.setFinished(); | ||
137 | } else { | ||
138 | if (isEmpty()) { | ||
139 | emit this->drained(); | ||
140 | } | ||
141 | future.setFinished(); | ||
142 | } | ||
143 | }).exec(); | ||
111 | }); | 144 | }); |
112 | } | 145 | } |
113 | 146 | ||