From 01adeefb24bf72f1015e93aa5f075f93f56d94da Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 30 Jul 2015 13:43:31 +0200 Subject: Made the pipeline an implementation detail of the resource. This removes one dependency from the Listener and will allow us to test the Listener better. --- common/genericresource.cpp | 17 ++++++----------- common/genericresource.h | 9 +++++---- common/listener.cpp | 8 +++----- common/listener.h | 1 - common/resource.cpp | 15 +++------------ common/resource.h | 8 +++----- 6 files changed, 20 insertions(+), 38 deletions(-) (limited to 'common') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 99d1aaa..4dd73b3 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -152,26 +152,21 @@ private: }; -GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier) +GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline) : Akonadi2::Resource(), mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceInstanceIdentifier + ".userqueue"), mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceInstanceIdentifier + ".synchronizerqueue"), mResourceInstanceIdentifier(resourceInstanceIdentifier), + mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceInstanceIdentifier)), mError(0) { + mProcessor = new Processor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue); + QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); + QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); } GenericResource::~GenericResource() { - -} - -void GenericResource::configurePipeline(Akonadi2::Pipeline *pipeline) -{ - //TODO figure out lifetime of the processor - mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); - QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); - QObject::connect(pipeline, &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); } void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) @@ -195,7 +190,7 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); } -void GenericResource::processCommand(int commandId, const QByteArray &data, Akonadi2::Pipeline *pipeline) +void GenericResource::processCommand(int commandId, const QByteArray &data) { //TODO instead of copying the command including the full entity first into the command queue, we could directly //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). diff --git a/common/genericresource.h b/common/genericresource.h index e9d5d59..4a285ea 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -22,6 +22,7 @@ #include #include #include +#include class Processor; @@ -34,14 +35,13 @@ namespace Akonadi2 class AKONADI2COMMON_EXPORT GenericResource : public Resource { public: - GenericResource(const QByteArray &resourceInstanceIdentifier); + GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline = QSharedPointer()); virtual ~GenericResource(); - virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) Q_DECL_OVERRIDE; - virtual KAsync::Job synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; + virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; + virtual KAsync::Job synchronizeWithSource() Q_DECL_OVERRIDE = 0; virtual KAsync::Job processAllMessages() Q_DECL_OVERRIDE; - virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE; int error() const; protected: @@ -51,6 +51,7 @@ protected: MessageQueue mUserQueue; MessageQueue mSynchronizerQueue; QByteArray mResourceInstanceIdentifier; + QSharedPointer mPipeline; private: Processor *mProcessor; diff --git a/common/listener.cpp b/common/listener.cpp index 2e2e98e..8ec9b3e 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -41,7 +41,6 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent m_resourceName(Akonadi2::Store::resourceName(resourceInstanceIdentifier)), m_resourceInstanceIdentifier(resourceInstanceIdentifier), m_resource(0), - m_pipeline(new Akonadi2::Pipeline(resourceInstanceIdentifier, parent)), m_clientBufferProcessesTimer(new QTimer(this)), m_messageId(0) { @@ -226,7 +225,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c } auto job = KAsync::null(); if (buffer->sourceSync()) { - job = m_resource->synchronizeWithSource(m_pipeline); + job = m_resource->synchronizeWithSource(); } if (buffer->localSync()) { job = job.then(m_resource->processAllMessages()); @@ -247,7 +246,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name; loadResource(); if (m_resource) { - m_resource->processCommand(commandId, commandBuffer, m_pipeline); + m_resource->processCommand(commandId, commandBuffer); } break; case Akonadi2::Commands::ShutdownCommand: @@ -261,7 +260,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; loadResource(); if (m_resource) { - m_resource->processCommand(commandId, commandBuffer, m_pipeline); + m_resource->processCommand(commandId, commandBuffer); } } else { Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; @@ -367,7 +366,6 @@ void Listener::loadResource() m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); Log() << QString("\tResource: %1").arg((qlonglong)m_resource); - m_resource->configurePipeline(m_pipeline); connect(m_resource, &Akonadi2::Resource::revisionUpdated, this, &Listener::refreshRevision); } else { diff --git a/common/listener.h b/common/listener.h index 649c3ed..0d19823 100644 --- a/common/listener.h +++ b/common/listener.h @@ -92,7 +92,6 @@ private: const QByteArray m_resourceName; const QByteArray m_resourceInstanceIdentifier; Akonadi2::Resource *m_resource; - Akonadi2::Pipeline *m_pipeline; QTimer *m_clientBufferProcessesTimer; QTimer *m_checkConnectionsTimer; int m_messageId; diff --git a/common/resource.cpp b/common/resource.cpp index 68a237c..2a86df5 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -40,24 +40,15 @@ Resource::~Resource() //delete d; } -void Resource::configurePipeline(Pipeline *pipeline) -{ - -} - -void Resource::processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) +void Resource::processCommand(int commandId, const QByteArray &data) { Q_UNUSED(commandId) Q_UNUSED(data) - Q_UNUSED(pipeline) - pipeline->null(); } -KAsync::Job Resource::synchronizeWithSource(Pipeline *pipeline) +KAsync::Job Resource::synchronizeWithSource() { - return KAsync::start([pipeline](KAsync::Future &f) { - pipeline->null(); - }); + return KAsync::null(); } KAsync::Job Resource::processAllMessages() diff --git a/common/resource.h b/common/resource.h index 9f657f7..a51e12d 100644 --- a/common/resource.h +++ b/common/resource.h @@ -20,12 +20,12 @@ #include #include -#include #include namespace Akonadi2 { +class Pipeline; /** * Resource interface @@ -37,12 +37,10 @@ public: Resource(); virtual ~Resource(); - virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline); - virtual KAsync::Job synchronizeWithSource(Pipeline *pipeline); + virtual void processCommand(int commandId, const QByteArray &data); + virtual KAsync::Job synchronizeWithSource(); virtual KAsync::Job processAllMessages(); - virtual void configurePipeline(Pipeline *pipeline); - Q_SIGNALS: void revisionUpdated(qint64); -- cgit v1.2.3