From 885f185f55249a2e97e9c7c238f89a5d0d99d1df Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 28 Nov 2016 16:56:28 +0100 Subject: Move the commandprocessor to a separate file. --- common/CMakeLists.txt | 1 + common/commandprocessor.cpp | 193 ++++++++++++++++++++++++++++++++++++++++++++ common/commandprocessor.h | 79 ++++++++++++++++++ common/genericresource.cpp | 189 +------------------------------------------ common/genericresource.h | 3 +- 5 files changed, 275 insertions(+), 190 deletions(-) create mode 100644 common/commandprocessor.cpp create mode 100644 common/commandprocessor.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 8a16af4..018fc22 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -77,6 +77,7 @@ set(command_SRCS indexer.cpp mail/threadindexer.cpp notification.cpp + commandprocessor.cpp ${storage_SRCS}) add_library(${PROJECT_NAME} SHARED ${command_SRCS}) diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp new file mode 100644 index 0000000..c9fca37 --- /dev/null +++ b/common/commandprocessor.cpp @@ -0,0 +1,193 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "commandprocessor.h" + +#include "commands.h" +#include "messagequeue.h" +#include "queuedcommand_generated.h" + +#include "pipeline.h" + +static int sBatchSize = 100; + +using namespace Sink; + +CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, QList commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false), mLowerBoundRevision(0) +{ + for (auto queue : mCommandQueues) { + const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); + Q_UNUSED(ret); + } +} + +void CommandProcessor::setOldestUsedRevision(qint64 revision) +{ + mLowerBoundRevision = revision; +} + +void CommandProcessor::setInspectionCommand(const InspectionFunction &f) +{ + mInspect = f; +} + +void CommandProcessor::setFlushCommand(const FlushFunction &f) +{ + mFlush = f; +} + +bool CommandProcessor::messagesToProcessAvailable() +{ + for (auto queue : mCommandQueues) { + if (!queue->isEmpty()) { + return true; + } + } + return false; +} + +void CommandProcessor::process() +{ + if (mProcessingLock) { + return; + } + mProcessingLock = true; + auto job = processPipeline() + .syncThen([this]() { + mProcessingLock = false; + if (messagesToProcessAvailable()) { + process(); + } + }) + .exec(); +} + +KAsync::Job CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand) +{ + SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); + const auto data = queuedCommand->command()->Data(); + const auto size = queuedCommand->command()->size(); + switch (queuedCommand->commandId()) { + case Sink::Commands::DeleteEntityCommand: + return mPipeline->deletedEntity(data, size); + case Sink::Commands::ModifyEntityCommand: + return mPipeline->modifiedEntity(data, size); + case Sink::Commands::CreateEntityCommand: + return mPipeline->newEntity(data, size); + case Sink::Commands::InspectionCommand: + if (mInspect) { + return mInspect(data, size) + .syncThen([]() { return -1; }); + } else { + return KAsync::error(-1, "Missing inspection command."); + } + case Sink::Commands::FlushCommand: + if (mFlush) { + return mFlush(data, size) + .syncThen([]() { return -1; }); + } else { + return KAsync::error(-1, "Missing inspection command."); + } + default: + return KAsync::error(-1, "Unhandled command"); + } +} + +KAsync::Job CommandProcessor::processQueuedCommand(const QByteArray &data) +{ + flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); + if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { + SinkWarning() << "invalid buffer"; + // return KAsync::error(1, "Invalid Buffer"); + } + auto queuedCommand = Sink::GetQueuedCommand(data.constData()); + const auto commandId = queuedCommand->commandId(); + SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); + return processQueuedCommand(queuedCommand) + .then( + [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job { + if (error) { + SinkWarning() << "Error while processing queue command: " << error.errorMessage; + return KAsync::error(error); + } + SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); + return KAsync::value(createdRevision); + }); +} + +// Process all messages of this queue +KAsync::Job CommandProcessor::processQueue(MessageQueue *queue) +{ + auto time = QSharedPointer::create(); + return KAsync::syncStart([this]() { mPipeline->startTransaction(); }) + .then(KAsync::dowhile( + [this, queue, time]() -> KAsync::Job { + return queue->dequeueBatch(sBatchSize, + [this, time](const QByteArray &data) -> KAsync::Job { + time->start(); + return processQueuedCommand(data) + .syncThen([this, time](qint64 createdRevision) { + SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); + }); + }) + .then([queue](const KAsync::Error &error) { + if (error) { + if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { + SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; + } + } + if (queue->isEmpty()) { + return KAsync::value(KAsync::Break); + } else { + return KAsync::value(KAsync::Continue); + } + }); + })) + .syncThen([this](const KAsync::Error &) { mPipeline->commit(); }); +} + +KAsync::Job CommandProcessor::processPipeline() +{ + auto time = QSharedPointer::create(); + time->start(); + mPipeline->cleanupRevisions(mLowerBoundRevision); + SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed()); + + // Go through all message queues + if (mCommandQueues.isEmpty()) { + return KAsync::null(); + } + auto it = QSharedPointer>::create(mCommandQueues); + return KAsync::dowhile( + [it, this]() { + auto time = QSharedPointer::create(); + time->start(); + + auto queue = it->next(); + return processQueue(queue) + .syncThen([this, time, it]() { + SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); + if (it->hasNext()) { + return KAsync::Continue; + } + return KAsync::Break; + }); + }); +} + diff --git a/common/commandprocessor.h b/common/commandprocessor.h new file mode 100644 index 0000000..51d845e --- /dev/null +++ b/common/commandprocessor.h @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "sink_export.h" + +#include +#include +#include +#include "log.h" + +class MessageQueue; + +namespace Sink { + class Pipeline; + class QueuedCommand; + +/** + * Drives the pipeline using the output from all command queues + */ +class CommandProcessor : public QObject +{ + Q_OBJECT + typedef std::function(void const *, size_t)> InspectionFunction; + typedef std::function(void const *, size_t)> FlushFunction; + SINK_DEBUG_AREA("commandprocessor") + +public: + CommandProcessor(Sink::Pipeline *pipeline, QList commandQueues); + + void setOldestUsedRevision(qint64 revision); + + void setInspectionCommand(const InspectionFunction &f); + + void setFlushCommand(const FlushFunction &f); + +signals: + void error(int errorCode, const QString &errorMessage); + +private: + bool messagesToProcessAvailable(); + +private slots: + void process(); + KAsync::Job processQueuedCommand(const Sink::QueuedCommand *queuedCommand); + KAsync::Job processQueuedCommand(const QByteArray &data); + // Process all messages of this queue + KAsync::Job processQueue(MessageQueue *queue); + KAsync::Job processPipeline(); + +private: + Sink::Pipeline *mPipeline; + // Ordered by priority + QList mCommandQueues; + bool mProcessingLock; + // The lowest revision we no longer need + qint64 mLowerBoundRevision; + InspectionFunction mInspect; + FlushFunction mFlush; +}; + +}; diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 7b83957..3aa4fce 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -36,6 +36,7 @@ #include "bufferutils.h" #include "adaptorfactoryregistry.h" #include "synchronizer.h" +#include "commandprocessor.h" #include #include @@ -48,194 +49,6 @@ static int sCommitInterval = 10; using namespace Sink; using namespace Sink::Storage; -/** - * Drives the pipeline using the output from all command queues - */ -class CommandProcessor : public QObject -{ - Q_OBJECT - typedef std::function(void const *, size_t)> InspectionFunction; - typedef std::function(void const *, size_t)> FlushFunction; - SINK_DEBUG_AREA("commandprocessor") - -public: - CommandProcessor(Sink::Pipeline *pipeline, QList commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false), mLowerBoundRevision(0) - { - for (auto queue : mCommandQueues) { - const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); - Q_UNUSED(ret); - } - } - - void setOldestUsedRevision(qint64 revision) - { - mLowerBoundRevision = revision; - } - - void setInspectionCommand(const InspectionFunction &f) - { - mInspect = f; - } - - void setFlushCommand(const FlushFunction &f) - { - mFlush = f; - } - -signals: - void error(int errorCode, const QString &errorMessage); - -private: - bool messagesToProcessAvailable() - { - for (auto queue : mCommandQueues) { - if (!queue->isEmpty()) { - return true; - } - } - return false; - } - -private slots: - void process() - { - if (mProcessingLock) { - return; - } - mProcessingLock = true; - auto job = processPipeline() - .syncThen([this]() { - mProcessingLock = false; - if (messagesToProcessAvailable()) { - process(); - } - }) - .exec(); - } - - KAsync::Job processQueuedCommand(const Sink::QueuedCommand *queuedCommand) - { - SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); - // Throw command into appropriate pipeline - switch (queuedCommand->commandId()) { - case Sink::Commands::DeleteEntityCommand: - return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); - case Sink::Commands::ModifyEntityCommand: - return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); - case Sink::Commands::CreateEntityCommand: - return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); - case Sink::Commands::InspectionCommand: - if (mInspect) { - return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()) - .syncThen([]() { return -1; }); - } else { - return KAsync::error(-1, "Missing inspection command."); - } - case Sink::Commands::FlushCommand: - if (mFlush) { - return mFlush(queuedCommand->command()->Data(), queuedCommand->command()->size()) - .syncThen([]() { return -1; }); - } else { - return KAsync::error(-1, "Missing inspection command."); - } - default: - return KAsync::error(-1, "Unhandled command"); - } - } - - KAsync::Job processQueuedCommand(const QByteArray &data) - { - flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); - if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { - SinkWarning() << "invalid buffer"; - // return KAsync::error(1, "Invalid Buffer"); - } - auto queuedCommand = Sink::GetQueuedCommand(data.constData()); - const auto commandId = queuedCommand->commandId(); - SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); - return processQueuedCommand(queuedCommand) - .then( - [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job { - if (error) { - SinkWarning() << "Error while processing queue command: " << error.errorMessage; - return KAsync::error(error); - } - SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); - return KAsync::value(createdRevision); - }); - } - - // Process all messages of this queue - KAsync::Job processQueue(MessageQueue *queue) - { - auto time = QSharedPointer::create(); - return KAsync::syncStart([this]() { mPipeline->startTransaction(); }) - .then(KAsync::dowhile( - [this, queue, time]() -> KAsync::Job { - return queue->dequeueBatch(sBatchSize, - [this, time](const QByteArray &data) -> KAsync::Job { - time->start(); - return processQueuedCommand(data) - .syncThen([this, time](qint64 createdRevision) { - SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); - }); - }) - .then([queue](const KAsync::Error &error) { - if (error) { - if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { - SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; - } - } - if (queue->isEmpty()) { - return KAsync::value(KAsync::Break); - } else { - return KAsync::value(KAsync::Continue); - } - }); - })) - .syncThen([this](const KAsync::Error &) { mPipeline->commit(); }); - } - - KAsync::Job processPipeline() - { - auto time = QSharedPointer::create(); - time->start(); - mPipeline->cleanupRevisions(mLowerBoundRevision); - SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed()); - - // Go through all message queues - if (mCommandQueues.isEmpty()) { - return KAsync::null(); - } - auto it = QSharedPointer>::create(mCommandQueues); - return KAsync::dowhile( - [it, this]() { - auto time = QSharedPointer::create(); - time->start(); - - auto queue = it->next(); - return processQueue(queue) - .syncThen([this, time, it]() { - SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); - if (it->hasNext()) { - return KAsync::Continue; - } - return KAsync::Break; - }); - }); - } - -private: - Sink::Pipeline *mPipeline; - // Ordered by priority - QList mCommandQueues; - bool mProcessingLock; - // The lowest revision we no longer need - qint64 mLowerBoundRevision; - InspectionFunction mInspect; - FlushFunction mFlush; -}; - GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer &pipeline ) : Sink::Resource(), mResourceContext(resourceContext), diff --git a/common/genericresource.h b/common/genericresource.h index a3a58b9..12f15f3 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -28,12 +28,11 @@ #include -class CommandProcessor; - namespace Sink { class Pipeline; class Preprocessor; class Synchronizer; +class CommandProcessor; /** * Generic Resource implementation. -- cgit v1.2.3