summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-05-08 21:40:13 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-05-08 21:40:13 +0200
commit205729e3ab9664c8b2d56cc478daac2c5afd1b28 (patch)
tree63306ba796052dafb5a1c73e80654a46dae93f75 /common
parentfd9a5b1ff3b31f80d72283d6011459127dead282 (diff)
downloadsink-205729e3ab9664c8b2d56cc478daac2c5afd1b28.tar.gz
sink-205729e3ab9664c8b2d56cc478daac2c5afd1b28.zip
Guard the changereplay callbacks using the new API
Diffstat (limited to 'common')
-rw-r--r--common/changereplay.cpp14
-rw-r--r--common/changereplay.h2
2 files changed, 7 insertions, 9 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 7895b66..da36b3e 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -30,8 +30,7 @@ using namespace Sink;
30using namespace Sink::Storage; 30using namespace Sink::Storage;
31 31
32ChangeReplay::ChangeReplay(const ResourceContext &resourceContext, const Sink::Log::Context &ctx) 32ChangeReplay::ChangeReplay(const ResourceContext &resourceContext, const Sink::Log::Context &ctx)
33 : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{ctx.subContext("changereplay")}, 33 : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{ctx.subContext("changereplay")}
34 mGuard{new QObject}
35{ 34{
36} 35}
37 36
@@ -84,6 +83,8 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
84 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { 83 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) {
85 SinkWarningCtx(mLogCtx) << error.message; 84 SinkWarningCtx(mLogCtx) << error.message;
86 }); 85 });
86 Q_ASSERT(mMainStoreTransaction);
87 Q_ASSERT(replayStoreTransaction);
87 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 88 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
88 [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 89 [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
89 *lastReplayedRevision = value.toLongLong(); 90 *lastReplayedRevision = value.toLongLong();
@@ -98,14 +99,11 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
98 SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; 99 SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision;
99 return KAsync::doWhile( 100 return KAsync::doWhile(
100 [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> { 101 [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> {
101 if (!mGuard) {
102 SinkTraceCtx(mLogCtx) << "Exit due to guard";
103 return KAsync::value(KAsync::Break);
104 }
105 if (*lastReplayedRevision >= *topRevision) { 102 if (*lastReplayedRevision >= *topRevision) {
106 SinkTraceCtx(mLogCtx) << "Done replaying" << *lastReplayedRevision << *topRevision; 103 SinkTraceCtx(mLogCtx) << "Done replaying" << *lastReplayedRevision << *topRevision;
107 return KAsync::value(KAsync::Break); 104 return KAsync::value(KAsync::Break);
108 } 105 }
106 Q_ASSERT(mMainStoreTransaction);
109 107
110 KAsync::Job<void> replayJob = KAsync::null<void>(); 108 KAsync::Job<void> replayJob = KAsync::null<void>();
111 qint64 revision = *lastReplayedRevision + 1; 109 qint64 revision = *lastReplayedRevision + 1;
@@ -164,7 +162,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
164 //We shouldn't ever get here 162 //We shouldn't ever get here
165 Q_ASSERT(false); 163 Q_ASSERT(false);
166 return KAsync::value(KAsync::Break); 164 return KAsync::value(KAsync::Break);
167 }); 165 }).guard(&mGuard);
168 }); 166 });
169 }) 167 })
170 .then([this](const KAsync::Error &error) { 168 .then([this](const KAsync::Error &error) {
@@ -181,7 +179,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
181 emit changesReplayed(); 179 emit changesReplayed();
182 } 180 }
183 } 181 }
184 }); 182 }).guard(&mGuard);
185} 183}
186 184
187void ChangeReplay::revisionChanged() 185void ChangeReplay::revisionChanged()
diff --git a/common/changereplay.h b/common/changereplay.h
index edc4462..ab2d857 100644
--- a/common/changereplay.h
+++ b/common/changereplay.h
@@ -63,7 +63,7 @@ private:
63 bool mReplayInProgress; 63 bool mReplayInProgress;
64 Sink::Storage::DataStore::Transaction mMainStoreTransaction; 64 Sink::Storage::DataStore::Transaction mMainStoreTransaction;
65 Sink::Log::Context mLogCtx; 65 Sink::Log::Context mLogCtx;
66 QSharedPointer<QObject> mGuard; 66 QObject mGuard;
67}; 67};
68 68
69class NullChangeReplay : public ChangeReplay 69class NullChangeReplay : public ChangeReplay