From 1d713d9e2dbaf27de9da087f9270d260dfc40c31 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 21 Nov 2016 23:13:38 +0100 Subject: Folded the SourceWriteback into the Synchronizer. By concentrating all communication to the source in one place we get rid of several oddities. * Quite a bit of duplication since both need access to the synchronizationStore and the source. * We currently have an akward locking in place because both classes access the ync store. This is not easier to resolve cleanly. * The live of resource implementers becomes easier. * An implementation could elect to not use changereplay and always do a full sync... (maybe?) --- common/genericresource.cpp | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) (limited to 'common/genericresource.cpp') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index c4c8bc6..746fa33 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -292,14 +292,14 @@ KAsync::Job GenericResource::inspect( void GenericResource::enableChangeReplay(bool enable) { - Q_ASSERT(mChangeReplay); + Q_ASSERT(mSynchronizer); if (enable) { - QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); - QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); - QMetaObject::invokeMethod(mChangeReplay.data(), "revisionChanged", Qt::QueuedConnection); + QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); + QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); } else { - QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged); - QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged); + QObject::disconnect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); } } @@ -314,13 +314,8 @@ void GenericResource::setupSynchronizer(const QSharedPointer &sync mSynchronizer->setup([this](int commandId, const QByteArray &data) { enqueueCommand(mSynchronizerQueue, commandId, data); }, mSynchronizerQueue); -} - -void GenericResource::setupChangereplay(const QSharedPointer &changeReplay) -{ - mChangeReplay = changeReplay; { - auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::replayingChanges, [this]() { + auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { Sink::Notification n; n.id = "changereplay"; n.type = Sink::Notification::Status; @@ -331,7 +326,7 @@ void GenericResource::setupChangereplay(const QSharedPointer &chan Q_ASSERT(ret); } { - auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, [this]() { + auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { Sink::Notification n; n.id = "changereplay"; n.type = Sink::Notification::Status; @@ -342,7 +337,7 @@ void GenericResource::setupChangereplay(const QSharedPointer &chan Q_ASSERT(ret); } - mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); + mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); enableChangeReplay(true); } @@ -459,11 +454,11 @@ KAsync::Job GenericResource::processAllMessages() .then([this](KAsync::Future &f) { waitForDrained(f, mSynchronizerQueue); }) .then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); }) .then([this](KAsync::Future &f) { - if (mChangeReplay->allChangesReplayed()) { + if (mSynchronizer->allChangesReplayed()) { f.setFinished(); } else { auto context = new QObject; - QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { + QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { delete context; f.setFinished(); }); @@ -473,7 +468,7 @@ KAsync::Job GenericResource::processAllMessages() void GenericResource::updateLowerBoundRevision() { - mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision())); + mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSynchronizer->getLastReplayedRevision())); } void GenericResource::setLowerBoundRevision(qint64 revision) -- cgit v1.2.3