diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-05-08 21:40:13 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-05-08 21:40:13 +0200 |
commit | 205729e3ab9664c8b2d56cc478daac2c5afd1b28 (patch) | |
tree | 63306ba796052dafb5a1c73e80654a46dae93f75 | |
parent | fd9a5b1ff3b31f80d72283d6011459127dead282 (diff) | |
download | sink-205729e3ab9664c8b2d56cc478daac2c5afd1b28.tar.gz sink-205729e3ab9664c8b2d56cc478daac2c5afd1b28.zip |
Guard the changereplay callbacks using the new API
-rw-r--r-- | common/changereplay.cpp | 14 | ||||
-rw-r--r-- | common/changereplay.h | 2 |
2 files changed, 7 insertions, 9 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 7895b66..da36b3e 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -30,8 +30,7 @@ using namespace Sink; | |||
30 | using namespace Sink::Storage; | 30 | using namespace Sink::Storage; |
31 | 31 | ||
32 | ChangeReplay::ChangeReplay(const ResourceContext &resourceContext, const Sink::Log::Context &ctx) | 32 | ChangeReplay::ChangeReplay(const ResourceContext &resourceContext, const Sink::Log::Context &ctx) |
33 | : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{ctx.subContext("changereplay")}, | 33 | : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{ctx.subContext("changereplay")} |
34 | mGuard{new QObject} | ||
35 | { | 34 | { |
36 | } | 35 | } |
37 | 36 | ||
@@ -84,6 +83,8 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
84 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { | 83 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { |
85 | SinkWarningCtx(mLogCtx) << error.message; | 84 | SinkWarningCtx(mLogCtx) << error.message; |
86 | }); | 85 | }); |
86 | Q_ASSERT(mMainStoreTransaction); | ||
87 | Q_ASSERT(replayStoreTransaction); | ||
87 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | 88 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", |
88 | [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | 89 | [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { |
89 | *lastReplayedRevision = value.toLongLong(); | 90 | *lastReplayedRevision = value.toLongLong(); |
@@ -98,14 +99,11 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
98 | SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; | 99 | SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; |
99 | return KAsync::doWhile( | 100 | return KAsync::doWhile( |
100 | [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> { | 101 | [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> { |
101 | if (!mGuard) { | ||
102 | SinkTraceCtx(mLogCtx) << "Exit due to guard"; | ||
103 | return KAsync::value(KAsync::Break); | ||
104 | } | ||
105 | if (*lastReplayedRevision >= *topRevision) { | 102 | if (*lastReplayedRevision >= *topRevision) { |
106 | SinkTraceCtx(mLogCtx) << "Done replaying" << *lastReplayedRevision << *topRevision; | 103 | SinkTraceCtx(mLogCtx) << "Done replaying" << *lastReplayedRevision << *topRevision; |
107 | return KAsync::value(KAsync::Break); | 104 | return KAsync::value(KAsync::Break); |
108 | } | 105 | } |
106 | Q_ASSERT(mMainStoreTransaction); | ||
109 | 107 | ||
110 | KAsync::Job<void> replayJob = KAsync::null<void>(); | 108 | KAsync::Job<void> replayJob = KAsync::null<void>(); |
111 | qint64 revision = *lastReplayedRevision + 1; | 109 | qint64 revision = *lastReplayedRevision + 1; |
@@ -164,7 +162,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
164 | //We shouldn't ever get here | 162 | //We shouldn't ever get here |
165 | Q_ASSERT(false); | 163 | Q_ASSERT(false); |
166 | return KAsync::value(KAsync::Break); | 164 | return KAsync::value(KAsync::Break); |
167 | }); | 165 | }).guard(&mGuard); |
168 | }); | 166 | }); |
169 | }) | 167 | }) |
170 | .then([this](const KAsync::Error &error) { | 168 | .then([this](const KAsync::Error &error) { |
@@ -181,7 +179,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
181 | emit changesReplayed(); | 179 | emit changesReplayed(); |
182 | } | 180 | } |
183 | } | 181 | } |
184 | }); | 182 | }).guard(&mGuard); |
185 | } | 183 | } |
186 | 184 | ||
187 | void ChangeReplay::revisionChanged() | 185 | void ChangeReplay::revisionChanged() |
diff --git a/common/changereplay.h b/common/changereplay.h index edc4462..ab2d857 100644 --- a/common/changereplay.h +++ b/common/changereplay.h | |||
@@ -63,7 +63,7 @@ private: | |||
63 | bool mReplayInProgress; | 63 | bool mReplayInProgress; |
64 | Sink::Storage::DataStore::Transaction mMainStoreTransaction; | 64 | Sink::Storage::DataStore::Transaction mMainStoreTransaction; |
65 | Sink::Log::Context mLogCtx; | 65 | Sink::Log::Context mLogCtx; |
66 | QSharedPointer<QObject> mGuard; | 66 | QObject mGuard; |
67 | }; | 67 | }; |
68 | 68 | ||
69 | class NullChangeReplay : public ChangeReplay | 69 | class NullChangeReplay : public ChangeReplay |