From 7fdcc36a1a352bb869020ade8a8aa697c3e8b80c Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 28 Nov 2016 19:43:23 +0100 Subject: Moved the flush command to the command processor. --- common/commandprocessor.cpp | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) (limited to 'common/commandprocessor.cpp') diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index 4ff352b..57fe524 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -22,9 +22,11 @@ #include "commands.h" #include "messagequeue.h" #include "queuedcommand_generated.h" +#include "flush_generated.h" #include "inspector.h" #include "synchronizer.h" #include "pipeline.h" +#include "bufferutils.h" static int sBatchSize = 100; @@ -43,11 +45,6 @@ void CommandProcessor::setOldestUsedRevision(qint64 revision) mLowerBoundRevision = revision; } -void CommandProcessor::setFlushCommand(const FlushFunction &f) -{ - mFlush = f; -} - bool CommandProcessor::messagesToProcessAvailable() { for (auto queue : mCommandQueues) { @@ -91,12 +88,8 @@ KAsync::Job CommandProcessor::processQueuedCommand(const Sink::QueuedCom return mInspector->processCommand(data, size) .syncThen([]() { return -1; }); case Sink::Commands::FlushCommand: - if (mFlush) { - return mFlush(data, size) - .syncThen([]() { return -1; }); - } else { - return KAsync::error(-1, "Missing inspection command."); - } + return flush(data, size) + .syncThen([]() { return -1; }); default: return KAsync::error(-1, "Unhandled command"); } @@ -194,5 +187,29 @@ void CommandProcessor::setSynchronizer(const QSharedPointer &synch { mSynchronizer = synchronizer; QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); + setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); +} + +KAsync::Job CommandProcessor::flush(void const *command, size_t size) +{ + flatbuffers::Verifier verifier((const uint8_t *)command, size); + if (Sink::Commands::VerifyFlushBuffer(verifier)) { + auto buffer = Sink::Commands::GetFlush(command); + const auto flushType = buffer->type(); + const auto flushId = BufferUtils::extractBuffer(buffer->id()); + if (flushType == Sink::Flush::FlushReplayQueue) { + SinkTrace() << "Flushing synchronizer "; + Q_ASSERT(mSynchronizer); + mSynchronizer->flush(flushType, flushId); + } else { + SinkTrace() << "Emitting flush completion" << flushId; + Sink::Notification n; + n.type = Sink::Notification::FlushCompletion; + n.id = flushId; + emit notify(n); + } + return KAsync::null(); + } + return KAsync::error(-1, "Invalid flush command."); } -- cgit v1.2.3