summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-22 10:26:11 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-22 10:26:11 +0100
commitb6981d94fb5fb12024738b639f9e389dd04578da (patch)
treef13e16f84ba9cf33782af7a0b0dbd82c5d65afd4 /common/genericresource.cpp
parent1d713d9e2dbaf27de9da087f9270d260dfc40c31 (diff)
downloadsink-b6981d94fb5fb12024738b639f9e389dd04578da.tar.gz
sink-b6981d94fb5fb12024738b639f9e389dd04578da.zip
Process change replays as part of the synchronization queue
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp20
1 files changed, 3 insertions, 17 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 746fa33..e36b750 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -290,19 +290,6 @@ KAsync::Job<void> GenericResource::inspect(
290 return KAsync::null<void>(); 290 return KAsync::null<void>();
291} 291}
292 292
293void GenericResource::enableChangeReplay(bool enable)
294{
295 Q_ASSERT(mSynchronizer);
296 if (enable) {
297 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
298 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
299 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection);
300 } else {
301 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged);
302 QObject::disconnect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
303 }
304}
305
306void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) 293void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors)
307{ 294{
308 mPipeline->setPreprocessors(type, preprocessors); 295 mPipeline->setPreprocessors(type, preprocessors);
@@ -338,7 +325,9 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync
338 } 325 }
339 326
340 mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); 327 mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision());
341 enableChangeReplay(true); 328 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
329 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
330 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection);
342} 331}
343 332
344void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) 333void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
@@ -406,11 +395,8 @@ KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &
406 emit notify(n); 395 emit notify(n);
407 396
408 SinkLog() << " Synchronizing"; 397 SinkLog() << " Synchronizing";
409 // Changereplay would deadlock otherwise when trying to open the synchronization store
410 enableChangeReplay(false);
411 return mSynchronizer->synchronize(query) 398 return mSynchronizer->synchronize(query)
412 .then<void>([this](const KAsync::Error &error) { 399 .then<void>([this](const KAsync::Error &error) {
413 enableChangeReplay(true);
414 if (!error) { 400 if (!error) {
415 SinkLog() << "Done Synchronizing"; 401 SinkLog() << "Done Synchronizing";
416 Sink::Notification n; 402 Sink::Notification n;