summaryrefslogtreecommitdiffstats
path: root/common/changereplay.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-07-03 14:02:27 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-07-03 14:02:27 +0200
commit55fe06979ceebe67553135b43aa47e70d931304b (patch)
tree16b10a744879cc1872d6c07624b59ae64469ddbf /common/changereplay.cpp
parent56fae95f49a1ca8ca614bd9f89b0ea5f872765e9 (diff)
parent288946f1694c2abe1d2c5800c87339d1e8780e4b (diff)
downloadsink-55fe06979ceebe67553135b43aa47e70d931304b.tar.gz
sink-55fe06979ceebe67553135b43aa47e70d931304b.zip
Merge branch 'develop'
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r--common/changereplay.cpp57
1 files changed, 30 insertions, 27 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 7895b66..0adbd78 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -30,8 +30,7 @@ using namespace Sink;
30using namespace Sink::Storage; 30using namespace Sink::Storage;
31 31
32ChangeReplay::ChangeReplay(const ResourceContext &resourceContext, const Sink::Log::Context &ctx) 32ChangeReplay::ChangeReplay(const ResourceContext &resourceContext, const Sink::Log::Context &ctx)
33 : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{ctx.subContext("changereplay")}, 33 : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{ctx.subContext("changereplay")}
34 mGuard{new QObject}
35{ 34{
36} 35}
37 36
@@ -84,6 +83,8 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
84 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { 83 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) {
85 SinkWarningCtx(mLogCtx) << error.message; 84 SinkWarningCtx(mLogCtx) << error.message;
86 }); 85 });
86 Q_ASSERT(mMainStoreTransaction);
87 Q_ASSERT(replayStoreTransaction);
87 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 88 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
88 [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 89 [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
89 *lastReplayedRevision = value.toLongLong(); 90 *lastReplayedRevision = value.toLongLong();
@@ -98,16 +99,13 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
98 SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; 99 SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision;
99 return KAsync::doWhile( 100 return KAsync::doWhile(
100 [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> { 101 [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> {
101 if (!mGuard) {
102 SinkTraceCtx(mLogCtx) << "Exit due to guard";
103 return KAsync::value(KAsync::Break);
104 }
105 if (*lastReplayedRevision >= *topRevision) { 102 if (*lastReplayedRevision >= *topRevision) {
106 SinkTraceCtx(mLogCtx) << "Done replaying" << *lastReplayedRevision << *topRevision; 103 SinkTraceCtx(mLogCtx) << "Done replaying" << *lastReplayedRevision << *topRevision;
107 return KAsync::value(KAsync::Break); 104 return KAsync::value(KAsync::Break);
108 } 105 }
106 Q_ASSERT(mMainStoreTransaction);
109 107
110 KAsync::Job<void> replayJob = KAsync::null<void>(); 108 auto replayJob = KAsync::null();
111 qint64 revision = *lastReplayedRevision + 1; 109 qint64 revision = *lastReplayedRevision + 1;
112 while (revision <= *topRevision) { 110 while (revision <= *topRevision) {
113 const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); 111 const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision);
@@ -131,14 +129,14 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
131 if (canReplay(type, key, entityBuffer)) { 129 if (canReplay(type, key, entityBuffer)) {
132 SinkTraceCtx(mLogCtx) << "Replaying " << key; 130 SinkTraceCtx(mLogCtx) << "Replaying " << key;
133 replayJob = replay(type, key, entityBuffer); 131 replayJob = replay(type, key, entityBuffer);
132 //Set the last revision we tried to replay
133 *lastReplayedRevision = revision;
134 //Execute replay job and commit
135 break;
134 } else { 136 } else {
135 SinkTraceCtx(mLogCtx) << "Cannot replay " << key; 137 SinkTraceCtx(mLogCtx) << "Not replaying " << key;
136 //We silently skip over revisions that cannot be replayed, as this is not an error. 138 //We silently skip over revisions that cannot be replayed, as this is not an error.
137 replayJob = KAsync::null();
138 } 139 }
139 //Set the last revision we tried to replay
140 *lastReplayedRevision = revision;
141 break;
142 } 140 }
143 } 141 }
144 //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. 142 //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations.
@@ -149,22 +147,22 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
149 if (error) { 147 if (error) {
150 SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; 148 SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision;
151 //We're probably not online or so, so postpone retrying 149 //We're probably not online or so, so postpone retrying
152 return KAsync::value(KAsync::Break); 150 return KAsync::value(KAsync::Break).then(KAsync::error<KAsync::ControlFlowFlag>(error));
151 }
152 SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision;
153
154 recordReplayedRevision(*lastReplayedRevision);
155 reportProgress(*lastReplayedRevision, *topRevision);
156
157 const bool gotMoreToReplay = (*lastReplayedRevision < *topRevision);
158 if (gotMoreToReplay) {
159 SinkTraceCtx(mLogCtx) << "Replaying some more...";
160 //Replay more if we have more
161 return KAsync::wait(0).then(KAsync::value(KAsync::Continue));
153 } else { 162 } else {
154 SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; 163 return KAsync::value(KAsync::Break);
155 recordReplayedRevision(*lastReplayedRevision);
156 if (*lastReplayedRevision < *topRevision) {
157 SinkTraceCtx(mLogCtx) << "Replaying some more...";
158 //Replay more if we have more
159 return KAsync::wait(0).then(KAsync::value(KAsync::Continue));
160 } else {
161 return KAsync::value(KAsync::Break);
162 }
163 } 164 }
164 //We shouldn't ever get here 165 }).guard(&mGuard);
165 Q_ASSERT(false);
166 return KAsync::value(KAsync::Break);
167 });
168 }); 166 });
169 }) 167 })
170 .then([this](const KAsync::Error &error) { 168 .then([this](const KAsync::Error &error) {
@@ -181,7 +179,12 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
181 emit changesReplayed(); 179 emit changesReplayed();
182 } 180 }
183 } 181 }
184 }); 182 if (error) {
183 return KAsync::error(error);
184 } else {
185 return KAsync::null();
186 }
187 }).guard(&mGuard);
185} 188}
186 189
187void ChangeReplay::revisionChanged() 190void ChangeReplay::revisionChanged()