diff options
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r-- | common/changereplay.cpp | 33 |
1 files changed, 17 insertions, 16 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index e3b7158..6e58564 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -27,31 +27,32 @@ | |||
27 | #include <QTimer> | 27 | #include <QTimer> |
28 | 28 | ||
29 | using namespace Sink; | 29 | using namespace Sink; |
30 | using namespace Sink::Storage; | ||
30 | 31 | ||
31 | SINK_DEBUG_AREA("changereplay"); | 32 | SINK_DEBUG_AREA("changereplay"); |
32 | 33 | ||
33 | ChangeReplay::ChangeReplay(const QByteArray &resourceName) | 34 | ChangeReplay::ChangeReplay(const ResourceContext &resourceContext) |
34 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) | 35 | : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false) |
35 | { | 36 | { |
36 | SinkTrace() << "Created change replay: " << resourceName; | 37 | SinkTrace() << "Created change replay: " << resourceContext.instanceId(); |
37 | } | 38 | } |
38 | 39 | ||
39 | qint64 ChangeReplay::getLastReplayedRevision() | 40 | qint64 ChangeReplay::getLastReplayedRevision() |
40 | { | 41 | { |
41 | qint64 lastReplayedRevision = 0; | 42 | qint64 lastReplayedRevision = 0; |
42 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); | 43 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly); |
43 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | 44 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
44 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | 45 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { |
45 | lastReplayedRevision = value.toLongLong(); | 46 | lastReplayedRevision = value.toLongLong(); |
46 | return false; | 47 | return false; |
47 | }, | 48 | }, |
48 | [](const Storage::Error &) {}); | 49 | [](const DataStore::Error &) {}); |
49 | return lastReplayedRevision; | 50 | return lastReplayedRevision; |
50 | } | 51 | } |
51 | 52 | ||
52 | bool ChangeReplay::allChangesReplayed() | 53 | bool ChangeReplay::allChangesReplayed() |
53 | { | 54 | { |
54 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 55 | const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { |
55 | SinkWarning() << error.message; | 56 | SinkWarning() << error.message; |
56 | })); | 57 | })); |
57 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | 58 | const qint64 lastReplayedRevision = getLastReplayedRevision(); |
@@ -61,7 +62,7 @@ bool ChangeReplay::allChangesReplayed() | |||
61 | 62 | ||
62 | void ChangeReplay::recordReplayedRevision(qint64 revision) | 63 | void ChangeReplay::recordReplayedRevision(qint64 revision) |
63 | { | 64 | { |
64 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 65 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [](const Sink::Storage::DataStore::Error &error) { |
65 | SinkWarning() << error.message; | 66 | SinkWarning() << error.message; |
66 | }); | 67 | }); |
67 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | 68 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); |
@@ -74,10 +75,10 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
74 | auto topRevision = QSharedPointer<qint64>::create(0); | 75 | auto topRevision = QSharedPointer<qint64>::create(0); |
75 | return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { | 76 | return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { |
76 | mReplayInProgress = true; | 77 | mReplayInProgress = true; |
77 | mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 78 | mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { |
78 | SinkWarning() << error.message; | 79 | SinkWarning() << error.message; |
79 | }); | 80 | }); |
80 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 81 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { |
81 | SinkWarning() << error.message; | 82 | SinkWarning() << error.message; |
82 | }); | 83 | }); |
83 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | 84 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
@@ -85,8 +86,8 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
85 | *lastReplayedRevision = value.toLongLong(); | 86 | *lastReplayedRevision = value.toLongLong(); |
86 | return false; | 87 | return false; |
87 | }, | 88 | }, |
88 | [](const Storage::Error &) {}); | 89 | [](const DataStore::Error &) {}); |
89 | *topRevision = Storage::maxRevision(mMainStoreTransaction); | 90 | *topRevision = DataStore::maxRevision(mMainStoreTransaction); |
90 | SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; | 91 | SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; |
91 | }) | 92 | }) |
92 | .then(KAsync::dowhile( | 93 | .then(KAsync::dowhile( |
@@ -98,11 +99,11 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
98 | qint64 revision = *lastReplayedRevision + 1; | 99 | qint64 revision = *lastReplayedRevision + 1; |
99 | KAsync::Job<void> replayJob = KAsync::null<void>(); | 100 | KAsync::Job<void> replayJob = KAsync::null<void>(); |
100 | while (revision <= *topRevision) { | 101 | while (revision <= *topRevision) { |
101 | const auto uid = Storage::getUidFromRevision(mMainStoreTransaction, revision); | 102 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); |
102 | const auto type = Storage::getTypeFromRevision(mMainStoreTransaction, revision); | 103 | const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision); |
103 | const auto key = Storage::assembleKey(uid, revision); | 104 | const auto key = DataStore::assembleKey(uid, revision); |
104 | bool exitLoop = false; | 105 | bool exitLoop = false; |
105 | Storage::mainDatabase(mMainStoreTransaction, type) | 106 | DataStore::mainDatabase(mMainStoreTransaction, type) |
106 | .scan(key, | 107 | .scan(key, |
107 | [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { | 108 | [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { |
108 | SinkTrace() << "Replaying " << key; | 109 | SinkTrace() << "Replaying " << key; |
@@ -123,7 +124,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
123 | } | 124 | } |
124 | return false; | 125 | return false; |
125 | }, | 126 | }, |
126 | [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); | 127 | [key](const DataStore::Error &) { SinkError() << "Failed to replay change " << key; }); |
127 | if (exitLoop) { | 128 | if (exitLoop) { |
128 | break; | 129 | break; |
129 | } | 130 | } |