summaryrefslogtreecommitdiffstats
path: root/common/messagequeue.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-17 00:19:50 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-18 18:54:07 +0200
commite19bad87f43caf602793d8297562804b17383f7d (patch)
tree401837d8ea4fe07d2ca174e06989c72f02de633f /common/messagequeue.cpp
parentca3ce44c7a565df701d9e1b1556cbf2b6819f37c (diff)
downloadsink-e19bad87f43caf602793d8297562804b17383f7d.tar.gz
sink-e19bad87f43caf602793d8297562804b17383f7d.zip
Transactions for messagequeue
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r--common/messagequeue.cpp123
1 files changed, 68 insertions, 55 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 2a046d1..ab4b1cf 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -39,7 +39,10 @@ static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void> > &futures
39MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) 39MessageQueue::MessageQueue(const QString &storageRoot, const QString &name)
40 : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite) 40 : mStorage(storageRoot, name, Akonadi2::Storage::ReadWrite)
41{ 41{
42}
42 43
44MessageQueue::~MessageQueue()
45{
43} 46}
44 47
45void MessageQueue::enqueue(void const *msg, size_t size) 48void MessageQueue::enqueue(void const *msg, size_t size)
@@ -47,71 +50,88 @@ void MessageQueue::enqueue(void const *msg, size_t size)
47 enqueue(QByteArray::fromRawData(static_cast<const char*>(msg), size)); 50 enqueue(QByteArray::fromRawData(static_cast<const char*>(msg), size));
48} 51}
49 52
53void MessageQueue::startTransaction()
54{
55 if (mWriteTransaction) {
56 return;
57 }
58 processRemovals();
59 mWriteTransaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
60}
61
62void MessageQueue::commit()
63{
64 mWriteTransaction.commit();
65 mWriteTransaction = Akonadi2::Storage::Transaction();
66 processRemovals();
67 emit messageReady();
68}
69
50void MessageQueue::enqueue(const QByteArray &value) 70void MessageQueue::enqueue(const QByteArray &value)
51{ 71{
52 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); 72 bool implicitTransaction = false;
53 const qint64 revision = Akonadi2::Storage::maxRevision(transaction) + 1; 73 if (!mWriteTransaction) {
74 implicitTransaction = true;
75 startTransaction();
76 }
77 const qint64 revision = Akonadi2::Storage::maxRevision(mWriteTransaction) + 1;
54 const QByteArray key = QString("%1").arg(revision).toUtf8(); 78 const QByteArray key = QString("%1").arg(revision).toUtf8();
55 transaction.write(key, value); 79 mWriteTransaction.write(key, value);
56 Akonadi2::Storage::setMaxRevision(transaction, revision); 80 Akonadi2::Storage::setMaxRevision(mWriteTransaction, revision);
81 if (implicitTransaction) {
82 commit();
83 }
84}
85
86void MessageQueue::processRemovals()
87{
88 if (mWriteTransaction) {
89 return;
90 }
91 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite));
92 for (const auto &key : mPendingRemoval) {
93 transaction.remove(key);
94 }
57 transaction.commit(); 95 transaction.commit();
58 emit messageReady(); 96 mPendingRemoval.clear();
59} 97}
60 98
61void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler, 99void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::function<void(bool success)>)> &resultHandler,
62 const std::function<void(const Error &error)> &errorHandler) 100 const std::function<void(const Error &error)> &errorHandler)
63{ 101{
64 bool readValue = false; 102 dequeueBatch(1, [resultHandler](const QByteArray &value) {
65 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, errorHandler, &readValue](const QByteArray &key, const QByteArray &value) -> bool { 103 return KAsync::start<void>([&value,resultHandler](KAsync::Future<void> &future) {
66 if (Akonadi2::Storage::isInternalKey(key)) { 104 resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [&future](bool success){
67 return true; 105 future.setFinished();
68 } 106 });
69 readValue = true;
70 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
71 const auto keyCopy = QByteArray(key.constData(), key.size());
72 resultHandler(const_cast<void*>(static_cast<const void*>(value.data())), value.size(), [this, keyCopy, errorHandler](bool success) {
73 if (success) {
74 mStorage.createTransaction(Akonadi2::Storage::ReadWrite).remove(keyCopy, [errorHandler, keyCopy](const Akonadi2::Storage::Error &error) {
75 ErrorMsg() << "Error while removing value" << error.message << keyCopy;
76 //Don't call the errorhandler in here, we already called the result handler
77 });
78 if (isEmpty()) {
79 emit this->drained();
80 }
81 } else {
82 //TODO re-enqueue?
83 }
84 }); 107 });
85 return false; 108 }).then<void>([](){},
86 }, 109 [errorHandler](int error, const QString &errorString) {
87 [errorHandler](const Akonadi2::Storage::Error &error) { 110 errorHandler(Error("messagequeue", error, errorString.toLatin1()));
88 ErrorMsg() << "Error while retrieving value" << error.message; 111 }).exec();
89 errorHandler(Error(error.store, error.code, error.message));
90 }
91 );
92 if (!readValue) {
93 errorHandler(Error("messagequeue", -1, "No message found"));
94 }
95} 112}
96 113
97KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) 114KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler)
98{ 115{
116 Trace() << "Dequeue batch";
99 auto resultCount = QSharedPointer<int>::create(0); 117 auto resultCount = QSharedPointer<int>::create(0);
100 auto keyList = QSharedPointer<QByteArrayList>::create(); 118 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) {
101 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount, keyList](KAsync::Future<void> &future) {
102 int count = 0; 119 int count = 0;
103 QList<KAsync::Future<void> > waitCondition; 120 QList<KAsync::Future<void> > waitCondition;
104 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, keyList, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { 121 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool {
105 if (Akonadi2::Storage::isInternalKey(key)) { 122 if (Akonadi2::Storage::isInternalKey(key) || mPendingRemoval.contains(key)) {
106 return true; 123 return true;
107 } 124 }
125 *resultCount += 1;
126 Trace() << "Dequeue value";
108 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) 127 //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid)
109 keyList->append(QByteArray(key.constData(), key.size())); 128 mPendingRemoval << QByteArray(key.constData(), key.size());
110 129
111 waitCondition << resultHandler(value).exec(); 130 waitCondition << resultHandler(value).exec();
112 131
113 count++; 132 count++;
114 if (count <= maxBatchSize) { 133 Trace() << count << maxBatchSize;
134 if (count < maxBatchSize) {
115 return true; 135 return true;
116 } 136 }
117 return false; 137 return false;
@@ -121,17 +141,10 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
121 // errorHandler(Error(error.store, error.code, error.message)); 141 // errorHandler(Error(error.store, error.code, error.message));
122 }); 142 });
123 143
124 ::waitForCompletion(waitCondition).then<void>([this, keyList, &future]() { 144 // Trace() << "Waiting on " << waitCondition.size() << " results";
125 Trace() << "Dequeue complete, removing values " << *keyList; 145 ::waitForCompletion(waitCondition).then<void>([this, resultCount, &future]() {
126 auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); 146 processRemovals();
127 for (const auto &key : *keyList) { 147 if (*resultCount == 0) {
128 transaction.remove(key, [key](const Akonadi2::Storage::Error &error) {
129 ErrorMsg() << "Error while removing value" << error.message << key;
130 //Don't call the errorhandler in here, we already called the result handler
131 });
132 }
133 transaction.commit();
134 if (keyList->isEmpty()) {
135 future.setError(-1, "No message found"); 148 future.setError(-1, "No message found");
136 future.setFinished(); 149 future.setFinished();
137 } else { 150 } else {
@@ -147,15 +160,15 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
147bool MessageQueue::isEmpty() 160bool MessageQueue::isEmpty()
148{ 161{
149 int count = 0; 162 int count = 0;
150 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count](const QByteArray &key, const QByteArray &value) -> bool { 163 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count, this](const QByteArray &key, const QByteArray &value) -> bool {
151 if (!Akonadi2::Storage::isInternalKey(key)) { 164 if (!Akonadi2::Storage::isInternalKey(key) && !mPendingRemoval.contains(key)) {
152 count++; 165 count++;
153 return false; 166 return false;
154 } 167 }
155 return true; 168 return true;
156 }, 169 },
157 [](const Akonadi2::Storage::Error &error) { 170 [](const Akonadi2::Storage::Error &error) {
158 qDebug() << "Error while checking if empty" << error.message; 171 ErrorMsg() << "Error while checking if empty" << error.message;
159 }); 172 });
160 return count == 0; 173 return count == 0;
161} 174}