summaryrefslogtreecommitdiffstats
path: root/common/messagequeue.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-27 02:26:47 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-15 16:14:19 +0200
commit26816c21f60450e461a5b6ef4ef740f6070ce278 (patch)
tree55e8aee03e094abf702438e6cd26233047345e70 /common/messagequeue.cpp
parent9a9bb39f7641a818434cafa0dae0c8aa47124c0b (diff)
downloadsink-26816c21f60450e461a5b6ef4ef740f6070ce278.tar.gz
sink-26816c21f60450e461a5b6ef4ef740f6070ce278.zip
Ported to the kasync revamp
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r--common/messagequeue.cpp37
1 files changed, 3 insertions, 34 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 3567a10..28eacb7 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -5,37 +5,6 @@
5 5
6SINK_DEBUG_AREA("messagequeue") 6SINK_DEBUG_AREA("messagequeue")
7 7
8static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void>> &futures)
9{
10 auto context = new QObject;
11 return KAsync::start<void>([futures, context](KAsync::Future<void> &future) {
12 const auto total = futures.size();
13 auto count = QSharedPointer<int>::create();
14 int i = 0;
15 for (KAsync::Future<void> subFuture : futures) {
16 i++;
17 if (subFuture.isFinished()) {
18 *count += 1;
19 continue;
20 }
21 // FIXME bind lifetime all watcher to future (repectively the main job
22 auto watcher = QSharedPointer<KAsync::FutureWatcher<void>>::create();
23 QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, [count, total, &future]() {
24 *count += 1;
25 if (*count == total) {
26 future.setFinished();
27 }
28 });
29 watcher->setFuture(subFuture);
30 context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher));
31 }
32 if (*count == total) {
33 future.setFinished();
34 }
35 })
36 .then<void>([context]() { delete context; });
37}
38
39MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) 8MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite)
40{ 9{
41} 10}
@@ -101,7 +70,7 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu
101 return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) { 70 return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) {
102 resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); 71 resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); });
103 }); 72 });
104 }).then<void>([]() {}, [errorHandler](int error, const QString &errorString) { errorHandler(Error("messagequeue", error, errorString.toLatin1())); }).exec(); 73 }).onError([errorHandler](const KAsync::Error &error) { errorHandler(Error("messagequeue", error.errorCode, error.errorMessage.toLatin1())); }).exec();
105} 74}
106 75
107KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) 76KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler)
@@ -135,8 +104,8 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
135 }); 104 });
136 105
137 // Trace() << "Waiting on " << waitCondition.size() << " results"; 106 // Trace() << "Waiting on " << waitCondition.size() << " results";
138 ::waitForCompletion(waitCondition) 107 KAsync::waitForCompletion(waitCondition)
139 .then<void>([this, resultCount, &future]() { 108 .syncThen<void>([this, resultCount, &future]() {
140 processRemovals(); 109 processRemovals();
141 if (*resultCount == 0) { 110 if (*resultCount == 0) {
142 future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found"); 111 future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found");