diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-16 14:55:20 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-21 09:02:21 +0200 |
commit | 237b9ae4113e7a9f489632296941becb71afdb45 (patch) | |
tree | 01cde58f495944f01cad9d282391d4efd2897141 /common/messagequeue.cpp | |
parent | 95d11bf0be98a4e3c08502fe23417b800233ce14 (diff) | |
download | sink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz sink-237b9ae4113e7a9f489632296941becb71afdb45.zip |
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage.
It does a couple of things:
* Rename Sink::Storage to Sink::Storage::DataStore to free up the
Sink::Storage namespace
* Introduce a Sink::ResourceContext to have a single object that can be
passed around containing everything that is necessary to operate on a
resource. This is a lot better than the multiple separate parameters
that we used to pass around all over the place, while still allowing
for dependency injection for tests.
* Tie storage access together using the new EntityStore that directly
works with ApplicationDomainTypes. This gives us a central place where
main storage, indexes and buffer adaptors are tied together, which
will also give us a place to implement external indexes, such as a
fulltextindex using xapian.
* Use ApplicationDomainTypes as the default way to pass around entities.
Instead of using various ways to pass around entities (buffers,
buffer adaptors, ApplicationDomainTypes), only use a single way.
The old approach was confusing, and was only done as:
* optimization; really shouldn't be necessary and otherwise I'm sure
we can find better ways to optimize ApplicationDomainType itself.
* a way to account for entities that have multiple buffers, a concept
that I no longer deem relevant.
While this commit does the bulk of the work to get there, the following
commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common/messagequeue.cpp')
-rw-r--r-- | common/messagequeue.cpp | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index e050bcd..0fcbf99 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -5,7 +5,7 @@ | |||
5 | 5 | ||
6 | SINK_DEBUG_AREA("messagequeue") | 6 | SINK_DEBUG_AREA("messagequeue") |
7 | 7 | ||
8 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) | 8 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::DataStore::ReadWrite) |
9 | { | 9 | { |
10 | } | 10 | } |
11 | 11 | ||
@@ -27,13 +27,13 @@ void MessageQueue::startTransaction() | |||
27 | return; | 27 | return; |
28 | } | 28 | } |
29 | processRemovals(); | 29 | processRemovals(); |
30 | mWriteTransaction = mStorage.createTransaction(Sink::Storage::ReadWrite); | 30 | mWriteTransaction = mStorage.createTransaction(Sink::Storage::DataStore::ReadWrite); |
31 | } | 31 | } |
32 | 32 | ||
33 | void MessageQueue::commit() | 33 | void MessageQueue::commit() |
34 | { | 34 | { |
35 | mWriteTransaction.commit(); | 35 | mWriteTransaction.commit(); |
36 | mWriteTransaction = Sink::Storage::Transaction(); | 36 | mWriteTransaction = Sink::Storage::DataStore::Transaction(); |
37 | processRemovals(); | 37 | processRemovals(); |
38 | emit messageReady(); | 38 | emit messageReady(); |
39 | } | 39 | } |
@@ -45,10 +45,10 @@ void MessageQueue::enqueue(const QByteArray &value) | |||
45 | implicitTransaction = true; | 45 | implicitTransaction = true; |
46 | startTransaction(); | 46 | startTransaction(); |
47 | } | 47 | } |
48 | const qint64 revision = Sink::Storage::maxRevision(mWriteTransaction) + 1; | 48 | const qint64 revision = Sink::Storage::DataStore::maxRevision(mWriteTransaction) + 1; |
49 | const QByteArray key = QString("%1").arg(revision).toUtf8(); | 49 | const QByteArray key = QString("%1").arg(revision).toUtf8(); |
50 | mWriteTransaction.openDatabase().write(key, value); | 50 | mWriteTransaction.openDatabase().write(key, value); |
51 | Sink::Storage::setMaxRevision(mWriteTransaction, revision); | 51 | Sink::Storage::DataStore::setMaxRevision(mWriteTransaction, revision); |
52 | if (implicitTransaction) { | 52 | if (implicitTransaction) { |
53 | commit(); | 53 | commit(); |
54 | } | 54 | } |
@@ -59,7 +59,7 @@ void MessageQueue::processRemovals() | |||
59 | if (mWriteTransaction) { | 59 | if (mWriteTransaction) { |
60 | return; | 60 | return; |
61 | } | 61 | } |
62 | auto transaction = mStorage.createTransaction(Sink::Storage::ReadWrite); | 62 | auto transaction = mStorage.createTransaction(Sink::Storage::DataStore::ReadWrite); |
63 | for (const auto &key : mPendingRemoval) { | 63 | for (const auto &key : mPendingRemoval) { |
64 | transaction.openDatabase().remove(key); | 64 | transaction.openDatabase().remove(key); |
65 | } | 65 | } |
@@ -82,7 +82,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
82 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) { | 82 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) { |
83 | int count = 0; | 83 | int count = 0; |
84 | QList<KAsync::Future<void>> waitCondition; | 84 | QList<KAsync::Future<void>> waitCondition; |
85 | mStorage.createTransaction(Sink::Storage::ReadOnly) | 85 | mStorage.createTransaction(Sink::Storage::DataStore::ReadOnly) |
86 | .openDatabase() | 86 | .openDatabase() |
87 | .scan("", | 87 | .scan("", |
88 | [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { | 88 | [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { |
@@ -101,7 +101,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
101 | } | 101 | } |
102 | return false; | 102 | return false; |
103 | }, | 103 | }, |
104 | [](const Sink::Storage::Error &error) { | 104 | [](const Sink::Storage::DataStore::Error &error) { |
105 | SinkError() << "Error while retrieving value" << error.message; | 105 | SinkError() << "Error while retrieving value" << error.message; |
106 | // errorHandler(Error(error.store, error.code, error.message)); | 106 | // errorHandler(Error(error.store, error.code, error.message)); |
107 | }); | 107 | }); |
@@ -126,7 +126,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
126 | bool MessageQueue::isEmpty() | 126 | bool MessageQueue::isEmpty() |
127 | { | 127 | { |
128 | int count = 0; | 128 | int count = 0; |
129 | auto t = mStorage.createTransaction(Sink::Storage::ReadOnly); | 129 | auto t = mStorage.createTransaction(Sink::Storage::DataStore::ReadOnly); |
130 | auto db = t.openDatabase(); | 130 | auto db = t.openDatabase(); |
131 | if (db) { | 131 | if (db) { |
132 | db.scan("", | 132 | db.scan("", |
@@ -137,7 +137,7 @@ bool MessageQueue::isEmpty() | |||
137 | } | 137 | } |
138 | return true; | 138 | return true; |
139 | }, | 139 | }, |
140 | [](const Sink::Storage::Error &error) { SinkError() << "Error while checking if empty" << error.message; }); | 140 | [](const Sink::Storage::DataStore::Error &error) { SinkError() << "Error while checking if empty" << error.message; }); |
141 | } | 141 | } |
142 | return count == 0; | 142 | return count == 0; |
143 | } | 143 | } |