diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-05-12 18:57:07 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-05-12 18:57:07 +0200 |
commit | 256fe3fc561f1690e5c29640b9081e805ceb5532 (patch) | |
tree | 5bf46688b732b534eb30cec9620393f8c6eb487f | |
parent | 58b9fa88198eecc224597e52d8bbd7f833fca63b (diff) | |
download | sink-256fe3fc561f1690e5c29640b9081e805ceb5532.tar.gz sink-256fe3fc561f1690e5c29640b9081e805ceb5532.zip |
Don't commit after every replayed revision
If we didn't actually do anything we just carry on.
Failing to commit is harmless in that case and committing for every
revision is rather expensive.
-rw-r--r-- | common/changereplay.cpp | 36 | ||||
-rw-r--r-- | common/changereplay.h | 1 | ||||
-rw-r--r-- | common/synchronizer.h | 2 |
3 files changed, 20 insertions, 19 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index da36b3e..75b9a4c 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -105,7 +105,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
105 | } | 105 | } |
106 | Q_ASSERT(mMainStoreTransaction); | 106 | Q_ASSERT(mMainStoreTransaction); |
107 | 107 | ||
108 | KAsync::Job<void> replayJob = KAsync::null<void>(); | 108 | auto replayJob = KAsync::null(); |
109 | qint64 revision = *lastReplayedRevision + 1; | 109 | qint64 revision = *lastReplayedRevision + 1; |
110 | while (revision <= *topRevision) { | 110 | while (revision <= *topRevision) { |
111 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); | 111 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); |
@@ -129,14 +129,14 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
129 | if (canReplay(type, key, entityBuffer)) { | 129 | if (canReplay(type, key, entityBuffer)) { |
130 | SinkTraceCtx(mLogCtx) << "Replaying " << key; | 130 | SinkTraceCtx(mLogCtx) << "Replaying " << key; |
131 | replayJob = replay(type, key, entityBuffer); | 131 | replayJob = replay(type, key, entityBuffer); |
132 | //Set the last revision we tried to replay | ||
133 | *lastReplayedRevision = revision; | ||
134 | //Execute replay job and commit | ||
135 | break; | ||
132 | } else { | 136 | } else { |
133 | SinkTraceCtx(mLogCtx) << "Cannot replay " << key; | 137 | SinkTraceCtx(mLogCtx) << "Not replaying " << key; |
134 | //We silently skip over revisions that cannot be replayed, as this is not an error. | 138 | //We silently skip over revisions that cannot be replayed, as this is not an error. |
135 | replayJob = KAsync::null(); | ||
136 | } | 139 | } |
137 | //Set the last revision we tried to replay | ||
138 | *lastReplayedRevision = revision; | ||
139 | break; | ||
140 | } | 140 | } |
141 | } | 141 | } |
142 | //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. | 142 | //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. |
@@ -148,20 +148,20 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
148 | SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; | 148 | SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; |
149 | //We're probably not online or so, so postpone retrying | 149 | //We're probably not online or so, so postpone retrying |
150 | return KAsync::value(KAsync::Break); | 150 | return KAsync::value(KAsync::Break); |
151 | } | ||
152 | SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; | ||
153 | |||
154 | recordReplayedRevision(*lastReplayedRevision); | ||
155 | reportProgress(*lastReplayedRevision, *topRevision); | ||
156 | |||
157 | const bool gotMoreToReplay = (*lastReplayedRevision < *topRevision); | ||
158 | if (gotMoreToReplay) { | ||
159 | SinkTraceCtx(mLogCtx) << "Replaying some more..."; | ||
160 | //Replay more if we have more | ||
161 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); | ||
151 | } else { | 162 | } else { |
152 | SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; | 163 | return KAsync::value(KAsync::Break); |
153 | recordReplayedRevision(*lastReplayedRevision); | ||
154 | if (*lastReplayedRevision < *topRevision) { | ||
155 | SinkTraceCtx(mLogCtx) << "Replaying some more..."; | ||
156 | //Replay more if we have more | ||
157 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); | ||
158 | } else { | ||
159 | return KAsync::value(KAsync::Break); | ||
160 | } | ||
161 | } | 164 | } |
162 | //We shouldn't ever get here | ||
163 | Q_ASSERT(false); | ||
164 | return KAsync::value(KAsync::Break); | ||
165 | }).guard(&mGuard); | 165 | }).guard(&mGuard); |
166 | }); | 166 | }); |
167 | }) | 167 | }) |
diff --git a/common/changereplay.h b/common/changereplay.h index ab2d857..c509735 100644 --- a/common/changereplay.h +++ b/common/changereplay.h | |||
@@ -54,6 +54,7 @@ public slots: | |||
54 | protected: | 54 | protected: |
55 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; | 55 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; |
56 | virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; | 56 | virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; |
57 | virtual void reportProgress(int progress, int total){}; | ||
57 | Sink::Storage::DataStore mStorage; | 58 | Sink::Storage::DataStore mStorage; |
58 | KAsync::Job<void> replayNextRevision(); | 59 | KAsync::Job<void> replayNextRevision(); |
59 | 60 | ||
diff --git a/common/synchronizer.h b/common/synchronizer.h index bb24c2b..935c139 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h | |||
@@ -197,7 +197,7 @@ protected: | |||
197 | /** | 197 | /** |
198 | * Report progress for current task | 198 | * Report progress for current task |
199 | */ | 199 | */ |
200 | void reportProgress(int progress, int total); | 200 | virtual void reportProgress(int progress, int total) Q_DECL_OVERRIDE; |
201 | 201 | ||
202 | protected: | 202 | protected: |
203 | Sink::Log::Context mLogCtx; | 203 | Sink::Log::Context mLogCtx; |