From f89e43b3603976bc0e6eb885b3b9a43a6caff1c2 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 19 Apr 2015 14:11:02 +0200 Subject: Moved client and dummyresource to examples/ --- examples/CMakeLists.txt | 5 + examples/client/CMakeLists.txt | 8 + examples/client/main.cpp | 52 ++++ examples/dummyresource/CMakeLists.txt | 12 + examples/dummyresource/domainadaptor.cpp | 60 ++++ examples/dummyresource/domainadaptor.h | 14 + examples/dummyresource/dummycalendar.fbs | 13 + examples/dummyresource/facade.cpp | 189 +++++++++++++ examples/dummyresource/facade.h | 45 +++ examples/dummyresource/resourcefactory.cpp | 428 +++++++++++++++++++++++++++++ examples/dummyresource/resourcefactory.h | 65 +++++ 11 files changed, 891 insertions(+) create mode 100644 examples/CMakeLists.txt create mode 100644 examples/client/CMakeLists.txt create mode 100644 examples/client/main.cpp create mode 100644 examples/dummyresource/CMakeLists.txt create mode 100644 examples/dummyresource/domainadaptor.cpp create mode 100644 examples/dummyresource/domainadaptor.h create mode 100644 examples/dummyresource/dummycalendar.fbs create mode 100644 examples/dummyresource/facade.cpp create mode 100644 examples/dummyresource/facade.h create mode 100644 examples/dummyresource/resourcefactory.cpp create mode 100644 examples/dummyresource/resourcefactory.h (limited to 'examples') diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt new file mode 100644 index 0000000..ea2b0ce --- /dev/null +++ b/examples/CMakeLists.txt @@ -0,0 +1,5 @@ +# the client +add_subdirectory(client) + +# a simple dummy resource implementation +add_subdirectory(dummyresource) diff --git a/examples/client/CMakeLists.txt b/examples/client/CMakeLists.txt new file mode 100644 index 0000000..3555b3e --- /dev/null +++ b/examples/client/CMakeLists.txt @@ -0,0 +1,8 @@ +project(akonadi2_client) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) + +add_executable(${PROJECT_NAME} main.cpp) +target_link_libraries(${PROJECT_NAME} akonadi2common) +qt5_use_modules(${PROJECT_NAME} Widgets Network) +install(TARGETS ${PROJECT_NAME} ${KDE_INSTALL_TARGETS_DEFAULT_ARGS}) diff --git a/examples/client/main.cpp b/examples/client/main.cpp new file mode 100644 index 0000000..b4cb081 --- /dev/null +++ b/examples/client/main.cpp @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2014 Aaron Seigo + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include +#include + +#include "common/commands.h" +#include "common/console.h" +#include "common/resourceaccess.h" + +int main(int argc, char *argv[]) +{ + QApplication app(argc, argv); + + new Akonadi2::Console("Akonadi2 Client"); + Akonadi2::Console::main()->log(QString("PID: %1").arg(QCoreApplication::applicationPid())); + + QCommandLineParser cliOptions; + cliOptions.addPositionalArgument(QObject::tr("[resource]"), + QObject::tr("A resource to connect to")); + cliOptions.process(app); + QStringList resources = cliOptions.positionalArguments(); + if (resources.isEmpty()) { + resources << "org.kde.dummy"; + } + + for (const QString &resource: resources) { + Akonadi2::ResourceAccess *resAccess = new Akonadi2::ResourceAccess(resource.toLatin1()); + QObject::connect(&app, &QCoreApplication::aboutToQuit, + resAccess, &Akonadi2::ResourceAccess::close); + resAccess->sendCommand(Akonadi2::Commands::SynchronizeCommand); + resAccess->open(); + } + + return app.exec(); +} diff --git a/examples/dummyresource/CMakeLists.txt b/examples/dummyresource/CMakeLists.txt new file mode 100644 index 0000000..abd315f --- /dev/null +++ b/examples/dummyresource/CMakeLists.txt @@ -0,0 +1,12 @@ +project(akonadi2_resource_dummy) + +add_definitions(-DQT_PLUGIN) +include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) + +generate_flatbuffers(dummycalendar) + +add_library(${PROJECT_NAME} SHARED facade.cpp resourcefactory.cpp domainadaptor.cpp) +qt5_use_modules(${PROJECT_NAME} Core Network) +target_link_libraries(${PROJECT_NAME} akonadi2common) + +install(TARGETS ${PROJECT_NAME} LIBRARY DESTINATION ${AKONADI2_RESOURCE_PLUGINS_PATH}) diff --git a/examples/dummyresource/domainadaptor.cpp b/examples/dummyresource/domainadaptor.cpp new file mode 100644 index 0000000..8649bc3 --- /dev/null +++ b/examples/dummyresource/domainadaptor.cpp @@ -0,0 +1,60 @@ + +#include "domainadaptor.h" + +#include +#include + +#include "dummycalendar_generated.h" +#include "event_generated.h" +#include "entity_generated.h" +#include "metadata_generated.h" +#include "domainadaptor.h" +#include "log.h" +#include + +using namespace DummyCalendar; +using namespace flatbuffers; + + + + +DummyEventAdaptorFactory::DummyEventAdaptorFactory() + : DomainTypeAdaptorFactory() +{ + //TODO turn this into initializeReadPropertyMapper as well? + mResourceMapper->addMapping("summary", [](DummyEvent const *buffer) -> QVariant { + return propertyToVariant(buffer->summary()); + }); + + mResourceWriteMapper->addMapping("summary", [](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function { + auto offset = variantToProperty(value, fbb); + return [offset](DummyEventBuilder &builder) { builder.add_summary(offset); }; + }); +} + + +void DummyEventAdaptorFactory::createBuffer(const Akonadi2::ApplicationDomain::Event &event, flatbuffers::FlatBufferBuilder &fbb) +{ + flatbuffers::FlatBufferBuilder localFbb; + if (mLocalWriteMapper) { + auto pos = createBufferPart(event, localFbb, *mLocalWriteMapper); + Akonadi2::ApplicationDomain::Buffer::FinishEventBuffer(localFbb, pos); + flatbuffers::Verifier verifier(localFbb.GetBufferPointer(), localFbb.GetSize()); + if (!verifier.VerifyBuffer()) { + Warning() << "Created invalid local buffer"; + } + } + + flatbuffers::FlatBufferBuilder resFbb; + if (mResourceWriteMapper) { + auto pos = createBufferPart(event, resFbb, *mResourceWriteMapper); + DummyCalendar::FinishDummyEventBuffer(resFbb, pos); + flatbuffers::Verifier verifier(resFbb.GetBufferPointer(), resFbb.GetSize()); + if (!verifier.VerifyBuffer()) { + Warning() << "Created invalid resource buffer"; + } + } + + Akonadi2::EntityBuffer::assembleEntityBuffer(fbb, 0, 0, resFbb.GetBufferPointer(), resFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize()); +} + diff --git a/examples/dummyresource/domainadaptor.h b/examples/dummyresource/domainadaptor.h new file mode 100644 index 0000000..9d351e7 --- /dev/null +++ b/examples/dummyresource/domainadaptor.h @@ -0,0 +1,14 @@ +#pragma once + +#include "common/domainadaptor.h" +#include "event_generated.h" +#include "dummycalendar_generated.h" +#include "entity_generated.h" + +class DummyEventAdaptorFactory : public DomainTypeAdaptorFactory +{ +public: + DummyEventAdaptorFactory(); + virtual ~DummyEventAdaptorFactory() {}; + virtual void createBuffer(const Akonadi2::ApplicationDomain::Event &event, flatbuffers::FlatBufferBuilder &fbb); +}; diff --git a/examples/dummyresource/dummycalendar.fbs b/examples/dummyresource/dummycalendar.fbs new file mode 100644 index 0000000..643c9b2 --- /dev/null +++ b/examples/dummyresource/dummycalendar.fbs @@ -0,0 +1,13 @@ +// example IDL file + +namespace DummyCalendar; + +table DummyEvent { + summary:string; + description:string; + attachment:[ubyte]; + remoteId:string; +} + +root_type DummyEvent; +file_identifier "AKFB"; diff --git a/examples/dummyresource/facade.cpp b/examples/dummyresource/facade.cpp new file mode 100644 index 0000000..e50e4f3 --- /dev/null +++ b/examples/dummyresource/facade.cpp @@ -0,0 +1,189 @@ +/* + * Copyright (C) 2014 Christian Mollekopf + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "facade.h" + +#include +#include + +#include "common/resourceaccess.h" +#include "common/commands.h" +#include "dummycalendar_generated.h" +#include "event_generated.h" +#include "entity_generated.h" +#include "metadata_generated.h" +#include "domainadaptor.h" +#include +#include +#include + +using namespace DummyCalendar; +using namespace flatbuffers; + + +DummyResourceFacade::DummyResourceFacade() + : Akonadi2::GenericFacade("org.kde.dummy"), + mFactory(new DummyEventAdaptorFactory) +{ +} + +DummyResourceFacade::~DummyResourceFacade() +{ +} + +Async::Job DummyResourceFacade::create(const Akonadi2::ApplicationDomain::Event &domainObject) +{ + flatbuffers::FlatBufferBuilder entityFbb; + mFactory->createBuffer(domainObject, entityFbb); + return sendCreateCommand("event", QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); +} + +Async::Job DummyResourceFacade::modify(const Akonadi2::ApplicationDomain::Event &domainObject) +{ + //Create message buffer and send to resource + return Async::null(); +} + +Async::Job DummyResourceFacade::remove(const Akonadi2::ApplicationDomain::Event &domainObject) +{ + //Create message buffer and send to resource + return Async::null(); +} + +static std::function prepareQuery(const Akonadi2::Query &query) +{ + //Compose some functions to make query matching fast. + //This way we can process the query once, and convert all values into something that can be compared quickly + std::function preparedQuery; + if (!query.ids.isEmpty()) { + //Match by id + //TODO: for id's a direct lookup would be way faster + + //We convert the id's to std::string so we don't have to convert each key during the scan. (This runs only once, and the query will be run for every key) + //Probably a premature optimization, but perhaps a useful technique to be investigated. + QVector ids; + for (const auto &id : query.ids) { + ids << id.toStdString(); + } + preparedQuery = [ids](const std::string &key, DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local) { + if (ids.contains(key)) { + return true; + } + return false; + }; + } else if (!query.propertyFilter.isEmpty()) { + if (query.propertyFilter.contains("uid")) { + const QByteArray uid = query.propertyFilter.value("uid").toByteArray(); + preparedQuery = [uid](const std::string &key, DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local) { + if (local && local->uid() && (QByteArray::fromRawData(local->uid()->c_str(), local->uid()->size()) == uid)) { + return true; + } + return false; + }; + } + } else { + //Match everything + preparedQuery = [](const std::string &key, DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local) { + return true; + }; + } + return preparedQuery; +} + +void DummyResourceFacade::readValue(QSharedPointer storage, const QByteArray &key, const std::function &resultCallback, std::function preparedQuery) +{ + storage->scan(key, [=](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { + + //Skip internals + if (Akonadi2::Storage::isInternalKey(keyValue, keySize)) { + return true; + } + + //Extract buffers + Akonadi2::EntityBuffer buffer(dataValue, dataSize); + + const auto resourceBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().resource()); + const auto localBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().local()); + const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().metadata()); + + if ((!resourceBuffer && !localBuffer) || !metadataBuffer) { + qWarning() << "invalid buffer " << QByteArray::fromRawData(static_cast(keyValue), keySize); + return true; + } + + //We probably only want to create all buffers after the scan + //TODO use adapter for query and scan? + if (preparedQuery && preparedQuery(std::string(static_cast(keyValue), keySize), resourceBuffer, localBuffer)) { + qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + //This only works for a 1:1 mapping of resource to domain types. + //Not i.e. for tags that are stored as flags in each entity of an imap store. + auto adaptor = mFactory->createAdaptor(buffer.entity()); + //TODO only copy requested properties + auto memoryAdaptor = QSharedPointer::create(*adaptor); + // here we could copy additional properties that don't have a 1:1 mapping, such as separately stored tags. + auto event = QSharedPointer::create("org.kde.dummy", QByteArray::fromRawData(static_cast(keyValue), keySize), revision, memoryAdaptor); + resultCallback(event); + } + return true; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); +} + +Async::Job DummyResourceFacade::load(const Akonadi2::Query &query, const std::function &resultCallback) +{ + return Async::start([=](Async::Future &future) { + //Now that the sync is complete we can execute the query + const auto preparedQuery = prepareQuery(query); + + auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); + storage->setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); + + storage->startTransaction(Akonadi2::Storage::ReadOnly); + const qint64 revision = storage->maxRevision(); + + //Index lookups + QVector keys; + if (query.propertyFilter.contains("uid")) { + static Index uidIndex(Akonadi2::Store::storageLocation(), "org.kde.dummy.index.uid", Akonadi2::Storage::ReadOnly); + uidIndex.lookup(query.propertyFilter.value("uid").toByteArray(), [&](const QByteArray &value) { + keys << value; + }, + [](const Index::Error &error) { + Warning() << "Error in index: " << error.message; + }); + } + + if (keys.isEmpty()) { + Log() << "Executing a full scan"; + readValue(storage, QByteArray(), resultCallback, preparedQuery); + } else { + for (const auto &key : keys) { + readValue(storage, key, resultCallback, preparedQuery); + } + } + storage->abortTransaction(); + future.setValue(revision); + future.setFinished(); + }); +} + diff --git a/examples/dummyresource/facade.h b/examples/dummyresource/facade.h new file mode 100644 index 0000000..91ae351 --- /dev/null +++ b/examples/dummyresource/facade.h @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2014 Christian Mollekopf + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "common/facade.h" + +#include "common/clientapi.h" +#include "common/storage.h" +#include "resourcefactory.h" +#include "entity_generated.h" +#include "event_generated.h" +#include "dummycalendar_generated.h" +#include "common/domainadaptor.h" + +class DummyResourceFacade : public Akonadi2::GenericFacade +{ +public: + DummyResourceFacade(); + virtual ~DummyResourceFacade(); + Async::Job create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; + Async::Job modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; + Async::Job remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; + Async::Job load(const Akonadi2::Query &query, const std::function &resultCallback) Q_DECL_OVERRIDE; + +private: + void readValue(QSharedPointer storage, const QByteArray &key, const std::function &resultCallback, std::function); + QSharedPointer > mFactory; +}; diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp new file mode 100644 index 0000000..d5765e2 --- /dev/null +++ b/examples/dummyresource/resourcefactory.cpp @@ -0,0 +1,428 @@ +/* + * Copyright (C) 2014 Aaron Seigo + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "resourcefactory.h" +#include "facade.h" +#include "entitybuffer.h" +#include "pipeline.h" +#include "dummycalendar_generated.h" +#include "metadata_generated.h" +#include "queuedcommand_generated.h" +#include "createentity_generated.h" +#include "domainadaptor.h" +#include "commands.h" +#include "clientapi.h" +#include "index.h" +#include "log.h" +#include +#include + + +/* + * Figure out how to implement various classes of processors: + * * read-only (index and such) => extractor function, probably using domain adaptor + * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) + * * flag extractors? => like read-only? Or write to local portion of buffer? + * ** $ISSPAM should become part of domain object and is written to the local part of the mail. + * ** => value could be calculated by the server directly + */ +class SimpleProcessor : public Akonadi2::Preprocessor +{ +public: + SimpleProcessor(const QString &id, const std::function &f) + : Akonadi2::Preprocessor(), + mFunction(f), + mId(id) + { + } + + void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) Q_DECL_OVERRIDE + { + mFunction(state, e); + processingCompleted(state); + } + + QString id() const + { + return mId; + } + +protected: + std::function mFunction; + QString mId; +}; + + + +static std::string createEvent() +{ + static const size_t attachmentSize = 1024*2; // 2KB + static uint8_t rawData[attachmentSize]; + static flatbuffers::FlatBufferBuilder fbb; + fbb.Clear(); + { + uint8_t *rawDataPtr = Q_NULLPTR; + auto summary = fbb.CreateString("summary"); + auto data = fbb.CreateUninitializedVector(attachmentSize, &rawDataPtr); + DummyCalendar::DummyEventBuilder eventBuilder(fbb); + eventBuilder.add_summary(summary); + eventBuilder.add_attachment(data); + auto eventLocation = eventBuilder.Finish(); + DummyCalendar::FinishDummyEventBuffer(fbb, eventLocation); + memcpy((void*)rawDataPtr, rawData, attachmentSize); + } + + return std::string(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); +} + +QMap populate() +{ + QMap content; + for (int i = 0; i < 2; i++) { + auto event = createEvent(); + content.insert(QString("key%1").arg(i), QString::fromStdString(event)); + } + return content; +} + +static QMap s_dataSource = populate(); + +//Drives the pipeline using the output from all command queues +class Processor : public QObject +{ + Q_OBJECT +public: + Processor(Akonadi2::Pipeline *pipeline, QList commandQueues) + : QObject(), + mPipeline(pipeline), + mCommandQueues(commandQueues), + mProcessingLock(false) + { + for (auto queue : mCommandQueues) { + const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); + Q_UNUSED(ret); + } + } + +signals: + void error(int errorCode, const QString &errorMessage); + +private: + bool messagesToProcessAvailable() + { + for (auto queue : mCommandQueues) { + if (!queue->isEmpty()) { + return true; + } + } + return false; + } + +private slots: + void process() + { + if (mProcessingLock) { + return; + } + mProcessingLock = true; + auto job = processPipeline().then([this]() { + mProcessingLock = false; + if (messagesToProcessAvailable()) { + process(); + } + }).exec(); + } + + Async::Job processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) + { + Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); + //Throw command into appropriate pipeline + switch (queuedCommand->commandId()) { + case Akonadi2::Commands::DeleteEntityCommand: + //mPipeline->removedEntity + return Async::null(); + case Akonadi2::Commands::ModifyEntityCommand: + //mPipeline->modifiedEntity + return Async::null(); + case Akonadi2::Commands::CreateEntityCommand: + return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); + default: + return Async::error(-1, "Unhandled command"); + } + return Async::null(); + } + + //Process all messages of this queue + Async::Job processQueue(MessageQueue *queue) + { + //TODO use something like: + //Async::foreach("pass iterator here").each("process value here").join(); + //Async::foreach("pass iterator here").parallel("process value here").join(); + return Async::dowhile( + [this, queue](Async::Future &future) { + if (queue->isEmpty()) { + future.setValue(false); + future.setFinished(); + return; + } + queue->dequeue( + [this, &future](void *ptr, int size, std::function messageQueueCallback) { + auto callback = [messageQueueCallback, &future](bool success) { + messageQueueCallback(success); + future.setValue(!success); + future.setFinished(); + }; + + flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); + if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { + Warning() << "invalid buffer"; + callback(false); + return; + } + auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); + Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId()); + //TODO JOBAPI: job lifetime management + //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete + //themselves once done. In other cases we'd like jobs that only live as long as their handle though. + //FIXME this job is stack allocated and thus simply dies.... + processQueuedCommand(queuedCommand).then( + [callback]() { + callback(true); + }, + [callback](int errorCode, QString errorMessage) { + Warning() << "Error while processing queue command: " << errorMessage; + callback(false); + } + ).exec(); + }, + [&future](const MessageQueue::Error &error) { + Warning() << "Error while getting message from messagequeue: " << error.message; + future.setValue(false); + future.setFinished(); + } + ); + } + ); + } + + Async::Job processPipeline() + { + //Go through all message queues + auto it = QSharedPointer >::create(mCommandQueues); + return Async::dowhile( + [it]() { return it->hasNext(); }, + [it, this](Async::Future &future) { + auto queue = it->next(); + processQueue(queue).then([&future]() { + Trace() << "Queue processed"; + future.setFinished(); + }).exec(); + } + ); + } + +private: + Akonadi2::Pipeline *mPipeline; + //Ordered by priority + QList mCommandQueues; + bool mProcessingLock; +}; + +DummyResource::DummyResource() + : Akonadi2::Resource(), + mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), + mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"), + mError(0) +{ +} + +void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) +{ + auto eventFactory = QSharedPointer::create(); + //FIXME we should setup for each resource entity type, not for each domain type + //i.e. If a resource stores tags as part of each message it needs to update the tag index + //TODO setup preprocessors for each resource entity type and pipeline type allowing full customization + //Eventually the order should be self configuring, for now it's hardcoded. + auto eventIndexer = new SimpleProcessor("summaryprocessor", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { + auto adaptor = eventFactory->createAdaptor(entity); + // Log() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); + }); + + auto uidIndexer = new SimpleProcessor("uidIndexer", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { + static Index uidIndex(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.index.uid", Akonadi2::Storage::ReadWrite); + + //TODO: Benchmark if this is performance wise acceptable, or if we have to access the buffer directly + auto adaptor = eventFactory->createAdaptor(entity); + const auto uid = adaptor->getProperty("uid"); + if (uid.isValid()) { + uidIndex.add(uid.toByteArray(), state.key()); + } + }); + + //event is the entitytype and not the domain type + pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer << uidIndexer); + mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); + QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); +} + +void DummyResource::onProcessorError(int errorCode, const QString &errorMessage) +{ + Warning() << "Received error from Processor: " << errorCode << errorMessage; + mError = errorCode; +} + +int DummyResource::error() const +{ + return mError; +} + +void findByRemoteId(QSharedPointer storage, const QString &rid, std::function callback) +{ + //TODO lookup in rid index instead of doing a full scan + const std::string ridString = rid.toStdString(); + storage->scan("", [&](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { + if (Akonadi2::Storage::isInternalKey(keyValue, keySize)) { + return true; + } + + Akonadi2::EntityBuffer::extractResourceBuffer(dataValue, dataSize, [&](const uint8_t *buffer, size_t size) { + flatbuffers::Verifier verifier(buffer, size); + if (DummyCalendar::VerifyDummyEventBuffer(verifier)) { + DummyCalendar::DummyEvent const *resourceBuffer = DummyCalendar::GetDummyEvent(buffer); + if (resourceBuffer && resourceBuffer->remoteId()) { + if (std::string(resourceBuffer->remoteId()->c_str(), resourceBuffer->remoteId()->size()) == ridString) { + callback(keyValue, keySize, dataValue, dataSize); + } + } + } + }); + return true; + }); +} + +void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) +{ + m_fbb.Clear(); + auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size()); + auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData); + Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); + mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); +} + +Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) +{ + return Async::start([this, pipeline](Async::Future &f) { + //TODO use a read-only transaction during the complete sync to sync against a defined revision + auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); + for (auto it = s_dataSource.constBegin(); it != s_dataSource.constEnd(); it++) { + bool isNew = true; + if (storage->exists()) { + findByRemoteId(storage, it.key(), [&](void *keyValue, int keySize, void *dataValue, int dataSize) { + isNew = false; + }); + } + if (isNew) { + m_fbb.Clear(); + + const QByteArray data = it.value().toUtf8(); + auto eventBuffer = DummyCalendar::GetDummyEvent(data.data()); + + //Map the source format to the buffer format (which happens to be an exact copy here) + auto summary = m_fbb.CreateString(eventBuffer->summary()->c_str()); + auto rid = m_fbb.CreateString(it.key().toStdString().c_str()); + auto description = m_fbb.CreateString(it.key().toStdString().c_str()); + static uint8_t rawData[100]; + auto attachment = Akonadi2::EntityBuffer::appendAsVector(m_fbb, rawData, 100); + + auto builder = DummyCalendar::DummyEventBuilder(m_fbb); + builder.add_summary(summary); + builder.add_remoteId(rid); + builder.add_description(description); + builder.add_attachment(attachment); + auto buffer = builder.Finish(); + DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); + flatbuffers::FlatBufferBuilder entityFbb; + Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0); + + flatbuffers::FlatBufferBuilder fbb; + //This is the resource type and not the domain type + auto type = fbb.CreateString("event"); + auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); + auto location = Akonadi2::Commands::CreateCreateEntity(fbb, type, delta); + Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); + + enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); + } else { //modification + //TODO diff and create modification if necessary + } + } + //TODO find items to remove + f.setFinished(); + }); +} + +Async::Job DummyResource::processAllMessages() +{ + //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. + //TODO: report errors while processing sync? + //TODO JOBAPI: A helper that waits for n events and then continues? + return Async::start([this](Async::Future &f) { + if (mSynchronizerQueue.isEmpty()) { + f.setFinished(); + } else { + QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { + f.setFinished(); + }); + } + }).then([this](Async::Future &f) { + if (mUserQueue.isEmpty()) { + f.setFinished(); + } else { + QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() { + f.setFinished(); + }); + } + }); +} + +void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) +{ + //TODO instead of copying the command including the full entity first into the command queue, we could directly + //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). + //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). + enqueueCommand(mUserQueue, commandId, data); +} + +DummyResourceFactory::DummyResourceFactory(QObject *parent) + : Akonadi2::ResourceFactory(parent) +{ + +} + +Akonadi2::Resource *DummyResourceFactory::createResource() +{ + return new DummyResource(); +} + +void DummyResourceFactory::registerFacades(Akonadi2::FacadeFactory &factory) +{ + factory.registerFacade(PLUGIN_NAME); +} + +#include "resourcefactory.moc" diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h new file mode 100644 index 0000000..3b99d5e --- /dev/null +++ b/examples/dummyresource/resourcefactory.h @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2014 Aaron Seigo + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "common/resource.h" +#include "async/src/async.h" +#include "common/messagequeue.h" + +#include + +//TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA +#define PLUGIN_NAME "org.kde.dummy" + +class Processor; + +class DummyResource : public Akonadi2::Resource +{ +public: + DummyResource(); + Async::Job synchronizeWithSource(Akonadi2::Pipeline *pipeline); + Async::Job processAllMessages(); + void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); + void configurePipeline(Akonadi2::Pipeline *pipeline); + int error() const; + +private: + void onProcessorError(int errorCode, const QString &errorMessage); + void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); + flatbuffers::FlatBufferBuilder m_fbb; + MessageQueue mUserQueue; + MessageQueue mSynchronizerQueue; + Processor *mProcessor; + int mError; +}; + +class DummyResourceFactory : public Akonadi2::ResourceFactory +{ + Q_OBJECT + Q_PLUGIN_METADATA(IID "org.kde.dummy") + Q_INTERFACES(Akonadi2::ResourceFactory) + +public: + DummyResourceFactory(QObject *parent = 0); + + Akonadi2::Resource *createResource(); + void registerFacades(Akonadi2::FacadeFactory &factory); +}; + -- cgit v1.2.3