summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/changereplay.cpp36
-rw-r--r--common/changereplay.h1
-rw-r--r--common/synchronizer.h2
3 files changed, 20 insertions, 19 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index da36b3e..75b9a4c 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -105,7 +105,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
105 } 105 }
106 Q_ASSERT(mMainStoreTransaction); 106 Q_ASSERT(mMainStoreTransaction);
107 107
108 KAsync::Job<void> replayJob = KAsync::null<void>(); 108 auto replayJob = KAsync::null();
109 qint64 revision = *lastReplayedRevision + 1; 109 qint64 revision = *lastReplayedRevision + 1;
110 while (revision <= *topRevision) { 110 while (revision <= *topRevision) {
111 const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); 111 const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision);
@@ -129,14 +129,14 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
129 if (canReplay(type, key, entityBuffer)) { 129 if (canReplay(type, key, entityBuffer)) {
130 SinkTraceCtx(mLogCtx) << "Replaying " << key; 130 SinkTraceCtx(mLogCtx) << "Replaying " << key;
131 replayJob = replay(type, key, entityBuffer); 131 replayJob = replay(type, key, entityBuffer);
132 //Set the last revision we tried to replay
133 *lastReplayedRevision = revision;
134 //Execute replay job and commit
135 break;
132 } else { 136 } else {
133 SinkTraceCtx(mLogCtx) << "Cannot replay " << key; 137 SinkTraceCtx(mLogCtx) << "Not replaying " << key;
134 //We silently skip over revisions that cannot be replayed, as this is not an error. 138 //We silently skip over revisions that cannot be replayed, as this is not an error.
135 replayJob = KAsync::null();
136 } 139 }
137 //Set the last revision we tried to replay
138 *lastReplayedRevision = revision;
139 break;
140 } 140 }
141 } 141 }
142 //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. 142 //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations.
@@ -148,20 +148,20 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
148 SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; 148 SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision;
149 //We're probably not online or so, so postpone retrying 149 //We're probably not online or so, so postpone retrying
150 return KAsync::value(KAsync::Break); 150 return KAsync::value(KAsync::Break);
151 }
152 SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision;
153
154 recordReplayedRevision(*lastReplayedRevision);
155 reportProgress(*lastReplayedRevision, *topRevision);
156
157 const bool gotMoreToReplay = (*lastReplayedRevision < *topRevision);
158 if (gotMoreToReplay) {
159 SinkTraceCtx(mLogCtx) << "Replaying some more...";
160 //Replay more if we have more
161 return KAsync::wait(0).then(KAsync::value(KAsync::Continue));
151 } else { 162 } else {
152 SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; 163 return KAsync::value(KAsync::Break);
153 recordReplayedRevision(*lastReplayedRevision);
154 if (*lastReplayedRevision < *topRevision) {
155 SinkTraceCtx(mLogCtx) << "Replaying some more...";
156 //Replay more if we have more
157 return KAsync::wait(0).then(KAsync::value(KAsync::Continue));
158 } else {
159 return KAsync::value(KAsync::Break);
160 }
161 } 164 }
162 //We shouldn't ever get here
163 Q_ASSERT(false);
164 return KAsync::value(KAsync::Break);
165 }).guard(&mGuard); 165 }).guard(&mGuard);
166 }); 166 });
167 }) 167 })
diff --git a/common/changereplay.h b/common/changereplay.h
index ab2d857..c509735 100644
--- a/common/changereplay.h
+++ b/common/changereplay.h
@@ -54,6 +54,7 @@ public slots:
54protected: 54protected:
55 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; 55 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0;
56 virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; 56 virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0;
57 virtual void reportProgress(int progress, int total){};
57 Sink::Storage::DataStore mStorage; 58 Sink::Storage::DataStore mStorage;
58 KAsync::Job<void> replayNextRevision(); 59 KAsync::Job<void> replayNextRevision();
59 60
diff --git a/common/synchronizer.h b/common/synchronizer.h
index bb24c2b..935c139 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -197,7 +197,7 @@ protected:
197 /** 197 /**
198 * Report progress for current task 198 * Report progress for current task
199 */ 199 */
200 void reportProgress(int progress, int total); 200 virtual void reportProgress(int progress, int total) Q_DECL_OVERRIDE;
201 201
202protected: 202protected:
203 Sink::Log::Context mLogCtx; 203 Sink::Log::Context mLogCtx;