diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-07-27 02:26:47 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-09-15 16:14:19 +0200 |
commit | 26816c21f60450e461a5b6ef4ef740f6070ce278 (patch) | |
tree | 55e8aee03e094abf702438e6cd26233047345e70 /common/changereplay.cpp | |
parent | 9a9bb39f7641a818434cafa0dae0c8aa47124c0b (diff) | |
download | sink-26816c21f60450e461a5b6ef4ef740f6070ce278.tar.gz sink-26816c21f60450e461a5b6ef4ef740f6070ce278.zip |
Ported to the kasync revamp
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r-- | common/changereplay.cpp | 58 |
1 files changed, 31 insertions, 27 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index fbd556f..e3b7158 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -72,7 +72,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
72 | { | 72 | { |
73 | auto lastReplayedRevision = QSharedPointer<qint64>::create(0); | 73 | auto lastReplayedRevision = QSharedPointer<qint64>::create(0); |
74 | auto topRevision = QSharedPointer<qint64>::create(0); | 74 | auto topRevision = QSharedPointer<qint64>::create(0); |
75 | return KAsync::start<void>([this, lastReplayedRevision, topRevision]() { | 75 | return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { |
76 | mReplayInProgress = true; | 76 | mReplayInProgress = true; |
77 | mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 77 | mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
78 | SinkWarning() << error.message; | 78 | SinkWarning() << error.message; |
@@ -90,11 +90,9 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
90 | SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; | 90 | SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; |
91 | }) | 91 | }) |
92 | .then(KAsync::dowhile( | 92 | .then(KAsync::dowhile( |
93 | [this, lastReplayedRevision, topRevision](KAsync::Future<bool> &future) { | 93 | [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> { |
94 | if (*lastReplayedRevision >= *topRevision) { | 94 | if (*lastReplayedRevision >= *topRevision) { |
95 | future.setValue(false); | 95 | return KAsync::value(KAsync::Break); |
96 | future.setFinished(); | ||
97 | return; | ||
98 | } | 96 | } |
99 | 97 | ||
100 | qint64 revision = *lastReplayedRevision + 1; | 98 | qint64 revision = *lastReplayedRevision + 1; |
@@ -109,12 +107,15 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
109 | [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { | 107 | [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { |
110 | SinkTrace() << "Replaying " << key; | 108 | SinkTrace() << "Replaying " << key; |
111 | if (canReplay(type, key, value)) { | 109 | if (canReplay(type, key, value)) { |
112 | replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision]() { | 110 | replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision](const KAsync::Error &error) { |
113 | recordReplayedRevision(revision); | 111 | if (error) { |
114 | *lastReplayedRevision = revision; | 112 | SinkTrace() << "Change replay failed" << revision; |
115 | }, | 113 | return KAsync::error(error); |
116 | [revision](int, QString) { | 114 | } else { |
117 | SinkTrace() << "Change replay failed" << revision; | 115 | recordReplayedRevision(revision); |
116 | *lastReplayedRevision = revision; | ||
117 | } | ||
118 | return KAsync::null(); | ||
118 | }); | 119 | }); |
119 | exitLoop = true; | 120 | exitLoop = true; |
120 | } else { | 121 | } else { |
@@ -128,23 +129,26 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
128 | } | 129 | } |
129 | revision++; | 130 | revision++; |
130 | } | 131 | } |
131 | replayJob.then<void>([this, revision, lastReplayedRevision, topRevision, &future]() { | 132 | return replayJob.then<KAsync::ControlFlowFlag>([this, revision, lastReplayedRevision, topRevision](const KAsync::Error &error) ->KAsync::Job<KAsync::ControlFlowFlag> { |
132 | SinkTrace() << "Replayed until " << revision; | 133 | if (error) { |
133 | recordReplayedRevision(*lastReplayedRevision); | 134 | SinkTrace() << "Change replay failed" << revision; |
134 | QTimer::singleShot(0, [&future, lastReplayedRevision, topRevision]() { | 135 | //We're probably not online or so, so postpone retrying |
135 | future.setValue((*lastReplayedRevision < *topRevision)); | 136 | return KAsync::value(KAsync::Break); |
136 | future.setFinished(); | 137 | } else { |
137 | }); | 138 | SinkTrace() << "Replayed until " << revision; |
138 | }, | 139 | recordReplayedRevision(*lastReplayedRevision); |
139 | [this, revision, &future](int, QString) { | 140 | if (*lastReplayedRevision < *topRevision) { |
140 | SinkTrace() << "Change replay failed" << revision; | 141 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); |
141 | //We're probably not online or so, so postpone retrying | 142 | } else { |
142 | future.setValue(false); | 143 | return KAsync::value(KAsync::Break); |
143 | future.setFinished(); | 144 | } |
144 | }).exec(); | 145 | } |
145 | 146 | //We shouldn't ever get here | |
147 | Q_ASSERT(false); | ||
148 | return KAsync::value(KAsync::Break); | ||
149 | }); | ||
146 | })) | 150 | })) |
147 | .then<void>([this, lastReplayedRevision]() { | 151 | .syncThen<void>([this, lastReplayedRevision]() { |
148 | recordReplayedRevision(*lastReplayedRevision); | 152 | recordReplayedRevision(*lastReplayedRevision); |
149 | mMainStoreTransaction.abort(); | 153 | mMainStoreTransaction.abort(); |
150 | if (allChangesReplayed()) { | 154 | if (allChangesReplayed()) { |