summaryrefslogtreecommitdiffstats
path: root/common/messagequeue.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-14 17:11:40 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-14 17:11:40 +0200
commit9f3a6ff5d27e4983ee626231e43210d2bbb95dd6 (patch)
tree992cdc5c6d51ae3b430dbcd9fadb30b7ea3d76e1 /common/messagequeue.cpp
parent4385c6bae1a66aa94beb703dcc16e12bdf0ebb0e (diff)
downloadsink-9f3a6ff5d27e4983ee626231e43210d2bbb95dd6.tar.gz
sink-9f3a6ff5d27e4983ee626231e43210d2bbb95dd6.zip
Almost working batch dequeues
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r--common/messagequeue.cpp50
1 files changed, 50 insertions, 0 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 84385ca..598c63a 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -61,6 +61,56 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu
61 } 61 }
62} 62}
63 63
64KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler)
65{
66 auto resultCount = QSharedPointer<int>::create(0);
67 auto keyList = QSharedPointer<QByteArrayList>::create();
68 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) {
69 bool readValue = false;
70 int count = 0;
71 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, &readValue, resultCount, keyList, &count, maxBatchSize](const QByteArray &key, const QByteArray &value) -> bool {
72 if (Akonadi2::Storage::isInternalKey(key)) {
73 return true;
74 }
75 readValue = true;
76 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
77 keyList->append(QByteArray(key.constData(), key.size()));
78 resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, resultCount, keyList, &count](bool success) {
79 *resultCount += 1;
80 //We're done
81 //FIXME the check below should only be done once we finished reading
82 if (*resultCount >= count) {
83 //FIXME do this from the caller thread
84 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
85 for (const auto &key : *keyList) {
86 transaction.remove(key, [key](const Akonadi2::Storage::Error &error) {
87 ErrorMsg() << "Error while removing value" << error.message << key;
88 //Don't call the errorhandler in here, we already called the result handler
89 });
90 }
91 if (isEmpty()) {
92 emit this->drained();
93 }
94 }
95 });
96 count++;
97 if (count <= maxBatchSize) {
98 return true;
99 }
100 return false;
101 },
102 [](const Akonadi2::Storage::Error &error) {
103 ErrorMsg() << "Error while retrieving value" << error.message;
104 // errorHandler(Error(error.store, error.code, error.message));
105 }
106 );
107 if (!readValue) {
108 future.setError(-1, "No message found");
109 future.setFinished();
110 }
111 });
112}
113
64bool MessageQueue::isEmpty() 114bool MessageQueue::isEmpty()
65{ 115{
66 int count = 0; 116 int count = 0;