From 01adeefb24bf72f1015e93aa5f075f93f56d94da Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 30 Jul 2015 13:43:31 +0200 Subject: Made the pipeline an implementation detail of the resource. This removes one dependency from the Listener and will allow us to test the Listener better. --- common/genericresource.cpp | 17 ++++++----------- common/genericresource.h | 9 +++++---- common/listener.cpp | 8 +++----- common/listener.h | 1 - common/resource.cpp | 15 +++------------ common/resource.h | 8 +++----- examples/dummyresource/resourcefactory.cpp | 17 ++++++----------- examples/dummyresource/resourcefactory.h | 5 ++--- tests/dummyresourcebenchmark.cpp | 11 ++++++----- tests/dummyresourcetest.cpp | 8 ++++---- tests/genericresourcetest.cpp | 22 ++++++++-------------- 11 files changed, 46 insertions(+), 75 deletions(-) diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 99d1aaa..4dd73b3 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -152,26 +152,21 @@ private: }; -GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier) +GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline) : Akonadi2::Resource(), mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceInstanceIdentifier + ".userqueue"), mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceInstanceIdentifier + ".synchronizerqueue"), mResourceInstanceIdentifier(resourceInstanceIdentifier), + mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceInstanceIdentifier)), mError(0) { + mProcessor = new Processor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue); + QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); + QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); } GenericResource::~GenericResource() { - -} - -void GenericResource::configurePipeline(Akonadi2::Pipeline *pipeline) -{ - //TODO figure out lifetime of the processor - mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); - QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); - QObject::connect(pipeline, &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); } void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) @@ -195,7 +190,7 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); } -void GenericResource::processCommand(int commandId, const QByteArray &data, Akonadi2::Pipeline *pipeline) +void GenericResource::processCommand(int commandId, const QByteArray &data) { //TODO instead of copying the command including the full entity first into the command queue, we could directly //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). diff --git a/common/genericresource.h b/common/genericresource.h index e9d5d59..4a285ea 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -22,6 +22,7 @@ #include #include #include +#include class Processor; @@ -34,14 +35,13 @@ namespace Akonadi2 class AKONADI2COMMON_EXPORT GenericResource : public Resource { public: - GenericResource(const QByteArray &resourceInstanceIdentifier); + GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline = QSharedPointer()); virtual ~GenericResource(); - virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) Q_DECL_OVERRIDE; - virtual KAsync::Job synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; + virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; + virtual KAsync::Job synchronizeWithSource() Q_DECL_OVERRIDE = 0; virtual KAsync::Job processAllMessages() Q_DECL_OVERRIDE; - virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE; int error() const; protected: @@ -51,6 +51,7 @@ protected: MessageQueue mUserQueue; MessageQueue mSynchronizerQueue; QByteArray mResourceInstanceIdentifier; + QSharedPointer mPipeline; private: Processor *mProcessor; diff --git a/common/listener.cpp b/common/listener.cpp index 2e2e98e..8ec9b3e 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -41,7 +41,6 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent m_resourceName(Akonadi2::Store::resourceName(resourceInstanceIdentifier)), m_resourceInstanceIdentifier(resourceInstanceIdentifier), m_resource(0), - m_pipeline(new Akonadi2::Pipeline(resourceInstanceIdentifier, parent)), m_clientBufferProcessesTimer(new QTimer(this)), m_messageId(0) { @@ -226,7 +225,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c } auto job = KAsync::null(); if (buffer->sourceSync()) { - job = m_resource->synchronizeWithSource(m_pipeline); + job = m_resource->synchronizeWithSource(); } if (buffer->localSync()) { job = job.then(m_resource->processAllMessages()); @@ -247,7 +246,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name; loadResource(); if (m_resource) { - m_resource->processCommand(commandId, commandBuffer, m_pipeline); + m_resource->processCommand(commandId, commandBuffer); } break; case Akonadi2::Commands::ShutdownCommand: @@ -261,7 +260,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; loadResource(); if (m_resource) { - m_resource->processCommand(commandId, commandBuffer, m_pipeline); + m_resource->processCommand(commandId, commandBuffer); } } else { Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; @@ -367,7 +366,6 @@ void Listener::loadResource() m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); Log() << QString("\tResource: %1").arg((qlonglong)m_resource); - m_resource->configurePipeline(m_pipeline); connect(m_resource, &Akonadi2::Resource::revisionUpdated, this, &Listener::refreshRevision); } else { diff --git a/common/listener.h b/common/listener.h index 649c3ed..0d19823 100644 --- a/common/listener.h +++ b/common/listener.h @@ -92,7 +92,6 @@ private: const QByteArray m_resourceName; const QByteArray m_resourceInstanceIdentifier; Akonadi2::Resource *m_resource; - Akonadi2::Pipeline *m_pipeline; QTimer *m_clientBufferProcessesTimer; QTimer *m_checkConnectionsTimer; int m_messageId; diff --git a/common/resource.cpp b/common/resource.cpp index 68a237c..2a86df5 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -40,24 +40,15 @@ Resource::~Resource() //delete d; } -void Resource::configurePipeline(Pipeline *pipeline) -{ - -} - -void Resource::processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) +void Resource::processCommand(int commandId, const QByteArray &data) { Q_UNUSED(commandId) Q_UNUSED(data) - Q_UNUSED(pipeline) - pipeline->null(); } -KAsync::Job Resource::synchronizeWithSource(Pipeline *pipeline) +KAsync::Job Resource::synchronizeWithSource() { - return KAsync::start([pipeline](KAsync::Future &f) { - pipeline->null(); - }); + return KAsync::null(); } KAsync::Job Resource::processAllMessages() diff --git a/common/resource.h b/common/resource.h index 9f657f7..a51e12d 100644 --- a/common/resource.h +++ b/common/resource.h @@ -20,12 +20,12 @@ #include #include -#include #include namespace Akonadi2 { +class Pipeline; /** * Resource interface @@ -37,12 +37,10 @@ public: Resource(); virtual ~Resource(); - virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline); - virtual KAsync::Job synchronizeWithSource(Pipeline *pipeline); + virtual void processCommand(int commandId, const QByteArray &data); + virtual KAsync::Job synchronizeWithSource(); virtual KAsync::Job processAllMessages(); - virtual void configurePipeline(Pipeline *pipeline); - Q_SIGNALS: void revisionUpdated(qint64); diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index c7a3eef..e9bf6cd 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp @@ -35,12 +35,8 @@ //This is the resources entity type, and not the domain type #define ENTITY_TYPE_EVENT "event" -DummyResource::DummyResource(const QByteArray &instanceIdentifier) - : Akonadi2::GenericResource(instanceIdentifier) -{ -} - -void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) +DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline) + : Akonadi2::GenericResource(instanceIdentifier, pipeline) { auto eventFactory = QSharedPointer::create(); const auto resourceIdentifier = mResourceInstanceIdentifier; @@ -57,15 +53,14 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) } }); - pipeline->setPreprocessors(ENTITY_TYPE_EVENT, Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); - pipeline->setAdaptorFactory(ENTITY_TYPE_EVENT, eventFactory); + mPipeline->setPreprocessors(ENTITY_TYPE_EVENT, Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); + mPipeline->setAdaptorFactory(ENTITY_TYPE_EVENT, eventFactory); //TODO cleanup indexes during removal - GenericResource::configurePipeline(pipeline); } -KAsync::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) +KAsync::Job DummyResource::synchronizeWithSource() { - return KAsync::start([this, pipeline](KAsync::Future &f) { + return KAsync::start([this](KAsync::Future &f) { //TODO start transaction on index Index uidIndex(Akonadi2::Store::storageLocation(), mResourceInstanceIdentifier + ".index.uid", Akonadi2::Storage::ReadOnly); diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h index f2362bc..4baafa7 100644 --- a/examples/dummyresource/resourcefactory.h +++ b/examples/dummyresource/resourcefactory.h @@ -32,9 +32,8 @@ class DummyResource : public Akonadi2::GenericResource { public: - DummyResource(const QByteArray &instanceIdentifier); - KAsync::Job synchronizeWithSource(Akonadi2::Pipeline *pipeline) Q_DECL_OVERRIDE; - void configurePipeline(Akonadi2::Pipeline *pipeline) Q_DECL_OVERRIDE; + DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline = QSharedPointer()); + KAsync::Job synchronizeWithSource() Q_DECL_OVERRIDE; }; class DummyResourceFactory : public Akonadi2::ResourceFactory diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp index 7d40779..4c649a9 100644 --- a/tests/dummyresourcebenchmark.cpp +++ b/tests/dummyresourcebenchmark.cpp @@ -8,11 +8,13 @@ #include "commands.h" #include "entitybuffer.h" #include "synclistresult.h" +#include "pipeline.h" #include "event_generated.h" #include "entity_generated.h" #include "metadata_generated.h" #include "createentity_generated.h" + #include static void removeFromDisk(const QString &name) @@ -94,10 +96,9 @@ private Q_SLOTS: time.start(); int num = 10000; - Akonadi2::Pipeline pipeline("org.kde.dummy.instance1"); - QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated())); - DummyResource resource("org.kde.dummy.instance1"); - resource.configurePipeline(&pipeline); + auto pipeline = QSharedPointer::create("org.kde.dummy.instance1"); + QSignalSpy revisionSpy(pipeline.data(), SIGNAL(revisionUpdated())); + DummyResource resource("org.kde.dummy.instance1", pipeline); flatbuffers::FlatBufferBuilder eventFbb; eventFbb.Clear(); @@ -133,7 +134,7 @@ private Q_SLOTS: const QByteArray command(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); for (int i = 0; i < num; i++) { - resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); + resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command); } auto appendTime = time.elapsed(); diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index 10cd7e3..1a4d6ca 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp @@ -12,6 +12,7 @@ #include "commands.h" #include "entitybuffer.h" #include "resourceconfig.h" +#include "pipeline.h" static void removeFromDisk(const QString &name) { @@ -133,10 +134,9 @@ private Q_SLOTS: void testResourceSync() { - Akonadi2::Pipeline pipeline("org.kde.dummy.instance1"); - DummyResource resource("org.kde.dummy.instance1"); - resource.configurePipeline(&pipeline); - auto job = resource.synchronizeWithSource(&pipeline); + auto pipeline = QSharedPointer::create("org.kde.dummy.instance1"); + DummyResource resource("org.kde.dummy.instance1", pipeline); + auto job = resource.synchronizeWithSource(); //TODO pass in optional timeout? auto future = job.exec(); future.waitForFinished(); diff --git a/tests/genericresourcetest.cpp b/tests/genericresourcetest.cpp index 0b9a5c1..b6f629a 100644 --- a/tests/genericresourcetest.cpp +++ b/tests/genericresourcetest.cpp @@ -14,20 +14,15 @@ class TestResource : public Akonadi2::GenericResource { public: - TestResource(const QByteArray &instanceIdentifier) - : Akonadi2::GenericResource(instanceIdentifier) + TestResource(const QByteArray &instanceIdentifier, QSharedPointer pipeline) + : Akonadi2::GenericResource(instanceIdentifier, pipeline) { } - KAsync::Job synchronizeWithSource(Akonadi2::Pipeline *pipeline) Q_DECL_OVERRIDE + KAsync::Job synchronizeWithSource() Q_DECL_OVERRIDE { return KAsync::null(); } - - void configurePipeline(Akonadi2::Pipeline *pipeline) Q_DECL_OVERRIDE - { - GenericResource::configurePipeline(pipeline); - } }; @@ -89,12 +84,11 @@ private Q_SLOTS: } //Actual test - Akonadi2::Pipeline pipeline("org.kde.test.instance1"); - QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated(qint64))); - TestResource resource("org.kde.test.instance1"); - resource.configurePipeline(&pipeline); - resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); - resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); + auto pipeline = QSharedPointer::create("org.kde.test.instance1"); + QSignalSpy revisionSpy(pipeline.data(), SIGNAL(revisionUpdated(qint64))); + TestResource resource("org.kde.test.instance1", pipeline); + resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command); + resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command); QVERIFY(revisionSpy.isValid()); QTRY_COMPARE(revisionSpy.count(), 2); -- cgit v1.2.3