summaryrefslogtreecommitdiffstats
path: root/common/messagequeue.cpp
blob: 598c63a8c1dc34c4c3021b99ec9fe3bc5424ee7f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#include "messagequeue.h"
#include "storage.h"
#include <QDebug>
#include <log.h>

MessageQueue::MessageQueue(const QString &storageRoot, const QString &name)
    : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite)
{

}

void MessageQueue::enqueue(void const *msg, size_t size)
{
    auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
    const qint64 revision = Akonadi2::Storage::maxRevision(transaction) + 1;
    const QByteArray key = QString("%1").arg(revision).toUtf8();
    transaction.write(key, QByteArray::fromRawData(static_cast<const char*>(msg), size));
    Akonadi2::Storage::setMaxRevision(transaction, revision);
    transaction.commit();
    emit messageReady();
}

void MessageQueue::enqueue(const QByteArray &value)
{
    enqueue(value.data(), value.size());
}

void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler,
                           const std::function<void(const Error &error)> &errorHandler)
{
    bool readValue = false;
    mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool {
        if (Akonadi2::Storage::isInternalKey(key)) {
            return true;
        }
        readValue = true;
        //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
        const auto keyCopy  = QByteArray(key.constData(), key.size());
        resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) {
            if (success) {
                mStorage.createTransaction(Akonadi2::Storage::ReadWrite).remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) {
                    ErrorMsg() << "Error while removing value" << error.message << keyCopy;
                    //Don't call the errorhandler in here, we already called the result handler
                });
                if (isEmpty()) {
                    emit this->drained();
                }
            } else {
                //TODO re-enqueue?
            }
        });
        return false;
    },
    [errorHandler](const Akonadi2::Storage::Error &error) {
        ErrorMsg() << "Error while retrieving value" << error.message;
        errorHandler(Error(error.store, error.code, error.message));
    }
    );
    if (!readValue) {
        errorHandler(Error("messagequeue", -1, "No message found"));
    }
}

KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler)
{
    auto resultCount = QSharedPointer<int>::create(0);
    auto keyList = QSharedPointer<QByteArrayList>::create();
    return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) {
        bool readValue = false;
        int count = 0;
        mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, &readValue, resultCount, keyList, &count, maxBatchSize](const QByteArray &key, const QByteArray &value) -> bool {
            if (Akonadi2::Storage::isInternalKey(key)) {
                return true;
            }
            readValue = true;
            //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
            keyList->append(QByteArray(key.constData(), key.size()));
            resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, resultCount, keyList, &count](bool success) {
                *resultCount += 1;
                //We're done
                //FIXME the check below should only be done once we finished reading
                if (*resultCount >= count) {
                    //FIXME do this from the caller thread
                    auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
                    for (const auto &key : *keyList) {
                        transaction.remove(key, [key](const Akonadi2::Storage::Error &error) {
                            ErrorMsg() << "Error while removing value" << error.message << key;
                            //Don't call the errorhandler in here, we already called the result handler
                        });
                    }
                    if (isEmpty()) {
                        emit this->drained();
                    }
                }
            });
            count++;
            if (count <= maxBatchSize) {
                return true;
            }
            return false;
        },
        [](const Akonadi2::Storage::Error &error) {
            ErrorMsg() << "Error while retrieving value" << error.message;
            // errorHandler(Error(error.store, error.code, error.message));
        }
        );
        if (!readValue) {
            future.setError(-1, "No message found");
            future.setFinished();
        }
    });
}

bool MessageQueue::isEmpty()
{
    int count = 0;
    mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count](const QByteArray &key, const QByteArray &value) -> bool {
        if (!Akonadi2::Storage::isInternalKey(key)) {
            count++;
            return false;
        }
        return true;
    },
    [](const Akonadi2::Storage::Error &error) {
        qDebug() << "Error while checking if empty" << error.message;
    });
    return count == 0;
}