diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-01-18 15:59:31 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-01-18 15:59:31 +0100 |
commit | 62d222f20de7558ebb266efdcadf458e3807e406 (patch) | |
tree | ec2f3bbe552bce2e8be8a79d02cf4c63eaa1edeb /common/changereplay.cpp | |
parent | 47412280397d66950abd9582048ce8e465fc2844 (diff) | |
download | sink-62d222f20de7558ebb266efdcadf458e3807e406.tar.gz sink-62d222f20de7558ebb266efdcadf458e3807e406.zip |
Refactored the changereplay
* use a log context
* clearer and simpler control flow
* No infinite recursive calling
Diffstat (limited to 'common/changereplay.cpp')
-rw-r--r-- | common/changereplay.cpp | 132 |
1 files changed, 72 insertions, 60 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 224fb25..8ed0532 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -29,12 +29,10 @@ | |||
29 | using namespace Sink; | 29 | using namespace Sink; |
30 | using namespace Sink::Storage; | 30 | using namespace Sink::Storage; |
31 | 31 | ||
32 | SINK_DEBUG_AREA("changereplay"); | ||
33 | |||
34 | ChangeReplay::ChangeReplay(const ResourceContext &resourceContext) | 32 | ChangeReplay::ChangeReplay(const ResourceContext &resourceContext) |
35 | : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false) | 33 | : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{resourceContext.instanceId() + ".changereplay"} |
36 | { | 34 | { |
37 | SinkTrace() << "Created change replay: " << resourceContext.instanceId(); | 35 | SinkTraceCtx(mLogCtx) << "Created change replay: " << resourceContext.instanceId(); |
38 | } | 36 | } |
39 | 37 | ||
40 | qint64 ChangeReplay::getLastReplayedRevision() | 38 | qint64 ChangeReplay::getLastReplayedRevision() |
@@ -52,18 +50,18 @@ qint64 ChangeReplay::getLastReplayedRevision() | |||
52 | 50 | ||
53 | bool ChangeReplay::allChangesReplayed() | 51 | bool ChangeReplay::allChangesReplayed() |
54 | { | 52 | { |
55 | const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { | 53 | const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [this](const Sink::Storage::DataStore::Error &error) { |
56 | SinkWarning() << error.message; | 54 | SinkWarningCtx(mLogCtx) << error.message; |
57 | })); | 55 | })); |
58 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | 56 | const qint64 lastReplayedRevision = getLastReplayedRevision(); |
59 | SinkTrace() << "All changes replayed " << topRevision << lastReplayedRevision; | 57 | SinkTraceCtx(mLogCtx) << "Checking if all replayed. Top revision:" << topRevision << "Last replayed:" << lastReplayedRevision; |
60 | return (lastReplayedRevision >= topRevision); | 58 | return (lastReplayedRevision >= topRevision); |
61 | } | 59 | } |
62 | 60 | ||
63 | void ChangeReplay::recordReplayedRevision(qint64 revision) | 61 | void ChangeReplay::recordReplayedRevision(qint64 revision) |
64 | { | 62 | { |
65 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [](const Sink::Storage::DataStore::Error &error) { | 63 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [this](const Sink::Storage::DataStore::Error &error) { |
66 | SinkWarning() << error.message; | 64 | SinkWarningCtx(mLogCtx) << error.message; |
67 | }); | 65 | }); |
68 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | 66 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); |
69 | replayStoreTransaction.commit(); | 67 | replayStoreTransaction.commit(); |
@@ -72,16 +70,20 @@ void ChangeReplay::recordReplayedRevision(qint64 revision) | |||
72 | KAsync::Job<void> ChangeReplay::replayNextRevision() | 70 | KAsync::Job<void> ChangeReplay::replayNextRevision() |
73 | { | 71 | { |
74 | Q_ASSERT(!mReplayInProgress); | 72 | Q_ASSERT(!mReplayInProgress); |
75 | auto lastReplayedRevision = QSharedPointer<qint64>::create(0); | 73 | return KAsync::start<void>([this]() { |
76 | auto topRevision = QSharedPointer<qint64>::create(0); | 74 | if (mReplayInProgress) { |
77 | emit replayingChanges(); | 75 | SinkErrorCtx(mLogCtx) << "Replay still in progress!!!!!"; |
78 | return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { | 76 | return KAsync::null<void>(); |
77 | } | ||
78 | auto lastReplayedRevision = QSharedPointer<qint64>::create(0); | ||
79 | auto topRevision = QSharedPointer<qint64>::create(0); | ||
80 | emit replayingChanges(); | ||
79 | mReplayInProgress = true; | 81 | mReplayInProgress = true; |
80 | mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { | 82 | mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { |
81 | SinkWarning() << error.message; | 83 | SinkWarningCtx(mLogCtx) << error.message; |
82 | }); | 84 | }); |
83 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { | 85 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { |
84 | SinkWarning() << error.message; | 86 | SinkWarningCtx(mLogCtx) << error.message; |
85 | }); | 87 | }); |
86 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | 88 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
87 | [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | 89 | [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { |
@@ -90,57 +92,67 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
90 | }, | 92 | }, |
91 | [](const DataStore::Error &) {}); | 93 | [](const DataStore::Error &) {}); |
92 | *topRevision = DataStore::maxRevision(mMainStoreTransaction); | 94 | *topRevision = DataStore::maxRevision(mMainStoreTransaction); |
93 | SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; | 95 | if (*lastReplayedRevision >= *topRevision) { |
94 | }) | 96 | SinkTraceCtx(mLogCtx) << "Nothing to replay"; |
95 | .then(KAsync::doWhile( | 97 | return KAsync::null(); |
96 | [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> { | 98 | } |
99 | SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; | ||
100 | return KAsync::doWhile( | ||
101 | [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> { | ||
97 | if (*lastReplayedRevision >= *topRevision) { | 102 | if (*lastReplayedRevision >= *topRevision) { |
103 | SinkTraceCtx(mLogCtx) << "Done replaying"; | ||
98 | return KAsync::value(KAsync::Break); | 104 | return KAsync::value(KAsync::Break); |
99 | } | 105 | } |
100 | 106 | ||
101 | qint64 revision = *lastReplayedRevision + 1; | ||
102 | KAsync::Job<void> replayJob = KAsync::null<void>(); | 107 | KAsync::Job<void> replayJob = KAsync::null<void>(); |
108 | qint64 revision = *lastReplayedRevision + 1; | ||
103 | while (revision <= *topRevision) { | 109 | while (revision <= *topRevision) { |
104 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); | 110 | const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); |
105 | const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision); | 111 | const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision); |
106 | const auto key = DataStore::assembleKey(uid, revision); | 112 | if (uid.isEmpty() || type.isEmpty()) { |
107 | bool exitLoop = false; | 113 | SinkErrorCtx(mLogCtx) << "Failed to read uid or type for revison: " << revision << uid << type; |
108 | DataStore::mainDatabase(mMainStoreTransaction, type) | 114 | } else { |
109 | .scan(key, | 115 | const auto key = DataStore::assembleKey(uid, revision); |
110 | [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { | 116 | QByteArray entityBuffer; |
111 | SinkTrace() << "Replaying " << key; | 117 | DataStore::mainDatabase(mMainStoreTransaction, type) |
112 | if (canReplay(type, key, value)) { | 118 | .scan(key, |
113 | replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision](const KAsync::Error &error) { | 119 | [&entityBuffer](const QByteArray &key, const QByteArray &value) -> bool { |
114 | if (error) { | 120 | entityBuffer = value; |
115 | SinkTrace() << "Change replay failed" << revision; | 121 | return false; |
116 | return KAsync::error(error); | 122 | }, |
117 | } else { | 123 | [this, key](const DataStore::Error &) { SinkErrorCtx(mLogCtx) << "Failed to read the entity buffer " << key; }); |
118 | recordReplayedRevision(revision); | 124 | |
119 | *lastReplayedRevision = revision; | 125 | if (entityBuffer.isEmpty()) { |
120 | } | 126 | SinkErrorCtx(mLogCtx) << "Failed to replay change " << key; |
121 | return KAsync::null(); | 127 | } else { |
122 | }); | 128 | if (canReplay(type, key, entityBuffer)) { |
123 | exitLoop = true; | 129 | SinkTraceCtx(mLogCtx) << "Replaying " << key; |
124 | } else { | 130 | replayJob = replay(type, key, entityBuffer); |
125 | *lastReplayedRevision = revision; | 131 | } else { |
126 | } | 132 | SinkTraceCtx(mLogCtx) << "Cannot replay " << key; |
127 | return false; | 133 | //We silently skip over revisions that cannot be replayed, as this is not an error. |
128 | }, | 134 | replayJob = KAsync::null(); |
129 | [key](const DataStore::Error &) { SinkError() << "Failed to replay change " << key; }); | 135 | } |
130 | if (exitLoop) { | 136 | //Set the last revision we tried to replay |
131 | break; | 137 | *lastReplayedRevision = revision; |
138 | break; | ||
139 | } | ||
132 | } | 140 | } |
141 | //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. | ||
142 | *lastReplayedRevision = revision; | ||
133 | revision++; | 143 | revision++; |
134 | } | 144 | } |
135 | return replayJob.then<KAsync::ControlFlowFlag>([this, revision, lastReplayedRevision, topRevision](const KAsync::Error &error) ->KAsync::Job<KAsync::ControlFlowFlag> { | 145 | return replayJob.then([=](const KAsync::Error &error) { |
136 | if (error) { | 146 | if (error) { |
137 | SinkTrace() << "Change replay failed" << revision; | 147 | SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; |
138 | //We're probably not online or so, so postpone retrying | 148 | //We're probably not online or so, so postpone retrying |
139 | return KAsync::value(KAsync::Break); | 149 | return KAsync::value(KAsync::Break); |
140 | } else { | 150 | } else { |
141 | SinkTrace() << "Replayed until " << revision; | 151 | SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; |
142 | recordReplayedRevision(*lastReplayedRevision); | 152 | recordReplayedRevision(*lastReplayedRevision); |
143 | if (*lastReplayedRevision < *topRevision) { | 153 | if (*lastReplayedRevision < *topRevision) { |
154 | SinkTraceCtx(mLogCtx) << "Replaying some more..."; | ||
155 | //Replay more if we have more | ||
144 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); | 156 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); |
145 | } else { | 157 | } else { |
146 | return KAsync::value(KAsync::Break); | 158 | return KAsync::value(KAsync::Break); |
@@ -150,21 +162,21 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
150 | Q_ASSERT(false); | 162 | Q_ASSERT(false); |
151 | return KAsync::value(KAsync::Break); | 163 | return KAsync::value(KAsync::Break); |
152 | }); | 164 | }); |
153 | })) | 165 | }); |
154 | .then([this, lastReplayedRevision]() { | 166 | }) |
155 | recordReplayedRevision(*lastReplayedRevision); | 167 | .then([this](const KAsync::Error &error) { |
168 | SinkTraceCtx(mLogCtx) << "Change replay complete."; | ||
169 | if (error) { | ||
170 | SinkWarningCtx(mLogCtx) << "Error during change replay: " << error; | ||
171 | } | ||
156 | mMainStoreTransaction.abort(); | 172 | mMainStoreTransaction.abort(); |
173 | mReplayInProgress = false; | ||
157 | if (ChangeReplay::allChangesReplayed()) { | 174 | if (ChangeReplay::allChangesReplayed()) { |
158 | mReplayInProgress = false; | ||
159 | //In case we have a derived implementation | 175 | //In case we have a derived implementation |
160 | if (allChangesReplayed()) { | 176 | if (allChangesReplayed()) { |
177 | SinkTraceCtx(mLogCtx) << "All changes replayed"; | ||
161 | emit changesReplayed(); | 178 | emit changesReplayed(); |
162 | } | 179 | } |
163 | } else { | ||
164 | QTimer::singleShot(0, [this]() { | ||
165 | mReplayInProgress = false; | ||
166 | replayNextRevision().exec(); | ||
167 | }); | ||
168 | } | 180 | } |
169 | }); | 181 | }); |
170 | } | 182 | } |