summaryrefslogtreecommitdiffstats
path: root/common/messagequeue.cpp
blob: 2a046d1d7969c33169d7752e6726be40b17cd274 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#include "messagequeue.h"
#include "storage.h"
#include <QDebug>
#include <log.h>

static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void> > &futures)
{
    auto context = new QObject;
    return KAsync::start<void>([futures, context](KAsync::Future<void> &future) {
        const auto total = futures.size();
        auto count = QSharedPointer<int>::create();
        int i = 0;
        for (KAsync::Future<void> subFuture : futures) {
            i++;
            if (subFuture.isFinished()) {
                *count += 1;
                continue;
            }
            //FIXME bind lifetime all watcher to future (repectively the main job
            auto watcher = QSharedPointer<KAsync::FutureWatcher<void> >::create();
            QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady,
            [count, total, &future](){
                *count += 1;
                if (*count == total) {
                    future.setFinished();
                }
            });
            watcher->setFuture(subFuture);
            context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher));
        }
        if (*count == total) {
            future.setFinished();
        }
    }).then<void>([context]() {
        delete context;
    });
}

MessageQueue::MessageQueue(const QString &storageRoot, const QString &name)
    : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite)
{

}

void MessageQueue::enqueue(void const *msg, size_t size)
{
    enqueue(QByteArray::fromRawData(static_cast<const char*>(msg), size));
}

void MessageQueue::enqueue(const QByteArray &value)
{
    auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
    const qint64 revision = Akonadi2::Storage::maxRevision(transaction) + 1;
    const QByteArray key = QString("%1").arg(revision).toUtf8();
    transaction.write(key, value);
    Akonadi2::Storage::setMaxRevision(transaction, revision);
    transaction.commit();
    emit messageReady();
}

void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler,
                           const std::function<void(const Error &error)> &errorHandler)
{
    bool readValue = false;
    mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool {
        if (Akonadi2::Storage::isInternalKey(key)) {
            return true;
        }
        readValue = true;
        //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
        const auto keyCopy  = QByteArray(key.constData(), key.size());
        resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) {
            if (success) {
                mStorage.createTransaction(Akonadi2::Storage::ReadWrite).remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) {
                    ErrorMsg() << "Error while removing value" << error.message << keyCopy;
                    //Don't call the errorhandler in here, we already called the result handler
                });
                if (isEmpty()) {
                    emit this->drained();
                }
            } else {
                //TODO re-enqueue?
            }
        });
        return false;
    },
    [errorHandler](const Akonadi2::Storage::Error &error) {
        ErrorMsg() << "Error while retrieving value" << error.message;
        errorHandler(Error(error.store, error.code, error.message));
    }
    );
    if (!readValue) {
        errorHandler(Error("messagequeue", -1, "No message found"));
    }
}

KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler)
{
    auto resultCount = QSharedPointer<int>::create(0);
    auto keyList = QSharedPointer<QByteArrayList>::create();
    return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) {
        int count = 0;
        QList<KAsync::Future<void> > waitCondition;
        mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, keyList, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool {
            if (Akonadi2::Storage::isInternalKey(key)) {
                return true;
            }
            //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
            keyList->append(QByteArray(key.constData(), key.size()));

            waitCondition << resultHandler(value).exec();

            count++;
            if (count <= maxBatchSize) {
                return true;
            }
            return false;
        },
        [](const Akonadi2::Storage::Error &error) {
            ErrorMsg() << "Error while retrieving value" << error.message;
            // errorHandler(Error(error.store, error.code, error.message));
        });

        ::waitForCompletion(waitCondition).then<void>([this, keyList, &future]() {
            Trace() << "Dequeue complete, removing values " << *keyList;
            auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
            for (const auto &key : *keyList) {
                transaction.remove(key, [key](const Akonadi2::Storage::Error &error) {
                    ErrorMsg() << "Error while removing value" << error.message << key;
                    //Don't call the errorhandler in here, we already called the result handler
                });
            }
            transaction.commit();
            if (keyList->isEmpty()) {
                future.setError(-1, "No message found");
                future.setFinished();
            } else {
                if (isEmpty()) {
                    emit this->drained();
                }
                future.setFinished();
            }
        }).exec();
    });
}

bool MessageQueue::isEmpty()
{
    int count = 0;
    mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count](const QByteArray &key, const QByteArray &value) -> bool {
        if (!Akonadi2::Storage::isInternalKey(key)) {
            count++;
            return false;
        }
        return true;
    },
    [](const Akonadi2::Storage::Error &error) {
        qDebug() << "Error while checking if empty" << error.message;
    });
    return count == 0;
}