From e05306d22bc994bcfae869dcd857ec76027495d1 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 28 Dec 2015 10:19:27 +0100 Subject: Changereplay for maildir folders. The sync and changereplay can not run at the same time, or would have to share the transaction otherwise. --- common/genericresource.cpp | 36 +++++++++++++++++++++++++++++++++--- common/genericresource.h | 1 + 2 files changed, 34 insertions(+), 3 deletions(-) (limited to 'common') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 5e6764a..afe3900 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -46,6 +46,13 @@ public: return lastReplayedRevision; } + bool allChangesReplayed() + { + const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); + const qint64 lastReplayedRevision = getLastReplayedRevision(); + return (lastReplayedRevision >= topRevision); + } + Q_SIGNALS: void changesReplayed(); @@ -62,7 +69,8 @@ public Q_SLOTS: }); const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); - if (lastReplayedRevision < topRevision) { + Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; + if (lastReplayedRevision <= topRevision) { qint64 revision = lastReplayedRevision; for (;revision <= topRevision; revision++) { const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); @@ -82,6 +90,7 @@ public Q_SLOTS: replayStoreTransaction.commit(); Trace() << "Replayed until " << revision; } + emit changesReplayed(); } private: @@ -269,8 +278,7 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { return this->replay(type, key, value); }); - QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); - QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + enableChangeReplay(true); mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); @@ -285,6 +293,18 @@ GenericResource::~GenericResource() delete mSourceChangeReplay; } +void GenericResource::enableChangeReplay(bool enable) +{ + if (enable) { + QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); + QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + mSourceChangeReplay->revisionChanged(); + } else { + QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); + QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + } +} + void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector &preprocessors) { mPipeline->setPreprocessors(type, preprocessors); @@ -380,6 +400,16 @@ KAsync::Job GenericResource::processAllMessages() waitForDrained(f, mSynchronizerQueue); }).then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); + }).then([this](KAsync::Future &f) { + if (mSourceChangeReplay->allChangesReplayed()) { + f.setFinished(); + } else { + auto context = new QObject; + QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { + delete context; + f.setFinished(); + }); + } }); } diff --git a/common/genericresource.h b/common/genericresource.h index 1aa4206..a58a7c3 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -56,6 +56,7 @@ private Q_SLOTS: void updateLowerBoundRevision(); protected: + void enableChangeReplay(bool); void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector &preprocessors); virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value); void onProcessorError(int errorCode, const QString &errorMessage); -- cgit v1.2.3