diff options
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r-- | common/changereplay.cpp | 144 |
1 files changed, 90 insertions, 54 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 638a30d..fbd556f 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -59,67 +59,103 @@ bool ChangeReplay::allChangesReplayed() | |||
59 | return (lastReplayedRevision >= topRevision); | 59 | return (lastReplayedRevision >= topRevision); |
60 | } | 60 | } |
61 | 61 | ||
62 | KAsync::Job<void> ChangeReplay::replayNextRevision() | 62 | void ChangeReplay::recordReplayedRevision(qint64 revision) |
63 | { | 63 | { |
64 | mReplayInProgress = true; | 64 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { |
65 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | ||
66 | SinkWarning() << error.message; | ||
67 | }); | ||
68 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | ||
69 | SinkWarning() << error.message; | 65 | SinkWarning() << error.message; |
70 | }); | 66 | }); |
71 | qint64 lastReplayedRevision = 0; | 67 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); |
72 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | 68 | replayStoreTransaction.commit(); |
73 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | 69 | }; |
74 | lastReplayedRevision = value.toLongLong(); | ||
75 | return false; | ||
76 | }, | ||
77 | [](const Storage::Error &) {}); | ||
78 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | ||
79 | 70 | ||
80 | auto recordReplayedRevision = [this](qint64 revision) { | 71 | KAsync::Job<void> ChangeReplay::replayNextRevision() |
81 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 72 | { |
82 | SinkWarning() << error.message; | 73 | auto lastReplayedRevision = QSharedPointer<qint64>::create(0); |
83 | }); | 74 | auto topRevision = QSharedPointer<qint64>::create(0); |
84 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | 75 | return KAsync::start<void>([this, lastReplayedRevision, topRevision]() { |
85 | replayStoreTransaction.commit(); | 76 | mReplayInProgress = true; |
86 | }; | 77 | mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
87 | 78 | SinkWarning() << error.message; | |
88 | if (lastReplayedRevision < topRevision) { | 79 | }); |
89 | SinkTrace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | 80 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
90 | qint64 revision = lastReplayedRevision + 1; | 81 | SinkWarning() << error.message; |
91 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | 82 | }); |
92 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | 83 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
93 | const auto key = Storage::assembleKey(uid, revision); | 84 | [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { |
94 | KAsync::Job<void> replayJob = KAsync::null<void>(); | 85 | *lastReplayedRevision = value.toLongLong(); |
95 | Storage::mainDatabase(mainStoreTransaction, type) | ||
96 | .scan(key, | ||
97 | [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool { | ||
98 | SinkTrace() << "Replaying " << key; | ||
99 | replayJob = replay(type, key, value); | ||
100 | return false; | 86 | return false; |
101 | }, | 87 | }, |
102 | [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); | 88 | [](const Storage::Error &) {}); |
103 | return replayJob.then<void>([this, revision, recordReplayedRevision]() { | 89 | *topRevision = Storage::maxRevision(mMainStoreTransaction); |
104 | SinkTrace() << "Replayed until " << revision; | 90 | SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; |
105 | recordReplayedRevision(revision); | 91 | }) |
106 | //replay until we're done | 92 | .then(KAsync::dowhile( |
107 | QTimer::singleShot(0, this, [this]() { | 93 | [this, lastReplayedRevision, topRevision](KAsync::Future<bool> &future) { |
108 | replayNextRevision().exec(); | 94 | if (*lastReplayedRevision >= *topRevision) { |
109 | }); | 95 | future.setValue(false); |
110 | }, | 96 | future.setFinished(); |
111 | [this, revision, recordReplayedRevision](int, QString) { | 97 | return; |
112 | SinkTrace() << "Change replay failed" << revision; | 98 | } |
113 | //We're probably not online or so, so postpone retrying | 99 | |
114 | mReplayInProgress = false; | 100 | qint64 revision = *lastReplayedRevision + 1; |
115 | emit changesReplayed(); | 101 | KAsync::Job<void> replayJob = KAsync::null<void>(); |
102 | while (revision <= *topRevision) { | ||
103 | const auto uid = Storage::getUidFromRevision(mMainStoreTransaction, revision); | ||
104 | const auto type = Storage::getTypeFromRevision(mMainStoreTransaction, revision); | ||
105 | const auto key = Storage::assembleKey(uid, revision); | ||
106 | bool exitLoop = false; | ||
107 | Storage::mainDatabase(mMainStoreTransaction, type) | ||
108 | .scan(key, | ||
109 | [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { | ||
110 | SinkTrace() << "Replaying " << key; | ||
111 | if (canReplay(type, key, value)) { | ||
112 | replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision]() { | ||
113 | recordReplayedRevision(revision); | ||
114 | *lastReplayedRevision = revision; | ||
115 | }, | ||
116 | [revision](int, QString) { | ||
117 | SinkTrace() << "Change replay failed" << revision; | ||
118 | }); | ||
119 | exitLoop = true; | ||
120 | } else { | ||
121 | *lastReplayedRevision = revision; | ||
122 | } | ||
123 | return false; | ||
124 | }, | ||
125 | [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); | ||
126 | if (exitLoop) { | ||
127 | break; | ||
128 | } | ||
129 | revision++; | ||
130 | } | ||
131 | replayJob.then<void>([this, revision, lastReplayedRevision, topRevision, &future]() { | ||
132 | SinkTrace() << "Replayed until " << revision; | ||
133 | recordReplayedRevision(*lastReplayedRevision); | ||
134 | QTimer::singleShot(0, [&future, lastReplayedRevision, topRevision]() { | ||
135 | future.setValue((*lastReplayedRevision < *topRevision)); | ||
136 | future.setFinished(); | ||
137 | }); | ||
138 | }, | ||
139 | [this, revision, &future](int, QString) { | ||
140 | SinkTrace() << "Change replay failed" << revision; | ||
141 | //We're probably not online or so, so postpone retrying | ||
142 | future.setValue(false); | ||
143 | future.setFinished(); | ||
144 | }).exec(); | ||
145 | |||
146 | })) | ||
147 | .then<void>([this, lastReplayedRevision]() { | ||
148 | recordReplayedRevision(*lastReplayedRevision); | ||
149 | mMainStoreTransaction.abort(); | ||
150 | if (allChangesReplayed()) { | ||
151 | mReplayInProgress = false; | ||
152 | emit changesReplayed(); | ||
153 | } else { | ||
154 | QTimer::singleShot(0, [this]() { | ||
155 | replayNextRevision().exec(); | ||
156 | }); | ||
157 | } | ||
116 | }); | 158 | }); |
117 | } else { | ||
118 | SinkTrace() << "No changes to replay"; | ||
119 | mReplayInProgress = false; | ||
120 | emit changesReplayed(); | ||
121 | } | ||
122 | return KAsync::null<void>(); | ||
123 | } | 159 | } |
124 | 160 | ||
125 | void ChangeReplay::revisionChanged() | 161 | void ChangeReplay::revisionChanged() |