#include "messagequeue.h" #include "storage.h" #include #include static KAsync::Job waitForCompletion(QList > &futures) { auto context = new QObject; return KAsync::start([futures, context](KAsync::Future &future) { const auto total = futures.size(); auto count = QSharedPointer::create(); int i = 0; for (KAsync::Future subFuture : futures) { i++; if (subFuture.isFinished()) { *count += 1; continue; } //FIXME bind lifetime all watcher to future (repectively the main job auto watcher = QSharedPointer >::create(); QObject::connect(watcher.data(), &KAsync::FutureWatcher::futureReady, [count, total, &future](){ *count += 1; if (*count == total) { future.setFinished(); } }); watcher->setFuture(subFuture); context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); } if (*count == total) { future.setFinished(); } }).then([context]() { delete context; }); } MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) { } MessageQueue::~MessageQueue() { } void MessageQueue::enqueue(void const *msg, size_t size) { enqueue(QByteArray::fromRawData(static_cast(msg), size)); } void MessageQueue::startTransaction() { if (mWriteTransaction) { return; } processRemovals(); mWriteTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); } void MessageQueue::commit() { mWriteTransaction.commit(); mWriteTransaction = Akonadi2::Storage::Transaction(); processRemovals(); emit messageReady(); } void MessageQueue::enqueue(const QByteArray &value) { bool implicitTransaction = false; if (!mWriteTransaction) { implicitTransaction = true; startTransaction(); } const qint64 revision = Akonadi2::Storage::maxRevision(mWriteTransaction) + 1; const QByteArray key = QString("%1").arg(revision).toUtf8(); mWriteTransaction.write(key, value); Akonadi2::Storage::setMaxRevision(mWriteTransaction, revision); if (implicitTransaction) { commit(); } } void MessageQueue::processRemovals() { if (mWriteTransaction) { return; } auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); for (const auto &key : mPendingRemoval) { transaction.remove(key); } transaction.commit(); mPendingRemoval.clear(); } void MessageQueue::dequeue(const std::function)> &resultHandler, const std::function &errorHandler) { dequeueBatch(1, [resultHandler](const QByteArray &value) { return KAsync::start([&value,resultHandler](KAsync::Future &future) { resultHandler(const_cast(static_cast(value.data())), value.size(), [&future](bool success){ future.setFinished(); }); }); }).then([](){}, [errorHandler](int error, const QString &errorString) { errorHandler(Error("messagequeue", error, errorString.toLatin1())); }).exec(); } KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::function(const QByteArray &)> &resultHandler) { Trace() << "Dequeue batch"; auto resultCount = QSharedPointer::create(0); return KAsync::start([this, maxBatchSize, resultHandler, resultCount](KAsync::Future &future) { int count = 0; QList > waitCondition; mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { if (Akonadi2::Storage::isInternalKey(key) || mPendingRemoval.contains(key)) { return true; } *resultCount += 1; Trace() << "Dequeue value"; //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) mPendingRemoval << QByteArray(key.constData(), key.size()); waitCondition << resultHandler(value).exec(); count++; Trace() << count << maxBatchSize; if (count < maxBatchSize) { return true; } return false; }, [](const Akonadi2::Storage::Error &error) { ErrorMsg() << "Error while retrieving value" << error.message; // errorHandler(Error(error.store, error.code, error.message)); }); // Trace() << "Waiting on " << waitCondition.size() << " results"; ::waitForCompletion(waitCondition).then([this, resultCount, &future]() { processRemovals(); if (*resultCount == 0) { future.setError(-1, "No message found"); future.setFinished(); } else { if (isEmpty()) { emit this->drained(); } future.setFinished(); } }).exec(); }); } bool MessageQueue::isEmpty() { int count = 0; mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count, this](const QByteArray &key, const QByteArray &value) -> bool { if (!Akonadi2::Storage::isInternalKey(key) && !mPendingRemoval.contains(key)) { count++; return false; } return true; }, [](const Akonadi2::Storage::Error &error) { ErrorMsg() << "Error while checking if empty" << error.message; }); return count == 0; }