diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-07-27 02:26:47 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-09-15 16:14:19 +0200 |
commit | 26816c21f60450e461a5b6ef4ef740f6070ce278 (patch) | |
tree | 55e8aee03e094abf702438e6cd26233047345e70 /common/messagequeue.cpp | |
parent | 9a9bb39f7641a818434cafa0dae0c8aa47124c0b (diff) | |
download | sink-26816c21f60450e461a5b6ef4ef740f6070ce278.tar.gz sink-26816c21f60450e461a5b6ef4ef740f6070ce278.zip |
Ported to the kasync revamp
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r-- | common/messagequeue.cpp | 37 |
1 files changed, 3 insertions, 34 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 3567a10..28eacb7 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -5,37 +5,6 @@ | |||
5 | 5 | ||
6 | SINK_DEBUG_AREA("messagequeue") | 6 | SINK_DEBUG_AREA("messagequeue") |
7 | 7 | ||
8 | static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void>> &futures) | ||
9 | { | ||
10 | auto context = new QObject; | ||
11 | return KAsync::start<void>([futures, context](KAsync::Future<void> &future) { | ||
12 | const auto total = futures.size(); | ||
13 | auto count = QSharedPointer<int>::create(); | ||
14 | int i = 0; | ||
15 | for (KAsync::Future<void> subFuture : futures) { | ||
16 | i++; | ||
17 | if (subFuture.isFinished()) { | ||
18 | *count += 1; | ||
19 | continue; | ||
20 | } | ||
21 | // FIXME bind lifetime all watcher to future (repectively the main job | ||
22 | auto watcher = QSharedPointer<KAsync::FutureWatcher<void>>::create(); | ||
23 | QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, [count, total, &future]() { | ||
24 | *count += 1; | ||
25 | if (*count == total) { | ||
26 | future.setFinished(); | ||
27 | } | ||
28 | }); | ||
29 | watcher->setFuture(subFuture); | ||
30 | context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); | ||
31 | } | ||
32 | if (*count == total) { | ||
33 | future.setFinished(); | ||
34 | } | ||
35 | }) | ||
36 | .then<void>([context]() { delete context; }); | ||
37 | } | ||
38 | |||
39 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) | 8 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) |
40 | { | 9 | { |
41 | } | 10 | } |
@@ -101,7 +70,7 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu | |||
101 | return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) { | 70 | return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) { |
102 | resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); | 71 | resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); |
103 | }); | 72 | }); |
104 | }).then<void>([]() {}, [errorHandler](int error, const QString &errorString) { errorHandler(Error("messagequeue", error, errorString.toLatin1())); }).exec(); | 73 | }).onError([errorHandler](const KAsync::Error &error) { errorHandler(Error("messagequeue", error.errorCode, error.errorMessage.toLatin1())); }).exec(); |
105 | } | 74 | } |
106 | 75 | ||
107 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) | 76 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) |
@@ -135,8 +104,8 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
135 | }); | 104 | }); |
136 | 105 | ||
137 | // Trace() << "Waiting on " << waitCondition.size() << " results"; | 106 | // Trace() << "Waiting on " << waitCondition.size() << " results"; |
138 | ::waitForCompletion(waitCondition) | 107 | KAsync::waitForCompletion(waitCondition) |
139 | .then<void>([this, resultCount, &future]() { | 108 | .syncThen<void>([this, resultCount, &future]() { |
140 | processRemovals(); | 109 | processRemovals(); |
141 | if (*resultCount == 0) { | 110 | if (*resultCount == 0) { |
142 | future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found"); | 111 | future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found"); |