summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp29
1 files changed, 12 insertions, 17 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index c4c8bc6..746fa33 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -292,14 +292,14 @@ KAsync::Job<void> GenericResource::inspect(
292 292
293void GenericResource::enableChangeReplay(bool enable) 293void GenericResource::enableChangeReplay(bool enable)
294{ 294{
295 Q_ASSERT(mChangeReplay); 295 Q_ASSERT(mSynchronizer);
296 if (enable) { 296 if (enable) {
297 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); 297 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
298 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 298 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
299 QMetaObject::invokeMethod(mChangeReplay.data(), "revisionChanged", Qt::QueuedConnection); 299 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection);
300 } else { 300 } else {
301 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged); 301 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged);
302 QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 302 QObject::disconnect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
303 } 303 }
304} 304}
305 305
@@ -314,13 +314,8 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync
314 mSynchronizer->setup([this](int commandId, const QByteArray &data) { 314 mSynchronizer->setup([this](int commandId, const QByteArray &data) {
315 enqueueCommand(mSynchronizerQueue, commandId, data); 315 enqueueCommand(mSynchronizerQueue, commandId, data);
316 }, mSynchronizerQueue); 316 }, mSynchronizerQueue);
317}
318
319void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay)
320{
321 mChangeReplay = changeReplay;
322 { 317 {
323 auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::replayingChanges, [this]() { 318 auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() {
324 Sink::Notification n; 319 Sink::Notification n;
325 n.id = "changereplay"; 320 n.id = "changereplay";
326 n.type = Sink::Notification::Status; 321 n.type = Sink::Notification::Status;
@@ -331,7 +326,7 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan
331 Q_ASSERT(ret); 326 Q_ASSERT(ret);
332 } 327 }
333 { 328 {
334 auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, [this]() { 329 auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() {
335 Sink::Notification n; 330 Sink::Notification n;
336 n.id = "changereplay"; 331 n.id = "changereplay";
337 n.type = Sink::Notification::Status; 332 n.type = Sink::Notification::Status;
@@ -342,7 +337,7 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan
342 Q_ASSERT(ret); 337 Q_ASSERT(ret);
343 } 338 }
344 339
345 mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); 340 mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision());
346 enableChangeReplay(true); 341 enableChangeReplay(true);
347} 342}
348 343
@@ -459,11 +454,11 @@ KAsync::Job<void> GenericResource::processAllMessages()
459 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) 454 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); })
460 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) 455 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); })
461 .then<void>([this](KAsync::Future<void> &f) { 456 .then<void>([this](KAsync::Future<void> &f) {
462 if (mChangeReplay->allChangesReplayed()) { 457 if (mSynchronizer->allChangesReplayed()) {
463 f.setFinished(); 458 f.setFinished();
464 } else { 459 } else {
465 auto context = new QObject; 460 auto context = new QObject;
466 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { 461 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() {
467 delete context; 462 delete context;
468 f.setFinished(); 463 f.setFinished();
469 }); 464 });
@@ -473,7 +468,7 @@ KAsync::Job<void> GenericResource::processAllMessages()
473 468
474void GenericResource::updateLowerBoundRevision() 469void GenericResource::updateLowerBoundRevision()
475{ 470{
476 mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision())); 471 mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSynchronizer->getLastReplayedRevision()));
477} 472}
478 473
479void GenericResource::setLowerBoundRevision(qint64 revision) 474void GenericResource::setLowerBoundRevision(qint64 revision)