From 7fdcc36a1a352bb869020ade8a8aa697c3e8b80c Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 28 Nov 2016 19:43:23 +0100 Subject: Moved the flush command to the command processor. --- common/genericresource.cpp | 22 +--------------------- 1 file changed, 1 insertion(+), 21 deletions(-) (limited to 'common/genericresource.cpp') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 80e59c9..38da6bf 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -59,26 +59,6 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q mClientLowerBoundRevision(std::numeric_limits::max()) { mProcessor = std::unique_ptr(new CommandProcessor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue)); - mProcessor->setFlushCommand([this](void const *command, size_t size) { - flatbuffers::Verifier verifier((const uint8_t *)command, size); - if (Sink::Commands::VerifyFlushBuffer(verifier)) { - auto buffer = Sink::Commands::GetFlush(command); - const auto flushType = buffer->type(); - const auto flushId = BufferUtils::extractBuffer(buffer->id()); - if (flushType == Sink::Flush::FlushReplayQueue) { - SinkTrace() << "Flushing synchronizer "; - mSynchronizer->flush(flushType, flushId); - } else { - SinkTrace() << "Emitting flush completion" << flushId; - Sink::Notification n; - n.type = Sink::Notification::FlushCompletion; - n.id = flushId; - emit notify(n); - } - return KAsync::null(); - } - return KAsync::error(-1, "Invalid flush command."); - }); QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); @@ -126,7 +106,7 @@ void GenericResource::setupSynchronizer(const QSharedPointer &sync Q_ASSERT(ret); } - mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); + mProcessor->setSynchronizer(synchronizer); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); -- cgit v1.2.3