From 091412c472b10ca61ada445c19bf3c95cd4e8e40 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 11 Jan 2015 16:10:01 +0100 Subject: A messagequeue. --- common/messagequeue.cpp | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 common/messagequeue.cpp (limited to 'common/messagequeue.cpp') diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp new file mode 100644 index 0000000..cf63e57 --- /dev/null +++ b/common/messagequeue.cpp @@ -0,0 +1,47 @@ +#include "messagequeue.h" +#include + +MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) + : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) +{ + +} + +void MessageQueue::enqueue(void const *msg, size_t size) +{ + mStorage.startTransaction(Akonadi2::Storage::ReadWrite); + const qint64 revision = mStorage.maxRevision() + 1; + const QByteArray key = QString("%1").arg(revision).toUtf8(); + mStorage.write(key.data(), key.size(), msg, size); + mStorage.setMaxRevision(revision); + mStorage.commitTransaction(); +} + +void MessageQueue::dequeue(const std::function)> &resultHandler, + const std::function &errorHandler) +{ + mStorage.scan("", [this, resultHandler](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool { + const std::string key(static_cast(keyPtr), keySize); + resultHandler(valuePtr, valueSize, [this, key](bool success) { + if (success) { + mStorage.remove(key.data(), key.size()); + } else { + //TODO re-enqueue? + } + }); + return false; + }); +} + +bool MessageQueue::isEmpty() +{ + int count = 0; + mStorage.scan("", [&count](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool { + const QByteArray key(static_cast(keyPtr), keySize); + if (!key.startsWith("__internal")) { + count++; + } + }); + return count == 0; +} + -- cgit v1.2.3