From 7fdcc36a1a352bb869020ade8a8aa697c3e8b80c Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 28 Nov 2016 19:43:23 +0100 Subject: Moved the flush command to the command processor. --- common/commandprocessor.cpp | 39 ++++++++++++++++++++++++++++----------- common/commandprocessor.h | 6 ++---- common/genericresource.cpp | 22 +--------------------- 3 files changed, 31 insertions(+), 36 deletions(-) (limited to 'common') diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index 4ff352b..57fe524 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -22,9 +22,11 @@ #include "commands.h" #include "messagequeue.h" #include "queuedcommand_generated.h" +#include "flush_generated.h" #include "inspector.h" #include "synchronizer.h" #include "pipeline.h" +#include "bufferutils.h" static int sBatchSize = 100; @@ -43,11 +45,6 @@ void CommandProcessor::setOldestUsedRevision(qint64 revision) mLowerBoundRevision = revision; } -void CommandProcessor::setFlushCommand(const FlushFunction &f) -{ - mFlush = f; -} - bool CommandProcessor::messagesToProcessAvailable() { for (auto queue : mCommandQueues) { @@ -91,12 +88,8 @@ KAsync::Job CommandProcessor::processQueuedCommand(const Sink::QueuedCom return mInspector->processCommand(data, size) .syncThen([]() { return -1; }); case Sink::Commands::FlushCommand: - if (mFlush) { - return mFlush(data, size) - .syncThen([]() { return -1; }); - } else { - return KAsync::error(-1, "Missing inspection command."); - } + return flush(data, size) + .syncThen([]() { return -1; }); default: return KAsync::error(-1, "Unhandled command"); } @@ -194,5 +187,29 @@ void CommandProcessor::setSynchronizer(const QSharedPointer &synch { mSynchronizer = synchronizer; QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); + setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); +} + +KAsync::Job CommandProcessor::flush(void const *command, size_t size) +{ + flatbuffers::Verifier verifier((const uint8_t *)command, size); + if (Sink::Commands::VerifyFlushBuffer(verifier)) { + auto buffer = Sink::Commands::GetFlush(command); + const auto flushType = buffer->type(); + const auto flushId = BufferUtils::extractBuffer(buffer->id()); + if (flushType == Sink::Flush::FlushReplayQueue) { + SinkTrace() << "Flushing synchronizer "; + Q_ASSERT(mSynchronizer); + mSynchronizer->flush(flushType, flushId); + } else { + SinkTrace() << "Emitting flush completion" << flushId; + Sink::Notification n; + n.type = Sink::Notification::FlushCompletion; + n.id = flushId; + emit notify(n); + } + return KAsync::null(); + } + return KAsync::error(-1, "Invalid flush command."); } diff --git a/common/commandprocessor.h b/common/commandprocessor.h index 75ae37a..d00cf43 100644 --- a/common/commandprocessor.h +++ b/common/commandprocessor.h @@ -42,7 +42,6 @@ namespace Sink { class CommandProcessor : public QObject { Q_OBJECT - typedef std::function(void const *, size_t)> FlushFunction; SINK_DEBUG_AREA("commandprocessor") public: @@ -50,8 +49,6 @@ public: void setOldestUsedRevision(qint64 revision); - void setFlushCommand(const FlushFunction &f); - void setInspector(const QSharedPointer &inspector); void setSynchronizer(const QSharedPointer &synchronizer); @@ -71,13 +68,14 @@ private slots: KAsync::Job processPipeline(); private: + KAsync::Job flush(void const *command, size_t size); + Sink::Pipeline *mPipeline; // Ordered by priority QList mCommandQueues; bool mProcessingLock; // The lowest revision we no longer need qint64 mLowerBoundRevision; - FlushFunction mFlush; QSharedPointer mSynchronizer; QSharedPointer mInspector; }; diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 80e59c9..38da6bf 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -59,26 +59,6 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q mClientLowerBoundRevision(std::numeric_limits::max()) { mProcessor = std::unique_ptr(new CommandProcessor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue)); - mProcessor->setFlushCommand([this](void const *command, size_t size) { - flatbuffers::Verifier verifier((const uint8_t *)command, size); - if (Sink::Commands::VerifyFlushBuffer(verifier)) { - auto buffer = Sink::Commands::GetFlush(command); - const auto flushType = buffer->type(); - const auto flushId = BufferUtils::extractBuffer(buffer->id()); - if (flushType == Sink::Flush::FlushReplayQueue) { - SinkTrace() << "Flushing synchronizer "; - mSynchronizer->flush(flushType, flushId); - } else { - SinkTrace() << "Emitting flush completion" << flushId; - Sink::Notification n; - n.type = Sink::Notification::FlushCompletion; - n.id = flushId; - emit notify(n); - } - return KAsync::null(); - } - return KAsync::error(-1, "Invalid flush command."); - }); QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); @@ -126,7 +106,7 @@ void GenericResource::setupSynchronizer(const QSharedPointer &sync Q_ASSERT(ret); } - mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); + mProcessor->setSynchronizer(synchronizer); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); -- cgit v1.2.3