summaryrefslogtreecommitdiffstats
path: root/common/changereplay.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-01-18 15:59:31 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-01-18 15:59:31 +0100
commit62d222f20de7558ebb266efdcadf458e3807e406 (patch)
treeec2f3bbe552bce2e8be8a79d02cf4c63eaa1edeb /common/changereplay.cpp
parent47412280397d66950abd9582048ce8e465fc2844 (diff)
downloadsink-62d222f20de7558ebb266efdcadf458e3807e406.tar.gz
sink-62d222f20de7558ebb266efdcadf458e3807e406.zip
Refactored the changereplay
* use a log context * clearer and simpler control flow * No infinite recursive calling
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r--common/changereplay.cpp132
1 files changed, 72 insertions, 60 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 224fb25..8ed0532 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -29,12 +29,10 @@
29using namespace Sink; 29using namespace Sink;
30using namespace Sink::Storage; 30using namespace Sink::Storage;
31 31
32SINK_DEBUG_AREA("changereplay");
33
34ChangeReplay::ChangeReplay(const ResourceContext &resourceContext) 32ChangeReplay::ChangeReplay(const ResourceContext &resourceContext)
35 : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false) 33 : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{resourceContext.instanceId() + ".changereplay"}
36{ 34{
37 SinkTrace() << "Created change replay: " << resourceContext.instanceId(); 35 SinkTraceCtx(mLogCtx) << "Created change replay: " << resourceContext.instanceId();
38} 36}
39 37
40qint64 ChangeReplay::getLastReplayedRevision() 38qint64 ChangeReplay::getLastReplayedRevision()
@@ -52,18 +50,18 @@ qint64 ChangeReplay::getLastReplayedRevision()
52 50
53bool ChangeReplay::allChangesReplayed() 51bool ChangeReplay::allChangesReplayed()
54{ 52{
55 const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { 53 const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [this](const Sink::Storage::DataStore::Error &error) {
56 SinkWarning() << error.message; 54 SinkWarningCtx(mLogCtx) << error.message;
57 })); 55 }));
58 const qint64 lastReplayedRevision = getLastReplayedRevision(); 56 const qint64 lastReplayedRevision = getLastReplayedRevision();
59 SinkTrace() << "All changes replayed " << topRevision << lastReplayedRevision; 57 SinkTraceCtx(mLogCtx) << "Checking if all replayed. Top revision:" << topRevision << "Last replayed:" << lastReplayedRevision;
60 return (lastReplayedRevision >= topRevision); 58 return (lastReplayedRevision >= topRevision);
61} 59}
62 60
63void ChangeReplay::recordReplayedRevision(qint64 revision) 61void ChangeReplay::recordReplayedRevision(qint64 revision)
64{ 62{
65 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [](const Sink::Storage::DataStore::Error &error) { 63 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [this](const Sink::Storage::DataStore::Error &error) {
66 SinkWarning() << error.message; 64 SinkWarningCtx(mLogCtx) << error.message;
67 }); 65 });
68 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); 66 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
69 replayStoreTransaction.commit(); 67 replayStoreTransaction.commit();
@@ -72,16 +70,20 @@ void ChangeReplay::recordReplayedRevision(qint64 revision)
72KAsync::Job<void> ChangeReplay::replayNextRevision() 70KAsync::Job<void> ChangeReplay::replayNextRevision()
73{ 71{
74 Q_ASSERT(!mReplayInProgress); 72 Q_ASSERT(!mReplayInProgress);
75 auto lastReplayedRevision = QSharedPointer<qint64>::create(0); 73 return KAsync::start<void>([this]() {
76 auto topRevision = QSharedPointer<qint64>::create(0); 74 if (mReplayInProgress) {
77 emit replayingChanges(); 75 SinkErrorCtx(mLogCtx) << "Replay still in progress!!!!!";
78 return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { 76 return KAsync::null<void>();
77 }
78 auto lastReplayedRevision = QSharedPointer<qint64>::create(0);
79 auto topRevision = QSharedPointer<qint64>::create(0);
80 emit replayingChanges();
79 mReplayInProgress = true; 81 mReplayInProgress = true;
80 mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { 82 mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) {
81 SinkWarning() << error.message; 83 SinkWarningCtx(mLogCtx) << error.message;
82 }); 84 });
83 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { 85 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) {
84 SinkWarning() << error.message; 86 SinkWarningCtx(mLogCtx) << error.message;
85 }); 87 });
86 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 88 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
87 [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 89 [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
@@ -90,57 +92,67 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
90 }, 92 },
91 [](const DataStore::Error &) {}); 93 [](const DataStore::Error &) {});
92 *topRevision = DataStore::maxRevision(mMainStoreTransaction); 94 *topRevision = DataStore::maxRevision(mMainStoreTransaction);
93 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; 95 if (*lastReplayedRevision >= *topRevision) {
94 }) 96 SinkTraceCtx(mLogCtx) << "Nothing to replay";
95 .then(KAsync::doWhile( 97 return KAsync::null();
96 [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> { 98 }
99 SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision;
100 return KAsync::doWhile(
101 [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> {
97 if (*lastReplayedRevision >= *topRevision) { 102 if (*lastReplayedRevision >= *topRevision) {
103 SinkTraceCtx(mLogCtx) << "Done replaying";
98 return KAsync::value(KAsync::Break); 104 return KAsync::value(KAsync::Break);
99 } 105 }
100 106
101 qint64 revision = *lastReplayedRevision + 1;
102 KAsync::Job<void> replayJob = KAsync::null<void>(); 107 KAsync::Job<void> replayJob = KAsync::null<void>();
108 qint64 revision = *lastReplayedRevision + 1;
103 while (revision <= *topRevision) { 109 while (revision <= *topRevision) {
104 const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); 110 const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision);
105 const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision); 111 const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision);
106 const auto key = DataStore::assembleKey(uid, revision); 112 if (uid.isEmpty() || type.isEmpty()) {
107 bool exitLoop = false; 113 SinkErrorCtx(mLogCtx) << "Failed to read uid or type for revison: " << revision << uid << type;
108 DataStore::mainDatabase(mMainStoreTransaction, type) 114 } else {
109 .scan(key, 115 const auto key = DataStore::assembleKey(uid, revision);
110 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { 116 QByteArray entityBuffer;
111 SinkTrace() << "Replaying " << key; 117 DataStore::mainDatabase(mMainStoreTransaction, type)
112 if (canReplay(type, key, value)) { 118 .scan(key,
113 replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision](const KAsync::Error &error) { 119 [&entityBuffer](const QByteArray &key, const QByteArray &value) -> bool {
114 if (error) { 120 entityBuffer = value;
115 SinkTrace() << "Change replay failed" << revision; 121 return false;
116 return KAsync::error(error); 122 },
117 } else { 123 [this, key](const DataStore::Error &) { SinkErrorCtx(mLogCtx) << "Failed to read the entity buffer " << key; });
118 recordReplayedRevision(revision); 124
119 *lastReplayedRevision = revision; 125 if (entityBuffer.isEmpty()) {
120 } 126 SinkErrorCtx(mLogCtx) << "Failed to replay change " << key;
121 return KAsync::null(); 127 } else {
122 }); 128 if (canReplay(type, key, entityBuffer)) {
123 exitLoop = true; 129 SinkTraceCtx(mLogCtx) << "Replaying " << key;
124 } else { 130 replayJob = replay(type, key, entityBuffer);
125 *lastReplayedRevision = revision; 131 } else {
126 } 132 SinkTraceCtx(mLogCtx) << "Cannot replay " << key;
127 return false; 133 //We silently skip over revisions that cannot be replayed, as this is not an error.
128 }, 134 replayJob = KAsync::null();
129 [key](const DataStore::Error &) { SinkError() << "Failed to replay change " << key; }); 135 }
130 if (exitLoop) { 136 //Set the last revision we tried to replay
131 break; 137 *lastReplayedRevision = revision;
138 break;
139 }
132 } 140 }
141 //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations.
142 *lastReplayedRevision = revision;
133 revision++; 143 revision++;
134 } 144 }
135 return replayJob.then<KAsync::ControlFlowFlag>([this, revision, lastReplayedRevision, topRevision](const KAsync::Error &error) ->KAsync::Job<KAsync::ControlFlowFlag> { 145 return replayJob.then([=](const KAsync::Error &error) {
136 if (error) { 146 if (error) {
137 SinkTrace() << "Change replay failed" << revision; 147 SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision;
138 //We're probably not online or so, so postpone retrying 148 //We're probably not online or so, so postpone retrying
139 return KAsync::value(KAsync::Break); 149 return KAsync::value(KAsync::Break);
140 } else { 150 } else {
141 SinkTrace() << "Replayed until " << revision; 151 SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision;
142 recordReplayedRevision(*lastReplayedRevision); 152 recordReplayedRevision(*lastReplayedRevision);
143 if (*lastReplayedRevision < *topRevision) { 153 if (*lastReplayedRevision < *topRevision) {
154 SinkTraceCtx(mLogCtx) << "Replaying some more...";
155 //Replay more if we have more
144 return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); 156 return KAsync::wait(0).then(KAsync::value(KAsync::Continue));
145 } else { 157 } else {
146 return KAsync::value(KAsync::Break); 158 return KAsync::value(KAsync::Break);
@@ -150,21 +162,21 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
150 Q_ASSERT(false); 162 Q_ASSERT(false);
151 return KAsync::value(KAsync::Break); 163 return KAsync::value(KAsync::Break);
152 }); 164 });
153 })) 165 });
154 .then([this, lastReplayedRevision]() { 166 })
155 recordReplayedRevision(*lastReplayedRevision); 167 .then([this](const KAsync::Error &error) {
168 SinkTraceCtx(mLogCtx) << "Change replay complete.";
169 if (error) {
170 SinkWarningCtx(mLogCtx) << "Error during change replay: " << error;
171 }
156 mMainStoreTransaction.abort(); 172 mMainStoreTransaction.abort();
173 mReplayInProgress = false;
157 if (ChangeReplay::allChangesReplayed()) { 174 if (ChangeReplay::allChangesReplayed()) {
158 mReplayInProgress = false;
159 //In case we have a derived implementation 175 //In case we have a derived implementation
160 if (allChangesReplayed()) { 176 if (allChangesReplayed()) {
177 SinkTraceCtx(mLogCtx) << "All changes replayed";
161 emit changesReplayed(); 178 emit changesReplayed();
162 } 179 }
163 } else {
164 QTimer::singleShot(0, [this]() {
165 mReplayInProgress = false;
166 replayNextRevision().exec();
167 });
168 } 180 }
169 }); 181 });
170} 182}