diff options
-rw-r--r-- | common/changereplay.cpp | 36 | ||||
-rw-r--r-- | common/changereplay.h | 1 | ||||
-rw-r--r-- | common/synchronizer.h | 2 |
3 files changed, 20 insertions, 19 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index da36b3e..75b9a4c 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -105,7 +105,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
105 | } | 105 | } |
106 | Q_ASSERT(mMainStoreTransaction); | 106 | Q_ASSERT(mMainStoreTransaction); |
107 | 107 | ||
108 | KAsync::Job<void> replayJob = KAsync::null<void>(); | 108 | auto replayJob = KAsync::null(); |
109 | qint64 revision = *lastReplayedRevision + 1; | 109 | qint64 revision = *lastReplayedRevision + 1; |
110 | while (revision <= *topRevision) { | 110 | while (revision <= *topRevision) { |
111 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); | 111 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); |
@@ -129,14 +129,14 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
129 | if (canReplay(type, key, entityBuffer)) { | 129 | if (canReplay(type, key, entityBuffer)) { |
130 | SinkTraceCtx(mLogCtx) << "Replaying " << key; | 130 | SinkTraceCtx(mLogCtx) << "Replaying " << key; |
131 | replayJob = replay(type, key, entityBuffer); | 131 | replayJob = replay(type, key, entityBuffer); |
132 | //Set the last revision we tried to replay | ||
133 | *lastReplayedRevision = revision; | ||
134 | //Execute replay job and commit | ||
135 | break; | ||
132 | } else { | 136 | } else { |
133 | SinkTraceCtx(mLogCtx) << "Cannot replay " << key; | 137 | SinkTraceCtx(mLogCtx) << "Not replaying " << key; |
134 | //We silently skip over revisions that cannot be replayed, as this is not an error. | 138 | //We silently skip over revisions that cannot be replayed, as this is not an error. |
135 | replayJob = KAsync::null(); | ||
136 | } | 139 | } |
137 | //Set the last revision we tried to replay | ||
138 | *lastReplayedRevision = revision; | ||
139 | break; | ||
140 | } | 140 | } |
141 | } | 141 | } |
142 | //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. | 142 | //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. |
@@ -148,20 +148,20 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
148 | SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; | 148 | SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; |
149 | //We're probably not online or so, so postpone retrying | 149 | //We're probably not online or so, so postpone retrying |
150 | return KAsync::value(KAsync::Break); | 150 | return KAsync::value(KAsync::Break); |
151 | } | ||
152 | SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; | ||
153 | |||
154 | recordReplayedRevision(*lastReplayedRevision); | ||
155 | reportProgress(*lastReplayedRevision, *topRevision); | ||
156 | |||
157 | const bool gotMoreToReplay = (*lastReplayedRevision < *topRevision); | ||
158 | if (gotMoreToReplay) { | ||
159 | SinkTraceCtx(mLogCtx) << "Replaying some more..."; | ||
160 | //Replay more if we have more | ||
161 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); | ||
151 | } else { | 162 | } else { |
152 | SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; | 163 | return KAsync::value(KAsync::Break); |
153 | recordReplayedRevision(*lastReplayedRevision); | ||
154 | if (*lastReplayedRevision < *topRevision) { | ||
155 | SinkTraceCtx(mLogCtx) << "Replaying some more..."; | ||
156 | //Replay more if we have more | ||
157 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); | ||
158 | } else { | ||
159 | return KAsync::value(KAsync::Break); | ||
160 | } | ||
161 | } | 164 | } |
162 | //We shouldn't ever get here | ||
163 | Q_ASSERT(false); | ||
164 | return KAsync::value(KAsync::Break); | ||
165 | }).guard(&mGuard); | 165 | }).guard(&mGuard); |
166 | }); | 166 | }); |
167 | }) | 167 | }) |
diff --git a/common/changereplay.h b/common/changereplay.h index ab2d857..c509735 100644 --- a/common/changereplay.h +++ b/common/changereplay.h | |||
@@ -54,6 +54,7 @@ public slots: | |||
54 | protected: | 54 | protected: |
55 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; | 55 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; |
56 | virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; | 56 | virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; |
57 | virtual void reportProgress(int progress, int total){}; | ||
57 | Sink::Storage::DataStore mStorage; | 58 | Sink::Storage::DataStore mStorage; |
58 | KAsync::Job<void> replayNextRevision(); | 59 | KAsync::Job<void> replayNextRevision(); |
59 | 60 | ||
diff --git a/common/synchronizer.h b/common/synchronizer.h index bb24c2b..935c139 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h | |||
@@ -197,7 +197,7 @@ protected: | |||
197 | /** | 197 | /** |
198 | * Report progress for current task | 198 | * Report progress for current task |
199 | */ | 199 | */ |
200 | void reportProgress(int progress, int total); | 200 | virtual void reportProgress(int progress, int total) Q_DECL_OVERRIDE; |
201 | 201 | ||
202 | protected: | 202 | protected: |
203 | Sink::Log::Context mLogCtx; | 203 | Sink::Log::Context mLogCtx; |