summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-22 14:27:37 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-22 14:27:37 +0100
commitc799105ca6b4107fa19de9845247f1f3322ea6ef (patch)
tree9a9da831861b5094936f8b62431fb5e20ce759f5
parentc6af4d6ed59235d8fec52aeddfcd9a65607139ff (diff)
downloadsink-c799105ca6b4107fa19de9845247f1f3322ea6ef.tar.gz
sink-c799105ca6b4107fa19de9845247f1f3322ea6ef.zip
Ensure we always process the full queue and that flushing works.
-rw-r--r--common/changereplay.cpp7
-rw-r--r--common/changereplay.h2
-rw-r--r--common/synchronizer.cpp15
-rw-r--r--common/synchronizer.h2
-rw-r--r--tests/pipelinetest.cpp4
5 files changed, 22 insertions, 8 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index a0796eb..13ed49e 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -154,9 +154,12 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
154 .syncThen<void>([this, lastReplayedRevision]() { 154 .syncThen<void>([this, lastReplayedRevision]() {
155 recordReplayedRevision(*lastReplayedRevision); 155 recordReplayedRevision(*lastReplayedRevision);
156 mMainStoreTransaction.abort(); 156 mMainStoreTransaction.abort();
157 if (allChangesReplayed()) { 157 if (ChangeReplay::allChangesReplayed()) {
158 mReplayInProgress = false; 158 mReplayInProgress = false;
159 emit changesReplayed(); 159 //In case we have a derived implementation
160 if (allChangesReplayed()) {
161 emit changesReplayed();
162 }
160 } else { 163 } else {
161 QTimer::singleShot(0, [this]() { 164 QTimer::singleShot(0, [this]() {
162 mReplayInProgress = false; 165 mReplayInProgress = false;
diff --git a/common/changereplay.h b/common/changereplay.h
index 4894806..da188bf 100644
--- a/common/changereplay.h
+++ b/common/changereplay.h
@@ -42,7 +42,7 @@ public:
42 ChangeReplay(const ResourceContext &resourceContext); 42 ChangeReplay(const ResourceContext &resourceContext);
43 43
44 qint64 getLastReplayedRevision(); 44 qint64 getLastReplayedRevision();
45 bool allChangesReplayed(); 45 virtual bool allChangesReplayed();
46 46
47signals: 47signals:
48 void changesReplayed(); 48 void changesReplayed();
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 8010689..5bde597 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -287,11 +287,15 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
287 mSyncStore.clear(); 287 mSyncStore.clear();
288 mMessageQueue->commit(); 288 mMessageQueue->commit();
289 mSyncInProgress = false; 289 mSyncInProgress = false;
290 if (allChangesReplayed()) {
291 emit changesReplayed();
292 }
290 if (error) { 293 if (error) {
291 SinkWarning() << "Error during sync: " << error.errorMessage; 294 SinkWarning() << "Error during sync: " << error.errorMessage;
292 return KAsync::error(error); 295 return KAsync::error(error);
293 } 296 }
294 return KAsync::null<void>(); 297 //In case we got more requests meanwhile.
298 return processSyncQueue();
295 }); 299 });
296} 300}
297 301
@@ -414,6 +418,15 @@ KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Folder &,
414 return KAsync::null<QByteArray>(); 418 return KAsync::null<QByteArray>();
415} 419}
416 420
421bool Synchronizer::allChangesReplayed()
422{
423 if (!mSyncRequestQueue.isEmpty()) {
424 SinkTrace() << "Queue is not empty";
425 return false;
426 }
427 return ChangeReplay::allChangesReplayed();
428}
429
417#define REGISTER_TYPE(T) \ 430#define REGISTER_TYPE(T) \
418 template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria); \ 431 template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria); \
419 template void Synchronizer::modify(const T &entity); 432 template void Synchronizer::modify(const T &entity);
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 9a71869..4d5bdd5 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -55,6 +55,8 @@ public:
55 void commit(); 55 void commit();
56 Sink::Storage::DataStore::Transaction &syncTransaction(); 56 Sink::Storage::DataStore::Transaction &syncTransaction();
57 57
58 bool allChangesReplayed() Q_DECL_OVERRIDE;
59
58public slots: 60public slots:
59 virtual void revisionChanged() Q_DECL_OVERRIDE; 61 virtual void revisionChanged() Q_DECL_OVERRIDE;
60 62
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp
index 5c294bb..e68aa53 100644
--- a/tests/pipelinetest.cpp
+++ b/tests/pipelinetest.cpp
@@ -271,9 +271,7 @@ private slots:
271 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); 271 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2);
272 272
273 // Cleanup old revisions 273 // Cleanup old revisions
274 pipeline.startTransaction();
275 pipeline.cleanupRevisions(2); 274 pipeline.cleanupRevisions(2);
276 pipeline.commit();
277 275
278 // And now only the latest revision is left 276 // And now only the latest revision is left
279 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 1); 277 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 1);
@@ -349,9 +347,7 @@ private slots:
349 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2); 347 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2);
350 348
351 // Cleanup old revisions 349 // Cleanup old revisions
352 pipeline.startTransaction();
353 pipeline.cleanupRevisions(2); 350 pipeline.cleanupRevisions(2);
354 pipeline.commit();
355 351
356 // And all revisions are gone 352 // And all revisions are gone
357 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 0); 353 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 0);