summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp36
1 files changed, 33 insertions, 3 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 5e6764a..afe3900 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -46,6 +46,13 @@ public:
46 return lastReplayedRevision; 46 return lastReplayedRevision;
47 } 47 }
48 48
49 bool allChangesReplayed()
50 {
51 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly));
52 const qint64 lastReplayedRevision = getLastReplayedRevision();
53 return (lastReplayedRevision >= topRevision);
54 }
55
49Q_SIGNALS: 56Q_SIGNALS:
50 void changesReplayed(); 57 void changesReplayed();
51 58
@@ -62,7 +69,8 @@ public Q_SLOTS:
62 }); 69 });
63 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); 70 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
64 71
65 if (lastReplayedRevision < topRevision) { 72 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
73 if (lastReplayedRevision <= topRevision) {
66 qint64 revision = lastReplayedRevision; 74 qint64 revision = lastReplayedRevision;
67 for (;revision <= topRevision; revision++) { 75 for (;revision <= topRevision; revision++) {
68 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); 76 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
@@ -82,6 +90,7 @@ public Q_SLOTS:
82 replayStoreTransaction.commit(); 90 replayStoreTransaction.commit();
83 Trace() << "Replayed until " << revision; 91 Trace() << "Replayed until " << revision;
84 } 92 }
93 emit changesReplayed();
85 } 94 }
86 95
87private: 96private:
@@ -269,8 +278,7 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
269 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { 278 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) {
270 return this->replay(type, key, value); 279 return this->replay(type, key, value);
271 }); 280 });
272 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); 281 enableChangeReplay(true);
273 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
274 mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); 282 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
275 mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); 283 mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision());
276 284
@@ -285,6 +293,18 @@ GenericResource::~GenericResource()
285 delete mSourceChangeReplay; 293 delete mSourceChangeReplay;
286} 294}
287 295
296void GenericResource::enableChangeReplay(bool enable)
297{
298 if (enable) {
299 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged);
300 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
301 mSourceChangeReplay->revisionChanged();
302 } else {
303 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged);
304 QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
305 }
306}
307
288void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors) 308void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors)
289{ 309{
290 mPipeline->setPreprocessors(type, preprocessors); 310 mPipeline->setPreprocessors(type, preprocessors);
@@ -380,6 +400,16 @@ KAsync::Job<void> GenericResource::processAllMessages()
380 waitForDrained(f, mSynchronizerQueue); 400 waitForDrained(f, mSynchronizerQueue);
381 }).then<void>([this](KAsync::Future<void> &f) { 401 }).then<void>([this](KAsync::Future<void> &f) {
382 waitForDrained(f, mUserQueue); 402 waitForDrained(f, mUserQueue);
403 }).then<void>([this](KAsync::Future<void> &f) {
404 if (mSourceChangeReplay->allChangesReplayed()) {
405 f.setFinished();
406 } else {
407 auto context = new QObject;
408 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() {
409 delete context;
410 f.setFinished();
411 });
412 }
383 }); 413 });
384} 414}
385 415