summaryrefslogtreecommitdiffstats
path: root/common/changereplay.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-27 02:26:47 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-15 16:14:19 +0200
commit26816c21f60450e461a5b6ef4ef740f6070ce278 (patch)
tree55e8aee03e094abf702438e6cd26233047345e70 /common/changereplay.cpp
parent9a9bb39f7641a818434cafa0dae0c8aa47124c0b (diff)
downloadsink-26816c21f60450e461a5b6ef4ef740f6070ce278.tar.gz
sink-26816c21f60450e461a5b6ef4ef740f6070ce278.zip
Ported to the kasync revamp
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r--common/changereplay.cpp58
1 files changed, 31 insertions, 27 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index fbd556f..e3b7158 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -72,7 +72,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
72{ 72{
73 auto lastReplayedRevision = QSharedPointer<qint64>::create(0); 73 auto lastReplayedRevision = QSharedPointer<qint64>::create(0);
74 auto topRevision = QSharedPointer<qint64>::create(0); 74 auto topRevision = QSharedPointer<qint64>::create(0);
75 return KAsync::start<void>([this, lastReplayedRevision, topRevision]() { 75 return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() {
76 mReplayInProgress = true; 76 mReplayInProgress = true;
77 mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 77 mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
78 SinkWarning() << error.message; 78 SinkWarning() << error.message;
@@ -90,11 +90,9 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
90 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; 90 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision;
91 }) 91 })
92 .then(KAsync::dowhile( 92 .then(KAsync::dowhile(
93 [this, lastReplayedRevision, topRevision](KAsync::Future<bool> &future) { 93 [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> {
94 if (*lastReplayedRevision >= *topRevision) { 94 if (*lastReplayedRevision >= *topRevision) {
95 future.setValue(false); 95 return KAsync::value(KAsync::Break);
96 future.setFinished();
97 return;
98 } 96 }
99 97
100 qint64 revision = *lastReplayedRevision + 1; 98 qint64 revision = *lastReplayedRevision + 1;
@@ -109,12 +107,15 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
109 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { 107 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool {
110 SinkTrace() << "Replaying " << key; 108 SinkTrace() << "Replaying " << key;
111 if (canReplay(type, key, value)) { 109 if (canReplay(type, key, value)) {
112 replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision]() { 110 replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision](const KAsync::Error &error) {
113 recordReplayedRevision(revision); 111 if (error) {
114 *lastReplayedRevision = revision; 112 SinkTrace() << "Change replay failed" << revision;
115 }, 113 return KAsync::error(error);
116 [revision](int, QString) { 114 } else {
117 SinkTrace() << "Change replay failed" << revision; 115 recordReplayedRevision(revision);
116 *lastReplayedRevision = revision;
117 }
118 return KAsync::null();
118 }); 119 });
119 exitLoop = true; 120 exitLoop = true;
120 } else { 121 } else {
@@ -128,23 +129,26 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
128 } 129 }
129 revision++; 130 revision++;
130 } 131 }
131 replayJob.then<void>([this, revision, lastReplayedRevision, topRevision, &future]() { 132 return replayJob.then<KAsync::ControlFlowFlag>([this, revision, lastReplayedRevision, topRevision](const KAsync::Error &error) ->KAsync::Job<KAsync::ControlFlowFlag> {
132 SinkTrace() << "Replayed until " << revision; 133 if (error) {
133 recordReplayedRevision(*lastReplayedRevision); 134 SinkTrace() << "Change replay failed" << revision;
134 QTimer::singleShot(0, [&future, lastReplayedRevision, topRevision]() { 135 //We're probably not online or so, so postpone retrying
135 future.setValue((*lastReplayedRevision < *topRevision)); 136 return KAsync::value(KAsync::Break);
136 future.setFinished(); 137 } else {
137 }); 138 SinkTrace() << "Replayed until " << revision;
138 }, 139 recordReplayedRevision(*lastReplayedRevision);
139 [this, revision, &future](int, QString) { 140 if (*lastReplayedRevision < *topRevision) {
140 SinkTrace() << "Change replay failed" << revision; 141 return KAsync::wait(0).then(KAsync::value(KAsync::Continue));
141 //We're probably not online or so, so postpone retrying 142 } else {
142 future.setValue(false); 143 return KAsync::value(KAsync::Break);
143 future.setFinished(); 144 }
144 }).exec(); 145 }
145 146 //We shouldn't ever get here
147 Q_ASSERT(false);
148 return KAsync::value(KAsync::Break);
149 });
146 })) 150 }))
147 .then<void>([this, lastReplayedRevision]() { 151 .syncThen<void>([this, lastReplayedRevision]() {
148 recordReplayedRevision(*lastReplayedRevision); 152 recordReplayedRevision(*lastReplayedRevision);
149 mMainStoreTransaction.abort(); 153 mMainStoreTransaction.abort();
150 if (allChangesReplayed()) { 154 if (allChangesReplayed()) {