From 938554f267193b652478fc12343819fa45d76034 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 28 Nov 2016 19:33:01 +0100 Subject: Moved inspection commands to a separate inspector. --- common/CMakeLists.txt | 1 + common/commandprocessor.cpp | 27 ++++++++------ common/commandprocessor.h | 14 +++++--- common/genericresource.cpp | 58 +++++-------------------------- common/genericresource.h | 4 +-- common/inspector.cpp | 85 +++++++++++++++++++++++++++++++++++++++++++++ common/inspector.h | 52 +++++++++++++++++++++++++++ 7 files changed, 174 insertions(+), 67 deletions(-) create mode 100644 common/inspector.cpp create mode 100644 common/inspector.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 018fc22..df44ce5 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -78,6 +78,7 @@ set(command_SRCS mail/threadindexer.cpp notification.cpp commandprocessor.cpp + inspector.cpp ${storage_SRCS}) add_library(${PROJECT_NAME} SHARED ${command_SRCS}) diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index c9fca37..4ff352b 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -22,7 +22,8 @@ #include "commands.h" #include "messagequeue.h" #include "queuedcommand_generated.h" - +#include "inspector.h" +#include "synchronizer.h" #include "pipeline.h" static int sBatchSize = 100; @@ -42,11 +43,6 @@ void CommandProcessor::setOldestUsedRevision(qint64 revision) mLowerBoundRevision = revision; } -void CommandProcessor::setInspectionCommand(const InspectionFunction &f) -{ - mInspect = f; -} - void CommandProcessor::setFlushCommand(const FlushFunction &f) { mFlush = f; @@ -91,12 +87,9 @@ KAsync::Job CommandProcessor::processQueuedCommand(const Sink::QueuedCom case Sink::Commands::CreateEntityCommand: return mPipeline->newEntity(data, size); case Sink::Commands::InspectionCommand: - if (mInspect) { - return mInspect(data, size) + Q_ASSERT(mInspector); + return mInspector->processCommand(data, size) .syncThen([]() { return -1; }); - } else { - return KAsync::error(-1, "Missing inspection command."); - } case Sink::Commands::FlushCommand: if (mFlush) { return mFlush(data, size) @@ -191,3 +184,15 @@ KAsync::Job CommandProcessor::processPipeline() }); } +void CommandProcessor::setInspector(const QSharedPointer &inspector) +{ + mInspector = inspector; + QObject::connect(mInspector.data(), &Inspector::notify, this, &CommandProcessor::notify); +} + +void CommandProcessor::setSynchronizer(const QSharedPointer &synchronizer) +{ + mSynchronizer = synchronizer; + QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); +} + diff --git a/common/commandprocessor.h b/common/commandprocessor.h index 51d845e..75ae37a 100644 --- a/common/commandprocessor.h +++ b/common/commandprocessor.h @@ -24,12 +24,16 @@ #include #include #include + #include "log.h" +#include "notification.h" class MessageQueue; namespace Sink { class Pipeline; + class Inspector; + class Synchronizer; class QueuedCommand; /** @@ -38,7 +42,6 @@ namespace Sink { class CommandProcessor : public QObject { Q_OBJECT - typedef std::function(void const *, size_t)> InspectionFunction; typedef std::function(void const *, size_t)> FlushFunction; SINK_DEBUG_AREA("commandprocessor") @@ -47,11 +50,13 @@ public: void setOldestUsedRevision(qint64 revision); - void setInspectionCommand(const InspectionFunction &f); - void setFlushCommand(const FlushFunction &f); + void setInspector(const QSharedPointer &inspector); + void setSynchronizer(const QSharedPointer &synchronizer); + signals: + void notify(Notification); void error(int errorCode, const QString &errorMessage); private: @@ -72,8 +77,9 @@ private: bool mProcessingLock; // The lowest revision we no longer need qint64 mLowerBoundRevision; - InspectionFunction mInspect; FlushFunction mFlush; + QSharedPointer mSynchronizer; + QSharedPointer mInspector; }; }; diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 3aa4fce..80e59c9 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -59,41 +59,6 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q mClientLowerBoundRevision(std::numeric_limits::max()) { mProcessor = std::unique_ptr(new CommandProcessor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue)); - mProcessor->setInspectionCommand([this](void const *command, size_t size) { - flatbuffers::Verifier verifier((const uint8_t *)command, size); - if (Sink::Commands::VerifyInspectionBuffer(verifier)) { - auto buffer = Sink::Commands::GetInspection(command); - int inspectionType = buffer->type(); - - QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id()); - QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId()); - QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType()); - QByteArray property = BufferUtils::extractBuffer(buffer->property()); - QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue()); - QDataStream s(expectedValueString); - QVariant expectedValue; - s >> expectedValue; - inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) - .then( - [=](const KAsync::Error &error) { - Sink::Notification n; - n.type = Sink::Notification::Inspection; - n.id = inspectionId; - if (error) { - Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage; - n.code = Sink::Notification::Failure; - } else { - Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; - n.code = Sink::Notification::Success; - } - emit notify(n); - return KAsync::null(); - }) - .exec(); - return KAsync::null(); - } - return KAsync::error(-1, "Invalid inspection command."); - }); mProcessor->setFlushCommand([this](void const *command, size_t size) { flatbuffers::Verifier verifier((const uint8_t *)command, size); if (Sink::Commands::VerifyFlushBuffer(verifier)) { @@ -114,14 +79,9 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q } return KAsync::error(-1, "Invalid flush command."); }); - { - auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); - Q_ASSERT(ret); - } - { - auto ret = QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); - Q_ASSERT(ret); - } + QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); + QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); + QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); mCommitQueueTimer.setInterval(sCommitInterval); mCommitQueueTimer.setSingleShot(true); @@ -132,13 +92,6 @@ GenericResource::~GenericResource() { } -KAsync::Job GenericResource::inspect( - int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) -{ - SinkWarning() << "Inspection not implemented"; - return KAsync::null(); -} - void GenericResource::setupPreprocessors(const QByteArray &type, const QVector &preprocessors) { mPipeline->setPreprocessors(type, preprocessors); @@ -179,6 +132,11 @@ void GenericResource::setupSynchronizer(const QSharedPointer &sync QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); } +void GenericResource::setupInspector(const QSharedPointer &inspector) +{ + mProcessor->setInspector(inspector); +} + void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) { Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); diff --git a/common/genericresource.h b/common/genericresource.h index 12f15f3..0bc47da 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -32,6 +32,7 @@ namespace Sink { class Pipeline; class Preprocessor; class Synchronizer; +class Inspector; class CommandProcessor; /** @@ -50,8 +51,6 @@ public: virtual KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE; virtual KAsync::Job processAllMessages() Q_DECL_OVERRIDE; virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; - virtual KAsync::Job - inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue); int error() const; @@ -64,6 +63,7 @@ private slots: protected: void setupPreprocessors(const QByteArray &type, const QVector &preprocessors); void setupSynchronizer(const QSharedPointer &synchronizer); + void setupInspector(const QSharedPointer &inspector); void onProcessorError(int errorCode, const QString &errorMessage); void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); diff --git a/common/inspector.cpp b/common/inspector.cpp new file mode 100644 index 0000000..8b4c93a --- /dev/null +++ b/common/inspector.cpp @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "inspector.h" + +#include "resourcecontext.h" +#include "inspection_generated.h" +#include "bufferutils.h" + +#include + +using namespace Sink; + +Inspector::Inspector(const ResourceContext &context) + : QObject(), + mResourceContext(context) + // mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), + // mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) +{ + // SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); +} + +Inspector::~Inspector() +{ + +} + +KAsync::Job Inspector::processCommand(void const *command, size_t size) +{ + flatbuffers::Verifier verifier((const uint8_t *)command, size); + if (Sink::Commands::VerifyInspectionBuffer(verifier)) { + auto buffer = Sink::Commands::GetInspection(command); + int inspectionType = buffer->type(); + + QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id()); + QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId()); + QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType()); + QByteArray property = BufferUtils::extractBuffer(buffer->property()); + QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue()); + QDataStream s(expectedValueString); + QVariant expectedValue; + s >> expectedValue; + inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) + .then( + [=](const KAsync::Error &error) { + Sink::Notification n; + n.type = Sink::Notification::Inspection; + n.id = inspectionId; + if (error) { + Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage; + n.code = Sink::Notification::Failure; + } else { + Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; + n.code = Sink::Notification::Success; + } + emit notify(n); + return KAsync::null(); + }) + .exec(); + return KAsync::null(); + } + return KAsync::error(-1, "Invalid inspection command."); +} + +KAsync::Job Inspector::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) +{ + return KAsync::error(-1, "Inspection not implemented."); +} + diff --git a/common/inspector.h b/common/inspector.h new file mode 100644 index 0000000..ff167b1 --- /dev/null +++ b/common/inspector.h @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "sink_export.h" +#include +#include + +#include "notification.h" +#include "resourcecontext.h" + +namespace Sink { + +/** + * Synchronize and add what we don't already have to local queue + */ +class SINK_EXPORT Inspector : public QObject +{ + Q_OBJECT +public: + Inspector(const ResourceContext &resourceContext); + virtual ~Inspector(); + + KAsync::Job processCommand(void const *command, size_t size); + +signals: + void notify(Notification); + +protected: + virtual KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue); + + Sink::ResourceContext mResourceContext; +}; + +} -- cgit v1.2.3