From d6c3e998dbb61eedf37773082bf58ffbd11663b0 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 20 Oct 2015 16:46:59 +0200 Subject: Draft of ChangeReplay --- common/genericresource.cpp | 103 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 2 deletions(-) (limited to 'common/genericresource.cpp') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 313d99c..8ae20ed 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -12,6 +12,84 @@ using namespace Akonadi2; +/** + * Replays changes from the storage one by one. + * + * Uses a local database to: + * * Remember what changes have been replayed already. + * * store a mapping of remote to local buffers + */ +class ChangeReplay : public QObject +{ + Q_OBJECT +public: + + typedef std::function(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; + + ChangeReplay(const QString &resourceName, const ReplayFunction replayFunction) + : mStorage(storageLocation(), resourceName, Storage::ReadOnly), + mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), + mReplayFunction(replayFunction) + { + + } + + qint64 getLastReplayedRevision() + { + qint64 lastReplayedRevision = 0; + auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); + replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { + lastReplayedRevision = value.toLongLong(); + return false; + }, [](const Storage::Error &) { + }); + return lastReplayedRevision; + } + +Q_SIGNALS: + void changesReplayed(); + +public Q_SLOTS: + void revisionChanged() + { + auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); + auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); + qint64 lastReplayedRevision = 0; + replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { + lastReplayedRevision = value.toLongLong(); + return false; + }, [](const Storage::Error &) { + }); + const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); + + if (lastReplayedRevision < topRevision) { + qint64 revision = lastReplayedRevision; + for (;revision <= topRevision; revision++) { + const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); + const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); + const auto key = Storage::assembleKey(uid, revision); + mainStoreTransaction.openDatabase(type + ".main").scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { + mReplayFunction(type, key, value).exec(); + //TODO make for loop async, and pass to async replay function together with type + Trace() << "Replaying " << key; + return false; + }, [key](const Storage::Error &) { + ErrorMsg() << "Failed to replay change " << key; + }); + } + revision--; + replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); + replayStoreTransaction.commit(); + Trace() << "Replayed until " << revision; + } + } + +private: + Akonadi2::Storage mChangeReplayStore; + Akonadi2::Storage mStorage; + ReplayFunction mReplayFunction; +}; + /** * Drives the pipeline using the output from all command queues */ @@ -181,11 +259,19 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c mSynchronizerQueue(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), mResourceInstanceIdentifier(resourceInstanceIdentifier), mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceInstanceIdentifier)), - mError(0) + mError(0), + mClientLowerBoundRevision(std::numeric_limits::max()) { mProcessor = new Processor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue); QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); + mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [](const QByteArray &type, const QByteArray &key, const QByteArray &value) { + return KAsync::null(); + }); + QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); + QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); + mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); mCommitQueueTimer.setInterval(100); mCommitQueueTimer.setSingleShot(true); @@ -194,8 +280,15 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c GenericResource::~GenericResource() { + delete mProcessor; + delete mSourceChangeReplay; } +// void GenericResource::revisionChanged() +// { +// //TODO replay revision +// } + void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) { Warning() << "Received error from Processor: " << errorCode << errorMessage; @@ -266,9 +359,15 @@ KAsync::Job GenericResource::processAllMessages() }); } +void GenericResource::updateLowerBoundRevision() +{ + mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision())); +} + void GenericResource::setLowerBoundRevision(qint64 revision) { - mProcessor->setOldestUsedRevision(revision); + mClientLowerBoundRevision = revision; + updateLowerBoundRevision(); } #include "genericresource.moc" -- cgit v1.2.3