From c799105ca6b4107fa19de9845247f1f3322ea6ef Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 22 Nov 2016 14:27:37 +0100 Subject: Ensure we always process the full queue and that flushing works. --- common/changereplay.cpp | 7 +++++-- common/changereplay.h | 2 +- common/synchronizer.cpp | 15 ++++++++++++++- common/synchronizer.h | 2 ++ tests/pipelinetest.cpp | 4 ---- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/common/changereplay.cpp b/common/changereplay.cpp index a0796eb..13ed49e 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp @@ -154,9 +154,12 @@ KAsync::Job ChangeReplay::replayNextRevision() .syncThen([this, lastReplayedRevision]() { recordReplayedRevision(*lastReplayedRevision); mMainStoreTransaction.abort(); - if (allChangesReplayed()) { + if (ChangeReplay::allChangesReplayed()) { mReplayInProgress = false; - emit changesReplayed(); + //In case we have a derived implementation + if (allChangesReplayed()) { + emit changesReplayed(); + } } else { QTimer::singleShot(0, [this]() { mReplayInProgress = false; diff --git a/common/changereplay.h b/common/changereplay.h index 4894806..da188bf 100644 --- a/common/changereplay.h +++ b/common/changereplay.h @@ -42,7 +42,7 @@ public: ChangeReplay(const ResourceContext &resourceContext); qint64 getLastReplayedRevision(); - bool allChangesReplayed(); + virtual bool allChangesReplayed(); signals: void changesReplayed(); diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 8010689..5bde597 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -287,11 +287,15 @@ KAsync::Job Synchronizer::processSyncQueue() mSyncStore.clear(); mMessageQueue->commit(); mSyncInProgress = false; + if (allChangesReplayed()) { + emit changesReplayed(); + } if (error) { SinkWarning() << "Error during sync: " << error.errorMessage; return KAsync::error(error); } - return KAsync::null(); + //In case we got more requests meanwhile. + return processSyncQueue(); }); } @@ -414,6 +418,15 @@ KAsync::Job Synchronizer::replay(const ApplicationDomain::Folder &, return KAsync::null(); } +bool Synchronizer::allChangesReplayed() +{ + if (!mSyncRequestQueue.isEmpty()) { + SinkTrace() << "Queue is not empty"; + return false; + } + return ChangeReplay::allChangesReplayed(); +} + #define REGISTER_TYPE(T) \ template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash &mergeCriteria); \ template void Synchronizer::modify(const T &entity); diff --git a/common/synchronizer.h b/common/synchronizer.h index 9a71869..4d5bdd5 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -55,6 +55,8 @@ public: void commit(); Sink::Storage::DataStore::Transaction &syncTransaction(); + bool allChangesReplayed() Q_DECL_OVERRIDE; + public slots: virtual void revisionChanged() Q_DECL_OVERRIDE; diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 5c294bb..e68aa53 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp @@ -271,9 +271,7 @@ private slots: QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); // Cleanup old revisions - pipeline.startTransaction(); pipeline.cleanupRevisions(2); - pipeline.commit(); // And now only the latest revision is left QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 1); @@ -349,9 +347,7 @@ private slots: QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); // Cleanup old revisions - pipeline.startTransaction(); pipeline.cleanupRevisions(2); - pipeline.commit(); // And all revisions are gone QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 0); -- cgit v1.2.3