From 885f185f55249a2e97e9c7c238f89a5d0d99d1df Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 28 Nov 2016 16:56:28 +0100 Subject: Move the commandprocessor to a separate file. --- common/commandprocessor.h | 79 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 common/commandprocessor.h (limited to 'common/commandprocessor.h') diff --git a/common/commandprocessor.h b/common/commandprocessor.h new file mode 100644 index 0000000..51d845e --- /dev/null +++ b/common/commandprocessor.h @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "sink_export.h" + +#include +#include +#include +#include "log.h" + +class MessageQueue; + +namespace Sink { + class Pipeline; + class QueuedCommand; + +/** + * Drives the pipeline using the output from all command queues + */ +class CommandProcessor : public QObject +{ + Q_OBJECT + typedef std::function(void const *, size_t)> InspectionFunction; + typedef std::function(void const *, size_t)> FlushFunction; + SINK_DEBUG_AREA("commandprocessor") + +public: + CommandProcessor(Sink::Pipeline *pipeline, QList commandQueues); + + void setOldestUsedRevision(qint64 revision); + + void setInspectionCommand(const InspectionFunction &f); + + void setFlushCommand(const FlushFunction &f); + +signals: + void error(int errorCode, const QString &errorMessage); + +private: + bool messagesToProcessAvailable(); + +private slots: + void process(); + KAsync::Job processQueuedCommand(const Sink::QueuedCommand *queuedCommand); + KAsync::Job processQueuedCommand(const QByteArray &data); + // Process all messages of this queue + KAsync::Job processQueue(MessageQueue *queue); + KAsync::Job processPipeline(); + +private: + Sink::Pipeline *mPipeline; + // Ordered by priority + QList mCommandQueues; + bool mProcessingLock; + // The lowest revision we no longer need + qint64 mLowerBoundRevision; + InspectionFunction mInspect; + FlushFunction mFlush; +}; + +}; -- cgit v1.2.3