summaryrefslogtreecommitdiffstats
path: root/common/changereplay.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r--common/changereplay.cpp27
1 files changed, 13 insertions, 14 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 78c0ff5..99bbaab 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -26,13 +26,12 @@
26 26
27using namespace Sink; 27using namespace Sink;
28 28
29#undef DEBUG_AREA 29SINK_DEBUG_AREA("changereplay");
30#define DEBUG_AREA "resource.changereplay"
31 30
32ChangeReplay::ChangeReplay(const QByteArray &resourceName) 31ChangeReplay::ChangeReplay(const QByteArray &resourceName)
33 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) 32 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false)
34{ 33{
35 Trace() << "Created change replay: " << resourceName; 34 SinkTrace() << "Created change replay: " << resourceName;
36} 35}
37 36
38qint64 ChangeReplay::getLastReplayedRevision() 37qint64 ChangeReplay::getLastReplayedRevision()
@@ -51,10 +50,10 @@ qint64 ChangeReplay::getLastReplayedRevision()
51bool ChangeReplay::allChangesReplayed() 50bool ChangeReplay::allChangesReplayed()
52{ 51{
53 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 52 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
54 Warning() << error.message; 53 SinkWarning() << error.message;
55 })); 54 }));
56 const qint64 lastReplayedRevision = getLastReplayedRevision(); 55 const qint64 lastReplayedRevision = getLastReplayedRevision();
57 Trace() << "All changes replayed " << topRevision << lastReplayedRevision; 56 SinkTrace() << "All changes replayed " << topRevision << lastReplayedRevision;
58 return (lastReplayedRevision >= topRevision); 57 return (lastReplayedRevision >= topRevision);
59} 58}
60 59
@@ -62,10 +61,10 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
62{ 61{
63 mReplayInProgress = true; 62 mReplayInProgress = true;
64 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 63 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
65 Warning() << error.message; 64 SinkWarning() << error.message;
66 }); 65 });
67 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 66 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
68 Warning() << error.message; 67 SinkWarning() << error.message;
69 }); 68 });
70 qint64 lastReplayedRevision = 0; 69 qint64 lastReplayedRevision = 0;
71 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 70 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
@@ -78,14 +77,14 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
78 77
79 auto recordReplayedRevision = [this](qint64 revision) { 78 auto recordReplayedRevision = [this](qint64 revision) {
80 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 79 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
81 Warning() << error.message; 80 SinkWarning() << error.message;
82 }); 81 });
83 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); 82 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
84 replayStoreTransaction.commit(); 83 replayStoreTransaction.commit();
85 }; 84 };
86 85
87 if (lastReplayedRevision < topRevision) { 86 if (lastReplayedRevision < topRevision) {
88 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; 87 SinkTrace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
89 emit replayingChanges(); 88 emit replayingChanges();
90 qint64 revision = lastReplayedRevision + 1; 89 qint64 revision = lastReplayedRevision + 1;
91 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); 90 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
@@ -95,25 +94,25 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
95 Storage::mainDatabase(mainStoreTransaction, type) 94 Storage::mainDatabase(mainStoreTransaction, type)
96 .scan(key, 95 .scan(key,
97 [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool { 96 [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool {
98 Trace() << "Replaying " << key; 97 SinkTrace() << "Replaying " << key;
99 replayJob = replay(type, key, value); 98 replayJob = replay(type, key, value);
100 return false; 99 return false;
101 }, 100 },
102 [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); 101 [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; });
103 return replayJob.then<void>([this, revision, recordReplayedRevision]() { 102 return replayJob.then<void>([this, revision, recordReplayedRevision]() {
104 Trace() << "Replayed until " << revision; 103 SinkTrace() << "Replayed until " << revision;
105 recordReplayedRevision(revision); 104 recordReplayedRevision(revision);
106 //replay until we're done 105 //replay until we're done
107 replayNextRevision().exec(); 106 replayNextRevision().exec();
108 }, 107 },
109 [this, revision, recordReplayedRevision](int, QString) { 108 [this, revision, recordReplayedRevision](int, QString) {
110 Trace() << "Change replay failed" << revision; 109 SinkTrace() << "Change replay failed" << revision;
111 //We're probably not online or so, so postpone retrying 110 //We're probably not online or so, so postpone retrying
112 mReplayInProgress = false; 111 mReplayInProgress = false;
113 emit changesReplayed(); 112 emit changesReplayed();
114 }); 113 });
115 } else { 114 } else {
116 Trace() << "No changes to replay"; 115 SinkTrace() << "No changes to replay";
117 mReplayInProgress = false; 116 mReplayInProgress = false;
118 emit changesReplayed(); 117 emit changesReplayed();
119 } 118 }