diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-05-12 18:57:07 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-05-12 18:57:07 +0200 |
commit | 256fe3fc561f1690e5c29640b9081e805ceb5532 (patch) | |
tree | 5bf46688b732b534eb30cec9620393f8c6eb487f /common/changereplay.cpp | |
parent | 58b9fa88198eecc224597e52d8bbd7f833fca63b (diff) | |
download | sink-256fe3fc561f1690e5c29640b9081e805ceb5532.tar.gz sink-256fe3fc561f1690e5c29640b9081e805ceb5532.zip |
Don't commit after every replayed revision
If we didn't actually do anything we just carry on.
Failing to commit is harmless in that case and committing for every
revision is rather expensive.
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r-- | common/changereplay.cpp | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index da36b3e..75b9a4c 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -105,7 +105,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
105 | } | 105 | } |
106 | Q_ASSERT(mMainStoreTransaction); | 106 | Q_ASSERT(mMainStoreTransaction); |
107 | 107 | ||
108 | KAsync::Job<void> replayJob = KAsync::null<void>(); | 108 | auto replayJob = KAsync::null(); |
109 | qint64 revision = *lastReplayedRevision + 1; | 109 | qint64 revision = *lastReplayedRevision + 1; |
110 | while (revision <= *topRevision) { | 110 | while (revision <= *topRevision) { |
111 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); | 111 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); |
@@ -129,14 +129,14 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
129 | if (canReplay(type, key, entityBuffer)) { | 129 | if (canReplay(type, key, entityBuffer)) { |
130 | SinkTraceCtx(mLogCtx) << "Replaying " << key; | 130 | SinkTraceCtx(mLogCtx) << "Replaying " << key; |
131 | replayJob = replay(type, key, entityBuffer); | 131 | replayJob = replay(type, key, entityBuffer); |
132 | //Set the last revision we tried to replay | ||
133 | *lastReplayedRevision = revision; | ||
134 | //Execute replay job and commit | ||
135 | break; | ||
132 | } else { | 136 | } else { |
133 | SinkTraceCtx(mLogCtx) << "Cannot replay " << key; | 137 | SinkTraceCtx(mLogCtx) << "Not replaying " << key; |
134 | //We silently skip over revisions that cannot be replayed, as this is not an error. | 138 | //We silently skip over revisions that cannot be replayed, as this is not an error. |
135 | replayJob = KAsync::null(); | ||
136 | } | 139 | } |
137 | //Set the last revision we tried to replay | ||
138 | *lastReplayedRevision = revision; | ||
139 | break; | ||
140 | } | 140 | } |
141 | } | 141 | } |
142 | //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. | 142 | //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. |
@@ -148,20 +148,20 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
148 | SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; | 148 | SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; |
149 | //We're probably not online or so, so postpone retrying | 149 | //We're probably not online or so, so postpone retrying |
150 | return KAsync::value(KAsync::Break); | 150 | return KAsync::value(KAsync::Break); |
151 | } | ||
152 | SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; | ||
153 | |||
154 | recordReplayedRevision(*lastReplayedRevision); | ||
155 | reportProgress(*lastReplayedRevision, *topRevision); | ||
156 | |||
157 | const bool gotMoreToReplay = (*lastReplayedRevision < *topRevision); | ||
158 | if (gotMoreToReplay) { | ||
159 | SinkTraceCtx(mLogCtx) << "Replaying some more..."; | ||
160 | //Replay more if we have more | ||
161 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); | ||
151 | } else { | 162 | } else { |
152 | SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; | 163 | return KAsync::value(KAsync::Break); |
153 | recordReplayedRevision(*lastReplayedRevision); | ||
154 | if (*lastReplayedRevision < *topRevision) { | ||
155 | SinkTraceCtx(mLogCtx) << "Replaying some more..."; | ||
156 | //Replay more if we have more | ||
157 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); | ||
158 | } else { | ||
159 | return KAsync::value(KAsync::Break); | ||
160 | } | ||
161 | } | 164 | } |
162 | //We shouldn't ever get here | ||
163 | Q_ASSERT(false); | ||
164 | return KAsync::value(KAsync::Break); | ||
165 | }).guard(&mGuard); | 165 | }).guard(&mGuard); |
166 | }); | 166 | }); |
167 | }) | 167 | }) |