summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/genericresource.cpp14
-rw-r--r--common/messagequeue.cpp89
-rw-r--r--common/messagequeue.h2
-rw-r--r--tests/genericresourcetest.cpp8
4 files changed, 67 insertions, 46 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index b3df389..bbd992b 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -106,19 +106,9 @@ private slots:
106 //KAsync::foreach("pass iterator here").parallel("process value here").join(); 106 //KAsync::foreach("pass iterator here").parallel("process value here").join();
107 return KAsync::dowhile( 107 return KAsync::dowhile(
108 [this, queue](KAsync::Future<bool> &future) { 108 [this, queue](KAsync::Future<bool> &future) {
109 queue->dequeueBatch(100, [this](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { 109 queue->dequeueBatch(100, [this](const QByteArray &data) {
110 Trace() << "Got value"; 110 Trace() << "Got value";
111 processQueuedCommand(QByteArray::fromRawData(static_cast<char*>(ptr), size)).then<void>( 111 return processQueuedCommand(data);
112 [&messageQueueCallback]() {
113 Trace() << "done";
114 messageQueueCallback(true);
115 },
116 [&messageQueueCallback](int errorCode, QString errorMessage) {
117 //Use false?
118 //For now we use true to make sure we don't get stuck on messages we fail to process
119 messageQueueCallback(true);
120 }
121 ).exec();
122 } 112 }
123 ).then<void>([&future](){ 113 ).then<void>([&future](){
124 future.setValue(true); 114 future.setValue(true);
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 598c63a..3b5ca2b 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -3,6 +3,39 @@
3#include <QDebug> 3#include <QDebug>
4#include <log.h> 4#include <log.h>
5 5
6static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void> > &futures)
7{
8 auto context = new QObject;
9 return KAsync::start<void>([futures, context](KAsync::Future<void> &future) {
10 const auto total = futures.size();
11 auto count = QSharedPointer<int>::create();
12 int i = 0;
13 for (KAsync::Future<void> subFuture : futures) {
14 i++;
15 if (subFuture.isFinished()) {
16 *count += 1;
17 continue;
18 }
19 //FIXME bind lifetime all watcher to future (repectively the main job
20 auto watcher = QSharedPointer<KAsync::FutureWatcher<void> >::create();
21 QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady,
22 [count, total, &future](){
23 *count += 1;
24 if (*count == total) {
25 future.setFinished();
26 }
27 });
28 watcher->setFuture(subFuture);
29 context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher));
30 }
31 if (*count == total) {
32 future.setFinished();
33 }
34 }).then<void>([context]() {
35 delete context;
36 });
37}
38
6MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) 39MessageQueue::MessageQueue(const QString &storageRoot, const QString &name)
7 : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) 40 : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite)
8{ 41{
@@ -61,38 +94,22 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu
61 } 94 }
62} 95}
63 96
64KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler) 97KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler)
65{ 98{
66 auto resultCount = QSharedPointer<int>::create(0); 99 auto resultCount = QSharedPointer<int>::create(0);
67 auto keyList = QSharedPointer<QByteArrayList>::create(); 100 auto keyList = QSharedPointer<QByteArrayList>::create();
68 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) { 101 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) {
69 bool readValue = false;
70 int count = 0; 102 int count = 0;
71 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, &readValue, resultCount, keyList, &count, maxBatchSize](const QByteArray &key, const QByteArray &value) -> bool { 103 QList<KAsync::Future<void> > waitCondition;
104 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, keyList, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool {
72 if (Akonadi2::Storage::isInternalKey(key)) { 105 if (Akonadi2::Storage::isInternalKey(key)) {
73 return true; 106 return true;
74 } 107 }
75 readValue = true;
76 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) 108 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
77 keyList->append(QByteArray(key.constData(), key.size())); 109 keyList->append(QByteArray(key.constData(), key.size()));
78 resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, resultCount, keyList, &count](bool success) { 110
79 *resultCount += 1; 111 waitCondition << resultHandler(value).exec();
80 //We're done 112
81 //FIXME the check below should only be done once we finished reading
82 if (*resultCount >= count) {
83 //FIXME do this from the caller thread
84 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
85 for (const auto &key : *keyList) {
86 transaction.remove(key, [key](const Akonadi2::Storage::Error &error) {
87 ErrorMsg() << "Error while removing value" << error.message << key;
88 //Don't call the errorhandler in here, we already called the result handler
89 });
90 }
91 if (isEmpty()) {
92 emit this->drained();
93 }
94 }
95 });
96 count++; 113 count++;
97 if (count <= maxBatchSize) { 114 if (count <= maxBatchSize) {
98 return true; 115 return true;
@@ -102,12 +119,28 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
102 [](const Akonadi2::Storage::Error &error) { 119 [](const Akonadi2::Storage::Error &error) {
103 ErrorMsg() << "Error while retrieving value" << error.message; 120 ErrorMsg() << "Error while retrieving value" << error.message;
104 // errorHandler(Error(error.store, error.code, error.message)); 121 // errorHandler(Error(error.store, error.code, error.message));
105 } 122 });
106 ); 123
107 if (!readValue) { 124 ::waitForCompletion(waitCondition).then<void>([this, keyList, &future]() {
108 future.setError(-1, "No message found"); 125 Trace() << "Dequeue complete, removing values " << *keyList;
109 future.setFinished(); 126 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
110 } 127 for (const auto &key : *keyList) {
128 transaction.remove(key, [key](const Akonadi2::Storage::Error &error) {
129 ErrorMsg() << "Error while removing value" << error.message << key;
130 //Don't call the errorhandler in here, we already called the result handler
131 });
132 }
133 transaction.commit();
134 if (keyList->isEmpty()) {
135 future.setError(-1, "No message found");
136 future.setFinished();
137 } else {
138 if (isEmpty()) {
139 emit this->drained();
140 }
141 future.setFinished();
142 }
143 }).exec();
111 }); 144 });
112} 145}
113 146
diff --git a/common/messagequeue.h b/common/messagequeue.h
index c5e32db..8ea8d8b 100644
--- a/common/messagequeue.h
+++ b/common/messagequeue.h
@@ -33,7 +33,7 @@ public:
33 //TODO track processing progress to avoid processing the same message with the same preprocessor twice? 33 //TODO track processing progress to avoid processing the same message with the same preprocessor twice?
34 void dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler, 34 void dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler,
35 const std::function<void(const Error &error)> &errorHandler); 35 const std::function<void(const Error &error)> &errorHandler);
36 KAsync::Job<void> dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler); 36 KAsync::Job<void> dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler);
37 bool isEmpty(); 37 bool isEmpty();
38signals: 38signals:
39 void messageReady(); 39 void messageReady();
diff --git a/tests/genericresourcetest.cpp b/tests/genericresourcetest.cpp
index 0faa484..abac7a6 100644
--- a/tests/genericresourcetest.cpp
+++ b/tests/genericresourcetest.cpp
@@ -87,14 +87,12 @@ private Q_SLOTS:
87 //Actual test 87 //Actual test
88 auto pipeline = QSharedPointer<Akonadi2::Pipeline>::create("org.kde.test.instance1"); 88 auto pipeline = QSharedPointer<Akonadi2::Pipeline>::create("org.kde.test.instance1");
89 QSignalSpy revisionSpy(pipeline.data(), SIGNAL(revisionUpdated(qint64))); 89 QSignalSpy revisionSpy(pipeline.data(), SIGNAL(revisionUpdated(qint64)));
90 QVERIFY(revisionSpy.isValid());
90 TestResource resource("org.kde.test.instance1", pipeline); 91 TestResource resource("org.kde.test.instance1", pipeline);
91 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command); 92 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command);
92 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command); 93 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command);
93 94 resource.processAllMessages().exec().waitForFinished();
94 QVERIFY(revisionSpy.isValid()); 95 QCOMPARE(revisionSpy.last().at(0).toInt(), 2);
95 QTRY_COMPARE(revisionSpy.count(), 2);
96 QTest::qWait(100);
97 QCOMPARE(revisionSpy.count(), 2);
98 } 96 }
99}; 97};
100 98