diff options
-rw-r--r-- | common/genericresource.cpp | 14 | ||||
-rw-r--r-- | common/messagequeue.cpp | 89 | ||||
-rw-r--r-- | common/messagequeue.h | 2 | ||||
-rw-r--r-- | tests/genericresourcetest.cpp | 8 |
4 files changed, 67 insertions, 46 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index b3df389..bbd992b 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -106,19 +106,9 @@ private slots: | |||
106 | //KAsync::foreach("pass iterator here").parallel("process value here").join(); | 106 | //KAsync::foreach("pass iterator here").parallel("process value here").join(); |
107 | return KAsync::dowhile( | 107 | return KAsync::dowhile( |
108 | [this, queue](KAsync::Future<bool> &future) { | 108 | [this, queue](KAsync::Future<bool> &future) { |
109 | queue->dequeueBatch(100, [this](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | 109 | queue->dequeueBatch(100, [this](const QByteArray &data) { |
110 | Trace() << "Got value"; | 110 | Trace() << "Got value"; |
111 | processQueuedCommand(QByteArray::fromRawData(static_cast<char*>(ptr), size)).then<void>( | 111 | return processQueuedCommand(data); |
112 | [&messageQueueCallback]() { | ||
113 | Trace() << "done"; | ||
114 | messageQueueCallback(true); | ||
115 | }, | ||
116 | [&messageQueueCallback](int errorCode, QString errorMessage) { | ||
117 | //Use false? | ||
118 | //For now we use true to make sure we don't get stuck on messages we fail to process | ||
119 | messageQueueCallback(true); | ||
120 | } | ||
121 | ).exec(); | ||
122 | } | 112 | } |
123 | ).then<void>([&future](){ | 113 | ).then<void>([&future](){ |
124 | future.setValue(true); | 114 | future.setValue(true); |
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 598c63a..3b5ca2b 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -3,6 +3,39 @@ | |||
3 | #include <QDebug> | 3 | #include <QDebug> |
4 | #include <log.h> | 4 | #include <log.h> |
5 | 5 | ||
6 | static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void> > &futures) | ||
7 | { | ||
8 | auto context = new QObject; | ||
9 | return KAsync::start<void>([futures, context](KAsync::Future<void> &future) { | ||
10 | const auto total = futures.size(); | ||
11 | auto count = QSharedPointer<int>::create(); | ||
12 | int i = 0; | ||
13 | for (KAsync::Future<void> subFuture : futures) { | ||
14 | i++; | ||
15 | if (subFuture.isFinished()) { | ||
16 | *count += 1; | ||
17 | continue; | ||
18 | } | ||
19 | //FIXME bind lifetime all watcher to future (repectively the main job | ||
20 | auto watcher = QSharedPointer<KAsync::FutureWatcher<void> >::create(); | ||
21 | QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, | ||
22 | [count, total, &future](){ | ||
23 | *count += 1; | ||
24 | if (*count == total) { | ||
25 | future.setFinished(); | ||
26 | } | ||
27 | }); | ||
28 | watcher->setFuture(subFuture); | ||
29 | context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); | ||
30 | } | ||
31 | if (*count == total) { | ||
32 | future.setFinished(); | ||
33 | } | ||
34 | }).then<void>([context]() { | ||
35 | delete context; | ||
36 | }); | ||
37 | } | ||
38 | |||
6 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) | 39 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) |
7 | : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) | 40 | : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) |
8 | { | 41 | { |
@@ -61,38 +94,22 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu | |||
61 | } | 94 | } |
62 | } | 95 | } |
63 | 96 | ||
64 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler) | 97 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) |
65 | { | 98 | { |
66 | auto resultCount = QSharedPointer<int>::create(0); | 99 | auto resultCount = QSharedPointer<int>::create(0); |
67 | auto keyList = QSharedPointer<QByteArrayList>::create(); | 100 | auto keyList = QSharedPointer<QByteArrayList>::create(); |
68 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) { | 101 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) { |
69 | bool readValue = false; | ||
70 | int count = 0; | 102 | int count = 0; |
71 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, &readValue, resultCount, keyList, &count, maxBatchSize](const QByteArray &key, const QByteArray &value) -> bool { | 103 | QList<KAsync::Future<void> > waitCondition; |
104 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, keyList, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { | ||
72 | if (Akonadi2::Storage::isInternalKey(key)) { | 105 | if (Akonadi2::Storage::isInternalKey(key)) { |
73 | return true; | 106 | return true; |
74 | } | 107 | } |
75 | readValue = true; | ||
76 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) | 108 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) |
77 | keyList->append(QByteArray(key.constData(), key.size())); | 109 | keyList->append(QByteArray(key.constData(), key.size())); |
78 | resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, resultCount, keyList, &count](bool success) { | 110 | |
79 | *resultCount += 1; | 111 | waitCondition << resultHandler(value).exec(); |
80 | //We're done | 112 | |
81 | //FIXME the check below should only be done once we finished reading | ||
82 | if (*resultCount >= count) { | ||
83 | //FIXME do this from the caller thread | ||
84 | auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); | ||
85 | for (const auto &key : *keyList) { | ||
86 | transaction.remove(key, [key](const Akonadi2::Storage::Error &error) { | ||
87 | ErrorMsg() << "Error while removing value" << error.message << key; | ||
88 | //Don't call the errorhandler in here, we already called the result handler | ||
89 | }); | ||
90 | } | ||
91 | if (isEmpty()) { | ||
92 | emit this->drained(); | ||
93 | } | ||
94 | } | ||
95 | }); | ||
96 | count++; | 113 | count++; |
97 | if (count <= maxBatchSize) { | 114 | if (count <= maxBatchSize) { |
98 | return true; | 115 | return true; |
@@ -102,12 +119,28 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
102 | [](const Akonadi2::Storage::Error &error) { | 119 | [](const Akonadi2::Storage::Error &error) { |
103 | ErrorMsg() << "Error while retrieving value" << error.message; | 120 | ErrorMsg() << "Error while retrieving value" << error.message; |
104 | // errorHandler(Error(error.store, error.code, error.message)); | 121 | // errorHandler(Error(error.store, error.code, error.message)); |
105 | } | 122 | }); |
106 | ); | 123 | |
107 | if (!readValue) { | 124 | ::waitForCompletion(waitCondition).then<void>([this, keyList, &future]() { |
108 | future.setError(-1, "No message found"); | 125 | Trace() << "Dequeue complete, removing values " << *keyList; |
109 | future.setFinished(); | 126 | auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); |
110 | } | 127 | for (const auto &key : *keyList) { |
128 | transaction.remove(key, [key](const Akonadi2::Storage::Error &error) { | ||
129 | ErrorMsg() << "Error while removing value" << error.message << key; | ||
130 | //Don't call the errorhandler in here, we already called the result handler | ||
131 | }); | ||
132 | } | ||
133 | transaction.commit(); | ||
134 | if (keyList->isEmpty()) { | ||
135 | future.setError(-1, "No message found"); | ||
136 | future.setFinished(); | ||
137 | } else { | ||
138 | if (isEmpty()) { | ||
139 | emit this->drained(); | ||
140 | } | ||
141 | future.setFinished(); | ||
142 | } | ||
143 | }).exec(); | ||
111 | }); | 144 | }); |
112 | } | 145 | } |
113 | 146 | ||
diff --git a/common/messagequeue.h b/common/messagequeue.h index c5e32db..8ea8d8b 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h | |||
@@ -33,7 +33,7 @@ public: | |||
33 | //TODO track processing progress to avoid processing the same message with the same preprocessor twice? | 33 | //TODO track processing progress to avoid processing the same message with the same preprocessor twice? |
34 | void dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler, | 34 | void dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler, |
35 | const std::function<void(const Error &error)> &errorHandler); | 35 | const std::function<void(const Error &error)> &errorHandler); |
36 | KAsync::Job<void> dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> & resultHandler); | 36 | KAsync::Job<void> dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler); |
37 | bool isEmpty(); | 37 | bool isEmpty(); |
38 | signals: | 38 | signals: |
39 | void messageReady(); | 39 | void messageReady(); |
diff --git a/tests/genericresourcetest.cpp b/tests/genericresourcetest.cpp index 0faa484..abac7a6 100644 --- a/tests/genericresourcetest.cpp +++ b/tests/genericresourcetest.cpp | |||
@@ -87,14 +87,12 @@ private Q_SLOTS: | |||
87 | //Actual test | 87 | //Actual test |
88 | auto pipeline = QSharedPointer<Akonadi2::Pipeline>::create("org.kde.test.instance1"); | 88 | auto pipeline = QSharedPointer<Akonadi2::Pipeline>::create("org.kde.test.instance1"); |
89 | QSignalSpy revisionSpy(pipeline.data(), SIGNAL(revisionUpdated(qint64))); | 89 | QSignalSpy revisionSpy(pipeline.data(), SIGNAL(revisionUpdated(qint64))); |
90 | QVERIFY(revisionSpy.isValid()); | ||
90 | TestResource resource("org.kde.test.instance1", pipeline); | 91 | TestResource resource("org.kde.test.instance1", pipeline); |
91 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command); | 92 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command); |
92 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command); | 93 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command); |
93 | 94 | resource.processAllMessages().exec().waitForFinished(); | |
94 | QVERIFY(revisionSpy.isValid()); | 95 | QCOMPARE(revisionSpy.last().at(0).toInt(), 2); |
95 | QTRY_COMPARE(revisionSpy.count(), 2); | ||
96 | QTest::qWait(100); | ||
97 | QCOMPARE(revisionSpy.count(), 2); | ||
98 | } | 96 | } |
99 | }; | 97 | }; |
100 | 98 | ||