diff options
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r-- | common/changereplay.cpp | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 0096bd0..4b7d593 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -26,13 +26,12 @@ | |||
26 | 26 | ||
27 | using namespace Sink; | 27 | using namespace Sink; |
28 | 28 | ||
29 | #undef DEBUG_AREA | 29 | SINK_DEBUG_AREA("changereplay"); |
30 | #define DEBUG_AREA "resource.changereplay" | ||
31 | 30 | ||
32 | ChangeReplay::ChangeReplay(const QByteArray &resourceName) | 31 | ChangeReplay::ChangeReplay(const QByteArray &resourceName) |
33 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) | 32 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) |
34 | { | 33 | { |
35 | Trace() << "Created change replay: " << resourceName; | 34 | SinkTrace() << "Created change replay: " << resourceName; |
36 | } | 35 | } |
37 | 36 | ||
38 | qint64 ChangeReplay::getLastReplayedRevision() | 37 | qint64 ChangeReplay::getLastReplayedRevision() |
@@ -51,10 +50,10 @@ qint64 ChangeReplay::getLastReplayedRevision() | |||
51 | bool ChangeReplay::allChangesReplayed() | 50 | bool ChangeReplay::allChangesReplayed() |
52 | { | 51 | { |
53 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 52 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
54 | Warning() << error.message; | 53 | SinkWarning() << error.message; |
55 | })); | 54 | })); |
56 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | 55 | const qint64 lastReplayedRevision = getLastReplayedRevision(); |
57 | Trace() << "All changes replayed " << topRevision << lastReplayedRevision; | 56 | SinkTrace() << "All changes replayed " << topRevision << lastReplayedRevision; |
58 | return (lastReplayedRevision >= topRevision); | 57 | return (lastReplayedRevision >= topRevision); |
59 | } | 58 | } |
60 | 59 | ||
@@ -62,10 +61,10 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
62 | { | 61 | { |
63 | mReplayInProgress = true; | 62 | mReplayInProgress = true; |
64 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 63 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
65 | Warning() << error.message; | 64 | SinkWarning() << error.message; |
66 | }); | 65 | }); |
67 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 66 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
68 | Warning() << error.message; | 67 | SinkWarning() << error.message; |
69 | }); | 68 | }); |
70 | qint64 lastReplayedRevision = 0; | 69 | qint64 lastReplayedRevision = 0; |
71 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | 70 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
@@ -78,14 +77,14 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
78 | 77 | ||
79 | auto recordReplayedRevision = [this](qint64 revision) { | 78 | auto recordReplayedRevision = [this](qint64 revision) { |
80 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 79 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { |
81 | Warning() << error.message; | 80 | SinkWarning() << error.message; |
82 | }); | 81 | }); |
83 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | 82 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); |
84 | replayStoreTransaction.commit(); | 83 | replayStoreTransaction.commit(); |
85 | }; | 84 | }; |
86 | 85 | ||
87 | if (lastReplayedRevision < topRevision) { | 86 | if (lastReplayedRevision < topRevision) { |
88 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | 87 | SinkTrace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; |
89 | qint64 revision = lastReplayedRevision + 1; | 88 | qint64 revision = lastReplayedRevision + 1; |
90 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | 89 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); |
91 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | 90 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); |
@@ -94,25 +93,25 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
94 | Storage::mainDatabase(mainStoreTransaction, type) | 93 | Storage::mainDatabase(mainStoreTransaction, type) |
95 | .scan(key, | 94 | .scan(key, |
96 | [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool { | 95 | [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool { |
97 | Trace() << "Replaying " << key; | 96 | SinkTrace() << "Replaying " << key; |
98 | replayJob = replay(type, key, value); | 97 | replayJob = replay(type, key, value); |
99 | return false; | 98 | return false; |
100 | }, | 99 | }, |
101 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); | 100 | [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); |
102 | return replayJob.then<void>([this, revision, recordReplayedRevision]() { | 101 | return replayJob.then<void>([this, revision, recordReplayedRevision]() { |
103 | Trace() << "Replayed until " << revision; | 102 | SinkTrace() << "Replayed until " << revision; |
104 | recordReplayedRevision(revision); | 103 | recordReplayedRevision(revision); |
105 | //replay until we're done | 104 | //replay until we're done |
106 | replayNextRevision().exec(); | 105 | replayNextRevision().exec(); |
107 | }, | 106 | }, |
108 | [this, revision, recordReplayedRevision](int, QString) { | 107 | [this, revision, recordReplayedRevision](int, QString) { |
109 | Trace() << "Change replay failed" << revision; | 108 | SinkTrace() << "Change replay failed" << revision; |
110 | //We're probably not online or so, so postpone retrying | 109 | //We're probably not online or so, so postpone retrying |
111 | mReplayInProgress = false; | 110 | mReplayInProgress = false; |
112 | emit changesReplayed(); | 111 | emit changesReplayed(); |
113 | }); | 112 | }); |
114 | } else { | 113 | } else { |
115 | Trace() << "No changes to replay"; | 114 | SinkTrace() << "No changes to replay"; |
116 | mReplayInProgress = false; | 115 | mReplayInProgress = false; |
117 | emit changesReplayed(); | 116 | emit changesReplayed(); |
118 | } | 117 | } |
@@ -122,6 +121,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
122 | void ChangeReplay::revisionChanged() | 121 | void ChangeReplay::revisionChanged() |
123 | { | 122 | { |
124 | if (!mReplayInProgress) { | 123 | if (!mReplayInProgress) { |
124 | emit replayingChanges(); | ||
125 | replayNextRevision().exec(); | 125 | replayNextRevision().exec(); |
126 | } | 126 | } |
127 | } | 127 | } |