From c83c2ef64b5a1e4b1dc0102df36687caebb96ff0 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 24 Dec 2014 02:15:41 +0100 Subject: unifying buffer, and a better way to implement domain object adapters. --- common/CMakeLists.txt | 19 ++++---- common/clientapi.h | 38 ++++++++++----- common/domain/event.fbs | 10 ++++ common/entitybuffer.fbs | 9 ++++ common/metadata.fbs | 7 +++ common/pipeline.cpp | 40 ++++++++++++++-- common/pipeline.h | 8 ++-- common/storage.h | 2 +- common/storage_lmdb.cpp | 25 ++++++---- common/test/clientapitest.cpp | 2 +- dummyresource/facade.cpp | 98 ++++++++++++++++++++++++++++++--------- dummyresource/resourcefactory.cpp | 4 +- 12 files changed, 198 insertions(+), 64 deletions(-) create mode 100644 common/domain/event.fbs create mode 100644 common/entitybuffer.fbs create mode 100644 common/metadata.fbs diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 001dab5..ec13e07 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -1,13 +1,16 @@ project(akonadi2common) generate_flatbuffers( - commands/commandcompletion - commands/createentity - commands/deleteentity - commands/fetchentity - commands/handshake - commands/modifyentity - commands/revisionupdate - ) + commands/commandcompletion + commands/createentity + commands/deleteentity + commands/fetchentity + commands/handshake + commands/modifyentity + commands/revisionupdate + domain/event + entitybuffer + metadata +) if (STORAGE_unqlite) add_definitions(-DUNQLITE_ENABLE_THREADS -fpermissive) diff --git a/common/clientapi.h b/common/clientapi.h index d2757e7..ba0cb19 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -173,18 +173,36 @@ namespace Akonadi2 { */ namespace Domain { +/** + * This class has to be implemented by resources and can be used as generic interface to access the buffer properties + */ +class BufferAdaptor { +public: + virtual QVariant getProperty(const QString &key) { return QVariant(); } + virtual void setProperty(const QString &key, const QVariant &value) {} +}; + +/** + * The domain type interface has two purposes: + * * provide a unified interface to read buffers (for zero-copy reading) + * * record changes to generate changesets for modifications + */ class AkonadiDomainType { public: - AkonadiDomainType(const QString &resourceName, const QString &identifier, qint64 revision) - : mResourceName(resourceName), + AkonadiDomainType(const QString &resourceName, const QString &identifier, qint64 revision, const QSharedPointer &adaptor) + : mAdaptor(adaptor), + mResourceName(resourceName), mIdentifier(identifier), mRevision(revision) { } - virtual QVariant getProperty(const QString &key){ return QVariant(); } + virtual QVariant getProperty(const QString &key){ return mAdaptor->getProperty(key); } + virtual void setProperty(const QString &key, const QVariant &value){ mChangeSet.insert(key, value); } private: + QSharedPointer mAdaptor; + QHash mChangeSet; /* * Each domain object needs to store the resource, identifier, revision triple so we can link back to the storage location. */ @@ -193,21 +211,19 @@ private: qint64 mRevision; }; -class Event : public AkonadiDomainType { -public: +struct Event : public AkonadiDomainType { typedef QSharedPointer Ptr; - Event(const QString &resource, const QString &identifier, qint64 revision):AkonadiDomainType(resource, identifier, revision){}; - + using AkonadiDomainType::AkonadiDomainType; }; -class Todo : public AkonadiDomainType { -public: +struct Todo : public AkonadiDomainType { typedef QSharedPointer Ptr; + using AkonadiDomainType::AkonadiDomainType; }; -class Calendar : public AkonadiDomainType { -public: +struct Calendar : public AkonadiDomainType { typedef QSharedPointer Ptr; + using AkonadiDomainType::AkonadiDomainType; }; class Mail : public AkonadiDomainType { diff --git a/common/domain/event.fbs b/common/domain/event.fbs new file mode 100644 index 0000000..6865cc5 --- /dev/null +++ b/common/domain/event.fbs @@ -0,0 +1,10 @@ +namespace Akonadi2.Domain.Buffer; + +table Event { + summary:string; + description:string; + attachment:[ubyte]; +} + +root_type Event; +file_identifier "AKFB"; diff --git a/common/entitybuffer.fbs b/common/entitybuffer.fbs new file mode 100644 index 0000000..28c9b2a --- /dev/null +++ b/common/entitybuffer.fbs @@ -0,0 +1,9 @@ +namespace Akonadi2; + +table EntityBuffer { + metadata: [ubyte]; + resource: [ubyte]; + local: [ubyte]; +} + +root_type EntityBuffer; diff --git a/common/metadata.fbs b/common/metadata.fbs new file mode 100644 index 0000000..71684b6 --- /dev/null +++ b/common/metadata.fbs @@ -0,0 +1,7 @@ +namespace Akonadi2; + +table Metadata { + revision: ulong; +} + +root_type Metadata; diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 739909d..5ca8b95 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -23,6 +23,9 @@ #include #include #include +#include +#include "entitybuffer_generated.h" +#include "metadata_generated.h" namespace Akonadi2 { @@ -69,11 +72,40 @@ void Pipeline::null() state.step(); } -void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity) +void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t size) { const qint64 newRevision = storage().maxRevision() + 1; - //FIXME this should go into a preprocessor - storage().write(key, key.size(), reinterpret_cast(entity.GetBufferPointer()), entity.GetSize()); + + flatbuffers::FlatBufferBuilder fbb; + auto builder = Akonadi2::EntityBufferBuilder(fbb); + + //Add metadata buffer + { + flatbuffers::FlatBufferBuilder metadataFbb; + auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); + metadataBuilder.add_revision(newRevision); + auto metadataBuffer = metadataBuilder.Finish(); + Akonadi2::FinishMetadataBuffer(fbb, metadataBuffer); + //TODO use memcpy + auto metadata = fbb.CreateVector(metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); + builder.add_metadata(metadata); + } + + //Add resource buffer + { + //TODO use memcpy + auto resource = fbb.CreateVector(static_cast(resourceBufferData), size); + builder.add_resource(resource); + } + + //We don't have a local buffer yet + // builder.add_local(); + + auto buffer = builder.Finish(); + Akonadi2::FinishEntityBufferBuffer(fbb, buffer); + + qDebug() << "writing new entity" << key; + storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); storage().setMaxRevision(newRevision); PipelineState state(this, NewPipeline, key, d->newPipeline); @@ -81,7 +113,7 @@ void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder & state.step(); } -void Pipeline::modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta) +void Pipeline::modifiedEntity(const QByteArray &key, void *data, size_t size) { PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline); d->activePipelines << state; diff --git a/common/pipeline.h b/common/pipeline.h index d7048ff..159cc1c 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -46,10 +46,11 @@ public: Storage &storage() const; - // domain objects needed here void null(); - void newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity); - void modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta); + //FIXME We should probably directly provide a DomainTypeAdapter here. The data has already been written and we only need to read it for processing. And we need to read all buffers. + void newEntity(const QByteArray &key, void *resourceBufferData, size_t size); + //TODO Send local buffer data as well? + void modifiedEntity(const QByteArray &key, void *data, size_t size); void deletedEntity(const QByteArray &key); Q_SIGNALS: @@ -72,7 +73,6 @@ private: class AKONADI2COMMON_EXPORT PipelineState { public: - // domain objects? PipelineState(); PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters); PipelineState(const PipelineState &other); diff --git a/common/storage.h b/common/storage.h index 6cfa3d6..6ae7838 100644 --- a/common/storage.h +++ b/common/storage.h @@ -52,7 +52,7 @@ public: //TODO: row removal //TODO: cursor based read //TODO: query? - bool write(const char *key, size_t keySize, const char *value, size_t valueSize); + bool write(void const *key, size_t keySize, void const *value, size_t valueSize); bool write(const std::string &sKey, const std::string &sValue); void read(const std::string &sKey, const std::function &resultHandler); diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index eeb0045..42e3d33 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -194,17 +194,17 @@ void Storage::abortTransaction() d->transaction = 0; } -bool Storage::write(const char *key, size_t keySize, const char *value, size_t valueSize) -{ - return write(std::string(key, keySize), std::string(value, valueSize)); -} - -bool Storage::write(const std::string &sKey, const std::string &sValue) +bool Storage::write(void const *keyPtr, size_t keySize, void const *valuePtr, size_t valueSize) { if (!d->env) { return false; } + if (d->mode == ReadOnly) { + std::cerr << "tried to write in read-only mode." << std::endl; + return false; + } + const bool implicitTransaction = !d->transaction || d->readTransaction; if (implicitTransaction) { // TODO: if this fails, still try the write below? @@ -215,10 +215,10 @@ bool Storage::write(const std::string &sKey, const std::string &sValue) int rc; MDB_val key, data; - key.mv_size = sKey.size(); - key.mv_data = (void*)sKey.data(); - data.mv_size = sValue.size(); - data.mv_data = (void*)sValue.data(); + key.mv_size = keySize; + key.mv_data = const_cast(keyPtr); + data.mv_size = valueSize; + data.mv_data = const_cast(valuePtr); rc = mdb_put(d->transaction, d->dbi, &key, &data, 0); if (rc) { @@ -236,6 +236,11 @@ bool Storage::write(const std::string &sKey, const std::string &sValue) return !rc; } +bool Storage::write(const std::string &sKey, const std::string &sValue) +{ + return write(const_cast(sKey.data()), sKey.size(), const_cast(sValue.data()), sValue.size()); +} + void Storage::read(const std::string &sKey, const std::function &resultHandler, const std::function &errorHandler) diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp index dd634f1..2ba0dcf 100644 --- a/common/test/clientapitest.cpp +++ b/common/test/clientapitest.cpp @@ -31,7 +31,7 @@ private Q_SLOTS: void testLoad() { DummyResourceFacade facade; - facade.results << QSharedPointer::create("resource", "id", 0); + facade.results << QSharedPointer::create("resource", "id", 0, QSharedPointer()); Akonadi2::FacadeFactory::instance().registerFacade("dummyresource", [facade](){ return new DummyResourceFacade(facade); }); diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index c2871bb..458aba6 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp @@ -25,15 +25,51 @@ #include "common/resourceaccess.h" #include "common/commands.h" #include "dummycalendar_generated.h" +#include "event_generated.h" +#include "entitybuffer_generated.h" +#include "metadata_generated.h" using namespace DummyCalendar; using namespace flatbuffers; +/** + * The property mapper holds accessor functions for all properties. + * + * It is by default initialized with accessors that access the local-only buffer, + * and resource simply have to overwrite those accessors. + */ +template +class PropertyMapper +{ +public: + void setProperty(const QString &key, const QVariant &value, BufferType *buffer) + { + if (mWriteAccessors.contains(key)) { + auto accessor = mWriteAccessors.value(key); + return accessor(value, buffer); + } + } + + virtual QVariant getProperty(const QString &key, BufferType const *buffer) const + { + if (mReadAccessors.contains(key)) { + auto accessor = mReadAccessors.value(key); + return accessor(buffer); + } + return QVariant(); + } + QHash > mReadAccessors; + QHash > mWriteAccessors; +}; + DummyResourceFacade::DummyResourceFacade() : Akonadi2::StoreFacade(), mResourceAccess(new Akonadi2::ResourceAccess("org.kde.dummy")) { - // connect(mResourceAccess.data(), &ResourceAccess::ready, this, onReadyChanged); + PropertyMapper mapper; + mapper.mReadAccessors.insert("summary", [](DummyEvent const *buffer) -> QVariant { + return QString::fromStdString(buffer->summary()->c_str()); + }); } DummyResourceFacade::~DummyResourceFacade() @@ -65,32 +101,43 @@ void DummyResourceFacade::remove(const Akonadi2::Domain::Event &domainObject) //-how do we free/munmap the data if we don't know when no one references it any longer? => no munmap needed, but read transaction to keep pointer alive //-we could bind the lifetime to the query //=> perhaps do heap allocate and use smart pointer? -class DummyEventAdaptor : public Akonadi2::Domain::Event +// + + +//This will become a generic implementation that simply takes the resource buffer and local buffer pointer +class DummyEventAdaptor : public Akonadi2::Domain::BufferAdaptor { public: - DummyEventAdaptor(const QString &resource, const QString &identifier, qint64 revision) - :Akonadi2::Domain::Event(resource, identifier, revision) + DummyEventAdaptor() + : BufferAdaptor() { + } - //TODO - // void setProperty(const QString &key, const QVariant &value) - // { - // //Record changes to send to resource? - // //The buffer is readonly - // } + void setProperty(const QString &key, const QVariant &value) + { + if (mResourceMapper.mWriteAccessors.contains(key)) { + // mResourceMapper.setProperty(key, value, mResourceBuffer); + } else { + // mLocalMapper.; + } + } virtual QVariant getProperty(const QString &key) const { - if (key == "summary") { - //FIXME how do we check availability for on-demand request? - return QString::fromStdString(buffer->summary()->c_str()); + if (mResourceBuffer && mResourceMapper.mReadAccessors.contains(key)) { + return mResourceMapper.getProperty(key, mResourceBuffer); + } else if (mLocalBuffer) { + return mLocalMapper.getProperty(key, mLocalBuffer); } return QVariant(); } - //Data is read-only - DummyEvent const *buffer; + Akonadi2::Domain::Buffer::Event const *mLocalBuffer; + DummyEvent const *mResourceBuffer; + + PropertyMapper mLocalMapper; + PropertyMapper mResourceMapper; //Keep query alive so values remain valid QSharedPointer storage; @@ -140,6 +187,7 @@ void DummyResourceFacade::load(const Akonadi2::Query &query, const std::function qDebug() << "load called"; synchronizeResource([=]() { + qDebug() << "sync complete"; //Now that the sync is complete we can execute the query const auto preparedQuery = prepareQuery(query); @@ -151,15 +199,19 @@ void DummyResourceFacade::load(const Akonadi2::Query &query, const std::function storage->startTransaction(Akonadi2::Storage::ReadOnly); //Because we have no indexes yet, we always do a full scan storage->scan("", [=](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { - //TODO read the three buffers qDebug() << QString::fromStdString(std::string(static_cast(keyValue), keySize)); - auto eventBuffer = GetDummyEvent(dataValue); - if (preparedQuery && preparedQuery(std::string(static_cast(keyValue), keySize), eventBuffer)) { - //TODO set proper revision - qint64 revision = 0; - auto event = QSharedPointer::create("org.kde.dummy", QString::fromUtf8(static_cast(keyValue), keySize), revision); - event->buffer = eventBuffer; - event->storage = storage; + auto buffer = Akonadi2::GetEntityBuffer(dataValue); + auto resourceBuffer = GetDummyEvent(buffer->resource()); + auto metadataBuffer = Akonadi2::GetMetadata(buffer->resource()); + auto localBuffer = Akonadi2::Domain::Buffer::GetEvent(buffer->local()); + //We probably only want to create all buffers after the scan + if (preparedQuery && preparedQuery(std::string(static_cast(keyValue), keySize), resourceBuffer)) { + qint64 revision = metadataBuffer->revision(); + auto adaptor = QSharedPointer::create(); + adaptor->mLocalBuffer = localBuffer; + adaptor->mResourceBuffer = resourceBuffer; + adaptor->storage = storage; + auto event = QSharedPointer::create("org.kde.dummy", QString::fromUtf8(static_cast(keyValue), keySize), revision, adaptor); resultCallback(event); } return true; diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index 2c43981..6b93985 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -103,7 +103,7 @@ void DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) const auto key = QUuid::createUuid().toString().toUtf8(); //TODO can we really just start populating the buffer and pass the buffer builder? qDebug() << "new event"; - pipeline->newEntity(key, m_fbb); + pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); } else { //modification //TODO diff and create modification if necessary } @@ -121,7 +121,7 @@ void DummyResource::processCommand(int commandId, const QByteArray &data, uint s builder .add_summary(m_fbb.CreateString("summary summary!")); auto buffer = builder.Finish(); DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); - pipeline->newEntity("fakekey", m_fbb); + pipeline->newEntity("fakekey", m_fbb.GetBufferPointer(), m_fbb.GetSize()); m_fbb.Clear(); } -- cgit v1.2.3