From 62d222f20de7558ebb266efdcadf458e3807e406 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 18 Jan 2017 15:59:31 +0100 Subject: Refactored the changereplay * use a log context * clearer and simpler control flow * No infinite recursive calling --- common/changereplay.cpp | 132 ++++++++++++++++++++++++++---------------------- common/changereplay.h | 1 + 2 files changed, 73 insertions(+), 60 deletions(-) diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 224fb25..8ed0532 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp @@ -29,12 +29,10 @@ using namespace Sink; using namespace Sink::Storage; -SINK_DEBUG_AREA("changereplay"); - ChangeReplay::ChangeReplay(const ResourceContext &resourceContext) - : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false) + : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{resourceContext.instanceId() + ".changereplay"} { - SinkTrace() << "Created change replay: " << resourceContext.instanceId(); + SinkTraceCtx(mLogCtx) << "Created change replay: " << resourceContext.instanceId(); } qint64 ChangeReplay::getLastReplayedRevision() @@ -52,18 +50,18 @@ qint64 ChangeReplay::getLastReplayedRevision() bool ChangeReplay::allChangesReplayed() { - const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { - SinkWarning() << error.message; + const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [this](const Sink::Storage::DataStore::Error &error) { + SinkWarningCtx(mLogCtx) << error.message; })); const qint64 lastReplayedRevision = getLastReplayedRevision(); - SinkTrace() << "All changes replayed " << topRevision << lastReplayedRevision; + SinkTraceCtx(mLogCtx) << "Checking if all replayed. Top revision:" << topRevision << "Last replayed:" << lastReplayedRevision; return (lastReplayedRevision >= topRevision); } void ChangeReplay::recordReplayedRevision(qint64 revision) { - auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [](const Sink::Storage::DataStore::Error &error) { - SinkWarning() << error.message; + auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [this](const Sink::Storage::DataStore::Error &error) { + SinkWarningCtx(mLogCtx) << error.message; }); replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); replayStoreTransaction.commit(); @@ -72,16 +70,20 @@ void ChangeReplay::recordReplayedRevision(qint64 revision) KAsync::Job ChangeReplay::replayNextRevision() { Q_ASSERT(!mReplayInProgress); - auto lastReplayedRevision = QSharedPointer::create(0); - auto topRevision = QSharedPointer::create(0); - emit replayingChanges(); - return KAsync::syncStart([this, lastReplayedRevision, topRevision]() { + return KAsync::start([this]() { + if (mReplayInProgress) { + SinkErrorCtx(mLogCtx) << "Replay still in progress!!!!!"; + return KAsync::null(); + } + auto lastReplayedRevision = QSharedPointer::create(0); + auto topRevision = QSharedPointer::create(0); + emit replayingChanges(); mReplayInProgress = true; - mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { - SinkWarning() << error.message; + mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { + SinkWarningCtx(mLogCtx) << error.message; }); - auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { - SinkWarning() << error.message; + auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { + SinkWarningCtx(mLogCtx) << error.message; }); replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { @@ -90,57 +92,67 @@ KAsync::Job ChangeReplay::replayNextRevision() }, [](const DataStore::Error &) {}); *topRevision = DataStore::maxRevision(mMainStoreTransaction); - SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; - }) - .then(KAsync::doWhile( - [this, lastReplayedRevision, topRevision]() -> KAsync::Job { + if (*lastReplayedRevision >= *topRevision) { + SinkTraceCtx(mLogCtx) << "Nothing to replay"; + return KAsync::null(); + } + SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; + return KAsync::doWhile( + [this, lastReplayedRevision, topRevision]() -> KAsync::Job { if (*lastReplayedRevision >= *topRevision) { + SinkTraceCtx(mLogCtx) << "Done replaying"; return KAsync::value(KAsync::Break); } - qint64 revision = *lastReplayedRevision + 1; KAsync::Job replayJob = KAsync::null(); + qint64 revision = *lastReplayedRevision + 1; while (revision <= *topRevision) { const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision); - const auto key = DataStore::assembleKey(uid, revision); - bool exitLoop = false; - DataStore::mainDatabase(mMainStoreTransaction, type) - .scan(key, - [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { - SinkTrace() << "Replaying " << key; - if (canReplay(type, key, value)) { - replayJob = replay(type, key, value).then([this, revision, lastReplayedRevision](const KAsync::Error &error) { - if (error) { - SinkTrace() << "Change replay failed" << revision; - return KAsync::error(error); - } else { - recordReplayedRevision(revision); - *lastReplayedRevision = revision; - } - return KAsync::null(); - }); - exitLoop = true; - } else { - *lastReplayedRevision = revision; - } - return false; - }, - [key](const DataStore::Error &) { SinkError() << "Failed to replay change " << key; }); - if (exitLoop) { - break; + if (uid.isEmpty() || type.isEmpty()) { + SinkErrorCtx(mLogCtx) << "Failed to read uid or type for revison: " << revision << uid << type; + } else { + const auto key = DataStore::assembleKey(uid, revision); + QByteArray entityBuffer; + DataStore::mainDatabase(mMainStoreTransaction, type) + .scan(key, + [&entityBuffer](const QByteArray &key, const QByteArray &value) -> bool { + entityBuffer = value; + return false; + }, + [this, key](const DataStore::Error &) { SinkErrorCtx(mLogCtx) << "Failed to read the entity buffer " << key; }); + + if (entityBuffer.isEmpty()) { + SinkErrorCtx(mLogCtx) << "Failed to replay change " << key; + } else { + if (canReplay(type, key, entityBuffer)) { + SinkTraceCtx(mLogCtx) << "Replaying " << key; + replayJob = replay(type, key, entityBuffer); + } else { + SinkTraceCtx(mLogCtx) << "Cannot replay " << key; + //We silently skip over revisions that cannot be replayed, as this is not an error. + replayJob = KAsync::null(); + } + //Set the last revision we tried to replay + *lastReplayedRevision = revision; + break; + } } + //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. + *lastReplayedRevision = revision; revision++; } - return replayJob.then([this, revision, lastReplayedRevision, topRevision](const KAsync::Error &error) ->KAsync::Job { + return replayJob.then([=](const KAsync::Error &error) { if (error) { - SinkTrace() << "Change replay failed" << revision; + SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; //We're probably not online or so, so postpone retrying return KAsync::value(KAsync::Break); } else { - SinkTrace() << "Replayed until " << revision; + SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; recordReplayedRevision(*lastReplayedRevision); if (*lastReplayedRevision < *topRevision) { + SinkTraceCtx(mLogCtx) << "Replaying some more..."; + //Replay more if we have more return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); } else { return KAsync::value(KAsync::Break); @@ -150,21 +162,21 @@ KAsync::Job ChangeReplay::replayNextRevision() Q_ASSERT(false); return KAsync::value(KAsync::Break); }); - })) - .then([this, lastReplayedRevision]() { - recordReplayedRevision(*lastReplayedRevision); + }); + }) + .then([this](const KAsync::Error &error) { + SinkTraceCtx(mLogCtx) << "Change replay complete."; + if (error) { + SinkWarningCtx(mLogCtx) << "Error during change replay: " << error; + } mMainStoreTransaction.abort(); + mReplayInProgress = false; if (ChangeReplay::allChangesReplayed()) { - mReplayInProgress = false; //In case we have a derived implementation if (allChangesReplayed()) { + SinkTraceCtx(mLogCtx) << "All changes replayed"; emit changesReplayed(); } - } else { - QTimer::singleShot(0, [this]() { - mReplayInProgress = false; - replayNextRevision().exec(); - }); } }); } diff --git a/common/changereplay.h b/common/changereplay.h index da188bf..4ec115c 100644 --- a/common/changereplay.h +++ b/common/changereplay.h @@ -62,6 +62,7 @@ private: Sink::Storage::DataStore mChangeReplayStore; bool mReplayInProgress; Sink::Storage::DataStore::Transaction mMainStoreTransaction; + Sink::Log::Context mLogCtx; }; class NullChangeReplay : public ChangeReplay -- cgit v1.2.3