diff options
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r-- | common/changereplay.cpp | 27 |
1 files changed, 13 insertions, 14 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 78c0ff5..99bbaab 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -26,13 +26,12 @@ | |||
26 | 26 | ||
27 | using namespace Sink; | 27 | using namespace Sink; |
28 | 28 | ||
29 | #undef DEBUG_AREA | 29 | SINK_DEBUG_AREA("changereplay"); |
30 | #define DEBUG_AREA "resource.changereplay" | ||
31 | 30 | ||
32 | ChangeReplay::ChangeReplay(const QByteArray &resourceName) | 31 | ChangeReplay::ChangeReplay(const QByteArray &resourceName) |
33 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) | 32 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) |
34 | { | 33 | { |
35 | Trace() << "Created change replay: " << resourceName; | 34 | SinkTrace() << "Created change replay: " << resourceName; |
36 | } | 35 | } |
37 | 36 | ||
38 | qint64 ChangeReplay::getLastReplayedRevision() | 37 | qint64 ChangeReplay::getLastReplayedRevision() |
@@ -51,10 +50,10 @@ qint64 ChangeReplay::getLastReplayedRevision() | |||
51 | bool ChangeReplay::allChangesReplayed() | 50 | bool ChangeReplay::allChangesReplayed() |
52 | { | 51 | { |
53 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 52 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
54 | Warning() << error.message; | 53 | SinkWarning() << error.message; |
55 | })); | 54 | })); |
56 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | 55 | const qint64 lastReplayedRevision = getLastReplayedRevision(); |
57 | Trace() << "All changes replayed " << topRevision << lastReplayedRevision; | 56 | SinkTrace() << "All changes replayed " << topRevision << lastReplayedRevision; |
58 | return (lastReplayedRevision >= topRevision); | 57 | return (lastReplayedRevision >= topRevision); |
59 | } | 58 | } |
60 | 59 | ||
@@ -62,10 +61,10 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
62 | { | 61 | { |
63 | mReplayInProgress = true; | 62 | mReplayInProgress = true; |
64 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 63 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
65 | Warning() << error.message; | 64 | SinkWarning() << error.message; |
66 | }); | 65 | }); |
67 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 66 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
68 | Warning() << error.message; | 67 | SinkWarning() << error.message; |
69 | }); | 68 | }); |
70 | qint64 lastReplayedRevision = 0; | 69 | qint64 lastReplayedRevision = 0; |
71 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | 70 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
@@ -78,14 +77,14 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
78 | 77 | ||
79 | auto recordReplayedRevision = [this](qint64 revision) { | 78 | auto recordReplayedRevision = [this](qint64 revision) { |
80 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 79 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { |
81 | Warning() << error.message; | 80 | SinkWarning() << error.message; |
82 | }); | 81 | }); |
83 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | 82 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); |
84 | replayStoreTransaction.commit(); | 83 | replayStoreTransaction.commit(); |
85 | }; | 84 | }; |
86 | 85 | ||
87 | if (lastReplayedRevision < topRevision) { | 86 | if (lastReplayedRevision < topRevision) { |
88 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | 87 | SinkTrace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; |
89 | emit replayingChanges(); | 88 | emit replayingChanges(); |
90 | qint64 revision = lastReplayedRevision + 1; | 89 | qint64 revision = lastReplayedRevision + 1; |
91 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | 90 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); |
@@ -95,25 +94,25 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
95 | Storage::mainDatabase(mainStoreTransaction, type) | 94 | Storage::mainDatabase(mainStoreTransaction, type) |
96 | .scan(key, | 95 | .scan(key, |
97 | [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool { | 96 | [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool { |
98 | Trace() << "Replaying " << key; | 97 | SinkTrace() << "Replaying " << key; |
99 | replayJob = replay(type, key, value); | 98 | replayJob = replay(type, key, value); |
100 | return false; | 99 | return false; |
101 | }, | 100 | }, |
102 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); | 101 | [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); |
103 | return replayJob.then<void>([this, revision, recordReplayedRevision]() { | 102 | return replayJob.then<void>([this, revision, recordReplayedRevision]() { |
104 | Trace() << "Replayed until " << revision; | 103 | SinkTrace() << "Replayed until " << revision; |
105 | recordReplayedRevision(revision); | 104 | recordReplayedRevision(revision); |
106 | //replay until we're done | 105 | //replay until we're done |
107 | replayNextRevision().exec(); | 106 | replayNextRevision().exec(); |
108 | }, | 107 | }, |
109 | [this, revision, recordReplayedRevision](int, QString) { | 108 | [this, revision, recordReplayedRevision](int, QString) { |
110 | Trace() << "Change replay failed" << revision; | 109 | SinkTrace() << "Change replay failed" << revision; |
111 | //We're probably not online or so, so postpone retrying | 110 | //We're probably not online or so, so postpone retrying |
112 | mReplayInProgress = false; | 111 | mReplayInProgress = false; |
113 | emit changesReplayed(); | 112 | emit changesReplayed(); |
114 | }); | 113 | }); |
115 | } else { | 114 | } else { |
116 | Trace() << "No changes to replay"; | 115 | SinkTrace() << "No changes to replay"; |
117 | mReplayInProgress = false; | 116 | mReplayInProgress = false; |
118 | emit changesReplayed(); | 117 | emit changesReplayed(); |
119 | } | 118 | } |