diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-16 14:55:20 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-21 09:02:21 +0200 |
commit | 237b9ae4113e7a9f489632296941becb71afdb45 (patch) | |
tree | 01cde58f495944f01cad9d282391d4efd2897141 /common/changereplay.cpp | |
parent | 95d11bf0be98a4e3c08502fe23417b800233ce14 (diff) | |
download | sink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz sink-237b9ae4113e7a9f489632296941becb71afdb45.zip |
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage.
It does a couple of things:
* Rename Sink::Storage to Sink::Storage::DataStore to free up the
Sink::Storage namespace
* Introduce a Sink::ResourceContext to have a single object that can be
passed around containing everything that is necessary to operate on a
resource. This is a lot better than the multiple separate parameters
that we used to pass around all over the place, while still allowing
for dependency injection for tests.
* Tie storage access together using the new EntityStore that directly
works with ApplicationDomainTypes. This gives us a central place where
main storage, indexes and buffer adaptors are tied together, which
will also give us a place to implement external indexes, such as a
fulltextindex using xapian.
* Use ApplicationDomainTypes as the default way to pass around entities.
Instead of using various ways to pass around entities (buffers,
buffer adaptors, ApplicationDomainTypes), only use a single way.
The old approach was confusing, and was only done as:
* optimization; really shouldn't be necessary and otherwise I'm sure
we can find better ways to optimize ApplicationDomainType itself.
* a way to account for entities that have multiple buffers, a concept
that I no longer deem relevant.
While this commit does the bulk of the work to get there, the following
commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r-- | common/changereplay.cpp | 33 |
1 files changed, 17 insertions, 16 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index e3b7158..6e58564 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -27,31 +27,32 @@ | |||
27 | #include <QTimer> | 27 | #include <QTimer> |
28 | 28 | ||
29 | using namespace Sink; | 29 | using namespace Sink; |
30 | using namespace Sink::Storage; | ||
30 | 31 | ||
31 | SINK_DEBUG_AREA("changereplay"); | 32 | SINK_DEBUG_AREA("changereplay"); |
32 | 33 | ||
33 | ChangeReplay::ChangeReplay(const QByteArray &resourceName) | 34 | ChangeReplay::ChangeReplay(const ResourceContext &resourceContext) |
34 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) | 35 | : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false) |
35 | { | 36 | { |
36 | SinkTrace() << "Created change replay: " << resourceName; | 37 | SinkTrace() << "Created change replay: " << resourceContext.instanceId(); |
37 | } | 38 | } |
38 | 39 | ||
39 | qint64 ChangeReplay::getLastReplayedRevision() | 40 | qint64 ChangeReplay::getLastReplayedRevision() |
40 | { | 41 | { |
41 | qint64 lastReplayedRevision = 0; | 42 | qint64 lastReplayedRevision = 0; |
42 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); | 43 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly); |
43 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | 44 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
44 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | 45 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { |
45 | lastReplayedRevision = value.toLongLong(); | 46 | lastReplayedRevision = value.toLongLong(); |
46 | return false; | 47 | return false; |
47 | }, | 48 | }, |
48 | [](const Storage::Error &) {}); | 49 | [](const DataStore::Error &) {}); |
49 | return lastReplayedRevision; | 50 | return lastReplayedRevision; |
50 | } | 51 | } |
51 | 52 | ||
52 | bool ChangeReplay::allChangesReplayed() | 53 | bool ChangeReplay::allChangesReplayed() |
53 | { | 54 | { |
54 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 55 | const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { |
55 | SinkWarning() << error.message; | 56 | SinkWarning() << error.message; |
56 | })); | 57 | })); |
57 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | 58 | const qint64 lastReplayedRevision = getLastReplayedRevision(); |
@@ -61,7 +62,7 @@ bool ChangeReplay::allChangesReplayed() | |||
61 | 62 | ||
62 | void ChangeReplay::recordReplayedRevision(qint64 revision) | 63 | void ChangeReplay::recordReplayedRevision(qint64 revision) |
63 | { | 64 | { |
64 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 65 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [](const Sink::Storage::DataStore::Error &error) { |
65 | SinkWarning() << error.message; | 66 | SinkWarning() << error.message; |
66 | }); | 67 | }); |
67 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | 68 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); |
@@ -74,10 +75,10 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
74 | auto topRevision = QSharedPointer<qint64>::create(0); | 75 | auto topRevision = QSharedPointer<qint64>::create(0); |
75 | return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { | 76 | return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { |
76 | mReplayInProgress = true; | 77 | mReplayInProgress = true; |
77 | mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 78 | mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { |
78 | SinkWarning() << error.message; | 79 | SinkWarning() << error.message; |
79 | }); | 80 | }); |
80 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 81 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { |
81 | SinkWarning() << error.message; | 82 | SinkWarning() << error.message; |
82 | }); | 83 | }); |
83 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | 84 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
@@ -85,8 +86,8 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
85 | *lastReplayedRevision = value.toLongLong(); | 86 | *lastReplayedRevision = value.toLongLong(); |
86 | return false; | 87 | return false; |
87 | }, | 88 | }, |
88 | [](const Storage::Error &) {}); | 89 | [](const DataStore::Error &) {}); |
89 | *topRevision = Storage::maxRevision(mMainStoreTransaction); | 90 | *topRevision = DataStore::maxRevision(mMainStoreTransaction); |
90 | SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; | 91 | SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; |
91 | }) | 92 | }) |
92 | .then(KAsync::dowhile( | 93 | .then(KAsync::dowhile( |
@@ -98,11 +99,11 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
98 | qint64 revision = *lastReplayedRevision + 1; | 99 | qint64 revision = *lastReplayedRevision + 1; |
99 | KAsync::Job<void> replayJob = KAsync::null<void>(); | 100 | KAsync::Job<void> replayJob = KAsync::null<void>(); |
100 | while (revision <= *topRevision) { | 101 | while (revision <= *topRevision) { |
101 | const auto uid = Storage::getUidFromRevision(mMainStoreTransaction, revision); | 102 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); |
102 | const auto type = Storage::getTypeFromRevision(mMainStoreTransaction, revision); | 103 | const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision); |
103 | const auto key = Storage::assembleKey(uid, revision); | 104 | const auto key = DataStore::assembleKey(uid, revision); |
104 | bool exitLoop = false; | 105 | bool exitLoop = false; |
105 | Storage::mainDatabase(mMainStoreTransaction, type) | 106 | DataStore::mainDatabase(mMainStoreTransaction, type) |
106 | .scan(key, | 107 | .scan(key, |
107 | [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { | 108 | [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { |
108 | SinkTrace() << "Replaying " << key; | 109 | SinkTrace() << "Replaying " << key; |
@@ -123,7 +124,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
123 | } | 124 | } |
124 | return false; | 125 | return false; |
125 | }, | 126 | }, |
126 | [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); | 127 | [key](const DataStore::Error &) { SinkError() << "Failed to replay change " << key; }); |
127 | if (exitLoop) { | 128 | if (exitLoop) { |
128 | break; | 129 | break; |
129 | } | 130 | } |