diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-07-03 14:02:27 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-07-03 14:02:27 +0200 |
commit | 55fe06979ceebe67553135b43aa47e70d931304b (patch) | |
tree | 16b10a744879cc1872d6c07624b59ae64469ddbf /common/changereplay.cpp | |
parent | 56fae95f49a1ca8ca614bd9f89b0ea5f872765e9 (diff) | |
parent | 288946f1694c2abe1d2c5800c87339d1e8780e4b (diff) | |
download | sink-55fe06979ceebe67553135b43aa47e70d931304b.tar.gz sink-55fe06979ceebe67553135b43aa47e70d931304b.zip |
Merge branch 'develop'
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r-- | common/changereplay.cpp | 57 |
1 files changed, 30 insertions, 27 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 7895b66..0adbd78 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -30,8 +30,7 @@ using namespace Sink; | |||
30 | using namespace Sink::Storage; | 30 | using namespace Sink::Storage; |
31 | 31 | ||
32 | ChangeReplay::ChangeReplay(const ResourceContext &resourceContext, const Sink::Log::Context &ctx) | 32 | ChangeReplay::ChangeReplay(const ResourceContext &resourceContext, const Sink::Log::Context &ctx) |
33 | : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{ctx.subContext("changereplay")}, | 33 | : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{ctx.subContext("changereplay")} |
34 | mGuard{new QObject} | ||
35 | { | 34 | { |
36 | } | 35 | } |
37 | 36 | ||
@@ -84,6 +83,8 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
84 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { | 83 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { |
85 | SinkWarningCtx(mLogCtx) << error.message; | 84 | SinkWarningCtx(mLogCtx) << error.message; |
86 | }); | 85 | }); |
86 | Q_ASSERT(mMainStoreTransaction); | ||
87 | Q_ASSERT(replayStoreTransaction); | ||
87 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | 88 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
88 | [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | 89 | [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { |
89 | *lastReplayedRevision = value.toLongLong(); | 90 | *lastReplayedRevision = value.toLongLong(); |
@@ -98,16 +99,13 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
98 | SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; | 99 | SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; |
99 | return KAsync::doWhile( | 100 | return KAsync::doWhile( |
100 | [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> { | 101 | [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> { |
101 | if (!mGuard) { | ||
102 | SinkTraceCtx(mLogCtx) << "Exit due to guard"; | ||
103 | return KAsync::value(KAsync::Break); | ||
104 | } | ||
105 | if (*lastReplayedRevision >= *topRevision) { | 102 | if (*lastReplayedRevision >= *topRevision) { |
106 | SinkTraceCtx(mLogCtx) << "Done replaying" << *lastReplayedRevision << *topRevision; | 103 | SinkTraceCtx(mLogCtx) << "Done replaying" << *lastReplayedRevision << *topRevision; |
107 | return KAsync::value(KAsync::Break); | 104 | return KAsync::value(KAsync::Break); |
108 | } | 105 | } |
106 | Q_ASSERT(mMainStoreTransaction); | ||
109 | 107 | ||
110 | KAsync::Job<void> replayJob = KAsync::null<void>(); | 108 | auto replayJob = KAsync::null(); |
111 | qint64 revision = *lastReplayedRevision + 1; | 109 | qint64 revision = *lastReplayedRevision + 1; |
112 | while (revision <= *topRevision) { | 110 | while (revision <= *topRevision) { |
113 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); | 111 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); |
@@ -131,14 +129,14 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
131 | if (canReplay(type, key, entityBuffer)) { | 129 | if (canReplay(type, key, entityBuffer)) { |
132 | SinkTraceCtx(mLogCtx) << "Replaying " << key; | 130 | SinkTraceCtx(mLogCtx) << "Replaying " << key; |
133 | replayJob = replay(type, key, entityBuffer); | 131 | replayJob = replay(type, key, entityBuffer); |
132 | //Set the last revision we tried to replay | ||
133 | *lastReplayedRevision = revision; | ||
134 | //Execute replay job and commit | ||
135 | break; | ||
134 | } else { | 136 | } else { |
135 | SinkTraceCtx(mLogCtx) << "Cannot replay " << key; | 137 | SinkTraceCtx(mLogCtx) << "Not replaying " << key; |
136 | //We silently skip over revisions that cannot be replayed, as this is not an error. | 138 | //We silently skip over revisions that cannot be replayed, as this is not an error. |
137 | replayJob = KAsync::null(); | ||
138 | } | 139 | } |
139 | //Set the last revision we tried to replay | ||
140 | *lastReplayedRevision = revision; | ||
141 | break; | ||
142 | } | 140 | } |
143 | } | 141 | } |
144 | //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. | 142 | //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. |
@@ -149,22 +147,22 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
149 | if (error) { | 147 | if (error) { |
150 | SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; | 148 | SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; |
151 | //We're probably not online or so, so postpone retrying | 149 | //We're probably not online or so, so postpone retrying |
152 | return KAsync::value(KAsync::Break); | 150 | return KAsync::value(KAsync::Break).then(KAsync::error<KAsync::ControlFlowFlag>(error)); |
151 | } | ||
152 | SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; | ||
153 | |||
154 | recordReplayedRevision(*lastReplayedRevision); | ||
155 | reportProgress(*lastReplayedRevision, *topRevision); | ||
156 | |||
157 | const bool gotMoreToReplay = (*lastReplayedRevision < *topRevision); | ||
158 | if (gotMoreToReplay) { | ||
159 | SinkTraceCtx(mLogCtx) << "Replaying some more..."; | ||
160 | //Replay more if we have more | ||
161 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); | ||
153 | } else { | 162 | } else { |
154 | SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; | 163 | return KAsync::value(KAsync::Break); |
155 | recordReplayedRevision(*lastReplayedRevision); | ||
156 | if (*lastReplayedRevision < *topRevision) { | ||
157 | SinkTraceCtx(mLogCtx) << "Replaying some more..."; | ||
158 | //Replay more if we have more | ||
159 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); | ||
160 | } else { | ||
161 | return KAsync::value(KAsync::Break); | ||
162 | } | ||
163 | } | 164 | } |
164 | //We shouldn't ever get here | 165 | }).guard(&mGuard); |
165 | Q_ASSERT(false); | ||
166 | return KAsync::value(KAsync::Break); | ||
167 | }); | ||
168 | }); | 166 | }); |
169 | }) | 167 | }) |
170 | .then([this](const KAsync::Error &error) { | 168 | .then([this](const KAsync::Error &error) { |
@@ -181,7 +179,12 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
181 | emit changesReplayed(); | 179 | emit changesReplayed(); |
182 | } | 180 | } |
183 | } | 181 | } |
184 | }); | 182 | if (error) { |
183 | return KAsync::error(error); | ||
184 | } else { | ||
185 | return KAsync::null(); | ||
186 | } | ||
187 | }).guard(&mGuard); | ||
185 | } | 188 | } |
186 | 189 | ||
187 | void ChangeReplay::revisionChanged() | 190 | void ChangeReplay::revisionChanged() |