summaryrefslogtreecommitdiffstats
path: root/common/messagequeue.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-16 14:55:20 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-21 09:02:21 +0200
commit237b9ae4113e7a9f489632296941becb71afdb45 (patch)
tree01cde58f495944f01cad9d282391d4efd2897141 /common/messagequeue.cpp
parent95d11bf0be98a4e3c08502fe23417b800233ce14 (diff)
downloadsink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz
sink-237b9ae4113e7a9f489632296941becb71afdb45.zip
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage. It does a couple of things: * Rename Sink::Storage to Sink::Storage::DataStore to free up the Sink::Storage namespace * Introduce a Sink::ResourceContext to have a single object that can be passed around containing everything that is necessary to operate on a resource. This is a lot better than the multiple separate parameters that we used to pass around all over the place, while still allowing for dependency injection for tests. * Tie storage access together using the new EntityStore that directly works with ApplicationDomainTypes. This gives us a central place where main storage, indexes and buffer adaptors are tied together, which will also give us a place to implement external indexes, such as a fulltextindex using xapian. * Use ApplicationDomainTypes as the default way to pass around entities. Instead of using various ways to pass around entities (buffers, buffer adaptors, ApplicationDomainTypes), only use a single way. The old approach was confusing, and was only done as: * optimization; really shouldn't be necessary and otherwise I'm sure we can find better ways to optimize ApplicationDomainType itself. * a way to account for entities that have multiple buffers, a concept that I no longer deem relevant. While this commit does the bulk of the work to get there, the following commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r--common/messagequeue.cpp20
1 files changed, 10 insertions, 10 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index e050bcd..0fcbf99 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -5,7 +5,7 @@
5 5
6SINK_DEBUG_AREA("messagequeue") 6SINK_DEBUG_AREA("messagequeue")
7 7
8MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) 8MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::DataStore::ReadWrite)
9{ 9{
10} 10}
11 11
@@ -27,13 +27,13 @@ void MessageQueue::startTransaction()
27 return; 27 return;
28 } 28 }
29 processRemovals(); 29 processRemovals();
30 mWriteTransaction = mStorage.createTransaction(Sink::Storage::ReadWrite); 30 mWriteTransaction = mStorage.createTransaction(Sink::Storage::DataStore::ReadWrite);
31} 31}
32 32
33void MessageQueue::commit() 33void MessageQueue::commit()
34{ 34{
35 mWriteTransaction.commit(); 35 mWriteTransaction.commit();
36 mWriteTransaction = Sink::Storage::Transaction(); 36 mWriteTransaction = Sink::Storage::DataStore::Transaction();
37 processRemovals(); 37 processRemovals();
38 emit messageReady(); 38 emit messageReady();
39} 39}
@@ -45,10 +45,10 @@ void MessageQueue::enqueue(const QByteArray &value)
45 implicitTransaction = true; 45 implicitTransaction = true;
46 startTransaction(); 46 startTransaction();
47 } 47 }
48 const qint64 revision = Sink::Storage::maxRevision(mWriteTransaction) + 1; 48 const qint64 revision = Sink::Storage::DataStore::maxRevision(mWriteTransaction) + 1;
49 const QByteArray key = QString("%1").arg(revision).toUtf8(); 49 const QByteArray key = QString("%1").arg(revision).toUtf8();
50 mWriteTransaction.openDatabase().write(key, value); 50 mWriteTransaction.openDatabase().write(key, value);
51 Sink::Storage::setMaxRevision(mWriteTransaction, revision); 51 Sink::Storage::DataStore::setMaxRevision(mWriteTransaction, revision);
52 if (implicitTransaction) { 52 if (implicitTransaction) {
53 commit(); 53 commit();
54 } 54 }
@@ -59,7 +59,7 @@ void MessageQueue::processRemovals()
59 if (mWriteTransaction) { 59 if (mWriteTransaction) {
60 return; 60 return;
61 } 61 }
62 auto transaction = mStorage.createTransaction(Sink::Storage::ReadWrite); 62 auto transaction = mStorage.createTransaction(Sink::Storage::DataStore::ReadWrite);
63 for (const auto &key : mPendingRemoval) { 63 for (const auto &key : mPendingRemoval) {
64 transaction.openDatabase().remove(key); 64 transaction.openDatabase().remove(key);
65 } 65 }
@@ -82,7 +82,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
82 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) { 82 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) {
83 int count = 0; 83 int count = 0;
84 QList<KAsync::Future<void>> waitCondition; 84 QList<KAsync::Future<void>> waitCondition;
85 mStorage.createTransaction(Sink::Storage::ReadOnly) 85 mStorage.createTransaction(Sink::Storage::DataStore::ReadOnly)
86 .openDatabase() 86 .openDatabase()
87 .scan("", 87 .scan("",
88 [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { 88 [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool {
@@ -101,7 +101,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
101 } 101 }
102 return false; 102 return false;
103 }, 103 },
104 [](const Sink::Storage::Error &error) { 104 [](const Sink::Storage::DataStore::Error &error) {
105 SinkError() << "Error while retrieving value" << error.message; 105 SinkError() << "Error while retrieving value" << error.message;
106 // errorHandler(Error(error.store, error.code, error.message)); 106 // errorHandler(Error(error.store, error.code, error.message));
107 }); 107 });
@@ -126,7 +126,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
126bool MessageQueue::isEmpty() 126bool MessageQueue::isEmpty()
127{ 127{
128 int count = 0; 128 int count = 0;
129 auto t = mStorage.createTransaction(Sink::Storage::ReadOnly); 129 auto t = mStorage.createTransaction(Sink::Storage::DataStore::ReadOnly);
130 auto db = t.openDatabase(); 130 auto db = t.openDatabase();
131 if (db) { 131 if (db) {
132 db.scan("", 132 db.scan("",
@@ -137,7 +137,7 @@ bool MessageQueue::isEmpty()
137 } 137 }
138 return true; 138 return true;
139 }, 139 },
140 [](const Sink::Storage::Error &error) { SinkError() << "Error while checking if empty" << error.message; }); 140 [](const Sink::Storage::DataStore::Error &error) { SinkError() << "Error while checking if empty" << error.message; });
141 } 141 }
142 return count == 0; 142 return count == 0;
143} 143}