From 47f105febcd17d6db1f998a99c6c6c423851573a Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 20 Apr 2015 22:22:19 +0200 Subject: Moved generic part of resource implementation to GenericResource --- common/CMakeLists.txt | 1 + common/genericresource.cpp | 234 +++++++++++++++++++++++++++++ common/genericresource.h | 59 ++++++++ examples/dummyresource/resourcefactory.cpp | 200 +----------------------- examples/dummyresource/resourcefactory.h | 22 +-- 5 files changed, 301 insertions(+), 215 deletions(-) create mode 100644 common/genericresource.cpp create mode 100644 common/genericresource.h diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index ce237c5..c18d98a 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -33,6 +33,7 @@ set(command_SRCS pipeline.cpp domainadaptor.cpp resource.cpp + genericresource.cpp resourceaccess.cpp storage_common.cpp threadboundary.cpp diff --git a/common/genericresource.cpp b/common/genericresource.cpp new file mode 100644 index 0000000..0b71500 --- /dev/null +++ b/common/genericresource.cpp @@ -0,0 +1,234 @@ +#include "genericresource.h" + +#include "facade.h" +#include "entitybuffer.h" +#include "pipeline.h" +#include "queuedcommand_generated.h" +#include "createentity_generated.h" +#include "domainadaptor.h" +#include "commands.h" +#include "clientapi.h" +#include "index.h" +#include "log.h" +#include + +using namespace Akonadi2; + +/** + * Drives the pipeline using the output from all command queues + */ +class Processor : public QObject +{ + Q_OBJECT +public: + Processor(Akonadi2::Pipeline *pipeline, QList commandQueues) + : QObject(), + mPipeline(pipeline), + mCommandQueues(commandQueues), + mProcessingLock(false) + { + for (auto queue : mCommandQueues) { + const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); + Q_UNUSED(ret); + } + } + +signals: + void error(int errorCode, const QString &errorMessage); + +private: + bool messagesToProcessAvailable() + { + for (auto queue : mCommandQueues) { + if (!queue->isEmpty()) { + return true; + } + } + return false; + } + +private slots: + void process() + { + if (mProcessingLock) { + return; + } + mProcessingLock = true; + auto job = processPipeline().then([this]() { + mProcessingLock = false; + if (messagesToProcessAvailable()) { + process(); + } + }).exec(); + } + + Async::Job processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) + { + Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); + //Throw command into appropriate pipeline + switch (queuedCommand->commandId()) { + case Akonadi2::Commands::DeleteEntityCommand: + //mPipeline->removedEntity + return Async::null(); + case Akonadi2::Commands::ModifyEntityCommand: + //mPipeline->modifiedEntity + return Async::null(); + case Akonadi2::Commands::CreateEntityCommand: + return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); + default: + return Async::error(-1, "Unhandled command"); + } + return Async::null(); + } + + //Process all messages of this queue + Async::Job processQueue(MessageQueue *queue) + { + //TODO use something like: + //Async::foreach("pass iterator here").each("process value here").join(); + //Async::foreach("pass iterator here").parallel("process value here").join(); + return Async::dowhile( + [this, queue](Async::Future &future) { + if (queue->isEmpty()) { + future.setValue(false); + future.setFinished(); + return; + } + queue->dequeue( + [this, &future](void *ptr, int size, std::function messageQueueCallback) { + auto callback = [messageQueueCallback, &future](bool success) { + messageQueueCallback(success); + future.setValue(!success); + future.setFinished(); + }; + + flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); + if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { + Warning() << "invalid buffer"; + callback(false); + return; + } + auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); + Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId()); + //TODO JOBAPI: job lifetime management + //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete + //themselves once done. In other cases we'd like jobs that only live as long as their handle though. + //FIXME this job is stack allocated and thus simply dies.... + processQueuedCommand(queuedCommand).then( + [callback]() { + callback(true); + }, + [callback](int errorCode, QString errorMessage) { + Warning() << "Error while processing queue command: " << errorMessage; + callback(false); + } + ).exec(); + }, + [&future](const MessageQueue::Error &error) { + Warning() << "Error while getting message from messagequeue: " << error.message; + future.setValue(false); + future.setFinished(); + } + ); + } + ); + } + + Async::Job processPipeline() + { + //Go through all message queues + auto it = QSharedPointer >::create(mCommandQueues); + return Async::dowhile( + [it]() { return it->hasNext(); }, + [it, this](Async::Future &future) { + auto queue = it->next(); + processQueue(queue).then([&future]() { + Trace() << "Queue processed"; + future.setFinished(); + }).exec(); + } + ); + } + +private: + Akonadi2::Pipeline *mPipeline; + //Ordered by priority + QList mCommandQueues; + bool mProcessingLock; +}; + + +GenericResource::GenericResource(const QByteArray &resourceIdentifier) + : Akonadi2::Resource(), + mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde." + resourceIdentifier + ".userqueue"), + mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde." + resourceIdentifier + ".synchronizerqueue"), + mError(0) +{ +} + +GenericResource::~GenericResource() +{ + +} + +void GenericResource::configurePipeline(Akonadi2::Pipeline *pipeline) +{ + //TODO figure out lifetime of the processor + mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); + QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); +} + +void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) +{ + Warning() << "Received error from Processor: " << errorCode << errorMessage; + mError = errorCode; +} + +int GenericResource::error() const +{ + return mError; +} + +void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) +{ + //TODO get rid of m_fbb member variable + m_fbb.Clear(); + auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size()); + auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData); + Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); + mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); +} + +void GenericResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) +{ + //TODO instead of copying the command including the full entity first into the command queue, we could directly + //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). + //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). + enqueueCommand(mUserQueue, commandId, data); +} + +Async::Job GenericResource::processAllMessages() +{ + //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. + //TODO: report errors while processing sync? + //TODO JOBAPI: A helper that waits for n events and then continues? + return Async::start([this](Async::Future &f) { + if (mSynchronizerQueue.isEmpty()) { + f.setFinished(); + } else { + QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { + f.setFinished(); + }); + } + }).then([this](Async::Future &f) { + if (mUserQueue.isEmpty()) { + f.setFinished(); + } else { + QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() { + f.setFinished(); + }); + } + }); +} + +#include "genericresource.moc" diff --git a/common/genericresource.h b/common/genericresource.h new file mode 100644 index 0000000..36fa567 --- /dev/null +++ b/common/genericresource.h @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2015 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include +#include +#include + +class Processor; + +namespace Akonadi2 +{ + +/** + * Generic Resource implementation. + */ +class AKONADI2COMMON_EXPORT GenericResource : public Resource +{ +public: + GenericResource(const QByteArray &resourceIdentifier); + virtual ~GenericResource(); + + virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE; + virtual Async::Job synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; + virtual Async::Job processAllMessages() Q_DECL_OVERRIDE; + + virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE; + int error() const; + +protected: + void onProcessorError(int errorCode, const QString &errorMessage); + void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); + flatbuffers::FlatBufferBuilder m_fbb; + MessageQueue mUserQueue; + MessageQueue mSynchronizerQueue; + +private: + Processor *mProcessor; + int mError; +}; + +} diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index d5765e2..a4cd68d 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp @@ -103,152 +103,10 @@ QMap populate() static QMap s_dataSource = populate(); -//Drives the pipeline using the output from all command queues -class Processor : public QObject -{ - Q_OBJECT -public: - Processor(Akonadi2::Pipeline *pipeline, QList commandQueues) - : QObject(), - mPipeline(pipeline), - mCommandQueues(commandQueues), - mProcessingLock(false) - { - for (auto queue : mCommandQueues) { - const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); - Q_UNUSED(ret); - } - } - -signals: - void error(int errorCode, const QString &errorMessage); - -private: - bool messagesToProcessAvailable() - { - for (auto queue : mCommandQueues) { - if (!queue->isEmpty()) { - return true; - } - } - return false; - } - -private slots: - void process() - { - if (mProcessingLock) { - return; - } - mProcessingLock = true; - auto job = processPipeline().then([this]() { - mProcessingLock = false; - if (messagesToProcessAvailable()) { - process(); - } - }).exec(); - } - - Async::Job processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) - { - Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); - //Throw command into appropriate pipeline - switch (queuedCommand->commandId()) { - case Akonadi2::Commands::DeleteEntityCommand: - //mPipeline->removedEntity - return Async::null(); - case Akonadi2::Commands::ModifyEntityCommand: - //mPipeline->modifiedEntity - return Async::null(); - case Akonadi2::Commands::CreateEntityCommand: - return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); - default: - return Async::error(-1, "Unhandled command"); - } - return Async::null(); - } - - //Process all messages of this queue - Async::Job processQueue(MessageQueue *queue) - { - //TODO use something like: - //Async::foreach("pass iterator here").each("process value here").join(); - //Async::foreach("pass iterator here").parallel("process value here").join(); - return Async::dowhile( - [this, queue](Async::Future &future) { - if (queue->isEmpty()) { - future.setValue(false); - future.setFinished(); - return; - } - queue->dequeue( - [this, &future](void *ptr, int size, std::function messageQueueCallback) { - auto callback = [messageQueueCallback, &future](bool success) { - messageQueueCallback(success); - future.setValue(!success); - future.setFinished(); - }; - - flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); - if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { - Warning() << "invalid buffer"; - callback(false); - return; - } - auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); - Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId()); - //TODO JOBAPI: job lifetime management - //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete - //themselves once done. In other cases we'd like jobs that only live as long as their handle though. - //FIXME this job is stack allocated and thus simply dies.... - processQueuedCommand(queuedCommand).then( - [callback]() { - callback(true); - }, - [callback](int errorCode, QString errorMessage) { - Warning() << "Error while processing queue command: " << errorMessage; - callback(false); - } - ).exec(); - }, - [&future](const MessageQueue::Error &error) { - Warning() << "Error while getting message from messagequeue: " << error.message; - future.setValue(false); - future.setFinished(); - } - ); - } - ); - } - - Async::Job processPipeline() - { - //Go through all message queues - auto it = QSharedPointer >::create(mCommandQueues); - return Async::dowhile( - [it]() { return it->hasNext(); }, - [it, this](Async::Future &future) { - auto queue = it->next(); - processQueue(queue).then([&future]() { - Trace() << "Queue processed"; - future.setFinished(); - }).exec(); - } - ); - } - -private: - Akonadi2::Pipeline *mPipeline; - //Ordered by priority - QList mCommandQueues; - bool mProcessingLock; -}; +//FIXME We need to pass the resource-instance name to generic resource, not the plugin name DummyResource::DummyResource() - : Akonadi2::Resource(), - mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), - mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"), - mError(0) + : Akonadi2::GenericResource(PLUGIN_NAME) { } @@ -277,19 +135,7 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) //event is the entitytype and not the domain type pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer << uidIndexer); - mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); - QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); -} - -void DummyResource::onProcessorError(int errorCode, const QString &errorMessage) -{ - Warning() << "Received error from Processor: " << errorCode << errorMessage; - mError = errorCode; -} - -int DummyResource::error() const -{ - return mError; + GenericResource::configurePipeline(pipeline); } void findByRemoteId(QSharedPointer storage, const QString &rid, std::function callback) @@ -316,15 +162,6 @@ void findByRemoteId(QSharedPointer storage, const QString &ri }); } -void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) -{ - m_fbb.Clear(); - auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size()); - auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData); - Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); - mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); -} - Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) { return Async::start([this, pipeline](Async::Future &f) { @@ -377,37 +214,6 @@ Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli }); } -Async::Job DummyResource::processAllMessages() -{ - //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. - //TODO: report errors while processing sync? - //TODO JOBAPI: A helper that waits for n events and then continues? - return Async::start([this](Async::Future &f) { - if (mSynchronizerQueue.isEmpty()) { - f.setFinished(); - } else { - QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { - f.setFinished(); - }); - } - }).then([this](Async::Future &f) { - if (mUserQueue.isEmpty()) { - f.setFinished(); - } else { - QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() { - f.setFinished(); - }); - } - }); -} - -void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) -{ - //TODO instead of copying the command including the full entity first into the command queue, we could directly - //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). - //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). - enqueueCommand(mUserQueue, commandId, data); -} DummyResourceFactory::DummyResourceFactory(QObject *parent) : Akonadi2::ResourceFactory(parent) diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h index 3b99d5e..7b7783e 100644 --- a/examples/dummyresource/resourcefactory.h +++ b/examples/dummyresource/resourcefactory.h @@ -19,7 +19,7 @@ #pragma once -#include "common/resource.h" +#include "common/genericresource.h" #include "async/src/async.h" #include "common/messagequeue.h" @@ -28,26 +28,12 @@ //TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA #define PLUGIN_NAME "org.kde.dummy" -class Processor; - -class DummyResource : public Akonadi2::Resource +class DummyResource : public Akonadi2::GenericResource { public: DummyResource(); - Async::Job synchronizeWithSource(Akonadi2::Pipeline *pipeline); - Async::Job processAllMessages(); - void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); - void configurePipeline(Akonadi2::Pipeline *pipeline); - int error() const; - -private: - void onProcessorError(int errorCode, const QString &errorMessage); - void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); - flatbuffers::FlatBufferBuilder m_fbb; - MessageQueue mUserQueue; - MessageQueue mSynchronizerQueue; - Processor *mProcessor; - int mError; + Async::Job synchronizeWithSource(Akonadi2::Pipeline *pipeline) Q_DECL_OVERRIDE; + void configurePipeline(Akonadi2::Pipeline *pipeline) Q_DECL_OVERRIDE; }; class DummyResourceFactory : public Akonadi2::ResourceFactory -- cgit v1.2.3