summaryrefslogtreecommitdiffstats
path: root/common/changereplay.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r--common/changereplay.cpp33
1 files changed, 17 insertions, 16 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index e3b7158..6e58564 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -27,31 +27,32 @@
27#include <QTimer> 27#include <QTimer>
28 28
29using namespace Sink; 29using namespace Sink;
30using namespace Sink::Storage;
30 31
31SINK_DEBUG_AREA("changereplay"); 32SINK_DEBUG_AREA("changereplay");
32 33
33ChangeReplay::ChangeReplay(const QByteArray &resourceName) 34ChangeReplay::ChangeReplay(const ResourceContext &resourceContext)
34 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) 35 : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false)
35{ 36{
36 SinkTrace() << "Created change replay: " << resourceName; 37 SinkTrace() << "Created change replay: " << resourceContext.instanceId();
37} 38}
38 39
39qint64 ChangeReplay::getLastReplayedRevision() 40qint64 ChangeReplay::getLastReplayedRevision()
40{ 41{
41 qint64 lastReplayedRevision = 0; 42 qint64 lastReplayedRevision = 0;
42 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); 43 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly);
43 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 44 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
44 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 45 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
45 lastReplayedRevision = value.toLongLong(); 46 lastReplayedRevision = value.toLongLong();
46 return false; 47 return false;
47 }, 48 },
48 [](const Storage::Error &) {}); 49 [](const DataStore::Error &) {});
49 return lastReplayedRevision; 50 return lastReplayedRevision;
50} 51}
51 52
52bool ChangeReplay::allChangesReplayed() 53bool ChangeReplay::allChangesReplayed()
53{ 54{
54 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 55 const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) {
55 SinkWarning() << error.message; 56 SinkWarning() << error.message;
56 })); 57 }));
57 const qint64 lastReplayedRevision = getLastReplayedRevision(); 58 const qint64 lastReplayedRevision = getLastReplayedRevision();
@@ -61,7 +62,7 @@ bool ChangeReplay::allChangesReplayed()
61 62
62void ChangeReplay::recordReplayedRevision(qint64 revision) 63void ChangeReplay::recordReplayedRevision(qint64 revision)
63{ 64{
64 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 65 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [](const Sink::Storage::DataStore::Error &error) {
65 SinkWarning() << error.message; 66 SinkWarning() << error.message;
66 }); 67 });
67 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); 68 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
@@ -74,10 +75,10 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
74 auto topRevision = QSharedPointer<qint64>::create(0); 75 auto topRevision = QSharedPointer<qint64>::create(0);
75 return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { 76 return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() {
76 mReplayInProgress = true; 77 mReplayInProgress = true;
77 mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 78 mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) {
78 SinkWarning() << error.message; 79 SinkWarning() << error.message;
79 }); 80 });
80 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 81 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) {
81 SinkWarning() << error.message; 82 SinkWarning() << error.message;
82 }); 83 });
83 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 84 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
@@ -85,8 +86,8 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
85 *lastReplayedRevision = value.toLongLong(); 86 *lastReplayedRevision = value.toLongLong();
86 return false; 87 return false;
87 }, 88 },
88 [](const Storage::Error &) {}); 89 [](const DataStore::Error &) {});
89 *topRevision = Storage::maxRevision(mMainStoreTransaction); 90 *topRevision = DataStore::maxRevision(mMainStoreTransaction);
90 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; 91 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision;
91 }) 92 })
92 .then(KAsync::dowhile( 93 .then(KAsync::dowhile(
@@ -98,11 +99,11 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
98 qint64 revision = *lastReplayedRevision + 1; 99 qint64 revision = *lastReplayedRevision + 1;
99 KAsync::Job<void> replayJob = KAsync::null<void>(); 100 KAsync::Job<void> replayJob = KAsync::null<void>();
100 while (revision <= *topRevision) { 101 while (revision <= *topRevision) {
101 const auto uid = Storage::getUidFromRevision(mMainStoreTransaction, revision); 102 const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision);
102 const auto type = Storage::getTypeFromRevision(mMainStoreTransaction, revision); 103 const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision);
103 const auto key = Storage::assembleKey(uid, revision); 104 const auto key = DataStore::assembleKey(uid, revision);
104 bool exitLoop = false; 105 bool exitLoop = false;
105 Storage::mainDatabase(mMainStoreTransaction, type) 106 DataStore::mainDatabase(mMainStoreTransaction, type)
106 .scan(key, 107 .scan(key,
107 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { 108 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool {
108 SinkTrace() << "Replaying " << key; 109 SinkTrace() << "Replaying " << key;
@@ -123,7 +124,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
123 } 124 }
124 return false; 125 return false;
125 }, 126 },
126 [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); 127 [key](const DataStore::Error &) { SinkError() << "Failed to replay change " << key; });
127 if (exitLoop) { 128 if (exitLoop) {
128 break; 129 break;
129 } 130 }