From 91d915a09b7d52c10edb1d4c1298fc2885b8a257 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 2 Jan 2015 22:39:25 +0100 Subject: DomainTypeAdaptor factory, per type preprocessor pipeline configuration. --- common/entitybuffer.cpp | 5 ++ common/entitybuffer.h | 1 + common/metadata.fbs | 1 + common/pipeline.cpp | 44 ++++++++---- common/pipeline.h | 37 ++++++++-- common/resource.cpp | 5 ++ common/resource.h | 2 + dummyresource/facade.cpp | 138 ++++++++++++++++++-------------------- dummyresource/facade.h | 53 +++++++++++++++ dummyresource/resourcefactory.cpp | 56 +++++++++++++++- dummyresource/resourcefactory.h | 1 + synchronizer/listener.cpp | 2 + 12 files changed, 253 insertions(+), 92 deletions(-) diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp index b9c9d76..c5d6bce 100644 --- a/common/entitybuffer.cpp +++ b/common/entitybuffer.cpp @@ -18,6 +18,11 @@ EntityBuffer::EntityBuffer(void *dataValue, int dataSize) } } +const Akonadi2::Entity &EntityBuffer::entity() +{ + return *mEntity; +} + const flatbuffers::Vector* EntityBuffer::resourceBuffer() { if (!mEntity) { diff --git a/common/entitybuffer.h b/common/entitybuffer.h index c072777..bd9360d 100644 --- a/common/entitybuffer.h +++ b/common/entitybuffer.h @@ -12,6 +12,7 @@ public: const flatbuffers::Vector *resourceBuffer(); const flatbuffers::Vector *metadataBuffer(); const flatbuffers::Vector *localBuffer(); + const Entity &entity(); static void extractResourceBuffer(void *dataValue, int dataSize, const std::function *)> &handler); static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize); diff --git a/common/metadata.fbs b/common/metadata.fbs index 71684b6..34a8df2 100644 --- a/common/metadata.fbs +++ b/common/metadata.fbs @@ -2,6 +2,7 @@ namespace Akonadi2; table Metadata { revision: ulong; + processed: bool = true; } root_type Metadata; diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 04954ac..8d00480 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -41,10 +41,10 @@ public: } Storage storage; - QVector nullPipeline; - QVector newPipeline; - QVector modifiedPipeline; - QVector deletedPipeline; + QHash > nullPipeline; + QHash > newPipeline; + QHash > modifiedPipeline; + QHash > deletedPipeline; QVector activePipelines; bool stepScheduled; }; @@ -60,6 +60,23 @@ Pipeline::~Pipeline() delete d; } +void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, const QVector &preprocessors) +{ + switch (pipelineType) { + case NewPipeline: + d->newPipeline[entityType] = preprocessors; + break; + case ModifiedPipeline: + d->modifiedPipeline[entityType] = preprocessors; + break; + case DeletedPipeline: + d->deletedPipeline[entityType] = preprocessors; + break; + default: + break; + }; +} + Storage &Pipeline::storage() const { return d->storage; @@ -68,12 +85,12 @@ Storage &Pipeline::storage() const void Pipeline::null() { //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) - PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); - d->activePipelines << state; - state.step(); + // PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); + // d->activePipelines << state; + // state.step(); } -void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t size) +void Pipeline::newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size) { const qint64 newRevision = storage().maxRevision() + 1; @@ -81,6 +98,7 @@ void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); + metadataBuilder.add_processed(false); auto metadataBuffer = metadataBuilder.Finish(); Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); @@ -90,21 +108,21 @@ void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); storage().setMaxRevision(newRevision); - PipelineState state(this, NewPipeline, key, d->newPipeline); + PipelineState state(this, NewPipeline, key, d->newPipeline[entityType]); d->activePipelines << state; state.step(); } -void Pipeline::modifiedEntity(const QByteArray &key, void *data, size_t size) +void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) { - PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline); + PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType]); d->activePipelines << state; state.step(); } -void Pipeline::deletedEntity(const QByteArray &key) +void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) { - PipelineState state(this, DeletedPipeline, key, d->deletedPipeline); + PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType]); d->activePipelines << state; state.step(); } diff --git a/common/pipeline.h b/common/pipeline.h index 6ef8703..8373899 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -27,6 +27,7 @@ #include #include +#include //For domain types namespace Akonadi2 { @@ -46,12 +47,34 @@ public: Storage &storage() const; + // template + // Storage &storage() const; + + template + void setPreprocessors(Type type, const QVector &preprocessors) + { + setPreprocessors(Akonadi2::Domain::getTypeName(), type, preprocessors); + } + void null(); - //FIXME We should probably directly provide a DomainTypeAdapter here. The data has already been written and we only need to read it for processing. And we need to read all buffers. - void newEntity(const QByteArray &key, void *resourceBufferData, size_t size); - //TODO Send local buffer data as well? - void modifiedEntity(const QByteArray &key, void *data, size_t size); - void deletedEntity(const QByteArray &key); + + template + void newEntity(const QByteArray &key, void *resourceBufferData, size_t size) + { + newEntity(Akonadi2::Domain::getTypeName(), key, resourceBufferData, size); + } + + template + void modifiedEntity(const QByteArray &key, void *data, size_t size) + { + modifiedEntity(Akonadi2::Domain::getTypeName(), key, data, size); + } + + template + void deletedEntity(const QByteArray &key) + { + deletedEntity(Akonadi2::Domain::getTypeName(), key); + } Q_SIGNALS: void revisionUpdated(); @@ -61,6 +84,10 @@ private Q_SLOTS: void stepPipelines(); private: + void setPreprocessors(const QString &entityType, Type pipelineType, const QVector &preprocessors); + void newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size); + void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); + void deletedEntity(const QString &entityType, const QByteArray &key); void pipelineStepped(const PipelineState &state); void pipelineCompleted(const PipelineState &state); void scheduleStep(); diff --git a/common/resource.cpp b/common/resource.cpp index bba6609..db08c4f 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -39,6 +39,11 @@ Resource::~Resource() //delete d; } +void Resource::configurePipeline(Pipeline *pipeline) +{ + +} + void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) { Q_UNUSED(commandId) diff --git a/common/resource.h b/common/resource.h index fb42c1b..52a28a6 100644 --- a/common/resource.h +++ b/common/resource.h @@ -36,6 +36,8 @@ public: virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); virtual Async::Job synchronizeWithSource(Pipeline *pipeline); + virtual void configurePipeline(Pipeline *pipeline); + private: class Private; Private * const d; diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index d3974e9..c167297 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp @@ -33,44 +33,87 @@ using namespace DummyCalendar; using namespace flatbuffers; -/** - * The property mapper holds accessor functions for all properties. - * - * It is by default initialized with accessors that access the local-only buffer, - * and resource simply have to overwrite those accessors. - */ -template -class PropertyMapper +//This will become a generic implementation that simply takes the resource buffer and local buffer pointer +class DummyEventAdaptor : public Akonadi2::Domain::BufferAdaptor { public: - void setProperty(const QString &key, const QVariant &value, BufferType *buffer) + DummyEventAdaptor() + : BufferAdaptor() { - if (mWriteAccessors.contains(key)) { - auto accessor = mWriteAccessors.value(key); - return accessor(value, buffer); + + } + + void setProperty(const QString &key, const QVariant &value) + { + if (mResourceMapper->mWriteAccessors.contains(key)) { + // mResourceMapper.setProperty(key, value, mResourceBuffer); + } else { + // mLocalMapper.; } } - virtual QVariant getProperty(const QString &key, BufferType const *buffer) const + virtual QVariant getProperty(const QString &key) const { - if (mReadAccessors.contains(key)) { - auto accessor = mReadAccessors.value(key); - return accessor(buffer); + if (mResourceBuffer && mResourceMapper->mReadAccessors.contains(key)) { + return mResourceMapper->getProperty(key, mResourceBuffer); + } else if (mLocalBuffer) { + return mLocalMapper->getProperty(key, mLocalBuffer); } return QVariant(); } - QHash > mReadAccessors; - QHash > mWriteAccessors; + + Akonadi2::Domain::Buffer::Event const *mLocalBuffer; + DummyEvent const *mResourceBuffer; + + QSharedPointer > mLocalMapper; + QSharedPointer > mResourceMapper; }; +template<> +QSharedPointer DomainTypeAdaptorFactory::createAdaptor(const Akonadi2::Entity &entity) +{ + DummyEvent const *resourceBuffer = 0; + if (auto resourceData = entity.resource()) { + flatbuffers::Verifier verifyer(resourceData->Data(), resourceData->size()); + if (VerifyDummyEventBuffer(verifyer)) { + resourceBuffer = GetDummyEvent(resourceData); + } + } + + Akonadi2::Metadata const *metadataBuffer = 0; + if (auto metadataData = entity.metadata()) { + flatbuffers::Verifier verifyer(metadataData->Data(), metadataData->size()); + if (Akonadi2::VerifyMetadataBuffer(verifyer)) { + metadataBuffer = Akonadi2::GetMetadata(metadataData); + } + } + + Akonadi2::Domain::Buffer::Event const *localBuffer = 0; + if (auto localData = entity.local()) { + flatbuffers::Verifier verifyer(localData->Data(), localData->size()); + if (Akonadi2::Domain::Buffer::VerifyEventBuffer(verifyer)) { + localBuffer = Akonadi2::Domain::Buffer::GetEvent(localData); + } + } + + auto adaptor = QSharedPointer::create(); + adaptor->mLocalBuffer = localBuffer; + adaptor->mResourceBuffer = resourceBuffer; + adaptor->mResourceMapper = mResourceMapper; + adaptor->mLocalMapper = mLocalMapper; + return adaptor; +} + DummyResourceFacade::DummyResourceFacade() : Akonadi2::StoreFacade(), - mResourceAccess(new Akonadi2::ResourceAccess("org.kde.dummy")) + mResourceAccess(new Akonadi2::ResourceAccess("org.kde.dummy")), + mFactory(new DomainTypeAdaptorFactory()) { - PropertyMapper mapper; - mapper.mReadAccessors.insert("summary", [](DummyEvent const *buffer) -> QVariant { + auto mapper = QSharedPointer >::create(); + mapper->mReadAccessors.insert("summary", [](DummyEvent const *buffer) -> QVariant { return QString::fromStdString(buffer->summary()->c_str()); }); + mFactory->mResourceMapper = mapper; } DummyResourceFacade::~DummyResourceFacade() @@ -105,45 +148,6 @@ void DummyResourceFacade::remove(const Akonadi2::Domain::Event &domainObject) // -//This will become a generic implementation that simply takes the resource buffer and local buffer pointer -class DummyEventAdaptor : public Akonadi2::Domain::BufferAdaptor -{ -public: - DummyEventAdaptor() - : BufferAdaptor() - { - - } - - void setProperty(const QString &key, const QVariant &value) - { - if (mResourceMapper.mWriteAccessors.contains(key)) { - // mResourceMapper.setProperty(key, value, mResourceBuffer); - } else { - // mLocalMapper.; - } - } - - virtual QVariant getProperty(const QString &key) const - { - if (mResourceBuffer && mResourceMapper.mReadAccessors.contains(key)) { - return mResourceMapper.getProperty(key, mResourceBuffer); - } else if (mLocalBuffer) { - return mLocalMapper.getProperty(key, mLocalBuffer); - } - return QVariant(); - } - - Akonadi2::Domain::Buffer::Event const *mLocalBuffer; - DummyEvent const *mResourceBuffer; - - PropertyMapper mLocalMapper; - PropertyMapper mResourceMapper; - - //Keep query alive so values remain valid - QSharedPointer storage; -}; - static std::function prepareQuery(const Akonadi2::Query &query) { //Compose some functions to make query matching fast. @@ -225,26 +229,16 @@ void DummyResourceFacade::load(const Akonadi2::Query &query, const std::function } } - Akonadi2::Domain::Buffer::Event const *localBuffer = 0; - if (auto localData = buffer.localBuffer()) { - flatbuffers::Verifier verifyer(localData->Data(), localData->size()); - if (Akonadi2::Domain::Buffer::VerifyEventBuffer(verifyer)) { - localBuffer = Akonadi2::Domain::Buffer::GetEvent(localData); - } - } - if (!resourceBuffer || !metadataBuffer) { qWarning() << "invalid buffer " << QString::fromStdString(std::string(static_cast(keyValue), keySize)); return true; } //We probably only want to create all buffers after the scan + //TODO use adapter for query and scan? if (preparedQuery && preparedQuery(std::string(static_cast(keyValue), keySize), resourceBuffer)) { qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - auto adaptor = QSharedPointer::create(); - adaptor->mLocalBuffer = localBuffer; - adaptor->mResourceBuffer = resourceBuffer; - adaptor->storage = storage; + auto adaptor = mFactory->createAdaptor(buffer.entity()); auto event = QSharedPointer::create("org.kde.dummy", QString::fromUtf8(static_cast(keyValue), keySize), revision, adaptor); resultCallback(event); } diff --git a/dummyresource/facade.h b/dummyresource/facade.h index c76e62c..46b27ef 100644 --- a/dummyresource/facade.h +++ b/dummyresource/facade.h @@ -21,11 +21,63 @@ #include "common/clientapi.h" #include "common/storage.h" +#include "entity_generated.h" +#include "event_generated.h" +#include "dummycalendar_generated.h" namespace Akonadi2 { class ResourceAccess; } +/** + * The property mapper holds accessor functions for all properties. + * + * It is by default initialized with accessors that access the local-only buffer, + * and resource simply have to overwrite those accessors. + */ +template +class PropertyMapper +{ +public: + void setProperty(const QString &key, const QVariant &value, BufferType *buffer) + { + if (mWriteAccessors.contains(key)) { + auto accessor = mWriteAccessors.value(key); + return accessor(value, buffer); + } + } + + virtual QVariant getProperty(const QString &key, BufferType const *buffer) const + { + if (mReadAccessors.contains(key)) { + auto accessor = mReadAccessors.value(key); + return accessor(buffer); + } + return QVariant(); + } + QHash > mReadAccessors; + QHash > mWriteAccessors; +}; + +//The factory should define how to go from an entitybuffer (local + resource buffer), to a domain type adapter. +//It defines how values are split accross local and resource buffer. +//This is required by the facade the read the value, and by the pipeline preprocessors to access the domain values in a generic way. +template +class DomainTypeAdaptorFactory +{ +}; + +template +class DomainTypeAdaptorFactory +{ +public: + QSharedPointer createAdaptor(const Akonadi2::Entity &entity); + +// private: + QSharedPointer > mLocalMapper; + QSharedPointer > mResourceMapper; +}; + class DummyResourceFacade : public Akonadi2::StoreFacade { public: @@ -39,4 +91,5 @@ public: private: void synchronizeResource(const std::function &continuation); QSharedPointer mResourceAccess; + QSharedPointer > mFactory; }; diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index c9e4d7a..08222c0 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -20,10 +20,48 @@ #include "resourcefactory.h" #include "facade.h" #include "entitybuffer.h" +#include "pipeline.h" #include "dummycalendar_generated.h" #include "metadata_generated.h" #include +/* + * Figure out how to implement various classes of processors: + * * read-only (index and such) => domain adapter + * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) + * * flag extractors? => like read-only? Or write to local portion of buffer? + * ** $ISSPAM should become part of domain object and is written to the local part of the mail. + * ** => value could be calculated by the server directly + */ +// template +class SimpleProcessor : public Akonadi2::Preprocessor +{ +public: + SimpleProcessor(const std::function &f) + : Akonadi2::Preprocessor(), + mFunction(f) + { + } + + void process(const Akonadi2::PipelineState &state) { + mFunction(state); + } + +protected: + std::function mFunction; +}; + +// template +// class SimpleReadOnlyProcessor : public SimpleProcessor +// { +// public: +// using SimpleProcessor::SimpleProcessor; +// void process(Akonadi2::PipelineState state) { +// mFunction(); +// } +// }; + + static std::string createEvent() { static const size_t attachmentSize = 1024*2; // 2KB @@ -61,6 +99,20 @@ DummyResource::DummyResource() { } +void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) +{ + //TODO setup preprocessors for each domain type and pipeline type allowing full customization + //Eventually the order should be self configuring, for now it's hardcoded. + auto eventIndexer = new SimpleProcessor([](const Akonadi2::PipelineState &state) { + //FIXME + // auto adaptor = QSharedPointer::create(); + // adaptor->mLocalBuffer = localBuffer; + // adaptor->mResourceBuffer = resourceBuffer; + // adaptor->storage = storage; + }); + pipeline->setPreprocessors(Akonadi2::Pipeline::NewPipeline, QVector() << eventIndexer); +} + void findByRemoteId(QSharedPointer storage, const QString &rid, std::function callback) { //TODO lookup in rid index instead of doing a full scan @@ -119,7 +171,7 @@ Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. const auto key = QUuid::createUuid().toString().toUtf8(); - pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); + pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); } else { //modification //TODO diff and create modification if necessary } @@ -139,7 +191,7 @@ void DummyResource::processCommand(int commandId, const QByteArray &data, uint s builder .add_summary(m_fbb.CreateString("summary summary!")); auto buffer = builder.Finish(); DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); - pipeline->newEntity("fakekey", m_fbb.GetBufferPointer(), m_fbb.GetSize()); + pipeline->newEntity("fakekey", m_fbb.GetBufferPointer(), m_fbb.GetSize()); m_fbb.Clear(); } diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index dba674f..427fcc6 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h @@ -33,6 +33,7 @@ public: DummyResource(); Async::Job synchronizeWithSource(Akonadi2::Pipeline *pipeline); void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); + void configurePipeline(Akonadi2::Pipeline *pipeline); private: flatbuffers::FlatBufferBuilder m_fbb; diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 51794ed..8b5a19a 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp @@ -37,10 +37,12 @@ Listener::Listener(const QString &resourceName, QObject *parent) m_server(new QLocalServer(this)), m_resourceName(resourceName), m_resource(0), + //TODO move pipeline(s) to resource m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), m_clientBufferProcessesTimer(new QTimer(this)), m_messageId(0) { + m_resource->configurePipeline(m_pipeline); connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated, this, &Listener::refreshRevision); connect(m_server, &QLocalServer::newConnection, -- cgit v1.2.3