From 885f185f55249a2e97e9c7c238f89a5d0d99d1df Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 28 Nov 2016 16:56:28 +0100 Subject: Move the commandprocessor to a separate file. --- common/commandprocessor.cpp | 193 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 common/commandprocessor.cpp (limited to 'common/commandprocessor.cpp') diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp new file mode 100644 index 0000000..c9fca37 --- /dev/null +++ b/common/commandprocessor.cpp @@ -0,0 +1,193 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "commandprocessor.h" + +#include "commands.h" +#include "messagequeue.h" +#include "queuedcommand_generated.h" + +#include "pipeline.h" + +static int sBatchSize = 100; + +using namespace Sink; + +CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, QList commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false), mLowerBoundRevision(0) +{ + for (auto queue : mCommandQueues) { + const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); + Q_UNUSED(ret); + } +} + +void CommandProcessor::setOldestUsedRevision(qint64 revision) +{ + mLowerBoundRevision = revision; +} + +void CommandProcessor::setInspectionCommand(const InspectionFunction &f) +{ + mInspect = f; +} + +void CommandProcessor::setFlushCommand(const FlushFunction &f) +{ + mFlush = f; +} + +bool CommandProcessor::messagesToProcessAvailable() +{ + for (auto queue : mCommandQueues) { + if (!queue->isEmpty()) { + return true; + } + } + return false; +} + +void CommandProcessor::process() +{ + if (mProcessingLock) { + return; + } + mProcessingLock = true; + auto job = processPipeline() + .syncThen([this]() { + mProcessingLock = false; + if (messagesToProcessAvailable()) { + process(); + } + }) + .exec(); +} + +KAsync::Job CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand) +{ + SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); + const auto data = queuedCommand->command()->Data(); + const auto size = queuedCommand->command()->size(); + switch (queuedCommand->commandId()) { + case Sink::Commands::DeleteEntityCommand: + return mPipeline->deletedEntity(data, size); + case Sink::Commands::ModifyEntityCommand: + return mPipeline->modifiedEntity(data, size); + case Sink::Commands::CreateEntityCommand: + return mPipeline->newEntity(data, size); + case Sink::Commands::InspectionCommand: + if (mInspect) { + return mInspect(data, size) + .syncThen([]() { return -1; }); + } else { + return KAsync::error(-1, "Missing inspection command."); + } + case Sink::Commands::FlushCommand: + if (mFlush) { + return mFlush(data, size) + .syncThen([]() { return -1; }); + } else { + return KAsync::error(-1, "Missing inspection command."); + } + default: + return KAsync::error(-1, "Unhandled command"); + } +} + +KAsync::Job CommandProcessor::processQueuedCommand(const QByteArray &data) +{ + flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); + if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { + SinkWarning() << "invalid buffer"; + // return KAsync::error(1, "Invalid Buffer"); + } + auto queuedCommand = Sink::GetQueuedCommand(data.constData()); + const auto commandId = queuedCommand->commandId(); + SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); + return processQueuedCommand(queuedCommand) + .then( + [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job { + if (error) { + SinkWarning() << "Error while processing queue command: " << error.errorMessage; + return KAsync::error(error); + } + SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); + return KAsync::value(createdRevision); + }); +} + +// Process all messages of this queue +KAsync::Job CommandProcessor::processQueue(MessageQueue *queue) +{ + auto time = QSharedPointer::create(); + return KAsync::syncStart([this]() { mPipeline->startTransaction(); }) + .then(KAsync::dowhile( + [this, queue, time]() -> KAsync::Job { + return queue->dequeueBatch(sBatchSize, + [this, time](const QByteArray &data) -> KAsync::Job { + time->start(); + return processQueuedCommand(data) + .syncThen([this, time](qint64 createdRevision) { + SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); + }); + }) + .then([queue](const KAsync::Error &error) { + if (error) { + if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { + SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; + } + } + if (queue->isEmpty()) { + return KAsync::value(KAsync::Break); + } else { + return KAsync::value(KAsync::Continue); + } + }); + })) + .syncThen([this](const KAsync::Error &) { mPipeline->commit(); }); +} + +KAsync::Job CommandProcessor::processPipeline() +{ + auto time = QSharedPointer::create(); + time->start(); + mPipeline->cleanupRevisions(mLowerBoundRevision); + SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed()); + + // Go through all message queues + if (mCommandQueues.isEmpty()) { + return KAsync::null(); + } + auto it = QSharedPointer>::create(mCommandQueues); + return KAsync::dowhile( + [it, this]() { + auto time = QSharedPointer::create(); + time->start(); + + auto queue = it->next(); + return processQueue(queue) + .syncThen([this, time, it]() { + SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); + if (it->hasNext()) { + return KAsync::Continue; + } + return KAsync::Break; + }); + }); +} + -- cgit v1.2.3