summaryrefslogtreecommitdiffstats
path: root/common/changereplay.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-05-12 18:57:07 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-05-12 18:57:07 +0200
commit256fe3fc561f1690e5c29640b9081e805ceb5532 (patch)
tree5bf46688b732b534eb30cec9620393f8c6eb487f /common/changereplay.cpp
parent58b9fa88198eecc224597e52d8bbd7f833fca63b (diff)
downloadsink-256fe3fc561f1690e5c29640b9081e805ceb5532.tar.gz
sink-256fe3fc561f1690e5c29640b9081e805ceb5532.zip
Don't commit after every replayed revision
If we didn't actually do anything we just carry on. Failing to commit is harmless in that case and committing for every revision is rather expensive.
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r--common/changereplay.cpp36
1 files changed, 18 insertions, 18 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index da36b3e..75b9a4c 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -105,7 +105,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
105 } 105 }
106 Q_ASSERT(mMainStoreTransaction); 106 Q_ASSERT(mMainStoreTransaction);
107 107
108 KAsync::Job<void> replayJob = KAsync::null<void>(); 108 auto replayJob = KAsync::null();
109 qint64 revision = *lastReplayedRevision + 1; 109 qint64 revision = *lastReplayedRevision + 1;
110 while (revision <= *topRevision) { 110 while (revision <= *topRevision) {
111 const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); 111 const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision);
@@ -129,14 +129,14 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
129 if (canReplay(type, key, entityBuffer)) { 129 if (canReplay(type, key, entityBuffer)) {
130 SinkTraceCtx(mLogCtx) << "Replaying " << key; 130 SinkTraceCtx(mLogCtx) << "Replaying " << key;
131 replayJob = replay(type, key, entityBuffer); 131 replayJob = replay(type, key, entityBuffer);
132 //Set the last revision we tried to replay
133 *lastReplayedRevision = revision;
134 //Execute replay job and commit
135 break;
132 } else { 136 } else {
133 SinkTraceCtx(mLogCtx) << "Cannot replay " << key; 137 SinkTraceCtx(mLogCtx) << "Not replaying " << key;
134 //We silently skip over revisions that cannot be replayed, as this is not an error. 138 //We silently skip over revisions that cannot be replayed, as this is not an error.
135 replayJob = KAsync::null();
136 } 139 }
137 //Set the last revision we tried to replay
138 *lastReplayedRevision = revision;
139 break;
140 } 140 }
141 } 141 }
142 //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. 142 //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations.
@@ -148,20 +148,20 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
148 SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; 148 SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision;
149 //We're probably not online or so, so postpone retrying 149 //We're probably not online or so, so postpone retrying
150 return KAsync::value(KAsync::Break); 150 return KAsync::value(KAsync::Break);
151 }
152 SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision;
153
154 recordReplayedRevision(*lastReplayedRevision);
155 reportProgress(*lastReplayedRevision, *topRevision);
156
157 const bool gotMoreToReplay = (*lastReplayedRevision < *topRevision);
158 if (gotMoreToReplay) {
159 SinkTraceCtx(mLogCtx) << "Replaying some more...";
160 //Replay more if we have more
161 return KAsync::wait(0).then(KAsync::value(KAsync::Continue));
151 } else { 162 } else {
152 SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; 163 return KAsync::value(KAsync::Break);
153 recordReplayedRevision(*lastReplayedRevision);
154 if (*lastReplayedRevision < *topRevision) {
155 SinkTraceCtx(mLogCtx) << "Replaying some more...";
156 //Replay more if we have more
157 return KAsync::wait(0).then(KAsync::value(KAsync::Continue));
158 } else {
159 return KAsync::value(KAsync::Break);
160 }
161 } 164 }
162 //We shouldn't ever get here
163 Q_ASSERT(false);
164 return KAsync::value(KAsync::Break);
165 }).guard(&mGuard); 165 }).guard(&mGuard);
166 }); 166 });
167 }) 167 })