From c8587bca69546a3a5fd1b2f6c09aff89095a90f8 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 2 Jun 2016 13:31:33 +0200 Subject: Non blocking change-replay --- common/changereplay.cpp | 70 +++++++++++++++++++++++++++++++------------------ common/changereplay.h | 2 ++ 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 63c41c8..aebfdb0 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp @@ -30,7 +30,7 @@ using namespace Sink; #define DEBUG_AREA "resource.changereplay" ChangeReplay::ChangeReplay(const QByteArray &resourceName) - : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite) + : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) { Trace() << "Created change replay: " << resourceName; } @@ -58,15 +58,16 @@ bool ChangeReplay::allChangesReplayed() return (lastReplayedRevision >= topRevision); } -void ChangeReplay::revisionChanged() +KAsync::Job ChangeReplay::replayNextRevision() { + mReplayInProgress = true; auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { Warning() << error.message; }); - auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { + auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { Warning() << error.message; }); - qint64 lastReplayedRevision = 1; + qint64 lastReplayedRevision = 0; replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { lastReplayedRevision = value.toLongLong(); @@ -75,28 +76,45 @@ void ChangeReplay::revisionChanged() [](const Storage::Error &) {}); const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); - Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; - if (lastReplayedRevision <= topRevision) { - qint64 revision = lastReplayedRevision; - for (; revision <= topRevision; revision++) { - const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); - const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); - const auto key = Storage::assembleKey(uid, revision); - Storage::mainDatabase(mainStoreTransaction, type) - .scan(key, - [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { - Trace() << "Replaying " << key; - replay(type, key, value).exec(); - // TODO make for loop async, and pass to async replay function together with type - return false; - }, - [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); - } - revision--; - replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); - replayStoreTransaction.commit(); - Trace() << "Replayed until " << revision; + if (lastReplayedRevision < topRevision) { + Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; + qint64 revision = lastReplayedRevision + 1; + const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); + const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); + const auto key = Storage::assembleKey(uid, revision); + KAsync::Job replayJob = KAsync::null(); + Storage::mainDatabase(mainStoreTransaction, type) + .scan(key, + [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool { + Trace() << "Replaying " << key; + replayJob = replay(type, key, value); + // TODO make for loop async, and pass to async replay function together with type + return false; + }, + [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); + return replayJob.then([this, revision]() { + auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { + Warning() << error.message; + }); + replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); + replayStoreTransaction.commit(); + Trace() << "Replayed until " << revision; + }).then([this]() { + //replay until we're done + replayNextRevision().exec(); + }); + } else { + Trace() << "No changes to replay"; + mReplayInProgress = false; + emit changesReplayed(); + } + return KAsync::null(); +} + +void ChangeReplay::revisionChanged() +{ + if (!mReplayInProgress) { + replayNextRevision().exec(); } - emit changesReplayed(); } diff --git a/common/changereplay.h b/common/changereplay.h index 42736ca..aba3dd0 100644 --- a/common/changereplay.h +++ b/common/changereplay.h @@ -54,7 +54,9 @@ protected: Sink::Storage mStorage; private: + KAsync::Job replayNextRevision(); Sink::Storage mChangeReplayStore; + bool mReplayInProgress; }; class NullChangeReplay : public ChangeReplay -- cgit v1.2.3