diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-14 17:11:40 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-14 17:11:40 +0200 |
commit | 9f3a6ff5d27e4983ee626231e43210d2bbb95dd6 (patch) | |
tree | 992cdc5c6d51ae3b430dbcd9fadb30b7ea3d76e1 /common/messagequeue.cpp | |
parent | 4385c6bae1a66aa94beb703dcc16e12bdf0ebb0e (diff) | |
download | sink-9f3a6ff5d27e4983ee626231e43210d2bbb95dd6.tar.gz sink-9f3a6ff5d27e4983ee626231e43210d2bbb95dd6.zip |
Almost working batch dequeues
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r-- | common/messagequeue.cpp | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 84385ca..598c63a 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -61,6 +61,56 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu | |||
61 | } | 61 | } |
62 | } | 62 | } |
63 | 63 | ||
64 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler) | ||
65 | { | ||
66 | auto resultCount = QSharedPointer<int>::create(0); | ||
67 | auto keyList = QSharedPointer<QByteArrayList>::create(); | ||
68 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) { | ||
69 | bool readValue = false; | ||
70 | int count = 0; | ||
71 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, &readValue, resultCount, keyList, &count, maxBatchSize](const QByteArray &key, const QByteArray &value) -> bool { | ||
72 | if (Akonadi2::Storage::isInternalKey(key)) { | ||
73 | return true; | ||
74 | } | ||
75 | readValue = true; | ||
76 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) | ||
77 | keyList->append(QByteArray(key.constData(), key.size())); | ||
78 | resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, resultCount, keyList, &count](bool success) { | ||
79 | *resultCount += 1; | ||
80 | //We're done | ||
81 | //FIXME the check below should only be done once we finished reading | ||
82 | if (*resultCount >= count) { | ||
83 | //FIXME do this from the caller thread | ||
84 | auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); | ||
85 | for (const auto &key : *keyList) { | ||
86 | transaction.remove(key, [key](const Akonadi2::Storage::Error &error) { | ||
87 | ErrorMsg() << "Error while removing value" << error.message << key; | ||
88 | //Don't call the errorhandler in here, we already called the result handler | ||
89 | }); | ||
90 | } | ||
91 | if (isEmpty()) { | ||
92 | emit this->drained(); | ||
93 | } | ||
94 | } | ||
95 | }); | ||
96 | count++; | ||
97 | if (count <= maxBatchSize) { | ||
98 | return true; | ||
99 | } | ||
100 | return false; | ||
101 | }, | ||
102 | [](const Akonadi2::Storage::Error &error) { | ||
103 | ErrorMsg() << "Error while retrieving value" << error.message; | ||
104 | // errorHandler(Error(error.store, error.code, error.message)); | ||
105 | } | ||
106 | ); | ||
107 | if (!readValue) { | ||
108 | future.setError(-1, "No message found"); | ||
109 | future.setFinished(); | ||
110 | } | ||
111 | }); | ||
112 | } | ||
113 | |||
64 | bool MessageQueue::isEmpty() | 114 | bool MessageQueue::isEmpty() |
65 | { | 115 | { |
66 | int count = 0; | 116 | int count = 0; |