From 4f393198d022815c66b10871c7e5efe45a0bf462 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 10 Aug 2015 23:32:31 +0200 Subject: Ported messagequeue to new API --- common/messagequeue.cpp | 24 ++++++++++++------------ common/messagequeue.h | 1 + common/storage.h | 2 ++ common/storage_common.cpp | 21 +++++++++++++++++---- 4 files changed, 32 insertions(+), 16 deletions(-) (limited to 'common') diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 0704dd3..a92d6be 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp @@ -11,12 +11,11 @@ MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) void MessageQueue::enqueue(void const *msg, size_t size) { - mStorage.startTransaction(Akonadi2::Storage::ReadWrite); + auto transaction = mStorage.createTransaction(Akonadi2::Storage::ReadWrite); const qint64 revision = mStorage.maxRevision() + 1; const QByteArray key = QString("%1").arg(revision).toUtf8(); - mStorage.write(key.data(), key.size(), msg, size); - mStorage.setMaxRevision(revision); - mStorage.commitTransaction(); + transaction.write(key, QByteArray::fromRawData(static_cast(msg), size)); + Akonadi2::Storage::setMaxRevision(transaction, revision); emit messageReady(); } @@ -24,19 +23,21 @@ void MessageQueue::dequeue(const std::function &errorHandler) { bool readValue = false; - mStorage.scan("", [this, resultHandler, errorHandler, &readValue](void *keyPtr, int keySize, void *valuePtr, int valueSize) -> bool { - //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) - const auto key = QByteArray(static_cast(keyPtr), keySize); + mTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); + mTransaction.scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool { if (Akonadi2::Storage::isInternalKey(key)) { return true; } readValue = true; - resultHandler(valuePtr, valueSize, [this, key, errorHandler](bool success) { + //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) + const auto keyCopy = QByteArray(key.constData(), key.size()); + resultHandler(const_cast(static_cast(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) { if (success) { - mStorage.remove(key.data(), key.size(), [errorHandler, key](const Akonadi2::Storage::Error &error) { - ErrorMsg() << "Error while removing value" << error.message << key; + mTransaction.remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Error while removing value" << error.message << keyCopy; //Don't call the errorhandler in here, we already called the result handler }); + mTransaction.commit(); if (isEmpty()) { emit this->drained(); } @@ -59,8 +60,7 @@ void MessageQueue::dequeue(const std::function bool { - const auto key = QByteArray::fromRawData(static_cast(keyPtr), keySize); + mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count](const QByteArray &key, const QByteArray &value) -> bool { if (!Akonadi2::Storage::isInternalKey(key)) { count++; return false; diff --git a/common/messagequeue.h b/common/messagequeue.h index b7e3daa..ffc1ff2 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h @@ -39,4 +39,5 @@ signals: private: Q_DISABLE_COPY(MessageQueue); Akonadi2::Storage mStorage; + Akonadi2::Storage::Transaction mTransaction; }; diff --git a/common/storage.h b/common/storage.h index fd35274..f9a207f 100644 --- a/common/storage.h +++ b/common/storage.h @@ -168,6 +168,8 @@ public: qint64 maxRevision(); void setMaxRevision(qint64 revision); + static qint64 maxRevision(const Akonadi2::Storage::Transaction &); + static void setMaxRevision(Akonadi2::Storage::Transaction &, qint64 revision); bool exists() const; diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 65f6e57..8006a8e 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp @@ -69,18 +69,31 @@ bool Storage::write(const QByteArray &sKey, const QByteArray &sValue, const std: void Storage::setMaxRevision(qint64 revision) { - write("__internal_maxRevision", QByteArray::number(revision)); + auto transaction = createTransaction(Akonadi2::Storage::ReadWrite); + setMaxRevision(transaction, revision); +} + +void Storage::setMaxRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision) +{ + transaction.write("__internal_maxRevision", QByteArray::number(revision)); } qint64 Storage::maxRevision() +{ + auto transaction = createTransaction(Akonadi2::Storage::ReadOnly); + return maxRevision(transaction); +} + +qint64 Storage::maxRevision(const Akonadi2::Storage::Transaction &transaction) { qint64 r = 0; - scan("__internal_maxRevision", [&](const QByteArray &revision) -> bool { + transaction.scan("__internal_maxRevision", [&](const QByteArray &, const QByteArray &revision) -> bool { r = revision.toLongLong(); return false; - }, [this](const Error &error){ + }, [](const Error &error){ if (error.code != ErrorCodes::NotFound) { - defaultErrorHandler()(error); + //FIXME + // defaultErrorHandler()(error); } }); return r; -- cgit v1.2.3