From b6981d94fb5fb12024738b639f9e389dd04578da Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 22 Nov 2016 10:26:11 +0100 Subject: Process change replays as part of the synchronization queue --- common/changereplay.cpp | 4 +++- common/changereplay.h | 4 ++-- common/genericresource.cpp | 20 +++----------------- common/genericresource.h | 2 -- common/synchronizer.cpp | 22 +++++++++++++++++----- common/synchronizer.h | 16 ++++++++++++++++ 6 files changed, 41 insertions(+), 27 deletions(-) (limited to 'common') diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 6e58564..a0796eb 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp @@ -71,8 +71,10 @@ void ChangeReplay::recordReplayedRevision(qint64 revision) KAsync::Job ChangeReplay::replayNextRevision() { + Q_ASSERT(!mReplayInProgress); auto lastReplayedRevision = QSharedPointer::create(0); auto topRevision = QSharedPointer::create(0); + emit replayingChanges(); return KAsync::syncStart([this, lastReplayedRevision, topRevision]() { mReplayInProgress = true; mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { @@ -157,6 +159,7 @@ KAsync::Job ChangeReplay::replayNextRevision() emit changesReplayed(); } else { QTimer::singleShot(0, [this]() { + mReplayInProgress = false; replayNextRevision().exec(); }); } @@ -166,7 +169,6 @@ KAsync::Job ChangeReplay::replayNextRevision() void ChangeReplay::revisionChanged() { if (!mReplayInProgress) { - emit replayingChanges(); replayNextRevision().exec(); } } diff --git a/common/changereplay.h b/common/changereplay.h index e86c4f2..4894806 100644 --- a/common/changereplay.h +++ b/common/changereplay.h @@ -49,16 +49,16 @@ signals: void replayingChanges(); public slots: - void revisionChanged(); + virtual void revisionChanged(); protected: virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; Sink::Storage::DataStore mStorage; + KAsync::Job replayNextRevision(); private: void recordReplayedRevision(qint64 revision); - KAsync::Job replayNextRevision(); Sink::Storage::DataStore mChangeReplayStore; bool mReplayInProgress; Sink::Storage::DataStore::Transaction mMainStoreTransaction; diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 746fa33..e36b750 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -290,19 +290,6 @@ KAsync::Job GenericResource::inspect( return KAsync::null(); } -void GenericResource::enableChangeReplay(bool enable) -{ - Q_ASSERT(mSynchronizer); - if (enable) { - QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); - QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); - QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); - } else { - QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged); - QObject::disconnect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); - } -} - void GenericResource::setupPreprocessors(const QByteArray &type, const QVector &preprocessors) { mPipeline->setPreprocessors(type, preprocessors); @@ -338,7 +325,9 @@ void GenericResource::setupSynchronizer(const QSharedPointer &sync } mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); - enableChangeReplay(true); + QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); + QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); } void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) @@ -406,11 +395,8 @@ KAsync::Job GenericResource::synchronizeWithSource(const Sink::QueryBase & emit notify(n); SinkLog() << " Synchronizing"; - // Changereplay would deadlock otherwise when trying to open the synchronization store - enableChangeReplay(false); return mSynchronizer->synchronize(query) .then([this](const KAsync::Error &error) { - enableChangeReplay(true); if (!error) { SinkLog() << "Done Synchronizing"; Sink::Notification n; diff --git a/common/genericresource.h b/common/genericresource.h index 7e0f5ad..3f92e93 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -62,8 +62,6 @@ private slots: void updateLowerBoundRevision(); protected: - void enableChangeReplay(bool); - void setupPreprocessors(const QByteArray &type, const QVector &preprocessors); void setupSynchronizer(const QSharedPointer &synchronizer); diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 10acefc..8010689 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -274,10 +274,14 @@ KAsync::Job Synchronizer::processSyncQueue() auto job = KAsync::null(); while (!mSyncRequestQueue.isEmpty()) { auto request = mSyncRequestQueue.takeFirst(); - job = job.then(synchronizeWithSource(request.query)).syncThen([this] { - //Commit after every request, so implementations only have to commit more if they add a lot of data. - commit(); - }); + if (request.requestType == Synchronizer::SyncRequest::Synchronization) { + job = job.then(synchronizeWithSource(request.query)).syncThen([this] { + //Commit after every request, so implementations only have to commit more if they add a lot of data. + commit(); + }); + } else { + job = replayNextRevision(); + } } return job.then([this](const KAsync::Error &error) { mSyncStore.clear(); @@ -311,6 +315,12 @@ Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction( return mSyncTransaction; } +void Synchronizer::revisionChanged() +{ + mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay}; + processSyncQueue().exec(); +} + bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) { Sink::EntityBuffer buffer(value); @@ -334,7 +344,6 @@ KAsync::Job Synchronizer::replay(const QByteArray &type, const QByteArray Q_ASSERT(!mSyncStore); Q_ASSERT(!mSyncTransaction); mEntityStore->startTransaction(Storage::DataStore::ReadOnly); - mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::ReadWrite); const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; const auto uid = Sink::Storage::DataStore::uidFromKey(key); @@ -345,6 +354,9 @@ KAsync::Job Synchronizer::replay(const QByteArray &type, const QByteArray oldRemoteId = syncStore().resolveLocalId(type, uid); if (oldRemoteId.isEmpty()) { SinkWarning() << "Couldn't find the remote id for: " << type << uid; + mSyncStore.clear(); + mSyncTransaction.abort(); + mEntityStore->abortTransaction(); return KAsync::error(1, "Couldn't find the remote id."); } } diff --git a/common/synchronizer.h b/common/synchronizer.h index 0a51f54..9a71869 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -55,6 +55,9 @@ public: void commit(); Sink::Storage::DataStore::Transaction &syncTransaction(); +public slots: + virtual void revisionChanged() Q_DECL_OVERRIDE; + protected: ///Base implementation calls the replay$Type calls virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; @@ -108,13 +111,26 @@ protected: virtual KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) = 0; struct SyncRequest { + enum RequestType { + Synchronization, + ChangeReplay + }; + SyncRequest(const Sink::QueryBase &q) : flushQueue(false), + requestType(Synchronization), query(q) { } + SyncRequest(RequestType type) + : flushQueue(false), + requestType(type) + { + } + bool flushQueue; + RequestType requestType; Sink::QueryBase query; }; -- cgit v1.2.3