diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-22 14:27:37 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-22 14:27:37 +0100 |
commit | c799105ca6b4107fa19de9845247f1f3322ea6ef (patch) | |
tree | 9a9da831861b5094936f8b62431fb5e20ce759f5 | |
parent | c6af4d6ed59235d8fec52aeddfcd9a65607139ff (diff) | |
download | sink-c799105ca6b4107fa19de9845247f1f3322ea6ef.tar.gz sink-c799105ca6b4107fa19de9845247f1f3322ea6ef.zip |
Ensure we always process the full queue and that flushing works.
-rw-r--r-- | common/changereplay.cpp | 7 | ||||
-rw-r--r-- | common/changereplay.h | 2 | ||||
-rw-r--r-- | common/synchronizer.cpp | 15 | ||||
-rw-r--r-- | common/synchronizer.h | 2 | ||||
-rw-r--r-- | tests/pipelinetest.cpp | 4 |
5 files changed, 22 insertions, 8 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index a0796eb..13ed49e 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -154,9 +154,12 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
154 | .syncThen<void>([this, lastReplayedRevision]() { | 154 | .syncThen<void>([this, lastReplayedRevision]() { |
155 | recordReplayedRevision(*lastReplayedRevision); | 155 | recordReplayedRevision(*lastReplayedRevision); |
156 | mMainStoreTransaction.abort(); | 156 | mMainStoreTransaction.abort(); |
157 | if (allChangesReplayed()) { | 157 | if (ChangeReplay::allChangesReplayed()) { |
158 | mReplayInProgress = false; | 158 | mReplayInProgress = false; |
159 | emit changesReplayed(); | 159 | //In case we have a derived implementation |
160 | if (allChangesReplayed()) { | ||
161 | emit changesReplayed(); | ||
162 | } | ||
160 | } else { | 163 | } else { |
161 | QTimer::singleShot(0, [this]() { | 164 | QTimer::singleShot(0, [this]() { |
162 | mReplayInProgress = false; | 165 | mReplayInProgress = false; |
diff --git a/common/changereplay.h b/common/changereplay.h index 4894806..da188bf 100644 --- a/common/changereplay.h +++ b/common/changereplay.h | |||
@@ -42,7 +42,7 @@ public: | |||
42 | ChangeReplay(const ResourceContext &resourceContext); | 42 | ChangeReplay(const ResourceContext &resourceContext); |
43 | 43 | ||
44 | qint64 getLastReplayedRevision(); | 44 | qint64 getLastReplayedRevision(); |
45 | bool allChangesReplayed(); | 45 | virtual bool allChangesReplayed(); |
46 | 46 | ||
47 | signals: | 47 | signals: |
48 | void changesReplayed(); | 48 | void changesReplayed(); |
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 8010689..5bde597 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -287,11 +287,15 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
287 | mSyncStore.clear(); | 287 | mSyncStore.clear(); |
288 | mMessageQueue->commit(); | 288 | mMessageQueue->commit(); |
289 | mSyncInProgress = false; | 289 | mSyncInProgress = false; |
290 | if (allChangesReplayed()) { | ||
291 | emit changesReplayed(); | ||
292 | } | ||
290 | if (error) { | 293 | if (error) { |
291 | SinkWarning() << "Error during sync: " << error.errorMessage; | 294 | SinkWarning() << "Error during sync: " << error.errorMessage; |
292 | return KAsync::error(error); | 295 | return KAsync::error(error); |
293 | } | 296 | } |
294 | return KAsync::null<void>(); | 297 | //In case we got more requests meanwhile. |
298 | return processSyncQueue(); | ||
295 | }); | 299 | }); |
296 | } | 300 | } |
297 | 301 | ||
@@ -414,6 +418,15 @@ KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Folder &, | |||
414 | return KAsync::null<QByteArray>(); | 418 | return KAsync::null<QByteArray>(); |
415 | } | 419 | } |
416 | 420 | ||
421 | bool Synchronizer::allChangesReplayed() | ||
422 | { | ||
423 | if (!mSyncRequestQueue.isEmpty()) { | ||
424 | SinkTrace() << "Queue is not empty"; | ||
425 | return false; | ||
426 | } | ||
427 | return ChangeReplay::allChangesReplayed(); | ||
428 | } | ||
429 | |||
417 | #define REGISTER_TYPE(T) \ | 430 | #define REGISTER_TYPE(T) \ |
418 | template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria); \ | 431 | template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria); \ |
419 | template void Synchronizer::modify(const T &entity); | 432 | template void Synchronizer::modify(const T &entity); |
diff --git a/common/synchronizer.h b/common/synchronizer.h index 9a71869..4d5bdd5 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h | |||
@@ -55,6 +55,8 @@ public: | |||
55 | void commit(); | 55 | void commit(); |
56 | Sink::Storage::DataStore::Transaction &syncTransaction(); | 56 | Sink::Storage::DataStore::Transaction &syncTransaction(); |
57 | 57 | ||
58 | bool allChangesReplayed() Q_DECL_OVERRIDE; | ||
59 | |||
58 | public slots: | 60 | public slots: |
59 | virtual void revisionChanged() Q_DECL_OVERRIDE; | 61 | virtual void revisionChanged() Q_DECL_OVERRIDE; |
60 | 62 | ||
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 5c294bb..e68aa53 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp | |||
@@ -271,9 +271,7 @@ private slots: | |||
271 | QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); | 271 | QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); |
272 | 272 | ||
273 | // Cleanup old revisions | 273 | // Cleanup old revisions |
274 | pipeline.startTransaction(); | ||
275 | pipeline.cleanupRevisions(2); | 274 | pipeline.cleanupRevisions(2); |
276 | pipeline.commit(); | ||
277 | 275 | ||
278 | // And now only the latest revision is left | 276 | // And now only the latest revision is left |
279 | QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 1); | 277 | QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 1); |
@@ -349,9 +347,7 @@ private slots: | |||
349 | QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); | 347 | QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); |
350 | 348 | ||
351 | // Cleanup old revisions | 349 | // Cleanup old revisions |
352 | pipeline.startTransaction(); | ||
353 | pipeline.cleanupRevisions(2); | 350 | pipeline.cleanupRevisions(2); |
354 | pipeline.commit(); | ||
355 | 351 | ||
356 | // And all revisions are gone | 352 | // And all revisions are gone |
357 | QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 0); | 353 | QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 0); |