From f89e43b3603976bc0e6eb885b3b9a43a6caff1c2 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 19 Apr 2015 14:11:02 +0200 Subject: Moved client and dummyresource to examples/ --- dummyresource/CMakeLists.txt | 12 -- dummyresource/domainadaptor.cpp | 60 ------ dummyresource/domainadaptor.h | 14 -- dummyresource/dummycalendar.fbs | 13 -- dummyresource/facade.cpp | 189 ----------------- dummyresource/facade.h | 45 ---- dummyresource/resourcefactory.cpp | 428 -------------------------------------- dummyresource/resourcefactory.h | 65 ------ 8 files changed, 826 deletions(-) delete mode 100644 dummyresource/CMakeLists.txt delete mode 100644 dummyresource/domainadaptor.cpp delete mode 100644 dummyresource/domainadaptor.h delete mode 100644 dummyresource/dummycalendar.fbs delete mode 100644 dummyresource/facade.cpp delete mode 100644 dummyresource/facade.h delete mode 100644 dummyresource/resourcefactory.cpp delete mode 100644 dummyresource/resourcefactory.h (limited to 'dummyresource') diff --git a/dummyresource/CMakeLists.txt b/dummyresource/CMakeLists.txt deleted file mode 100644 index abd315f..0000000 --- a/dummyresource/CMakeLists.txt +++ /dev/null @@ -1,12 +0,0 @@ -project(akonadi2_resource_dummy) - -add_definitions(-DQT_PLUGIN) -include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) - -generate_flatbuffers(dummycalendar) - -add_library(${PROJECT_NAME} SHARED facade.cpp resourcefactory.cpp domainadaptor.cpp) -qt5_use_modules(${PROJECT_NAME} Core Network) -target_link_libraries(${PROJECT_NAME} akonadi2common) - -install(TARGETS ${PROJECT_NAME} LIBRARY DESTINATION ${AKONADI2_RESOURCE_PLUGINS_PATH}) diff --git a/dummyresource/domainadaptor.cpp b/dummyresource/domainadaptor.cpp deleted file mode 100644 index 8649bc3..0000000 --- a/dummyresource/domainadaptor.cpp +++ /dev/null @@ -1,60 +0,0 @@ - -#include "domainadaptor.h" - -#include -#include - -#include "dummycalendar_generated.h" -#include "event_generated.h" -#include "entity_generated.h" -#include "metadata_generated.h" -#include "domainadaptor.h" -#include "log.h" -#include - -using namespace DummyCalendar; -using namespace flatbuffers; - - - - -DummyEventAdaptorFactory::DummyEventAdaptorFactory() - : DomainTypeAdaptorFactory() -{ - //TODO turn this into initializeReadPropertyMapper as well? - mResourceMapper->addMapping("summary", [](DummyEvent const *buffer) -> QVariant { - return propertyToVariant(buffer->summary()); - }); - - mResourceWriteMapper->addMapping("summary", [](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function { - auto offset = variantToProperty(value, fbb); - return [offset](DummyEventBuilder &builder) { builder.add_summary(offset); }; - }); -} - - -void DummyEventAdaptorFactory::createBuffer(const Akonadi2::ApplicationDomain::Event &event, flatbuffers::FlatBufferBuilder &fbb) -{ - flatbuffers::FlatBufferBuilder localFbb; - if (mLocalWriteMapper) { - auto pos = createBufferPart(event, localFbb, *mLocalWriteMapper); - Akonadi2::ApplicationDomain::Buffer::FinishEventBuffer(localFbb, pos); - flatbuffers::Verifier verifier(localFbb.GetBufferPointer(), localFbb.GetSize()); - if (!verifier.VerifyBuffer()) { - Warning() << "Created invalid local buffer"; - } - } - - flatbuffers::FlatBufferBuilder resFbb; - if (mResourceWriteMapper) { - auto pos = createBufferPart(event, resFbb, *mResourceWriteMapper); - DummyCalendar::FinishDummyEventBuffer(resFbb, pos); - flatbuffers::Verifier verifier(resFbb.GetBufferPointer(), resFbb.GetSize()); - if (!verifier.VerifyBuffer()) { - Warning() << "Created invalid resource buffer"; - } - } - - Akonadi2::EntityBuffer::assembleEntityBuffer(fbb, 0, 0, resFbb.GetBufferPointer(), resFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize()); -} - diff --git a/dummyresource/domainadaptor.h b/dummyresource/domainadaptor.h deleted file mode 100644 index 9d351e7..0000000 --- a/dummyresource/domainadaptor.h +++ /dev/null @@ -1,14 +0,0 @@ -#pragma once - -#include "common/domainadaptor.h" -#include "event_generated.h" -#include "dummycalendar_generated.h" -#include "entity_generated.h" - -class DummyEventAdaptorFactory : public DomainTypeAdaptorFactory -{ -public: - DummyEventAdaptorFactory(); - virtual ~DummyEventAdaptorFactory() {}; - virtual void createBuffer(const Akonadi2::ApplicationDomain::Event &event, flatbuffers::FlatBufferBuilder &fbb); -}; diff --git a/dummyresource/dummycalendar.fbs b/dummyresource/dummycalendar.fbs deleted file mode 100644 index 643c9b2..0000000 --- a/dummyresource/dummycalendar.fbs +++ /dev/null @@ -1,13 +0,0 @@ -// example IDL file - -namespace DummyCalendar; - -table DummyEvent { - summary:string; - description:string; - attachment:[ubyte]; - remoteId:string; -} - -root_type DummyEvent; -file_identifier "AKFB"; diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp deleted file mode 100644 index e50e4f3..0000000 --- a/dummyresource/facade.cpp +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Copyright (C) 2014 Christian Mollekopf - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the - * Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#include "facade.h" - -#include -#include - -#include "common/resourceaccess.h" -#include "common/commands.h" -#include "dummycalendar_generated.h" -#include "event_generated.h" -#include "entity_generated.h" -#include "metadata_generated.h" -#include "domainadaptor.h" -#include -#include -#include - -using namespace DummyCalendar; -using namespace flatbuffers; - - -DummyResourceFacade::DummyResourceFacade() - : Akonadi2::GenericFacade("org.kde.dummy"), - mFactory(new DummyEventAdaptorFactory) -{ -} - -DummyResourceFacade::~DummyResourceFacade() -{ -} - -Async::Job DummyResourceFacade::create(const Akonadi2::ApplicationDomain::Event &domainObject) -{ - flatbuffers::FlatBufferBuilder entityFbb; - mFactory->createBuffer(domainObject, entityFbb); - return sendCreateCommand("event", QByteArray::fromRawData(reinterpret_cast(entityFbb.GetBufferPointer()), entityFbb.GetSize())); -} - -Async::Job DummyResourceFacade::modify(const Akonadi2::ApplicationDomain::Event &domainObject) -{ - //Create message buffer and send to resource - return Async::null(); -} - -Async::Job DummyResourceFacade::remove(const Akonadi2::ApplicationDomain::Event &domainObject) -{ - //Create message buffer and send to resource - return Async::null(); -} - -static std::function prepareQuery(const Akonadi2::Query &query) -{ - //Compose some functions to make query matching fast. - //This way we can process the query once, and convert all values into something that can be compared quickly - std::function preparedQuery; - if (!query.ids.isEmpty()) { - //Match by id - //TODO: for id's a direct lookup would be way faster - - //We convert the id's to std::string so we don't have to convert each key during the scan. (This runs only once, and the query will be run for every key) - //Probably a premature optimization, but perhaps a useful technique to be investigated. - QVector ids; - for (const auto &id : query.ids) { - ids << id.toStdString(); - } - preparedQuery = [ids](const std::string &key, DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local) { - if (ids.contains(key)) { - return true; - } - return false; - }; - } else if (!query.propertyFilter.isEmpty()) { - if (query.propertyFilter.contains("uid")) { - const QByteArray uid = query.propertyFilter.value("uid").toByteArray(); - preparedQuery = [uid](const std::string &key, DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local) { - if (local && local->uid() && (QByteArray::fromRawData(local->uid()->c_str(), local->uid()->size()) == uid)) { - return true; - } - return false; - }; - } - } else { - //Match everything - preparedQuery = [](const std::string &key, DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local) { - return true; - }; - } - return preparedQuery; -} - -void DummyResourceFacade::readValue(QSharedPointer storage, const QByteArray &key, const std::function &resultCallback, std::function preparedQuery) -{ - storage->scan(key, [=](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { - - //Skip internals - if (Akonadi2::Storage::isInternalKey(keyValue, keySize)) { - return true; - } - - //Extract buffers - Akonadi2::EntityBuffer buffer(dataValue, dataSize); - - const auto resourceBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().resource()); - const auto localBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().local()); - const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(buffer.entity().metadata()); - - if ((!resourceBuffer && !localBuffer) || !metadataBuffer) { - qWarning() << "invalid buffer " << QByteArray::fromRawData(static_cast(keyValue), keySize); - return true; - } - - //We probably only want to create all buffers after the scan - //TODO use adapter for query and scan? - if (preparedQuery && preparedQuery(std::string(static_cast(keyValue), keySize), resourceBuffer, localBuffer)) { - qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - auto adaptor = mFactory->createAdaptor(buffer.entity()); - //TODO only copy requested properties - auto memoryAdaptor = QSharedPointer::create(*adaptor); - // here we could copy additional properties that don't have a 1:1 mapping, such as separately stored tags. - auto event = QSharedPointer::create("org.kde.dummy", QByteArray::fromRawData(static_cast(keyValue), keySize), revision, memoryAdaptor); - resultCallback(event); - } - return true; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); -} - -Async::Job DummyResourceFacade::load(const Akonadi2::Query &query, const std::function &resultCallback) -{ - return Async::start([=](Async::Future &future) { - //Now that the sync is complete we can execute the query - const auto preparedQuery = prepareQuery(query); - - auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); - storage->setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); - - storage->startTransaction(Akonadi2::Storage::ReadOnly); - const qint64 revision = storage->maxRevision(); - - //Index lookups - QVector keys; - if (query.propertyFilter.contains("uid")) { - static Index uidIndex(Akonadi2::Store::storageLocation(), "org.kde.dummy.index.uid", Akonadi2::Storage::ReadOnly); - uidIndex.lookup(query.propertyFilter.value("uid").toByteArray(), [&](const QByteArray &value) { - keys << value; - }, - [](const Index::Error &error) { - Warning() << "Error in index: " << error.message; - }); - } - - if (keys.isEmpty()) { - Log() << "Executing a full scan"; - readValue(storage, QByteArray(), resultCallback, preparedQuery); - } else { - for (const auto &key : keys) { - readValue(storage, key, resultCallback, preparedQuery); - } - } - storage->abortTransaction(); - future.setValue(revision); - future.setFinished(); - }); -} - diff --git a/dummyresource/facade.h b/dummyresource/facade.h deleted file mode 100644 index 91ae351..0000000 --- a/dummyresource/facade.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (C) 2014 Christian Mollekopf - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the - * Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#pragma once - -#include "common/facade.h" - -#include "common/clientapi.h" -#include "common/storage.h" -#include "resourcefactory.h" -#include "entity_generated.h" -#include "event_generated.h" -#include "dummycalendar_generated.h" -#include "common/domainadaptor.h" - -class DummyResourceFacade : public Akonadi2::GenericFacade -{ -public: - DummyResourceFacade(); - virtual ~DummyResourceFacade(); - Async::Job create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; - Async::Job modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; - Async::Job remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; - Async::Job load(const Akonadi2::Query &query, const std::function &resultCallback) Q_DECL_OVERRIDE; - -private: - void readValue(QSharedPointer storage, const QByteArray &key, const std::function &resultCallback, std::function); - QSharedPointer > mFactory; -}; diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp deleted file mode 100644 index d5765e2..0000000 --- a/dummyresource/resourcefactory.cpp +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Copyright (C) 2014 Aaron Seigo - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the - * Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#include "resourcefactory.h" -#include "facade.h" -#include "entitybuffer.h" -#include "pipeline.h" -#include "dummycalendar_generated.h" -#include "metadata_generated.h" -#include "queuedcommand_generated.h" -#include "createentity_generated.h" -#include "domainadaptor.h" -#include "commands.h" -#include "clientapi.h" -#include "index.h" -#include "log.h" -#include -#include - - -/* - * Figure out how to implement various classes of processors: - * * read-only (index and such) => extractor function, probably using domain adaptor - * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) - * * flag extractors? => like read-only? Or write to local portion of buffer? - * ** $ISSPAM should become part of domain object and is written to the local part of the mail. - * ** => value could be calculated by the server directly - */ -class SimpleProcessor : public Akonadi2::Preprocessor -{ -public: - SimpleProcessor(const QString &id, const std::function &f) - : Akonadi2::Preprocessor(), - mFunction(f), - mId(id) - { - } - - void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) Q_DECL_OVERRIDE - { - mFunction(state, e); - processingCompleted(state); - } - - QString id() const - { - return mId; - } - -protected: - std::function mFunction; - QString mId; -}; - - - -static std::string createEvent() -{ - static const size_t attachmentSize = 1024*2; // 2KB - static uint8_t rawData[attachmentSize]; - static flatbuffers::FlatBufferBuilder fbb; - fbb.Clear(); - { - uint8_t *rawDataPtr = Q_NULLPTR; - auto summary = fbb.CreateString("summary"); - auto data = fbb.CreateUninitializedVector(attachmentSize, &rawDataPtr); - DummyCalendar::DummyEventBuilder eventBuilder(fbb); - eventBuilder.add_summary(summary); - eventBuilder.add_attachment(data); - auto eventLocation = eventBuilder.Finish(); - DummyCalendar::FinishDummyEventBuffer(fbb, eventLocation); - memcpy((void*)rawDataPtr, rawData, attachmentSize); - } - - return std::string(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); -} - -QMap populate() -{ - QMap content; - for (int i = 0; i < 2; i++) { - auto event = createEvent(); - content.insert(QString("key%1").arg(i), QString::fromStdString(event)); - } - return content; -} - -static QMap s_dataSource = populate(); - -//Drives the pipeline using the output from all command queues -class Processor : public QObject -{ - Q_OBJECT -public: - Processor(Akonadi2::Pipeline *pipeline, QList commandQueues) - : QObject(), - mPipeline(pipeline), - mCommandQueues(commandQueues), - mProcessingLock(false) - { - for (auto queue : mCommandQueues) { - const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); - Q_UNUSED(ret); - } - } - -signals: - void error(int errorCode, const QString &errorMessage); - -private: - bool messagesToProcessAvailable() - { - for (auto queue : mCommandQueues) { - if (!queue->isEmpty()) { - return true; - } - } - return false; - } - -private slots: - void process() - { - if (mProcessingLock) { - return; - } - mProcessingLock = true; - auto job = processPipeline().then([this]() { - mProcessingLock = false; - if (messagesToProcessAvailable()) { - process(); - } - }).exec(); - } - - Async::Job processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) - { - Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); - //Throw command into appropriate pipeline - switch (queuedCommand->commandId()) { - case Akonadi2::Commands::DeleteEntityCommand: - //mPipeline->removedEntity - return Async::null(); - case Akonadi2::Commands::ModifyEntityCommand: - //mPipeline->modifiedEntity - return Async::null(); - case Akonadi2::Commands::CreateEntityCommand: - return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); - default: - return Async::error(-1, "Unhandled command"); - } - return Async::null(); - } - - //Process all messages of this queue - Async::Job processQueue(MessageQueue *queue) - { - //TODO use something like: - //Async::foreach("pass iterator here").each("process value here").join(); - //Async::foreach("pass iterator here").parallel("process value here").join(); - return Async::dowhile( - [this, queue](Async::Future &future) { - if (queue->isEmpty()) { - future.setValue(false); - future.setFinished(); - return; - } - queue->dequeue( - [this, &future](void *ptr, int size, std::function messageQueueCallback) { - auto callback = [messageQueueCallback, &future](bool success) { - messageQueueCallback(success); - future.setValue(!success); - future.setFinished(); - }; - - flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); - if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { - Warning() << "invalid buffer"; - callback(false); - return; - } - auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); - Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId()); - //TODO JOBAPI: job lifetime management - //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete - //themselves once done. In other cases we'd like jobs that only live as long as their handle though. - //FIXME this job is stack allocated and thus simply dies.... - processQueuedCommand(queuedCommand).then( - [callback]() { - callback(true); - }, - [callback](int errorCode, QString errorMessage) { - Warning() << "Error while processing queue command: " << errorMessage; - callback(false); - } - ).exec(); - }, - [&future](const MessageQueue::Error &error) { - Warning() << "Error while getting message from messagequeue: " << error.message; - future.setValue(false); - future.setFinished(); - } - ); - } - ); - } - - Async::Job processPipeline() - { - //Go through all message queues - auto it = QSharedPointer >::create(mCommandQueues); - return Async::dowhile( - [it]() { return it->hasNext(); }, - [it, this](Async::Future &future) { - auto queue = it->next(); - processQueue(queue).then([&future]() { - Trace() << "Queue processed"; - future.setFinished(); - }).exec(); - } - ); - } - -private: - Akonadi2::Pipeline *mPipeline; - //Ordered by priority - QList mCommandQueues; - bool mProcessingLock; -}; - -DummyResource::DummyResource() - : Akonadi2::Resource(), - mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), - mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"), - mError(0) -{ -} - -void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) -{ - auto eventFactory = QSharedPointer::create(); - //FIXME we should setup for each resource entity type, not for each domain type - //i.e. If a resource stores tags as part of each message it needs to update the tag index - //TODO setup preprocessors for each resource entity type and pipeline type allowing full customization - //Eventually the order should be self configuring, for now it's hardcoded. - auto eventIndexer = new SimpleProcessor("summaryprocessor", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { - auto adaptor = eventFactory->createAdaptor(entity); - // Log() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); - }); - - auto uidIndexer = new SimpleProcessor("uidIndexer", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { - static Index uidIndex(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.index.uid", Akonadi2::Storage::ReadWrite); - - //TODO: Benchmark if this is performance wise acceptable, or if we have to access the buffer directly - auto adaptor = eventFactory->createAdaptor(entity); - const auto uid = adaptor->getProperty("uid"); - if (uid.isValid()) { - uidIndex.add(uid.toByteArray(), state.key()); - } - }); - - //event is the entitytype and not the domain type - pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer << uidIndexer); - mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); - QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); -} - -void DummyResource::onProcessorError(int errorCode, const QString &errorMessage) -{ - Warning() << "Received error from Processor: " << errorCode << errorMessage; - mError = errorCode; -} - -int DummyResource::error() const -{ - return mError; -} - -void findByRemoteId(QSharedPointer storage, const QString &rid, std::function callback) -{ - //TODO lookup in rid index instead of doing a full scan - const std::string ridString = rid.toStdString(); - storage->scan("", [&](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { - if (Akonadi2::Storage::isInternalKey(keyValue, keySize)) { - return true; - } - - Akonadi2::EntityBuffer::extractResourceBuffer(dataValue, dataSize, [&](const uint8_t *buffer, size_t size) { - flatbuffers::Verifier verifier(buffer, size); - if (DummyCalendar::VerifyDummyEventBuffer(verifier)) { - DummyCalendar::DummyEvent const *resourceBuffer = DummyCalendar::GetDummyEvent(buffer); - if (resourceBuffer && resourceBuffer->remoteId()) { - if (std::string(resourceBuffer->remoteId()->c_str(), resourceBuffer->remoteId()->size()) == ridString) { - callback(keyValue, keySize, dataValue, dataSize); - } - } - } - }); - return true; - }); -} - -void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) -{ - m_fbb.Clear(); - auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size()); - auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData); - Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); - mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); -} - -Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) -{ - return Async::start([this, pipeline](Async::Future &f) { - //TODO use a read-only transaction during the complete sync to sync against a defined revision - auto storage = QSharedPointer::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); - for (auto it = s_dataSource.constBegin(); it != s_dataSource.constEnd(); it++) { - bool isNew = true; - if (storage->exists()) { - findByRemoteId(storage, it.key(), [&](void *keyValue, int keySize, void *dataValue, int dataSize) { - isNew = false; - }); - } - if (isNew) { - m_fbb.Clear(); - - const QByteArray data = it.value().toUtf8(); - auto eventBuffer = DummyCalendar::GetDummyEvent(data.data()); - - //Map the source format to the buffer format (which happens to be an exact copy here) - auto summary = m_fbb.CreateString(eventBuffer->summary()->c_str()); - auto rid = m_fbb.CreateString(it.key().toStdString().c_str()); - auto description = m_fbb.CreateString(it.key().toStdString().c_str()); - static uint8_t rawData[100]; - auto attachment = Akonadi2::EntityBuffer::appendAsVector(m_fbb, rawData, 100); - - auto builder = DummyCalendar::DummyEventBuilder(m_fbb); - builder.add_summary(summary); - builder.add_remoteId(rid); - builder.add_description(description); - builder.add_attachment(attachment); - auto buffer = builder.Finish(); - DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); - flatbuffers::FlatBufferBuilder entityFbb; - Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0); - - flatbuffers::FlatBufferBuilder fbb; - //This is the resource type and not the domain type - auto type = fbb.CreateString("event"); - auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); - auto location = Akonadi2::Commands::CreateCreateEntity(fbb, type, delta); - Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); - - enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); - } else { //modification - //TODO diff and create modification if necessary - } - } - //TODO find items to remove - f.setFinished(); - }); -} - -Async::Job DummyResource::processAllMessages() -{ - //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. - //TODO: report errors while processing sync? - //TODO JOBAPI: A helper that waits for n events and then continues? - return Async::start([this](Async::Future &f) { - if (mSynchronizerQueue.isEmpty()) { - f.setFinished(); - } else { - QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { - f.setFinished(); - }); - } - }).then([this](Async::Future &f) { - if (mUserQueue.isEmpty()) { - f.setFinished(); - } else { - QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() { - f.setFinished(); - }); - } - }); -} - -void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) -{ - //TODO instead of copying the command including the full entity first into the command queue, we could directly - //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). - //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). - enqueueCommand(mUserQueue, commandId, data); -} - -DummyResourceFactory::DummyResourceFactory(QObject *parent) - : Akonadi2::ResourceFactory(parent) -{ - -} - -Akonadi2::Resource *DummyResourceFactory::createResource() -{ - return new DummyResource(); -} - -void DummyResourceFactory::registerFacades(Akonadi2::FacadeFactory &factory) -{ - factory.registerFacade(PLUGIN_NAME); -} - -#include "resourcefactory.moc" diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h deleted file mode 100644 index 3b99d5e..0000000 --- a/dummyresource/resourcefactory.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (C) 2014 Aaron Seigo - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the - * Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#pragma once - -#include "common/resource.h" -#include "async/src/async.h" -#include "common/messagequeue.h" - -#include - -//TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA -#define PLUGIN_NAME "org.kde.dummy" - -class Processor; - -class DummyResource : public Akonadi2::Resource -{ -public: - DummyResource(); - Async::Job synchronizeWithSource(Akonadi2::Pipeline *pipeline); - Async::Job processAllMessages(); - void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); - void configurePipeline(Akonadi2::Pipeline *pipeline); - int error() const; - -private: - void onProcessorError(int errorCode, const QString &errorMessage); - void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); - flatbuffers::FlatBufferBuilder m_fbb; - MessageQueue mUserQueue; - MessageQueue mSynchronizerQueue; - Processor *mProcessor; - int mError; -}; - -class DummyResourceFactory : public Akonadi2::ResourceFactory -{ - Q_OBJECT - Q_PLUGIN_METADATA(IID "org.kde.dummy") - Q_INTERFACES(Akonadi2::ResourceFactory) - -public: - DummyResourceFactory(QObject *parent = 0); - - Akonadi2::Resource *createResource(); - void registerFacades(Akonadi2::FacadeFactory &factory); -}; - -- cgit v1.2.3