summaryrefslogtreecommitdiffstats
path: root/common/changereplay.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r--common/changereplay.cpp28
1 files changed, 14 insertions, 14 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 0096bd0..4b7d593 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -26,13 +26,12 @@
26 26
27using namespace Sink; 27using namespace Sink;
28 28
29#undef DEBUG_AREA 29SINK_DEBUG_AREA("changereplay");
30#define DEBUG_AREA "resource.changereplay"
31 30
32ChangeReplay::ChangeReplay(const QByteArray &resourceName) 31ChangeReplay::ChangeReplay(const QByteArray &resourceName)
33 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) 32 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false)
34{ 33{
35 Trace() << "Created change replay: " << resourceName; 34 SinkTrace() << "Created change replay: " << resourceName;
36} 35}
37 36
38qint64 ChangeReplay::getLastReplayedRevision() 37qint64 ChangeReplay::getLastReplayedRevision()
@@ -51,10 +50,10 @@ qint64 ChangeReplay::getLastReplayedRevision()
51bool ChangeReplay::allChangesReplayed() 50bool ChangeReplay::allChangesReplayed()
52{ 51{
53 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 52 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
54 Warning() << error.message; 53 SinkWarning() << error.message;
55 })); 54 }));
56 const qint64 lastReplayedRevision = getLastReplayedRevision(); 55 const qint64 lastReplayedRevision = getLastReplayedRevision();
57 Trace() << "All changes replayed " << topRevision << lastReplayedRevision; 56 SinkTrace() << "All changes replayed " << topRevision << lastReplayedRevision;
58 return (lastReplayedRevision >= topRevision); 57 return (lastReplayedRevision >= topRevision);
59} 58}
60 59
@@ -62,10 +61,10 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
62{ 61{
63 mReplayInProgress = true; 62 mReplayInProgress = true;
64 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 63 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
65 Warning() << error.message; 64 SinkWarning() << error.message;
66 }); 65 });
67 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 66 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
68 Warning() << error.message; 67 SinkWarning() << error.message;
69 }); 68 });
70 qint64 lastReplayedRevision = 0; 69 qint64 lastReplayedRevision = 0;
71 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 70 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
@@ -78,14 +77,14 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
78 77
79 auto recordReplayedRevision = [this](qint64 revision) { 78 auto recordReplayedRevision = [this](qint64 revision) {
80 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 79 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
81 Warning() << error.message; 80 SinkWarning() << error.message;
82 }); 81 });
83 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); 82 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
84 replayStoreTransaction.commit(); 83 replayStoreTransaction.commit();
85 }; 84 };
86 85
87 if (lastReplayedRevision < topRevision) { 86 if (lastReplayedRevision < topRevision) {
88 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; 87 SinkTrace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
89 qint64 revision = lastReplayedRevision + 1; 88 qint64 revision = lastReplayedRevision + 1;
90 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); 89 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
91 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); 90 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
@@ -94,25 +93,25 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
94 Storage::mainDatabase(mainStoreTransaction, type) 93 Storage::mainDatabase(mainStoreTransaction, type)
95 .scan(key, 94 .scan(key,
96 [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool { 95 [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool {
97 Trace() << "Replaying " << key; 96 SinkTrace() << "Replaying " << key;
98 replayJob = replay(type, key, value); 97 replayJob = replay(type, key, value);
99 return false; 98 return false;
100 }, 99 },
101 [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); 100 [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; });
102 return replayJob.then<void>([this, revision, recordReplayedRevision]() { 101 return replayJob.then<void>([this, revision, recordReplayedRevision]() {
103 Trace() << "Replayed until " << revision; 102 SinkTrace() << "Replayed until " << revision;
104 recordReplayedRevision(revision); 103 recordReplayedRevision(revision);
105 //replay until we're done 104 //replay until we're done
106 replayNextRevision().exec(); 105 replayNextRevision().exec();
107 }, 106 },
108 [this, revision, recordReplayedRevision](int, QString) { 107 [this, revision, recordReplayedRevision](int, QString) {
109 Trace() << "Change replay failed" << revision; 108 SinkTrace() << "Change replay failed" << revision;
110 //We're probably not online or so, so postpone retrying 109 //We're probably not online or so, so postpone retrying
111 mReplayInProgress = false; 110 mReplayInProgress = false;
112 emit changesReplayed(); 111 emit changesReplayed();
113 }); 112 });
114 } else { 113 } else {
115 Trace() << "No changes to replay"; 114 SinkTrace() << "No changes to replay";
116 mReplayInProgress = false; 115 mReplayInProgress = false;
117 emit changesReplayed(); 116 emit changesReplayed();
118 } 117 }
@@ -122,6 +121,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
122void ChangeReplay::revisionChanged() 121void ChangeReplay::revisionChanged()
123{ 122{
124 if (!mReplayInProgress) { 123 if (!mReplayInProgress) {
124 emit replayingChanges();
125 replayNextRevision().exec(); 125 replayNextRevision().exec();
126 } 126 }
127} 127}