diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-12-28 10:19:27 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-12-28 10:20:53 +0100 |
commit | e05306d22bc994bcfae869dcd857ec76027495d1 (patch) | |
tree | b5b7db5108947a210d893c76b5e75cfadca32f50 /common/genericresource.cpp | |
parent | ab16e5aa9e9d83b6aa9189a67d97a4d089952902 (diff) | |
download | sink-e05306d22bc994bcfae869dcd857ec76027495d1.tar.gz sink-e05306d22bc994bcfae869dcd857ec76027495d1.zip |
Changereplay for maildir folders.
The sync and changereplay can not run at the same time, or would have
to share the transaction otherwise.
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 36 |
1 files changed, 33 insertions, 3 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 5e6764a..afe3900 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -46,6 +46,13 @@ public: | |||
46 | return lastReplayedRevision; | 46 | return lastReplayedRevision; |
47 | } | 47 | } |
48 | 48 | ||
49 | bool allChangesReplayed() | ||
50 | { | ||
51 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); | ||
52 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | ||
53 | return (lastReplayedRevision >= topRevision); | ||
54 | } | ||
55 | |||
49 | Q_SIGNALS: | 56 | Q_SIGNALS: |
50 | void changesReplayed(); | 57 | void changesReplayed(); |
51 | 58 | ||
@@ -62,7 +69,8 @@ public Q_SLOTS: | |||
62 | }); | 69 | }); |
63 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | 70 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); |
64 | 71 | ||
65 | if (lastReplayedRevision < topRevision) { | 72 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; |
73 | if (lastReplayedRevision <= topRevision) { | ||
66 | qint64 revision = lastReplayedRevision; | 74 | qint64 revision = lastReplayedRevision; |
67 | for (;revision <= topRevision; revision++) { | 75 | for (;revision <= topRevision; revision++) { |
68 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | 76 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); |
@@ -82,6 +90,7 @@ public Q_SLOTS: | |||
82 | replayStoreTransaction.commit(); | 90 | replayStoreTransaction.commit(); |
83 | Trace() << "Replayed until " << revision; | 91 | Trace() << "Replayed until " << revision; |
84 | } | 92 | } |
93 | emit changesReplayed(); | ||
85 | } | 94 | } |
86 | 95 | ||
87 | private: | 96 | private: |
@@ -269,8 +278,7 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
269 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { | 278 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { |
270 | return this->replay(type, key, value); | 279 | return this->replay(type, key, value); |
271 | }); | 280 | }); |
272 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); | 281 | enableChangeReplay(true); |
273 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | ||
274 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | 282 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); |
275 | mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); | 283 | mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); |
276 | 284 | ||
@@ -285,6 +293,18 @@ GenericResource::~GenericResource() | |||
285 | delete mSourceChangeReplay; | 293 | delete mSourceChangeReplay; |
286 | } | 294 | } |
287 | 295 | ||
296 | void GenericResource::enableChangeReplay(bool enable) | ||
297 | { | ||
298 | if (enable) { | ||
299 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); | ||
300 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | ||
301 | mSourceChangeReplay->revisionChanged(); | ||
302 | } else { | ||
303 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); | ||
304 | QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | ||
305 | } | ||
306 | } | ||
307 | |||
288 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors) | 308 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors) |
289 | { | 309 | { |
290 | mPipeline->setPreprocessors(type, preprocessors); | 310 | mPipeline->setPreprocessors(type, preprocessors); |
@@ -380,6 +400,16 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
380 | waitForDrained(f, mSynchronizerQueue); | 400 | waitForDrained(f, mSynchronizerQueue); |
381 | }).then<void>([this](KAsync::Future<void> &f) { | 401 | }).then<void>([this](KAsync::Future<void> &f) { |
382 | waitForDrained(f, mUserQueue); | 402 | waitForDrained(f, mUserQueue); |
403 | }).then<void>([this](KAsync::Future<void> &f) { | ||
404 | if (mSourceChangeReplay->allChangesReplayed()) { | ||
405 | f.setFinished(); | ||
406 | } else { | ||
407 | auto context = new QObject; | ||
408 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { | ||
409 | delete context; | ||
410 | f.setFinished(); | ||
411 | }); | ||
412 | } | ||
383 | }); | 413 | }); |
384 | } | 414 | } |
385 | 415 | ||