From 091412c472b10ca61ada445c19bf3c95cd4e8e40 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 11 Jan 2015 16:10:01 +0100 Subject: A messagequeue. --- common/CMakeLists.txt | 1 + common/messagequeue.cpp | 47 +++++++++++++++++++++++++++++++++++++++++++++++ common/messagequeue.h | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+) create mode 100644 common/messagequeue.cpp create mode 100644 common/messagequeue.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 1a9a812..671d1cd 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -30,6 +30,7 @@ set(command_SRCS resourceaccess.cpp storage_common.cpp threadboundary.cpp + messagequeue.cpp ${storage_SRCS}) add_library(${PROJECT_NAME} SHARED ${command_SRCS}) diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp new file mode 100644 index 0000000..cf63e57 --- /dev/null +++ b/common/messagequeue.cpp @@ -0,0 +1,47 @@ +#include "messagequeue.h" +#include + +MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) + : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) +{ + +} + +void MessageQueue::enqueue(void const *msg, size_t size) +{ + mStorage.startTransaction(Akonadi2::Storage::ReadWrite); + const qint64 revision = mStorage.maxRevision() + 1; + const QByteArray key = QString("%1").arg(revision).toUtf8(); + mStorage.write(key.data(), key.size(), msg, size); + mStorage.setMaxRevision(revision); + mStorage.commitTransaction(); +} + +void MessageQueue::dequeue(const std::function)> &resultHandler, + const std::function &errorHandler) +{ + mStorage.scan("", [this, resultHandler](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool { + const std::string key(static_cast(keyPtr), keySize); + resultHandler(valuePtr, valueSize, [this, key](bool success) { + if (success) { + mStorage.remove(key.data(), key.size()); + } else { + //TODO re-enqueue? + } + }); + return false; + }); +} + +bool MessageQueue::isEmpty() +{ + int count = 0; + mStorage.scan("", [&count](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool { + const QByteArray key(static_cast(keyPtr), keySize); + if (!key.startsWith("__internal")) { + count++; + } + }); + return count == 0; +} + diff --git a/common/messagequeue.h b/common/messagequeue.h new file mode 100644 index 0000000..b56b3cd --- /dev/null +++ b/common/messagequeue.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include +#include +#include "storage.h" + +/** + * A persistent FIFO message queue. + */ +class MessageQueue { +public: + class Error + { + public: + Error(const std::string &s, int c, const std::string &m) + : store(s), message(m), code(c) {} + std::string store; + std::string message; + int code; + }; + + MessageQueue(const QString &storageRoot, const QString &name); + + void enqueue(void const *msg, size_t size); + //Dequeue a message. This will return a new message everytime called. + //Call the result handler with a success response to remove the message from the store. + //TODO track processing progress to avoid processing the same message with the same preprocessor twice? + void dequeue(const std::function)> & resultHandler, + const std::function &errorHandler); + bool isEmpty(); +private: + Akonadi2::Storage mStorage; +}; -- cgit v1.2.3