From 47f105febcd17d6db1f998a99c6c6c423851573a Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 20 Apr 2015 22:22:19 +0200 Subject: Moved generic part of resource implementation to GenericResource --- examples/dummyresource/resourcefactory.cpp | 200 +---------------------------- examples/dummyresource/resourcefactory.h | 22 +--- 2 files changed, 7 insertions(+), 215 deletions(-) (limited to 'examples/dummyresource') diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index d5765e2..a4cd68d 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp @@ -103,152 +103,10 @@ QMap populate() static QMap s_dataSource = populate(); -//Drives the pipeline using the output from all command queues -class Processor : public QObject -{ - Q_OBJECT -public: - Processor(Akonadi2::Pipeline *pipeline, QList commandQueues) - : QObject(), - mPipeline(pipeline), - mCommandQueues(commandQueues), - mProcessingLock(false) - { - for (auto queue : mCommandQueues) { - const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); - Q_UNUSED(ret); - } - } - -signals: - void error(int errorCode, const QString &errorMessage); - -private: - bool messagesToProcessAvailable() - { - for (auto queue : mCommandQueues) { - if (!queue->isEmpty()) { - return true; - } - } - return false; - } - -private slots: - void process() - { - if (mProcessingLock) { - return; - } - mProcessingLock = true; - auto job = processPipeline().then([this]() { - mProcessingLock = false; - if (messagesToProcessAvailable()) { - process(); - } - }).exec(); - } - - Async::Job processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) - { - Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); - //Throw command into appropriate pipeline - switch (queuedCommand->commandId()) { - case Akonadi2::Commands::DeleteEntityCommand: - //mPipeline->removedEntity - return Async::null(); - case Akonadi2::Commands::ModifyEntityCommand: - //mPipeline->modifiedEntity - return Async::null(); - case Akonadi2::Commands::CreateEntityCommand: - return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); - default: - return Async::error(-1, "Unhandled command"); - } - return Async::null(); - } - - //Process all messages of this queue - Async::Job processQueue(MessageQueue *queue) - { - //TODO use something like: - //Async::foreach("pass iterator here").each("process value here").join(); - //Async::foreach("pass iterator here").parallel("process value here").join(); - return Async::dowhile( - [this, queue](Async::Future &future) { - if (queue->isEmpty()) { - future.setValue(false); - future.setFinished(); - return; - } - queue->dequeue( - [this, &future](void *ptr, int size, std::function messageQueueCallback) { - auto callback = [messageQueueCallback, &future](bool success) { - messageQueueCallback(success); - future.setValue(!success); - future.setFinished(); - }; - - flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); - if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { - Warning() << "invalid buffer"; - callback(false); - return; - } - auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); - Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId()); - //TODO JOBAPI: job lifetime management - //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete - //themselves once done. In other cases we'd like jobs that only live as long as their handle though. - //FIXME this job is stack allocated and thus simply dies.... - processQueuedCommand(queuedCommand).then( - [callback]() { - callback(true); - }, - [callback](int errorCode, QString errorMessage) { - Warning() << "Error while processing queue command: " << errorMessage; - callback(false); - } - ).exec(); - }, - [&future](const MessageQueue::Error &error) { - Warning() << "Error while getting message from messagequeue: " << error.message; - future.setValue(false); - future.setFinished(); - } - ); - } - ); - } - - Async::Job processPipeline() - { - //Go through all message queues - auto it = QSharedPointer >::create(mCommandQueues); - return Async::dowhile( - [it]() { return it->hasNext(); }, - [it, this](Async::Future &future) { - auto queue = it->next(); - processQueue(queue).then([&future]() { - Trace() << "Queue processed"; - future.setFinished(); - }).exec(); - } - ); - } - -private: - Akonadi2::Pipeline *mPipeline; - //Ordered by priority - QList mCommandQueues; - bool mProcessingLock; -}; +//FIXME We need to pass the resource-instance name to generic resource, not the plugin name DummyResource::DummyResource() - : Akonadi2::Resource(), - mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), - mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"), - mError(0) + : Akonadi2::GenericResource(PLUGIN_NAME) { } @@ -277,19 +135,7 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) //event is the entitytype and not the domain type pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer << uidIndexer); - mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); - QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); -} - -void DummyResource::onProcessorError(int errorCode, const QString &errorMessage) -{ - Warning() << "Received error from Processor: " << errorCode << errorMessage; - mError = errorCode; -} - -int DummyResource::error() const -{ - return mError; + GenericResource::configurePipeline(pipeline); } void findByRemoteId(QSharedPointer storage, const QString &rid, std::function callback) @@ -316,15 +162,6 @@ void findByRemoteId(QSharedPointer storage, const QString &ri }); } -void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) -{ - m_fbb.Clear(); - auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size()); - auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData); - Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); - mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); -} - Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) { return Async::start([this, pipeline](Async::Future &f) { @@ -377,37 +214,6 @@ Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli }); } -Async::Job DummyResource::processAllMessages() -{ - //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. - //TODO: report errors while processing sync? - //TODO JOBAPI: A helper that waits for n events and then continues? - return Async::start([this](Async::Future &f) { - if (mSynchronizerQueue.isEmpty()) { - f.setFinished(); - } else { - QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { - f.setFinished(); - }); - } - }).then([this](Async::Future &f) { - if (mUserQueue.isEmpty()) { - f.setFinished(); - } else { - QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() { - f.setFinished(); - }); - } - }); -} - -void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) -{ - //TODO instead of copying the command including the full entity first into the command queue, we could directly - //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). - //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). - enqueueCommand(mUserQueue, commandId, data); -} DummyResourceFactory::DummyResourceFactory(QObject *parent) : Akonadi2::ResourceFactory(parent) diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h index 3b99d5e..7b7783e 100644 --- a/examples/dummyresource/resourcefactory.h +++ b/examples/dummyresource/resourcefactory.h @@ -19,7 +19,7 @@ #pragma once -#include "common/resource.h" +#include "common/genericresource.h" #include "async/src/async.h" #include "common/messagequeue.h" @@ -28,26 +28,12 @@ //TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA #define PLUGIN_NAME "org.kde.dummy" -class Processor; - -class DummyResource : public Akonadi2::Resource +class DummyResource : public Akonadi2::GenericResource { public: DummyResource(); - Async::Job synchronizeWithSource(Akonadi2::Pipeline *pipeline); - Async::Job processAllMessages(); - void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); - void configurePipeline(Akonadi2::Pipeline *pipeline); - int error() const; - -private: - void onProcessorError(int errorCode, const QString &errorMessage); - void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); - flatbuffers::FlatBufferBuilder m_fbb; - MessageQueue mUserQueue; - MessageQueue mSynchronizerQueue; - Processor *mProcessor; - int mError; + Async::Job synchronizeWithSource(Akonadi2::Pipeline *pipeline) Q_DECL_OVERRIDE; + void configurePipeline(Akonadi2::Pipeline *pipeline) Q_DECL_OVERRIDE; }; class DummyResourceFactory : public Akonadi2::ResourceFactory -- cgit v1.2.3