From bc2a95cad05e454a84c317f1078edb329bd3afd4 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 15 Jan 2015 01:56:09 +0100 Subject: Writing from facade. --- dummyresource/resourcefactory.cpp | 123 +++++++++++++++++++++++++++++++++----- 1 file changed, 109 insertions(+), 14 deletions(-) (limited to 'dummyresource/resourcefactory.cpp') diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index e4f7e41..e14aa01 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -23,18 +23,20 @@ #include "pipeline.h" #include "dummycalendar_generated.h" #include "metadata_generated.h" +#include "queuedcommand_generated.h" #include "domainadaptor.h" +#include "commands.h" +#include "clientapi.h" #include /* * Figure out how to implement various classes of processors: - * * read-only (index and such) => domain adapter + * * read-only (index and such) => extractor function, probably using domain adaptor * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) * * flag extractors? => like read-only? Or write to local portion of buffer? * ** $ISSPAM should become part of domain object and is written to the local part of the mail. * ** => value could be calculated by the server directly */ -// template class SimpleProcessor : public Akonadi2::Preprocessor { public: @@ -96,21 +98,108 @@ QMap populate() static QMap s_dataSource = populate(); +//Drives the pipeline using the output from all command queues +class Processor : public QObject +{ + Q_OBJECT +public: + Processor(Akonadi2::Pipeline *pipeline, QList commandQueues) + : QObject(), + mPipeline(pipeline), + mCommandQueues(commandQueues), + mProcessingLock(false) + { + for (auto queue : mCommandQueues) { + bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); + Q_ASSERT(ret); + } + } + +private slots: + void process() + { + if (mProcessingLock) { + return; + } + mProcessingLock = true; + //Empty queue after queue + //FIXME the for and while loops should be async, otherwise we process all messages in parallel + for (auto queue : mCommandQueues) { + qDebug() << "processing queue"; + bool processedMessage = false; + while (processedMessage) { + qDebug() << "process"; + processedMessage = false; + queue->dequeue([this, &processedMessage](void *ptr, int size, std::function messageQueueCallback) { + flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); + if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { + qWarning() << "invalid buffer"; + processedMessage = false; + messageQueueCallback(false); + return; + } + auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); + qDebug() << "Dequeued: " << queuedCommand->commandId(); + //Throw command into appropriate pipeline + switch (queuedCommand->commandId()) { + case Akonadi2::Commands::DeleteEntityCommand: + //mPipeline->removedEntity + break; + case Akonadi2::Commands::ModifyEntityCommand: + //mPipeline->modifiedEntity + break; + case Akonadi2::Commands::CreateEntityCommand: { + //TODO job lifetime management + auto job = mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([&messageQueueCallback](Async::Future future) { + messageQueueCallback(true); + }); + job.exec(); + } + break; + default: + //Unhandled command + qWarning() << "Unhandled command"; + messageQueueCallback(true); + break; + } + processedMessage = true; + }, + [&processedMessage](const MessageQueue::Error &error) { + processedMessage = false; + }); + } + } + mProcessingLock = false; + } + +private: + Akonadi2::Pipeline *mPipeline; + //Ordered by priority + QList mCommandQueues; + bool mProcessingLock; +}; + DummyResource::DummyResource() - : Akonadi2::Resource() + : Akonadi2::Resource(), + mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), + mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue") { } void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) { auto eventFactory = QSharedPointer::create(); + //FIXME we should setup for each resource entity type, not for each domain type + //i.e. If a resource stores tags as part of each message it needs to update the tag index //TODO setup preprocessors for each domain type and pipeline type allowing full customization //Eventually the order should be self configuring, for now it's hardcoded. auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { auto adaptor = eventFactory->createAdaptor(entity); qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); }); - pipeline->setPreprocessors(Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); + //event is the entitytype and not the domain type + pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); + mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); } void findByRemoteId(QSharedPointer storage, const QString &rid, std::function callback) @@ -139,6 +228,7 @@ void findByRemoteId(QSharedPointer storage, const QString &ri Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) { + qDebug() << "synchronizeWithSource"; return Async::start([this, pipeline](Async::Future &f) { //TODO use a read-only transaction during the complete sync to sync against a defined revision auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); @@ -171,7 +261,9 @@ Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. const auto key = QUuid::createUuid().toString().toUtf8(); - pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); + //TODO Create queuedcommand and push into synchronizerQueue + //* create message in store directly, add command to messagequeue waiting for processing. + // pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); } else { //modification //TODO diff and create modification if necessary } @@ -183,16 +275,18 @@ Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) { - Q_UNUSED(commandId) - Q_UNUSED(data) - Q_UNUSED(size) - //TODO reallly process the commands :) - auto builder = DummyCalendar::DummyEventBuilder(m_fbb); - builder .add_summary(m_fbb.CreateString("summary summary!")); - auto buffer = builder.Finish(); - DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); - pipeline->newEntity("fakekey", m_fbb.GetBufferPointer(), m_fbb.GetSize()); + qDebug() << "processCommand"; + //TODO instead of copying the command including the full entity first into the command queue, we could directly + //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). + //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). m_fbb.Clear(); + auto commandData = m_fbb.CreateVector(reinterpret_cast(data.data()), data.size()); + auto builder = Akonadi2::QueuedCommandBuilder(m_fbb); + builder.add_commandId(commandId); + builder.add_command(commandData); + auto buffer = builder.Finish(); + Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); + mUserQueue.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); } DummyResourceFactory::DummyResourceFactory(QObject *parent) @@ -211,3 +305,4 @@ void DummyResourceFactory::registerFacades(Akonadi2::FacadeFactory &factory) factory.registerFacade(PLUGIN_NAME); } +#include "resourcefactory.moc" -- cgit v1.2.3