diff options
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r-- | common/changereplay.cpp | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index da36b3e..75b9a4c 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -105,7 +105,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
105 | } | 105 | } |
106 | Q_ASSERT(mMainStoreTransaction); | 106 | Q_ASSERT(mMainStoreTransaction); |
107 | 107 | ||
108 | KAsync::Job<void> replayJob = KAsync::null<void>(); | 108 | auto replayJob = KAsync::null(); |
109 | qint64 revision = *lastReplayedRevision + 1; | 109 | qint64 revision = *lastReplayedRevision + 1; |
110 | while (revision <= *topRevision) { | 110 | while (revision <= *topRevision) { |
111 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); | 111 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); |
@@ -129,14 +129,14 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
129 | if (canReplay(type, key, entityBuffer)) { | 129 | if (canReplay(type, key, entityBuffer)) { |
130 | SinkTraceCtx(mLogCtx) << "Replaying " << key; | 130 | SinkTraceCtx(mLogCtx) << "Replaying " << key; |
131 | replayJob = replay(type, key, entityBuffer); | 131 | replayJob = replay(type, key, entityBuffer); |
132 | //Set the last revision we tried to replay | ||
133 | *lastReplayedRevision = revision; | ||
134 | //Execute replay job and commit | ||
135 | break; | ||
132 | } else { | 136 | } else { |
133 | SinkTraceCtx(mLogCtx) << "Cannot replay " << key; | 137 | SinkTraceCtx(mLogCtx) << "Not replaying " << key; |
134 | //We silently skip over revisions that cannot be replayed, as this is not an error. | 138 | //We silently skip over revisions that cannot be replayed, as this is not an error. |
135 | replayJob = KAsync::null(); | ||
136 | } | 139 | } |
137 | //Set the last revision we tried to replay | ||
138 | *lastReplayedRevision = revision; | ||
139 | break; | ||
140 | } | 140 | } |
141 | } | 141 | } |
142 | //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. | 142 | //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. |
@@ -148,20 +148,20 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
148 | SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; | 148 | SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; |
149 | //We're probably not online or so, so postpone retrying | 149 | //We're probably not online or so, so postpone retrying |
150 | return KAsync::value(KAsync::Break); | 150 | return KAsync::value(KAsync::Break); |
151 | } | ||
152 | SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; | ||
153 | |||
154 | recordReplayedRevision(*lastReplayedRevision); | ||
155 | reportProgress(*lastReplayedRevision, *topRevision); | ||
156 | |||
157 | const bool gotMoreToReplay = (*lastReplayedRevision < *topRevision); | ||
158 | if (gotMoreToReplay) { | ||
159 | SinkTraceCtx(mLogCtx) << "Replaying some more..."; | ||
160 | //Replay more if we have more | ||
161 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); | ||
151 | } else { | 162 | } else { |
152 | SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; | 163 | return KAsync::value(KAsync::Break); |
153 | recordReplayedRevision(*lastReplayedRevision); | ||
154 | if (*lastReplayedRevision < *topRevision) { | ||
155 | SinkTraceCtx(mLogCtx) << "Replaying some more..."; | ||
156 | //Replay more if we have more | ||
157 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); | ||
158 | } else { | ||
159 | return KAsync::value(KAsync::Break); | ||
160 | } | ||
161 | } | 164 | } |
162 | //We shouldn't ever get here | ||
163 | Q_ASSERT(false); | ||
164 | return KAsync::value(KAsync::Break); | ||
165 | }).guard(&mGuard); | 165 | }).guard(&mGuard); |
166 | }); | 166 | }); |
167 | }) | 167 | }) |