diff options
-rw-r--r-- | common/messagequeue.cpp | 21 | ||||
-rw-r--r-- | common/messagequeue.h | 2 | ||||
-rw-r--r-- | tests/messagequeuetest.cpp | 90 |
3 files changed, 97 insertions, 16 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index a92d6be..ecc4d1a 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -11,29 +11,38 @@ MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) | |||
11 | 11 | ||
12 | void MessageQueue::enqueue(void const *msg, size_t size) | 12 | void MessageQueue::enqueue(void const *msg, size_t size) |
13 | { | 13 | { |
14 | auto transaction = mStorage.createTransaction(Akonadi2::Storage::ReadWrite); | 14 | auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); |
15 | const qint64 revision = mStorage.maxRevision() + 1; | 15 | const qint64 revision = Akonadi2::Storage::maxRevision(transaction) + 1; |
16 | const QByteArray key = QString("%1").arg(revision).toUtf8(); | 16 | const QByteArray key = QString("%1").arg(revision).toUtf8(); |
17 | transaction.write(key, QByteArray::fromRawData(static_cast<const char*>(msg), size)); | 17 | transaction.write(key, QByteArray::fromRawData(static_cast<const char*>(msg), size)); |
18 | Akonadi2::Storage::setMaxRevision(transaction, revision); | 18 | Akonadi2::Storage::setMaxRevision(transaction, revision); |
19 | transaction.commit(); | ||
19 | emit messageReady(); | 20 | emit messageReady(); |
20 | } | 21 | } |
21 | 22 | ||
23 | void MessageQueue::enqueue(const QByteArray &value) | ||
24 | { | ||
25 | enqueue(value.data(), value.size()); | ||
26 | } | ||
27 | |||
22 | void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, | 28 | void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, |
23 | const std::function<void(const Error &error)> &errorHandler) | 29 | const std::function<void(const Error &error)> &errorHandler) |
24 | { | 30 | { |
25 | bool readValue = false; | 31 | bool readValue = false; |
26 | mTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); | 32 | auto readTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadOnly)); |
27 | mTransaction.scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool { | 33 | readTransaction.scan("", [this, resultHandler, errorHandler, &readValue, &readTransaction](const QByteArray &key, const QByteArray &value) -> bool { |
28 | if (Akonadi2::Storage::isInternalKey(key)) { | 34 | if (Akonadi2::Storage::isInternalKey(key)) { |
29 | return true; | 35 | return true; |
30 | } | 36 | } |
31 | readValue = true; | 37 | readValue = true; |
32 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) | 38 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) |
33 | const auto keyCopy = QByteArray(key.constData(), key.size()); | 39 | const auto keyCopy = QByteArray(key.constData(), key.size()); |
34 | resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) { | 40 | //TODO The value copy and the early transaction abort is necessary because we don't support parallel read-transactions yet (in case of a synchronous callback) |
41 | const auto valueCopy = QByteArray(value.constData(), value.size()); | ||
42 | readTransaction.abort(); | ||
43 | resultHandler(const_cast<void*>(static_cast<const void*>(valueCopy.data())), valueCopy.size(), [this, keyCopy, errorHandler](bool success) { | ||
35 | if (success) { | 44 | if (success) { |
36 | mTransaction.remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) { | 45 | mStorage.createTransaction(Akonadi2::Storage::ReadWrite).remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) { |
37 | ErrorMsg() << "Error while removing value" << error.message << keyCopy; | 46 | ErrorMsg() << "Error while removing value" << error.message << keyCopy; |
38 | //Don't call the errorhandler in here, we already called the result handler | 47 | //Don't call the errorhandler in here, we already called the result handler |
39 | }); | 48 | }); |
diff --git a/common/messagequeue.h b/common/messagequeue.h index ffc1ff2..3393394 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h | |||
@@ -26,6 +26,7 @@ public: | |||
26 | MessageQueue(const QString &storageRoot, const QString &name); | 26 | MessageQueue(const QString &storageRoot, const QString &name); |
27 | 27 | ||
28 | void enqueue(void const *msg, size_t size); | 28 | void enqueue(void const *msg, size_t size); |
29 | void enqueue(const QByteArray &value); | ||
29 | //Dequeue a message. This will return a new message everytime called. | 30 | //Dequeue a message. This will return a new message everytime called. |
30 | //Call the result handler with a success response to remove the message from the store. | 31 | //Call the result handler with a success response to remove the message from the store. |
31 | //TODO track processing progress to avoid processing the same message with the same preprocessor twice? | 32 | //TODO track processing progress to avoid processing the same message with the same preprocessor twice? |
@@ -39,5 +40,4 @@ signals: | |||
39 | private: | 40 | private: |
40 | Q_DISABLE_COPY(MessageQueue); | 41 | Q_DISABLE_COPY(MessageQueue); |
41 | Akonadi2::Storage mStorage; | 42 | Akonadi2::Storage mStorage; |
42 | Akonadi2::Storage::Transaction mTransaction; | ||
43 | }; | 43 | }; |
diff --git a/tests/messagequeuetest.cpp b/tests/messagequeuetest.cpp index c43b192..d5c47f5 100644 --- a/tests/messagequeuetest.cpp +++ b/tests/messagequeuetest.cpp | |||
@@ -31,12 +31,38 @@ private Q_SLOTS: | |||
31 | { | 31 | { |
32 | MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); | 32 | MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); |
33 | QVERIFY(queue.isEmpty()); | 33 | QVERIFY(queue.isEmpty()); |
34 | QByteArray value("value"); | 34 | queue.enqueue("value"); |
35 | queue.enqueue(value.data(), value.size()); | ||
36 | QVERIFY(!queue.isEmpty()); | 35 | QVERIFY(!queue.isEmpty()); |
37 | } | 36 | } |
38 | 37 | ||
39 | void testQueue() | 38 | void testDequeueEmpty() |
39 | { | ||
40 | MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); | ||
41 | bool gotValue = false; | ||
42 | bool gotError = false; | ||
43 | queue.dequeue([&](void *ptr, int size, std::function<void(bool success)> callback) { | ||
44 | gotValue = true; | ||
45 | }, | ||
46 | [&](const MessageQueue::Error &error) { | ||
47 | gotError = true; | ||
48 | }); | ||
49 | QVERIFY(!gotValue); | ||
50 | QVERIFY(gotError); | ||
51 | } | ||
52 | |||
53 | void testDrained() | ||
54 | { | ||
55 | MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); | ||
56 | QSignalSpy spy(&queue, SIGNAL(drained())); | ||
57 | queue.enqueue("value1"); | ||
58 | |||
59 | queue.dequeue([](void *ptr, int size, std::function<void(bool success)> callback) { | ||
60 | callback(true); | ||
61 | }, [](const MessageQueue::Error &error) {}); | ||
62 | QCOMPARE(spy.size(), 1); | ||
63 | } | ||
64 | |||
65 | void testSyncDequeue() | ||
40 | { | 66 | { |
41 | QQueue<QByteArray> values; | 67 | QQueue<QByteArray> values; |
42 | values << "value1"; | 68 | values << "value1"; |
@@ -44,10 +70,11 @@ private Q_SLOTS: | |||
44 | 70 | ||
45 | MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); | 71 | MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); |
46 | for (const QByteArray &value : values) { | 72 | for (const QByteArray &value : values) { |
47 | queue.enqueue(value.data(), value.size()); | 73 | queue.enqueue(value); |
48 | } | 74 | } |
49 | 75 | ||
50 | while (!queue.isEmpty()) { | 76 | while (!queue.isEmpty()) { |
77 | Log() << "start"; | ||
51 | const auto expected = values.dequeue(); | 78 | const auto expected = values.dequeue(); |
52 | bool gotValue = false; | 79 | bool gotValue = false; |
53 | bool gotError = false; | 80 | bool gotError = false; |
@@ -66,21 +93,66 @@ private Q_SLOTS: | |||
66 | QVERIFY(values.isEmpty()); | 93 | QVERIFY(values.isEmpty()); |
67 | } | 94 | } |
68 | 95 | ||
69 | void testDequeueEmpty() | 96 | void testAsyncDequeue() |
70 | { | 97 | { |
98 | QQueue<QByteArray> values; | ||
99 | values << "value1"; | ||
100 | values << "value2"; | ||
101 | |||
71 | MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); | 102 | MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); |
72 | bool gotValue = false; | 103 | for (const QByteArray &value : values) { |
104 | queue.enqueue(value); | ||
105 | } | ||
106 | |||
107 | while (!queue.isEmpty()) { | ||
108 | QEventLoop eventLoop; | ||
109 | const auto expected = values.dequeue(); | ||
110 | bool gotValue = false; | ||
111 | bool gotError = false; | ||
112 | |||
113 | queue.dequeue([&](void *ptr, int size, std::function<void(bool success)> callback) { | ||
114 | if (QByteArray(static_cast<char*>(ptr), size) == expected) { | ||
115 | gotValue = true; | ||
116 | } | ||
117 | auto timer = new QTimer(); | ||
118 | timer->setSingleShot(true); | ||
119 | QObject::connect(timer, &QTimer::timeout, [timer, callback, &eventLoop]() { | ||
120 | delete timer; | ||
121 | callback(true); | ||
122 | eventLoop.exit(); | ||
123 | }); | ||
124 | timer->start(0); | ||
125 | }, | ||
126 | [&](const MessageQueue::Error &error) { | ||
127 | gotError = true; | ||
128 | }); | ||
129 | eventLoop.exec(); | ||
130 | QVERIFY(gotValue); | ||
131 | QVERIFY(!gotError); | ||
132 | } | ||
133 | QVERIFY(values.isEmpty()); | ||
134 | } | ||
135 | |||
136 | /* | ||
137 | * Dequeue's are async and we want to be able to enqueue new items in between. | ||
138 | */ | ||
139 | void testNestedEnqueue() | ||
140 | { | ||
141 | MessageQueue queue(Akonadi2::Store::storageLocation(), "org.kde.dummy.testqueue"); | ||
142 | queue.enqueue("value1"); | ||
143 | |||
73 | bool gotError = false; | 144 | bool gotError = false; |
74 | queue.dequeue([&](void *ptr, int size, std::function<void(bool success)> callback) { | 145 | queue.dequeue([&](void *ptr, int size, std::function<void(bool success)> callback) { |
75 | gotValue = true; | 146 | queue.enqueue("value3"); |
147 | callback(true); | ||
76 | }, | 148 | }, |
77 | [&](const MessageQueue::Error &error) { | 149 | [&](const MessageQueue::Error &error) { |
78 | gotError = true; | 150 | gotError = true; |
79 | }); | 151 | }); |
80 | QVERIFY(!gotValue); | 152 | QVERIFY(!gotError); |
81 | QVERIFY(gotError); | ||
82 | } | 153 | } |
83 | 154 | ||
155 | |||
84 | }; | 156 | }; |
85 | 157 | ||
86 | QTEST_MAIN(MessageQueueTest) | 158 | QTEST_MAIN(MessageQueueTest) |