From f89e43b3603976bc0e6eb885b3b9a43a6caff1c2 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 19 Apr 2015 14:11:02 +0200 Subject: Moved client and dummyresource to examples/ --- CMakeLists.txt | 7 +- client/CMakeLists.txt | 8 - client/main.cpp | 52 ---- dummyresource/CMakeLists.txt | 12 - dummyresource/domainadaptor.cpp | 60 ---- dummyresource/domainadaptor.h | 14 - dummyresource/dummycalendar.fbs | 13 - dummyresource/facade.cpp | 189 ------------- dummyresource/facade.h | 45 --- dummyresource/resourcefactory.cpp | 428 ----------------------------- dummyresource/resourcefactory.h | 65 ----- examples/CMakeLists.txt | 5 + examples/client/CMakeLists.txt | 8 + examples/client/main.cpp | 52 ++++ examples/dummyresource/CMakeLists.txt | 12 + examples/dummyresource/domainadaptor.cpp | 60 ++++ examples/dummyresource/domainadaptor.h | 14 + examples/dummyresource/dummycalendar.fbs | 13 + examples/dummyresource/facade.cpp | 189 +++++++++++++ examples/dummyresource/facade.h | 45 +++ examples/dummyresource/resourcefactory.cpp | 428 +++++++++++++++++++++++++++++ examples/dummyresource/resourcefactory.h | 65 +++++ tests/CMakeLists.txt | 8 +- 23 files changed, 899 insertions(+), 893 deletions(-) delete mode 100644 client/CMakeLists.txt delete mode 100644 client/main.cpp delete mode 100644 dummyresource/CMakeLists.txt delete mode 100644 dummyresource/domainadaptor.cpp delete mode 100644 dummyresource/domainadaptor.h delete mode 100644 dummyresource/dummycalendar.fbs delete mode 100644 dummyresource/facade.cpp delete mode 100644 dummyresource/facade.h delete mode 100644 dummyresource/resourcefactory.cpp delete mode 100644 dummyresource/resourcefactory.h create mode 100644 examples/CMakeLists.txt create mode 100644 examples/client/CMakeLists.txt create mode 100644 examples/client/main.cpp create mode 100644 examples/dummyresource/CMakeLists.txt create mode 100644 examples/dummyresource/domainadaptor.cpp create mode 100644 examples/dummyresource/domainadaptor.h create mode 100644 examples/dummyresource/dummycalendar.fbs create mode 100644 examples/dummyresource/facade.cpp create mode 100644 examples/dummyresource/facade.h create mode 100644 examples/dummyresource/resourcefactory.cpp create mode 100644 examples/dummyresource/resourcefactory.h diff --git a/CMakeLists.txt b/CMakeLists.txt index bc16184..bf7f7ae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -49,14 +49,11 @@ set(AKONADI2_RESOURCE_PLUGINS_PATH ${QT_PLUGIN_INSTALL_DIR}/akonadi2/resources) # common, eventually a lib but right now just the command buffers add_subdirectory(common) -# the client -add_subdirectory(client) - # the synchronizer add_subdirectory(synchronizer) -# a simple dummy resource implementation -add_subdirectory(dummyresource) +# example implementations +add_subdirectory(examples) # some tests add_subdirectory(tests) diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt deleted file mode 100644 index 3555b3e..0000000 --- a/client/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -project(akonadi2_client) - -include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) - -add_executable(${PROJECT_NAME} main.cpp) -target_link_libraries(${PROJECT_NAME} akonadi2common) -qt5_use_modules(${PROJECT_NAME} Widgets Network) -install(TARGETS ${PROJECT_NAME} ${KDE_INSTALL_TARGETS_DEFAULT_ARGS}) diff --git a/client/main.cpp b/client/main.cpp deleted file mode 100644 index b4cb081..0000000 --- a/client/main.cpp +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (C) 2014 Aaron Seigo - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the - * Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#include -#include - -#include "common/commands.h" -#include "common/console.h" -#include "common/resourceaccess.h" - -int main(int argc, char *argv[]) -{ - QApplication app(argc, argv); - - new Akonadi2::Console("Akonadi2 Client"); - Akonadi2::Console::main()->log(QString("PID: %1").arg(QCoreApplication::applicationPid())); - - QCommandLineParser cliOptions; - cliOptions.addPositionalArgument(QObject::tr("[resource]"), - QObject::tr("A resource to connect to")); - cliOptions.process(app); - QStringList resources = cliOptions.positionalArguments(); - if (resources.isEmpty()) { - resources << "org.kde.dummy"; - } - - for (const QString &resource: resources) { - Akonadi2::ResourceAccess *resAccess = new Akonadi2::ResourceAccess(resource.toLatin1()); - QObject::connect(&app, &QCoreApplication::aboutToQuit, - resAccess, &Akonadi2::ResourceAccess::close); - resAccess->sendCommand(Akonadi2::Commands::SynchronizeCommand); - resAccess->open(); - } - - return app.exec(); -} diff --git a/dummyresource/CMakeLists.txt b/dummyresource/CMakeLists.txt deleted file mode 100644 index abd315f..0000000 --- a/dummyresource/CMakeLists.txt +++ /dev/null @@ -1,12 +0,0 @@ -project(akonadi2_resource_dummy) - -add_definitions(-DQT_PLUGIN) -include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) - -generate_flatbuffers(dummycalendar) - -add_library(${PROJECT_NAME} SHARED facade.cpp resourcefactory.cpp domainadaptor.cpp) -qt5_use_modules(${PROJECT_NAME} Core Network) -target_link_libraries(${PROJECT_NAME} akonadi2common) - -install(TARGETS ${PROJECT_NAME} LIBRARY DESTINATION ${AKONADI2_RESOURCE_PLUGINS_PATH}) diff --git a/dummyresource/domainadaptor.cpp b/dummyresource/domainadaptor.cpp deleted file mode 100644 index 8649bc3..0000000 --- a/dummyresource/domainadaptor.cpp +++ /dev/null @@ -1,60 +0,0 @@ - -#include "domainadaptor.h" - -#include -#include - -#include "dummycalendar_generated.h" -#include "event_generated.h" -#include "entity_generated.h" -#include "metadata_generated.h" -#include "domainadaptor.h" -#include "log.h" -#include - -using namespace DummyCalendar; -using namespace flatbuffers; - - - - -DummyEventAdaptorFactory::DummyEventAdaptorFactory() - : DomainTypeAdaptorFactory() -{ - //TODO turn this into initializeReadPropertyMapper as well? - mResourceMapper->addMapping("summary", [](DummyEvent const *buffer) -> QVariant { - return propertyToVariant(buffer->summary()); - }); - - mResourceWriteMapper->addMapping("summary", [](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function { - auto offset = variantToProperty(value, fbb); - return [offset](DummyEventBuilder &builder) { builder.add_summary(offset); }; - }); -} - - -void DummyEventAdaptorFactory::createBuffer(const Akonadi2::ApplicationDomain::Event &event, flatbuffers::FlatBufferBuilder &fbb) -{ - flatbuffers::FlatBufferBuilder localFbb; - if (mLocalWriteMapper) { - auto pos = createBufferPart(event, localFbb, *mLocalWriteMapper); - Akonadi2::ApplicationDomain::Buffer::FinishEventBuffer(localFbb, pos); - flatbuffers::Verifier verifier(localFbb.GetBufferPointer(), localFbb.GetSize()); - if (!verifier.VerifyBuffer()) { - Warning() << "Created invalid local buffer"; - } - } - - flatbuffers::FlatBufferBuilder resFbb; - if (mResourceWriteMapper) { - auto pos = createBufferPart(event, resFbb, *mResourceWriteMapper); - DummyCalendar::FinishDummyEventBuffer(resFbb, pos); - flatbuffers::Verifier verifier(resFbb.GetBufferPointer(), resFbb.GetSize()); - if (!verifier.VerifyBuffer()) { - Warning() << "Created invalid resource buffer"; - } - } - - Akonadi2::EntityBuffer::assembleEntityBuffer(fbb, 0, 0, resFbb.GetBufferPointer(), resFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize()); -} - diff --git a/dummyresource/domainadaptor.h b/dummyresource/domainadaptor.h deleted file mode 100644 index 9d351e7..0000000 --- a/dummyresource/domainadaptor.h +++ /dev/null @@ -1,14 +0,0 @@ -#pragma once - -#include "common/domainadaptor.h" -#include "event_generated.h" -#include "dummycalendar_generated.h" -#include "entity_generated.h" - -class DummyEventAdaptorFactory : public DomainTypeAdaptorFactory -{ -public: - DummyEventAdaptorFactory(); - virtual ~DummyEventAdaptorFactory() {}; - virtual void createBuffer(const Akonadi2::ApplicationDomain::Event &event, flatbuffers::FlatBufferBuilder &fbb); -}; diff --git a/dummyresource/dummycalendar.fbs b/dummyresource/dummycalendar.fbs deleted file mode 100644 index 643c9b2..0000000 --- a/dummyresource/dummycalendar.fbs +++ /dev/null @@ -1,13 +0,0 @@ -// example IDL file - -namespace DummyCalendar; - -table DummyEvent { - summary:string; - description:string; - attachment:[ubyte]; - remoteId:string; -} - -root_type DummyEvent; -file_identifier "AKFB"; diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp deleted file mode 100644 index e50e4f3..0000000 --- a/dummyresource/facade.cpp +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Copyright (C) 2014 Christian Mollekopf - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the - * Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#include "facade.h" - -#include -#include - -#include "common/resourceaccess.h" -#include "common/commands.h" -#include "dummycalendar_generated.h" -#include "event_generated.h" -#include "entity_generated.h" -#include "metadata_generated.h" -#include "domainadaptor.h" -#include -#include -#include - -using namespace DummyCalendar; -using namespace flatbuffers; - - -DummyResourceFacade::DummyResourceFacade() - : Akonadi2::GenericFacade("org.kde.dummy"), - mFactory(new DummyEventAdaptorFactory) -{ -} - -DummyResourceFacade::~DummyResourceFacade() -{ -} - -Async::Job DummyResourceFacade::create(const Akonadi2::ApplicationDomain::Event &domainObject) -{ - flatbuffers::FlatBufferBuilder entityFbb; - mFactory->createBuffer(domainObject, entityFbb); - return sendCreateCommand("event", QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); -} - -Async::Job DummyResourceFacade::modify(const Akonadi2::ApplicationDomain::Event &domainObject) -{ - //Create message buffer and send to resource - return Async::null(); -} - -Async::Job DummyResourceFacade::remove(const Akonadi2::ApplicationDomain::Event &domainObject) -{ - //Create message buffer and send to resource - return Async::null(); -} - -static std::function prepareQuery(const Akonadi2::Query &query) -{ - //Compose some functions to make query matching fast. - //This way we can process the query once, and convert all values into something that can be compared quickly - std::function preparedQuery; - if (!query.ids.isEmpty()) { - //Match by id - //TODO: for id's a direct lookup would be way faster - - //We convert the id's to std::string so we don't have to convert each key during the scan. (This runs only once, and the query will be run for every key) - //Probably a premature optimization, but perhaps a useful technique to be investigated. - QVector ids; - for (const auto &id : query.ids) { - ids << id.toStdString(); - } - preparedQuery = [ids](const std::string &key, DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local) { - if (ids.contains(key)) { - return true; - } - return false; - }; - } else if (!query.propertyFilter.isEmpty()) { - if (query.propertyFilter.contains("uid")) { - const QByteArray uid = query.propertyFilter.value("uid").toByteArray(); - preparedQuery = [uid](const std::string &key, DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local) { - if (local && local->uid() && (QByteArray::fromRawData(local->uid()->c_str(), local->uid()->size()) == uid)) { - return true; - } - return false; - }; - } - } else { - //Match everything - preparedQuery = [](const std::string &key, DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local) { - return true; - }; - } - return preparedQuery; -} - -void DummyResourceFacade::readValue(QSharedPointer storage, const QByteArray &key, const std::function &resultCallback, std::function preparedQuery) -{ - storage->scan(key, [=](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { - - //Skip internals - if (Akonadi2::Storage::isInternalKey(keyValue, keySize)) { - return true; - } - - //Extract buffers - Akonadi2::EntityBuffer buffer(dataValue, dataSize); - - const auto resourceBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().resource()); - const auto localBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().local()); - const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().metadata()); - - if ((!resourceBuffer && !localBuffer) || !metadataBuffer) { - qWarning() << "invalid buffer " << QByteArray::fromRawData(static_cast(keyValue), keySize); - return true; - } - - //We probably only want to create all buffers after the scan - //TODO use adapter for query and scan? - if (preparedQuery && preparedQuery(std::string(static_cast(keyValue), keySize), resourceBuffer, localBuffer)) { - qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - auto adaptor = mFactory->createAdaptor(buffer.entity()); - //TODO only copy requested properties - auto memoryAdaptor = QSharedPointer::create(*adaptor); - // here we could copy additional properties that don't have a 1:1 mapping, such as separately stored tags. - auto event = QSharedPointer::create("org.kde.dummy", QByteArray::fromRawData(static_cast(keyValue), keySize), revision, memoryAdaptor); - resultCallback(event); - } - return true; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); -} - -Async::Job DummyResourceFacade::load(const Akonadi2::Query &query, const std::function &resultCallback) -{ - return Async::start([=](Async::Future &future) { - //Now that the sync is complete we can execute the query - const auto preparedQuery = prepareQuery(query); - - auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); - storage->setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); - - storage->startTransaction(Akonadi2::Storage::ReadOnly); - const qint64 revision = storage->maxRevision(); - - //Index lookups - QVector keys; - if (query.propertyFilter.contains("uid")) { - static Index uidIndex(Akonadi2::Store::storageLocation(), "org.kde.dummy.index.uid", Akonadi2::Storage::ReadOnly); - uidIndex.lookup(query.propertyFilter.value("uid").toByteArray(), [&](const QByteArray &value) { - keys << value; - }, - [](const Index::Error &error) { - Warning() << "Error in index: " << error.message; - }); - } - - if (keys.isEmpty()) { - Log() << "Executing a full scan"; - readValue(storage, QByteArray(), resultCallback, preparedQuery); - } else { - for (const auto &key : keys) { - readValue(storage, key, resultCallback, preparedQuery); - } - } - storage->abortTransaction(); - future.setValue(revision); - future.setFinished(); - }); -} - diff --git a/dummyresource/facade.h b/dummyresource/facade.h deleted file mode 100644 index 91ae351..0000000 --- a/dummyresource/facade.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (C) 2014 Christian Mollekopf - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the - * Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#pragma once - -#include "common/facade.h" - -#include "common/clientapi.h" -#include "common/storage.h" -#include "resourcefactory.h" -#include "entity_generated.h" -#include "event_generated.h" -#include "dummycalendar_generated.h" -#include "common/domainadaptor.h" - -class DummyResourceFacade : public Akonadi2::GenericFacade -{ -public: - DummyResourceFacade(); - virtual ~DummyResourceFacade(); - Async::Job create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; - Async::Job modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; - Async::Job remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; - Async::Job load(const Akonadi2::Query &query, const std::function &resultCallback) Q_DECL_OVERRIDE; - -private: - void readValue(QSharedPointer storage, const QByteArray &key, const std::function &resultCallback, std::function); - QSharedPointer > mFactory; -}; diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp deleted file mode 100644 index d5765e2..0000000 --- a/dummyresource/resourcefactory.cpp +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Copyright (C) 2014 Aaron Seigo - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the - * Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#include "resourcefactory.h" -#include "facade.h" -#include "entitybuffer.h" -#include "pipeline.h" -#include "dummycalendar_generated.h" -#include "metadata_generated.h" -#include "queuedcommand_generated.h" -#include "createentity_generated.h" -#include "domainadaptor.h" -#include "commands.h" -#include "clientapi.h" -#include "index.h" -#include "log.h" -#include -#include - - -/* - * Figure out how to implement various classes of processors: - * * read-only (index and such) => extractor function, probably using domain adaptor - * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) - * * flag extractors? => like read-only? Or write to local portion of buffer? - * ** $ISSPAM should become part of domain object and is written to the local part of the mail. - * ** => value could be calculated by the server directly - */ -class SimpleProcessor : public Akonadi2::Preprocessor -{ -public: - SimpleProcessor(const QString &id, const std::function &f) - : Akonadi2::Preprocessor(), - mFunction(f), - mId(id) - { - } - - void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) Q_DECL_OVERRIDE - { - mFunction(state, e); - processingCompleted(state); - } - - QString id() const - { - return mId; - } - -protected: - std::function mFunction; - QString mId; -}; - - - -static std::string createEvent() -{ - static const size_t attachmentSize = 1024*2; // 2KB - static uint8_t rawData[attachmentSize]; - static flatbuffers::FlatBufferBuilder fbb; - fbb.Clear(); - { - uint8_t *rawDataPtr = Q_NULLPTR; - auto summary = fbb.CreateString("summary"); - auto data = fbb.CreateUninitializedVector(attachmentSize, &rawDataPtr); - DummyCalendar::DummyEventBuilder eventBuilder(fbb); - eventBuilder.add_summary(summary); - eventBuilder.add_attachment(data); - auto eventLocation = eventBuilder.Finish(); - DummyCalendar::FinishDummyEventBuffer(fbb, eventLocation); - memcpy((void*)rawDataPtr, rawData, attachmentSize); - } - - return std::string(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); -} - -QMap populate() -{ - QMap content; - for (int i = 0; i < 2; i++) { - auto event = createEvent(); - content.insert(QString("key%1").arg(i), QString::fromStdString(event)); - } - return content; -} - -static QMap s_dataSource = populate(); - -//Drives the pipeline using the output from all command queues -class Processor : public QObject -{ - Q_OBJECT -public: - Processor(Akonadi2::Pipeline *pipeline, QList commandQueues) - : QObject(), - mPipeline(pipeline), - mCommandQueues(commandQueues), - mProcessingLock(false) - { - for (auto queue : mCommandQueues) { - const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); - Q_UNUSED(ret); - } - } - -signals: - void error(int errorCode, const QString &errorMessage); - -private: - bool messagesToProcessAvailable() - { - for (auto queue : mCommandQueues) { - if (!queue->isEmpty()) { - return true; - } - } - return false; - } - -private slots: - void process() - { - if (mProcessingLock) { - return; - } - mProcessingLock = true; - auto job = processPipeline().then([this]() { - mProcessingLock = false; - if (messagesToProcessAvailable()) { - process(); - } - }).exec(); - } - - Async::Job processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) - { - Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); - //Throw command into appropriate pipeline - switch (queuedCommand->commandId()) { - case Akonadi2::Commands::DeleteEntityCommand: - //mPipeline->removedEntity - return Async::null(); - case Akonadi2::Commands::ModifyEntityCommand: - //mPipeline->modifiedEntity - return Async::null(); - case Akonadi2::Commands::CreateEntityCommand: - return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); - default: - return Async::error(-1, "Unhandled command"); - } - return Async::null(); - } - - //Process all messages of this queue - Async::Job processQueue(MessageQueue *queue) - { - //TODO use something like: - //Async::foreach("pass iterator here").each("process value here").join(); - //Async::foreach("pass iterator here").parallel("process value here").join(); - return Async::dowhile( - [this, queue](Async::Future &future) { - if (queue->isEmpty()) { - future.setValue(false); - future.setFinished(); - return; - } - queue->dequeue( - [this, &future](void *ptr, int size, std::function messageQueueCallback) { - auto callback = [messageQueueCallback, &future](bool success) { - messageQueueCallback(success); - future.setValue(!success); - future.setFinished(); - }; - - flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); - if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { - Warning() << "invalid buffer"; - callback(false); - return; - } - auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); - Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId()); - //TODO JOBAPI: job lifetime management - //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete - //themselves once done. In other cases we'd like jobs that only live as long as their handle though. - //FIXME this job is stack allocated and thus simply dies.... - processQueuedCommand(queuedCommand).then( - [callback]() { - callback(true); - }, - [callback](int errorCode, QString errorMessage) { - Warning() << "Error while processing queue command: " << errorMessage; - callback(false); - } - ).exec(); - }, - [&future](const MessageQueue::Error &error) { - Warning() << "Error while getting message from messagequeue: " << error.message; - future.setValue(false); - future.setFinished(); - } - ); - } - ); - } - - Async::Job processPipeline() - { - //Go through all message queues - auto it = QSharedPointer >::create(mCommandQueues); - return Async::dowhile( - [it]() { return it->hasNext(); }, - [it, this](Async::Future &future) { - auto queue = it->next(); - processQueue(queue).then([&future]() { - Trace() << "Queue processed"; - future.setFinished(); - }).exec(); - } - ); - } - -private: - Akonadi2::Pipeline *mPipeline; - //Ordered by priority - QList mCommandQueues; - bool mProcessingLock; -}; - -DummyResource::DummyResource() - : Akonadi2::Resource(), - mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), - mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"), - mError(0) -{ -} - -void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) -{ - auto eventFactory = QSharedPointer::create(); - //FIXME we should setup for each resource entity type, not for each domain type - //i.e. If a resource stores tags as part of each message it needs to update the tag index - //TODO setup preprocessors for each resource entity type and pipeline type allowing full customization - //Eventually the order should be self configuring, for now it's hardcoded. - auto eventIndexer = new SimpleProcessor("summaryprocessor", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { - auto adaptor = eventFactory->createAdaptor(entity); - // Log() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); - }); - - auto uidIndexer = new SimpleProcessor("uidIndexer", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { - static Index uidIndex(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.index.uid", Akonadi2::Storage::ReadWrite); - - //TODO: Benchmark if this is performance wise acceptable, or if we have to access the buffer directly - auto adaptor = eventFactory->createAdaptor(entity); - const auto uid = adaptor->getProperty("uid"); - if (uid.isValid()) { - uidIndex.add(uid.toByteArray(), state.key()); - } - }); - - //event is the entitytype and not the domain type - pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer << uidIndexer); - mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); - QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); -} - -void DummyResource::onProcessorError(int errorCode, const QString &errorMessage) -{ - Warning() << "Received error from Processor: " << errorCode << errorMessage; - mError = errorCode; -} - -int DummyResource::error() const -{ - return mError; -} - -void findByRemoteId(QSharedPointer storage, const QString &rid, std::function callback) -{ - //TODO lookup in rid index instead of doing a full scan - const std::string ridString = rid.toStdString(); - storage->scan("", [&](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { - if (Akonadi2::Storage::isInternalKey(keyValue, keySize)) { - return true; - } - - Akonadi2::EntityBuffer::extractResourceBuffer(dataValue, dataSize, [&](const uint8_t *buffer, size_t size) { - flatbuffers::Verifier verifier(buffer, size); - if (DummyCalendar::VerifyDummyEventBuffer(verifier)) { - DummyCalendar::DummyEvent const *resourceBuffer = DummyCalendar::GetDummyEvent(buffer); - if (resourceBuffer && resourceBuffer->remoteId()) { - if (std::string(resourceBuffer->remoteId()->c_str(), resourceBuffer->remoteId()->size()) == ridString) { - callback(keyValue, keySize, dataValue, dataSize); - } - } - } - }); - return true; - }); -} - -void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) -{ - m_fbb.Clear(); - auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size()); - auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData); - Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); - mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); -} - -Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) -{ - return Async::start([this, pipeline](Async::Future &f) { - //TODO use a read-only transaction during the complete sync to sync against a defined revision - auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); - for (auto it = s_dataSource.constBegin(); it != s_dataSource.constEnd(); it++) { - bool isNew = true; - if (storage->exists()) { - findByRemoteId(storage, it.key(), [&](void *keyValue, int keySize, void *dataValue, int dataSize) { - isNew = false; - }); - } - if (isNew) { - m_fbb.Clear(); - - const QByteArray data = it.value().toUtf8(); - auto eventBuffer = DummyCalendar::GetDummyEvent(data.data()); - - //Map the source format to the buffer format (which happens to be an exact copy here) - auto summary = m_fbb.CreateString(eventBuffer->summary()->c_str()); - auto rid = m_fbb.CreateString(it.key().toStdString().c_str()); - auto description = m_fbb.CreateString(it.key().toStdString().c_str()); - static uint8_t rawData[100]; - auto attachment = Akonadi2::EntityBuffer::appendAsVector(m_fbb, rawData, 100); - - auto builder = DummyCalendar::DummyEventBuilder(m_fbb); - builder.add_summary(summary); - builder.add_remoteId(rid); - builder.add_description(description); - builder.add_attachment(attachment); - auto buffer = builder.Finish(); - DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); - flatbuffers::FlatBufferBuilder entityFbb; - Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0); - - flatbuffers::FlatBufferBuilder fbb; - //This is the resource type and not the domain type - auto type = fbb.CreateString("event"); - auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); - auto location = Akonadi2::Commands::CreateCreateEntity(fbb, type, delta); - Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); - - enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); - } else { //modification - //TODO diff and create modification if necessary - } - } - //TODO find items to remove - f.setFinished(); - }); -} - -Async::Job DummyResource::processAllMessages() -{ - //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. - //TODO: report errors while processing sync? - //TODO JOBAPI: A helper that waits for n events and then continues? - return Async::start([this](Async::Future &f) { - if (mSynchronizerQueue.isEmpty()) { - f.setFinished(); - } else { - QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { - f.setFinished(); - }); - } - }).then([this](Async::Future &f) { - if (mUserQueue.isEmpty()) { - f.setFinished(); - } else { - QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() { - f.setFinished(); - }); - } - }); -} - -void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) -{ - //TODO instead of copying the command including the full entity first into the command queue, we could directly - //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). - //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). - enqueueCommand(mUserQueue, commandId, data); -} - -DummyResourceFactory::DummyResourceFactory(QObject *parent) - : Akonadi2::ResourceFactory(parent) -{ - -} - -Akonadi2::Resource *DummyResourceFactory::createResource() -{ - return new DummyResource(); -} - -void DummyResourceFactory::registerFacades(Akonadi2::FacadeFactory &factory) -{ - factory.registerFacade(PLUGIN_NAME); -} - -#include "resourcefactory.moc" diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h deleted file mode 100644 index 3b99d5e..0000000 --- a/dummyresource/resourcefactory.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (C) 2014 Aaron Seigo - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the - * Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#pragma once - -#include "common/resource.h" -#include "async/src/async.h" -#include "common/messagequeue.h" - -#include - -//TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA -#define PLUGIN_NAME "org.kde.dummy" - -class Processor; - -class DummyResource : public Akonadi2::Resource -{ -public: - DummyResource(); - Async::Job synchronizeWithSource(Akonadi2::Pipeline *pipeline); - Async::Job processAllMessages(); - void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); - void configurePipeline(Akonadi2::Pipeline *pipeline); - int error() const; - -private: - void onProcessorError(int errorCode, const QString &errorMessage); - void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); - flatbuffers::FlatBufferBuilder m_fbb; - MessageQueue mUserQueue; - MessageQueue mSynchronizerQueue; - Processor *mProcessor; - int mError; -}; - -class DummyResourceFactory : public Akonadi2::ResourceFactory -{ - Q_OBJECT - Q_PLUGIN_METADATA(IID "org.kde.dummy") - Q_INTERFACES(Akonadi2::ResourceFactory) - -public: - DummyResourceFactory(QObject *parent = 0); - - Akonadi2::Resource *createResource(); - void registerFacades(Akonadi2::FacadeFactory &factory); -}; - diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt new file mode 100644 index 0000000..ea2b0ce --- /dev/null +++ b/examples/CMakeLists.txt @@ -0,0 +1,5 @@ +# the client +add_subdirectory(client) + +# a simple dummy resource implementation +add_subdirectory(dummyresource) diff --git a/examples/client/CMakeLists.txt b/examples/client/CMakeLists.txt new file mode 100644 index 0000000..3555b3e --- /dev/null +++ b/examples/client/CMakeLists.txt @@ -0,0 +1,8 @@ +project(akonadi2_client) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) + +add_executable(${PROJECT_NAME} main.cpp) +target_link_libraries(${PROJECT_NAME} akonadi2common) +qt5_use_modules(${PROJECT_NAME} Widgets Network) +install(TARGETS ${PROJECT_NAME} ${KDE_INSTALL_TARGETS_DEFAULT_ARGS}) diff --git a/examples/client/main.cpp b/examples/client/main.cpp new file mode 100644 index 0000000..b4cb081 --- /dev/null +++ b/examples/client/main.cpp @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2014 Aaron Seigo + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include +#include + +#include "common/commands.h" +#include "common/console.h" +#include "common/resourceaccess.h" + +int main(int argc, char *argv[]) +{ + QApplication app(argc, argv); + + new Akonadi2::Console("Akonadi2 Client"); + Akonadi2::Console::main()->log(QString("PID: %1").arg(QCoreApplication::applicationPid())); + + QCommandLineParser cliOptions; + cliOptions.addPositionalArgument(QObject::tr("[resource]"), + QObject::tr("A resource to connect to")); + cliOptions.process(app); + QStringList resources = cliOptions.positionalArguments(); + if (resources.isEmpty()) { + resources << "org.kde.dummy"; + } + + for (const QString &resource: resources) { + Akonadi2::ResourceAccess *resAccess = new Akonadi2::ResourceAccess(resource.toLatin1()); + QObject::connect(&app, &QCoreApplication::aboutToQuit, + resAccess, &Akonadi2::ResourceAccess::close); + resAccess->sendCommand(Akonadi2::Commands::SynchronizeCommand); + resAccess->open(); + } + + return app.exec(); +} diff --git a/examples/dummyresource/CMakeLists.txt b/examples/dummyresource/CMakeLists.txt new file mode 100644 index 0000000..abd315f --- /dev/null +++ b/examples/dummyresource/CMakeLists.txt @@ -0,0 +1,12 @@ +project(akonadi2_resource_dummy) + +add_definitions(-DQT_PLUGIN) +include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) + +generate_flatbuffers(dummycalendar) + +add_library(${PROJECT_NAME} SHARED facade.cpp resourcefactory.cpp domainadaptor.cpp) +qt5_use_modules(${PROJECT_NAME} Core Network) +target_link_libraries(${PROJECT_NAME} akonadi2common) + +install(TARGETS ${PROJECT_NAME} LIBRARY DESTINATION ${AKONADI2_RESOURCE_PLUGINS_PATH}) diff --git a/examples/dummyresource/domainadaptor.cpp b/examples/dummyresource/domainadaptor.cpp new file mode 100644 index 0000000..8649bc3 --- /dev/null +++ b/examples/dummyresource/domainadaptor.cpp @@ -0,0 +1,60 @@ + +#include "domainadaptor.h" + +#include +#include + +#include "dummycalendar_generated.h" +#include "event_generated.h" +#include "entity_generated.h" +#include "metadata_generated.h" +#include "domainadaptor.h" +#include "log.h" +#include + +using namespace DummyCalendar; +using namespace flatbuffers; + + + + +DummyEventAdaptorFactory::DummyEventAdaptorFactory() + : DomainTypeAdaptorFactory() +{ + //TODO turn this into initializeReadPropertyMapper as well? + mResourceMapper->addMapping("summary", [](DummyEvent const *buffer) -> QVariant { + return propertyToVariant(buffer->summary()); + }); + + mResourceWriteMapper->addMapping("summary", [](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function { + auto offset = variantToProperty(value, fbb); + return [offset](DummyEventBuilder &builder) { builder.add_summary(offset); }; + }); +} + + +void DummyEventAdaptorFactory::createBuffer(const Akonadi2::ApplicationDomain::Event &event, flatbuffers::FlatBufferBuilder &fbb) +{ + flatbuffers::FlatBufferBuilder localFbb; + if (mLocalWriteMapper) { + auto pos = createBufferPart(event, localFbb, *mLocalWriteMapper); + Akonadi2::ApplicationDomain::Buffer::FinishEventBuffer(localFbb, pos); + flatbuffers::Verifier verifier(localFbb.GetBufferPointer(), localFbb.GetSize()); + if (!verifier.VerifyBuffer()) { + Warning() << "Created invalid local buffer"; + } + } + + flatbuffers::FlatBufferBuilder resFbb; + if (mResourceWriteMapper) { + auto pos = createBufferPart(event, resFbb, *mResourceWriteMapper); + DummyCalendar::FinishDummyEventBuffer(resFbb, pos); + flatbuffers::Verifier verifier(resFbb.GetBufferPointer(), resFbb.GetSize()); + if (!verifier.VerifyBuffer()) { + Warning() << "Created invalid resource buffer"; + } + } + + Akonadi2::EntityBuffer::assembleEntityBuffer(fbb, 0, 0, resFbb.GetBufferPointer(), resFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize()); +} + diff --git a/examples/dummyresource/domainadaptor.h b/examples/dummyresource/domainadaptor.h new file mode 100644 index 0000000..9d351e7 --- /dev/null +++ b/examples/dummyresource/domainadaptor.h @@ -0,0 +1,14 @@ +#pragma once + +#include "common/domainadaptor.h" +#include "event_generated.h" +#include "dummycalendar_generated.h" +#include "entity_generated.h" + +class DummyEventAdaptorFactory : public DomainTypeAdaptorFactory +{ +public: + DummyEventAdaptorFactory(); + virtual ~DummyEventAdaptorFactory() {}; + virtual void createBuffer(const Akonadi2::ApplicationDomain::Event &event, flatbuffers::FlatBufferBuilder &fbb); +}; diff --git a/examples/dummyresource/dummycalendar.fbs b/examples/dummyresource/dummycalendar.fbs new file mode 100644 index 0000000..643c9b2 --- /dev/null +++ b/examples/dummyresource/dummycalendar.fbs @@ -0,0 +1,13 @@ +// example IDL file + +namespace DummyCalendar; + +table DummyEvent { + summary:string; + description:string; + attachment:[ubyte]; + remoteId:string; +} + +root_type DummyEvent; +file_identifier "AKFB"; diff --git a/examples/dummyresource/facade.cpp b/examples/dummyresource/facade.cpp new file mode 100644 index 0000000..e50e4f3 --- /dev/null +++ b/examples/dummyresource/facade.cpp @@ -0,0 +1,189 @@ +/* + * Copyright (C) 2014 Christian Mollekopf + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "facade.h" + +#include +#include + +#include "common/resourceaccess.h" +#include "common/commands.h" +#include "dummycalendar_generated.h" +#include "event_generated.h" +#include "entity_generated.h" +#include "metadata_generated.h" +#include "domainadaptor.h" +#include +#include +#include + +using namespace DummyCalendar; +using namespace flatbuffers; + + +DummyResourceFacade::DummyResourceFacade() + : Akonadi2::GenericFacade("org.kde.dummy"), + mFactory(new DummyEventAdaptorFactory) +{ +} + +DummyResourceFacade::~DummyResourceFacade() +{ +} + +Async::Job DummyResourceFacade::create(const Akonadi2::ApplicationDomain::Event &domainObject) +{ + flatbuffers::FlatBufferBuilder entityFbb; + mFactory->createBuffer(domainObject, entityFbb); + return sendCreateCommand("event", QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); +} + +Async::Job DummyResourceFacade::modify(const Akonadi2::ApplicationDomain::Event &domainObject) +{ + //Create message buffer and send to resource + return Async::null(); +} + +Async::Job DummyResourceFacade::remove(const Akonadi2::ApplicationDomain::Event &domainObject) +{ + //Create message buffer and send to resource + return Async::null(); +} + +static std::function prepareQuery(const Akonadi2::Query &query) +{ + //Compose some functions to make query matching fast. + //This way we can process the query once, and convert all values into something that can be compared quickly + std::function preparedQuery; + if (!query.ids.isEmpty()) { + //Match by id + //TODO: for id's a direct lookup would be way faster + + //We convert the id's to std::string so we don't have to convert each key during the scan. (This runs only once, and the query will be run for every key) + //Probably a premature optimization, but perhaps a useful technique to be investigated. + QVector ids; + for (const auto &id : query.ids) { + ids << id.toStdString(); + } + preparedQuery = [ids](const std::string &key, DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local) { + if (ids.contains(key)) { + return true; + } + return false; + }; + } else if (!query.propertyFilter.isEmpty()) { + if (query.propertyFilter.contains("uid")) { + const QByteArray uid = query.propertyFilter.value("uid").toByteArray(); + preparedQuery = [uid](const std::string &key, DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local) { + if (local && local->uid() && (QByteArray::fromRawData(local->uid()->c_str(), local->uid()->size()) == uid)) { + return true; + } + return false; + }; + } + } else { + //Match everything + preparedQuery = [](const std::string &key, DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local) { + return true; + }; + } + return preparedQuery; +} + +void DummyResourceFacade::readValue(QSharedPointer storage, const QByteArray &key, const std::function &resultCallback, std::function preparedQuery) +{ + storage->scan(key, [=](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { + + //Skip internals + if (Akonadi2::Storage::isInternalKey(keyValue, keySize)) { + return true; + } + + //Extract buffers + Akonadi2::EntityBuffer buffer(dataValue, dataSize); + + const auto resourceBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().resource()); + const auto localBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().local()); + const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().metadata()); + + if ((!resourceBuffer && !localBuffer) || !metadataBuffer) { + qWarning() << "invalid buffer " << QByteArray::fromRawData(static_cast(keyValue), keySize); + return true; + } + + //We probably only want to create all buffers after the scan + //TODO use adapter for query and scan? + if (preparedQuery && preparedQuery(std::string(static_cast(keyValue), keySize), resourceBuffer, localBuffer)) { + qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + //This only works for a 1:1 mapping of resource to domain types. + //Not i.e. for tags that are stored as flags in each entity of an imap store. + auto adaptor = mFactory->createAdaptor(buffer.entity()); + //TODO only copy requested properties + auto memoryAdaptor = QSharedPointer::create(*adaptor); + // here we could copy additional properties that don't have a 1:1 mapping, such as separately stored tags. + auto event = QSharedPointer::create("org.kde.dummy", QByteArray::fromRawData(static_cast(keyValue), keySize), revision, memoryAdaptor); + resultCallback(event); + } + return true; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); +} + +Async::Job DummyResourceFacade::load(const Akonadi2::Query &query, const std::function &resultCallback) +{ + return Async::start([=](Async::Future &future) { + //Now that the sync is complete we can execute the query + const auto preparedQuery = prepareQuery(query); + + auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); + storage->setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { + Warning() << "Error during query: " << error.store << error.message; + }); + + storage->startTransaction(Akonadi2::Storage::ReadOnly); + const qint64 revision = storage->maxRevision(); + + //Index lookups + QVector keys; + if (query.propertyFilter.contains("uid")) { + static Index uidIndex(Akonadi2::Store::storageLocation(), "org.kde.dummy.index.uid", Akonadi2::Storage::ReadOnly); + uidIndex.lookup(query.propertyFilter.value("uid").toByteArray(), [&](const QByteArray &value) { + keys << value; + }, + [](const Index::Error &error) { + Warning() << "Error in index: " << error.message; + }); + } + + if (keys.isEmpty()) { + Log() << "Executing a full scan"; + readValue(storage, QByteArray(), resultCallback, preparedQuery); + } else { + for (const auto &key : keys) { + readValue(storage, key, resultCallback, preparedQuery); + } + } + storage->abortTransaction(); + future.setValue(revision); + future.setFinished(); + }); +} + diff --git a/examples/dummyresource/facade.h b/examples/dummyresource/facade.h new file mode 100644 index 0000000..91ae351 --- /dev/null +++ b/examples/dummyresource/facade.h @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2014 Christian Mollekopf + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "common/facade.h" + +#include "common/clientapi.h" +#include "common/storage.h" +#include "resourcefactory.h" +#include "entity_generated.h" +#include "event_generated.h" +#include "dummycalendar_generated.h" +#include "common/domainadaptor.h" + +class DummyResourceFacade : public Akonadi2::GenericFacade +{ +public: + DummyResourceFacade(); + virtual ~DummyResourceFacade(); + Async::Job create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; + Async::Job modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; + Async::Job remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; + Async::Job load(const Akonadi2::Query &query, const std::function &resultCallback) Q_DECL_OVERRIDE; + +private: + void readValue(QSharedPointer storage, const QByteArray &key, const std::function &resultCallback, std::function); + QSharedPointer > mFactory; +}; diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp new file mode 100644 index 0000000..d5765e2 --- /dev/null +++ b/examples/dummyresource/resourcefactory.cpp @@ -0,0 +1,428 @@ +/* + * Copyright (C) 2014 Aaron Seigo + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "resourcefactory.h" +#include "facade.h" +#include "entitybuffer.h" +#include "pipeline.h" +#include "dummycalendar_generated.h" +#include "metadata_generated.h" +#include "queuedcommand_generated.h" +#include "createentity_generated.h" +#include "domainadaptor.h" +#include "commands.h" +#include "clientapi.h" +#include "index.h" +#include "log.h" +#include +#include + + +/* + * Figure out how to implement various classes of processors: + * * read-only (index and such) => extractor function, probably using domain adaptor + * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) + * * flag extractors? => like read-only? Or write to local portion of buffer? + * ** $ISSPAM should become part of domain object and is written to the local part of the mail. + * ** => value could be calculated by the server directly + */ +class SimpleProcessor : public Akonadi2::Preprocessor +{ +public: + SimpleProcessor(const QString &id, const std::function &f) + : Akonadi2::Preprocessor(), + mFunction(f), + mId(id) + { + } + + void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) Q_DECL_OVERRIDE + { + mFunction(state, e); + processingCompleted(state); + } + + QString id() const + { + return mId; + } + +protected: + std::function mFunction; + QString mId; +}; + + + +static std::string createEvent() +{ + static const size_t attachmentSize = 1024*2; // 2KB + static uint8_t rawData[attachmentSize]; + static flatbuffers::FlatBufferBuilder fbb; + fbb.Clear(); + { + uint8_t *rawDataPtr = Q_NULLPTR; + auto summary = fbb.CreateString("summary"); + auto data = fbb.CreateUninitializedVector(attachmentSize, &rawDataPtr); + DummyCalendar::DummyEventBuilder eventBuilder(fbb); + eventBuilder.add_summary(summary); + eventBuilder.add_attachment(data); + auto eventLocation = eventBuilder.Finish(); + DummyCalendar::FinishDummyEventBuffer(fbb, eventLocation); + memcpy((void*)rawDataPtr, rawData, attachmentSize); + } + + return std::string(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); +} + +QMap populate() +{ + QMap content; + for (int i = 0; i < 2; i++) { + auto event = createEvent(); + content.insert(QString("key%1").arg(i), QString::fromStdString(event)); + } + return content; +} + +static QMap s_dataSource = populate(); + +//Drives the pipeline using the output from all command queues +class Processor : public QObject +{ + Q_OBJECT +public: + Processor(Akonadi2::Pipeline *pipeline, QList commandQueues) + : QObject(), + mPipeline(pipeline), + mCommandQueues(commandQueues), + mProcessingLock(false) + { + for (auto queue : mCommandQueues) { + const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); + Q_UNUSED(ret); + } + } + +signals: + void error(int errorCode, const QString &errorMessage); + +private: + bool messagesToProcessAvailable() + { + for (auto queue : mCommandQueues) { + if (!queue->isEmpty()) { + return true; + } + } + return false; + } + +private slots: + void process() + { + if (mProcessingLock) { + return; + } + mProcessingLock = true; + auto job = processPipeline().then([this]() { + mProcessingLock = false; + if (messagesToProcessAvailable()) { + process(); + } + }).exec(); + } + + Async::Job processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) + { + Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); + //Throw command into appropriate pipeline + switch (queuedCommand->commandId()) { + case Akonadi2::Commands::DeleteEntityCommand: + //mPipeline->removedEntity + return Async::null(); + case Akonadi2::Commands::ModifyEntityCommand: + //mPipeline->modifiedEntity + return Async::null(); + case Akonadi2::Commands::CreateEntityCommand: + return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); + default: + return Async::error(-1, "Unhandled command"); + } + return Async::null(); + } + + //Process all messages of this queue + Async::Job processQueue(MessageQueue *queue) + { + //TODO use something like: + //Async::foreach("pass iterator here").each("process value here").join(); + //Async::foreach("pass iterator here").parallel("process value here").join(); + return Async::dowhile( + [this, queue](Async::Future &future) { + if (queue->isEmpty()) { + future.setValue(false); + future.setFinished(); + return; + } + queue->dequeue( + [this, &future](void *ptr, int size, std::function messageQueueCallback) { + auto callback = [messageQueueCallback, &future](bool success) { + messageQueueCallback(success); + future.setValue(!success); + future.setFinished(); + }; + + flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); + if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { + Warning() << "invalid buffer"; + callback(false); + return; + } + auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); + Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId()); + //TODO JOBAPI: job lifetime management + //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete + //themselves once done. In other cases we'd like jobs that only live as long as their handle though. + //FIXME this job is stack allocated and thus simply dies.... + processQueuedCommand(queuedCommand).then( + [callback]() { + callback(true); + }, + [callback](int errorCode, QString errorMessage) { + Warning() << "Error while processing queue command: " << errorMessage; + callback(false); + } + ).exec(); + }, + [&future](const MessageQueue::Error &error) { + Warning() << "Error while getting message from messagequeue: " << error.message; + future.setValue(false); + future.setFinished(); + } + ); + } + ); + } + + Async::Job processPipeline() + { + //Go through all message queues + auto it = QSharedPointer >::create(mCommandQueues); + return Async::dowhile( + [it]() { return it->hasNext(); }, + [it, this](Async::Future &future) { + auto queue = it->next(); + processQueue(queue).then([&future]() { + Trace() << "Queue processed"; + future.setFinished(); + }).exec(); + } + ); + } + +private: + Akonadi2::Pipeline *mPipeline; + //Ordered by priority + QList mCommandQueues; + bool mProcessingLock; +}; + +DummyResource::DummyResource() + : Akonadi2::Resource(), + mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), + mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"), + mError(0) +{ +} + +void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) +{ + auto eventFactory = QSharedPointer::create(); + //FIXME we should setup for each resource entity type, not for each domain type + //i.e. If a resource stores tags as part of each message it needs to update the tag index + //TODO setup preprocessors for each resource entity type and pipeline type allowing full customization + //Eventually the order should be self configuring, for now it's hardcoded. + auto eventIndexer = new SimpleProcessor("summaryprocessor", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { + auto adaptor = eventFactory->createAdaptor(entity); + // Log() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); + }); + + auto uidIndexer = new SimpleProcessor("uidIndexer", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { + static Index uidIndex(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.index.uid", Akonadi2::Storage::ReadWrite); + + //TODO: Benchmark if this is performance wise acceptable, or if we have to access the buffer directly + auto adaptor = eventFactory->createAdaptor(entity); + const auto uid = adaptor->getProperty("uid"); + if (uid.isValid()) { + uidIndex.add(uid.toByteArray(), state.key()); + } + }); + + //event is the entitytype and not the domain type + pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer << uidIndexer); + mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); + QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); +} + +void DummyResource::onProcessorError(int errorCode, const QString &errorMessage) +{ + Warning() << "Received error from Processor: " << errorCode << errorMessage; + mError = errorCode; +} + +int DummyResource::error() const +{ + return mError; +} + +void findByRemoteId(QSharedPointer storage, const QString &rid, std::function callback) +{ + //TODO lookup in rid index instead of doing a full scan + const std::string ridString = rid.toStdString(); + storage->scan("", [&](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { + if (Akonadi2::Storage::isInternalKey(keyValue, keySize)) { + return true; + } + + Akonadi2::EntityBuffer::extractResourceBuffer(dataValue, dataSize, [&](const uint8_t *buffer, size_t size) { + flatbuffers::Verifier verifier(buffer, size); + if (DummyCalendar::VerifyDummyEventBuffer(verifier)) { + DummyCalendar::DummyEvent const *resourceBuffer = DummyCalendar::GetDummyEvent(buffer); + if (resourceBuffer && resourceBuffer->remoteId()) { + if (std::string(resourceBuffer->remoteId()->c_str(), resourceBuffer->remoteId()->size()) == ridString) { + callback(keyValue, keySize, dataValue, dataSize); + } + } + } + }); + return true; + }); +} + +void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) +{ + m_fbb.Clear(); + auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size()); + auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData); + Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); + mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); +} + +Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) +{ + return Async::start([this, pipeline](Async::Future &f) { + //TODO use a read-only transaction during the complete sync to sync against a defined revision + auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); + for (auto it = s_dataSource.constBegin(); it != s_dataSource.constEnd(); it++) { + bool isNew = true; + if (storage->exists()) { + findByRemoteId(storage, it.key(), [&](void *keyValue, int keySize, void *dataValue, int dataSize) { + isNew = false; + }); + } + if (isNew) { + m_fbb.Clear(); + + const QByteArray data = it.value().toUtf8(); + auto eventBuffer = DummyCalendar::GetDummyEvent(data.data()); + + //Map the source format to the buffer format (which happens to be an exact copy here) + auto summary = m_fbb.CreateString(eventBuffer->summary()->c_str()); + auto rid = m_fbb.CreateString(it.key().toStdString().c_str()); + auto description = m_fbb.CreateString(it.key().toStdString().c_str()); + static uint8_t rawData[100]; + auto attachment = Akonadi2::EntityBuffer::appendAsVector(m_fbb, rawData, 100); + + auto builder = DummyCalendar::DummyEventBuilder(m_fbb); + builder.add_summary(summary); + builder.add_remoteId(rid); + builder.add_description(description); + builder.add_attachment(attachment); + auto buffer = builder.Finish(); + DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); + flatbuffers::FlatBufferBuilder entityFbb; + Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0); + + flatbuffers::FlatBufferBuilder fbb; + //This is the resource type and not the domain type + auto type = fbb.CreateString("event"); + auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); + auto location = Akonadi2::Commands::CreateCreateEntity(fbb, type, delta); + Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); + + enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); + } else { //modification + //TODO diff and create modification if necessary + } + } + //TODO find items to remove + f.setFinished(); + }); +} + +Async::Job DummyResource::processAllMessages() +{ + //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. + //TODO: report errors while processing sync? + //TODO JOBAPI: A helper that waits for n events and then continues? + return Async::start([this](Async::Future &f) { + if (mSynchronizerQueue.isEmpty()) { + f.setFinished(); + } else { + QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { + f.setFinished(); + }); + } + }).then([this](Async::Future &f) { + if (mUserQueue.isEmpty()) { + f.setFinished(); + } else { + QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() { + f.setFinished(); + }); + } + }); +} + +void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) +{ + //TODO instead of copying the command including the full entity first into the command queue, we could directly + //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). + //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). + enqueueCommand(mUserQueue, commandId, data); +} + +DummyResourceFactory::DummyResourceFactory(QObject *parent) + : Akonadi2::ResourceFactory(parent) +{ + +} + +Akonadi2::Resource *DummyResourceFactory::createResource() +{ + return new DummyResource(); +} + +void DummyResourceFactory::registerFacades(Akonadi2::FacadeFactory &factory) +{ + factory.registerFacade(PLUGIN_NAME); +} + +#include "resourcefactory.moc" diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h new file mode 100644 index 0000000..3b99d5e --- /dev/null +++ b/examples/dummyresource/resourcefactory.h @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2014 Aaron Seigo + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "common/resource.h" +#include "async/src/async.h" +#include "common/messagequeue.h" + +#include + +//TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA +#define PLUGIN_NAME "org.kde.dummy" + +class Processor; + +class DummyResource : public Akonadi2::Resource +{ +public: + DummyResource(); + Async::Job synchronizeWithSource(Akonadi2::Pipeline *pipeline); + Async::Job processAllMessages(); + void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); + void configurePipeline(Akonadi2::Pipeline *pipeline); + int error() const; + +private: + void onProcessorError(int errorCode, const QString &errorMessage); + void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); + flatbuffers::FlatBufferBuilder m_fbb; + MessageQueue mUserQueue; + MessageQueue mSynchronizerQueue; + Processor *mProcessor; + int mError; +}; + +class DummyResourceFactory : public Akonadi2::ResourceFactory +{ + Q_OBJECT + Q_PLUGIN_METADATA(IID "org.kde.dummy") + Q_INTERFACES(Akonadi2::ResourceFactory) + +public: + DummyResourceFactory(QObject *parent = 0); + + Akonadi2::Resource *createResource(); + void registerFacades(Akonadi2::FacadeFactory &factory); +}; + diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 24912f5..7b97264 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,8 +1,12 @@ add_subdirectory(hawd) set(CMAKE_AUTOMOC ON) -include_directories(${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_BINARY_DIR}/hawd) -include_directories (${CMAKE_CURRENT_BINARY_DIR}/../dummyresource) +include_directories( + ${CMAKE_CURRENT_BINARY_DIR} + ${CMAKE_CURRENT_BINARY_DIR}/hawd + ${CMAKE_CURRENT_BINARY_DIR}/../examples/dummyresource + ${CMAKE_CURRENT_SOURCE_DIR}/../examples/ + ) generate_flatbuffers(calendar) -- cgit v1.2.3