summaryrefslogtreecommitdiffstats
path: root/common/messagequeue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r--common/messagequeue.cpp20
1 files changed, 10 insertions, 10 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index e050bcd..0fcbf99 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -5,7 +5,7 @@
5 5
6SINK_DEBUG_AREA("messagequeue") 6SINK_DEBUG_AREA("messagequeue")
7 7
8MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) 8MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::DataStore::ReadWrite)
9{ 9{
10} 10}
11 11
@@ -27,13 +27,13 @@ void MessageQueue::startTransaction()
27 return; 27 return;
28 } 28 }
29 processRemovals(); 29 processRemovals();
30 mWriteTransaction = mStorage.createTransaction(Sink::Storage::ReadWrite); 30 mWriteTransaction = mStorage.createTransaction(Sink::Storage::DataStore::ReadWrite);
31} 31}
32 32
33void MessageQueue::commit() 33void MessageQueue::commit()
34{ 34{
35 mWriteTransaction.commit(); 35 mWriteTransaction.commit();
36 mWriteTransaction = Sink::Storage::Transaction(); 36 mWriteTransaction = Sink::Storage::DataStore::Transaction();
37 processRemovals(); 37 processRemovals();
38 emit messageReady(); 38 emit messageReady();
39} 39}
@@ -45,10 +45,10 @@ void MessageQueue::enqueue(const QByteArray &value)
45 implicitTransaction = true; 45 implicitTransaction = true;
46 startTransaction(); 46 startTransaction();
47 } 47 }
48 const qint64 revision = Sink::Storage::maxRevision(mWriteTransaction) + 1; 48 const qint64 revision = Sink::Storage::DataStore::maxRevision(mWriteTransaction) + 1;
49 const QByteArray key = QString("%1").arg(revision).toUtf8(); 49 const QByteArray key = QString("%1").arg(revision).toUtf8();
50 mWriteTransaction.openDatabase().write(key, value); 50 mWriteTransaction.openDatabase().write(key, value);
51 Sink::Storage::setMaxRevision(mWriteTransaction, revision); 51 Sink::Storage::DataStore::setMaxRevision(mWriteTransaction, revision);
52 if (implicitTransaction) { 52 if (implicitTransaction) {
53 commit(); 53 commit();
54 } 54 }
@@ -59,7 +59,7 @@ void MessageQueue::processRemovals()
59 if (mWriteTransaction) { 59 if (mWriteTransaction) {
60 return; 60 return;
61 } 61 }
62 auto transaction = mStorage.createTransaction(Sink::Storage::ReadWrite); 62 auto transaction = mStorage.createTransaction(Sink::Storage::DataStore::ReadWrite);
63 for (const auto &key : mPendingRemoval) { 63 for (const auto &key : mPendingRemoval) {
64 transaction.openDatabase().remove(key); 64 transaction.openDatabase().remove(key);
65 } 65 }
@@ -82,7 +82,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
82 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) { 82 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) {
83 int count = 0; 83 int count = 0;
84 QList<KAsync::Future<void>> waitCondition; 84 QList<KAsync::Future<void>> waitCondition;
85 mStorage.createTransaction(Sink::Storage::ReadOnly) 85 mStorage.createTransaction(Sink::Storage::DataStore::ReadOnly)
86 .openDatabase() 86 .openDatabase()
87 .scan("", 87 .scan("",
88 [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { 88 [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool {
@@ -101,7 +101,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
101 } 101 }
102 return false; 102 return false;
103 }, 103 },
104 [](const Sink::Storage::Error &error) { 104 [](const Sink::Storage::DataStore::Error &error) {
105 SinkError() << "Error while retrieving value" << error.message; 105 SinkError() << "Error while retrieving value" << error.message;
106 // errorHandler(Error(error.store, error.code, error.message)); 106 // errorHandler(Error(error.store, error.code, error.message));
107 }); 107 });
@@ -126,7 +126,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
126bool MessageQueue::isEmpty() 126bool MessageQueue::isEmpty()
127{ 127{
128 int count = 0; 128 int count = 0;
129 auto t = mStorage.createTransaction(Sink::Storage::ReadOnly); 129 auto t = mStorage.createTransaction(Sink::Storage::DataStore::ReadOnly);
130 auto db = t.openDatabase(); 130 auto db = t.openDatabase();
131 if (db) { 131 if (db) {
132 db.scan("", 132 db.scan("",
@@ -137,7 +137,7 @@ bool MessageQueue::isEmpty()
137 } 137 }
138 return true; 138 return true;
139 }, 139 },
140 [](const Sink::Storage::Error &error) { SinkError() << "Error while checking if empty" << error.message; }); 140 [](const Sink::Storage::DataStore::Error &error) { SinkError() << "Error while checking if empty" << error.message; });
141 } 141 }
142 return count == 0; 142 return count == 0;
143} 143}