summaryrefslogtreecommitdiffstats
path: root/common/changereplay.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-16 14:55:20 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-21 09:02:21 +0200
commit237b9ae4113e7a9f489632296941becb71afdb45 (patch)
tree01cde58f495944f01cad9d282391d4efd2897141 /common/changereplay.cpp
parent95d11bf0be98a4e3c08502fe23417b800233ce14 (diff)
downloadsink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz
sink-237b9ae4113e7a9f489632296941becb71afdb45.zip
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage. It does a couple of things: * Rename Sink::Storage to Sink::Storage::DataStore to free up the Sink::Storage namespace * Introduce a Sink::ResourceContext to have a single object that can be passed around containing everything that is necessary to operate on a resource. This is a lot better than the multiple separate parameters that we used to pass around all over the place, while still allowing for dependency injection for tests. * Tie storage access together using the new EntityStore that directly works with ApplicationDomainTypes. This gives us a central place where main storage, indexes and buffer adaptors are tied together, which will also give us a place to implement external indexes, such as a fulltextindex using xapian. * Use ApplicationDomainTypes as the default way to pass around entities. Instead of using various ways to pass around entities (buffers, buffer adaptors, ApplicationDomainTypes), only use a single way. The old approach was confusing, and was only done as: * optimization; really shouldn't be necessary and otherwise I'm sure we can find better ways to optimize ApplicationDomainType itself. * a way to account for entities that have multiple buffers, a concept that I no longer deem relevant. While this commit does the bulk of the work to get there, the following commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r--common/changereplay.cpp33
1 files changed, 17 insertions, 16 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index e3b7158..6e58564 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -27,31 +27,32 @@
27#include <QTimer> 27#include <QTimer>
28 28
29using namespace Sink; 29using namespace Sink;
30using namespace Sink::Storage;
30 31
31SINK_DEBUG_AREA("changereplay"); 32SINK_DEBUG_AREA("changereplay");
32 33
33ChangeReplay::ChangeReplay(const QByteArray &resourceName) 34ChangeReplay::ChangeReplay(const ResourceContext &resourceContext)
34 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) 35 : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false)
35{ 36{
36 SinkTrace() << "Created change replay: " << resourceName; 37 SinkTrace() << "Created change replay: " << resourceContext.instanceId();
37} 38}
38 39
39qint64 ChangeReplay::getLastReplayedRevision() 40qint64 ChangeReplay::getLastReplayedRevision()
40{ 41{
41 qint64 lastReplayedRevision = 0; 42 qint64 lastReplayedRevision = 0;
42 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); 43 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly);
43 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 44 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
44 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 45 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
45 lastReplayedRevision = value.toLongLong(); 46 lastReplayedRevision = value.toLongLong();
46 return false; 47 return false;
47 }, 48 },
48 [](const Storage::Error &) {}); 49 [](const DataStore::Error &) {});
49 return lastReplayedRevision; 50 return lastReplayedRevision;
50} 51}
51 52
52bool ChangeReplay::allChangesReplayed() 53bool ChangeReplay::allChangesReplayed()
53{ 54{
54 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 55 const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) {
55 SinkWarning() << error.message; 56 SinkWarning() << error.message;
56 })); 57 }));
57 const qint64 lastReplayedRevision = getLastReplayedRevision(); 58 const qint64 lastReplayedRevision = getLastReplayedRevision();
@@ -61,7 +62,7 @@ bool ChangeReplay::allChangesReplayed()
61 62
62void ChangeReplay::recordReplayedRevision(qint64 revision) 63void ChangeReplay::recordReplayedRevision(qint64 revision)
63{ 64{
64 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 65 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [](const Sink::Storage::DataStore::Error &error) {
65 SinkWarning() << error.message; 66 SinkWarning() << error.message;
66 }); 67 });
67 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); 68 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
@@ -74,10 +75,10 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
74 auto topRevision = QSharedPointer<qint64>::create(0); 75 auto topRevision = QSharedPointer<qint64>::create(0);
75 return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { 76 return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() {
76 mReplayInProgress = true; 77 mReplayInProgress = true;
77 mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 78 mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) {
78 SinkWarning() << error.message; 79 SinkWarning() << error.message;
79 }); 80 });
80 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 81 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) {
81 SinkWarning() << error.message; 82 SinkWarning() << error.message;
82 }); 83 });
83 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 84 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
@@ -85,8 +86,8 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
85 *lastReplayedRevision = value.toLongLong(); 86 *lastReplayedRevision = value.toLongLong();
86 return false; 87 return false;
87 }, 88 },
88 [](const Storage::Error &) {}); 89 [](const DataStore::Error &) {});
89 *topRevision = Storage::maxRevision(mMainStoreTransaction); 90 *topRevision = DataStore::maxRevision(mMainStoreTransaction);
90 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; 91 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision;
91 }) 92 })
92 .then(KAsync::dowhile( 93 .then(KAsync::dowhile(
@@ -98,11 +99,11 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
98 qint64 revision = *lastReplayedRevision + 1; 99 qint64 revision = *lastReplayedRevision + 1;
99 KAsync::Job<void> replayJob = KAsync::null<void>(); 100 KAsync::Job<void> replayJob = KAsync::null<void>();
100 while (revision <= *topRevision) { 101 while (revision <= *topRevision) {
101 const auto uid = Storage::getUidFromRevision(mMainStoreTransaction, revision); 102 const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision);
102 const auto type = Storage::getTypeFromRevision(mMainStoreTransaction, revision); 103 const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision);
103 const auto key = Storage::assembleKey(uid, revision); 104 const auto key = DataStore::assembleKey(uid, revision);
104 bool exitLoop = false; 105 bool exitLoop = false;
105 Storage::mainDatabase(mMainStoreTransaction, type) 106 DataStore::mainDatabase(mMainStoreTransaction, type)
106 .scan(key, 107 .scan(key,
107 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { 108 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool {
108 SinkTrace() << "Replaying " << key; 109 SinkTrace() << "Replaying " << key;
@@ -123,7 +124,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
123 } 124 }
124 return false; 125 return false;
125 }, 126 },
126 [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); 127 [key](const DataStore::Error &) { SinkError() << "Failed to replay change " << key; });
127 if (exitLoop) { 128 if (exitLoop) {
128 break; 129 break;
129 } 130 }