diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-21 23:13:38 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-21 23:51:24 +0100 |
commit | 1d713d9e2dbaf27de9da087f9270d260dfc40c31 (patch) | |
tree | 666d8edd42e44df3eaa674a35b6e938b99c2f4b4 /common/genericresource.cpp | |
parent | 0adba61a00491b96dadaa6d4719cb46831356222 (diff) | |
download | sink-1d713d9e2dbaf27de9da087f9270d260dfc40c31.tar.gz sink-1d713d9e2dbaf27de9da087f9270d260dfc40c31.zip |
Folded the SourceWriteback into the Synchronizer.
By concentrating all communication to the source in one place we get rid
of several oddities.
* Quite a bit of duplication since both need access to the
synchronizationStore and the source.
* We currently have an akward locking in place because both classes
access the ync store. This is not easier to resolve cleanly.
* The live of resource implementers becomes easier.
* An implementation could elect to not use changereplay and always do a
full sync... (maybe?)
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 29 |
1 files changed, 12 insertions, 17 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index c4c8bc6..746fa33 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -292,14 +292,14 @@ KAsync::Job<void> GenericResource::inspect( | |||
292 | 292 | ||
293 | void GenericResource::enableChangeReplay(bool enable) | 293 | void GenericResource::enableChangeReplay(bool enable) |
294 | { | 294 | { |
295 | Q_ASSERT(mChangeReplay); | 295 | Q_ASSERT(mSynchronizer); |
296 | if (enable) { | 296 | if (enable) { |
297 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); | 297 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); |
298 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 298 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
299 | QMetaObject::invokeMethod(mChangeReplay.data(), "revisionChanged", Qt::QueuedConnection); | 299 | QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); |
300 | } else { | 300 | } else { |
301 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged); | 301 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged); |
302 | QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 302 | QObject::disconnect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
303 | } | 303 | } |
304 | } | 304 | } |
305 | 305 | ||
@@ -314,13 +314,8 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync | |||
314 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | 314 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { |
315 | enqueueCommand(mSynchronizerQueue, commandId, data); | 315 | enqueueCommand(mSynchronizerQueue, commandId, data); |
316 | }, mSynchronizerQueue); | 316 | }, mSynchronizerQueue); |
317 | } | ||
318 | |||
319 | void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay) | ||
320 | { | ||
321 | mChangeReplay = changeReplay; | ||
322 | { | 317 | { |
323 | auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::replayingChanges, [this]() { | 318 | auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { |
324 | Sink::Notification n; | 319 | Sink::Notification n; |
325 | n.id = "changereplay"; | 320 | n.id = "changereplay"; |
326 | n.type = Sink::Notification::Status; | 321 | n.type = Sink::Notification::Status; |
@@ -331,7 +326,7 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan | |||
331 | Q_ASSERT(ret); | 326 | Q_ASSERT(ret); |
332 | } | 327 | } |
333 | { | 328 | { |
334 | auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, [this]() { | 329 | auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { |
335 | Sink::Notification n; | 330 | Sink::Notification n; |
336 | n.id = "changereplay"; | 331 | n.id = "changereplay"; |
337 | n.type = Sink::Notification::Status; | 332 | n.type = Sink::Notification::Status; |
@@ -342,7 +337,7 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan | |||
342 | Q_ASSERT(ret); | 337 | Q_ASSERT(ret); |
343 | } | 338 | } |
344 | 339 | ||
345 | mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); | 340 | mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); |
346 | enableChangeReplay(true); | 341 | enableChangeReplay(true); |
347 | } | 342 | } |
348 | 343 | ||
@@ -459,11 +454,11 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
459 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) | 454 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) |
460 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) | 455 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) |
461 | .then<void>([this](KAsync::Future<void> &f) { | 456 | .then<void>([this](KAsync::Future<void> &f) { |
462 | if (mChangeReplay->allChangesReplayed()) { | 457 | if (mSynchronizer->allChangesReplayed()) { |
463 | f.setFinished(); | 458 | f.setFinished(); |
464 | } else { | 459 | } else { |
465 | auto context = new QObject; | 460 | auto context = new QObject; |
466 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { | 461 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { |
467 | delete context; | 462 | delete context; |
468 | f.setFinished(); | 463 | f.setFinished(); |
469 | }); | 464 | }); |
@@ -473,7 +468,7 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
473 | 468 | ||
474 | void GenericResource::updateLowerBoundRevision() | 469 | void GenericResource::updateLowerBoundRevision() |
475 | { | 470 | { |
476 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision())); | 471 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSynchronizer->getLastReplayedRevision())); |
477 | } | 472 | } |
478 | 473 | ||
479 | void GenericResource::setLowerBoundRevision(qint64 revision) | 474 | void GenericResource::setLowerBoundRevision(qint64 revision) |