From c83c2ef64b5a1e4b1dc0102df36687caebb96ff0 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 24 Dec 2014 02:15:41 +0100 Subject: unifying buffer, and a better way to implement domain object adapters. --- common/CMakeLists.txt | 19 +++++++++++-------- common/clientapi.h | 38 +++++++++++++++++++++++++++----------- common/domain/event.fbs | 10 ++++++++++ common/entitybuffer.fbs | 9 +++++++++ common/metadata.fbs | 7 +++++++ common/pipeline.cpp | 40 ++++++++++++++++++++++++++++++++++++---- common/pipeline.h | 8 ++++---- common/storage.h | 2 +- common/storage_lmdb.cpp | 25 +++++++++++++++---------- common/test/clientapitest.cpp | 2 +- 10 files changed, 121 insertions(+), 39 deletions(-) create mode 100644 common/domain/event.fbs create mode 100644 common/entitybuffer.fbs create mode 100644 common/metadata.fbs (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 001dab5..ec13e07 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -1,13 +1,16 @@ project(akonadi2common) generate_flatbuffers( - commands/commandcompletion - commands/createentity - commands/deleteentity - commands/fetchentity - commands/handshake - commands/modifyentity - commands/revisionupdate - ) + commands/commandcompletion + commands/createentity + commands/deleteentity + commands/fetchentity + commands/handshake + commands/modifyentity + commands/revisionupdate + domain/event + entitybuffer + metadata +) if (STORAGE_unqlite) add_definitions(-DUNQLITE_ENABLE_THREADS -fpermissive) diff --git a/common/clientapi.h b/common/clientapi.h index d2757e7..ba0cb19 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -173,18 +173,36 @@ namespace Akonadi2 { */ namespace Domain { +/** + * This class has to be implemented by resources and can be used as generic interface to access the buffer properties + */ +class BufferAdaptor { +public: + virtual QVariant getProperty(const QString &key) { return QVariant(); } + virtual void setProperty(const QString &key, const QVariant &value) {} +}; + +/** + * The domain type interface has two purposes: + * * provide a unified interface to read buffers (for zero-copy reading) + * * record changes to generate changesets for modifications + */ class AkonadiDomainType { public: - AkonadiDomainType(const QString &resourceName, const QString &identifier, qint64 revision) - : mResourceName(resourceName), + AkonadiDomainType(const QString &resourceName, const QString &identifier, qint64 revision, const QSharedPointer &adaptor) + : mAdaptor(adaptor), + mResourceName(resourceName), mIdentifier(identifier), mRevision(revision) { } - virtual QVariant getProperty(const QString &key){ return QVariant(); } + virtual QVariant getProperty(const QString &key){ return mAdaptor->getProperty(key); } + virtual void setProperty(const QString &key, const QVariant &value){ mChangeSet.insert(key, value); } private: + QSharedPointer mAdaptor; + QHash mChangeSet; /* * Each domain object needs to store the resource, identifier, revision triple so we can link back to the storage location. */ @@ -193,21 +211,19 @@ private: qint64 mRevision; }; -class Event : public AkonadiDomainType { -public: +struct Event : public AkonadiDomainType { typedef QSharedPointer Ptr; - Event(const QString &resource, const QString &identifier, qint64 revision):AkonadiDomainType(resource, identifier, revision){}; - + using AkonadiDomainType::AkonadiDomainType; }; -class Todo : public AkonadiDomainType { -public: +struct Todo : public AkonadiDomainType { typedef QSharedPointer Ptr; + using AkonadiDomainType::AkonadiDomainType; }; -class Calendar : public AkonadiDomainType { -public: +struct Calendar : public AkonadiDomainType { typedef QSharedPointer Ptr; + using AkonadiDomainType::AkonadiDomainType; }; class Mail : public AkonadiDomainType { diff --git a/common/domain/event.fbs b/common/domain/event.fbs new file mode 100644 index 0000000..6865cc5 --- /dev/null +++ b/common/domain/event.fbs @@ -0,0 +1,10 @@ +namespace Akonadi2.Domain.Buffer; + +table Event { + summary:string; + description:string; + attachment:[ubyte]; +} + +root_type Event; +file_identifier "AKFB"; diff --git a/common/entitybuffer.fbs b/common/entitybuffer.fbs new file mode 100644 index 0000000..28c9b2a --- /dev/null +++ b/common/entitybuffer.fbs @@ -0,0 +1,9 @@ +namespace Akonadi2; + +table EntityBuffer { + metadata: [ubyte]; + resource: [ubyte]; + local: [ubyte]; +} + +root_type EntityBuffer; diff --git a/common/metadata.fbs b/common/metadata.fbs new file mode 100644 index 0000000..71684b6 --- /dev/null +++ b/common/metadata.fbs @@ -0,0 +1,7 @@ +namespace Akonadi2; + +table Metadata { + revision: ulong; +} + +root_type Metadata; diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 739909d..5ca8b95 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -23,6 +23,9 @@ #include #include #include +#include +#include "entitybuffer_generated.h" +#include "metadata_generated.h" namespace Akonadi2 { @@ -69,11 +72,40 @@ void Pipeline::null() state.step(); } -void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity) +void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t size) { const qint64 newRevision = storage().maxRevision() + 1; - //FIXME this should go into a preprocessor - storage().write(key, key.size(), reinterpret_cast(entity.GetBufferPointer()), entity.GetSize()); + + flatbuffers::FlatBufferBuilder fbb; + auto builder = Akonadi2::EntityBufferBuilder(fbb); + + //Add metadata buffer + { + flatbuffers::FlatBufferBuilder metadataFbb; + auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); + metadataBuilder.add_revision(newRevision); + auto metadataBuffer = metadataBuilder.Finish(); + Akonadi2::FinishMetadataBuffer(fbb, metadataBuffer); + //TODO use memcpy + auto metadata = fbb.CreateVector(metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); + builder.add_metadata(metadata); + } + + //Add resource buffer + { + //TODO use memcpy + auto resource = fbb.CreateVector(static_cast(resourceBufferData), size); + builder.add_resource(resource); + } + + //We don't have a local buffer yet + // builder.add_local(); + + auto buffer = builder.Finish(); + Akonadi2::FinishEntityBufferBuffer(fbb, buffer); + + qDebug() << "writing new entity" << key; + storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); storage().setMaxRevision(newRevision); PipelineState state(this, NewPipeline, key, d->newPipeline); @@ -81,7 +113,7 @@ void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder & state.step(); } -void Pipeline::modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta) +void Pipeline::modifiedEntity(const QByteArray &key, void *data, size_t size) { PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline); d->activePipelines << state; diff --git a/common/pipeline.h b/common/pipeline.h index d7048ff..159cc1c 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -46,10 +46,11 @@ public: Storage &storage() const; - // domain objects needed here void null(); - void newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity); - void modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta); + //FIXME We should probably directly provide a DomainTypeAdapter here. The data has already been written and we only need to read it for processing. And we need to read all buffers. + void newEntity(const QByteArray &key, void *resourceBufferData, size_t size); + //TODO Send local buffer data as well? + void modifiedEntity(const QByteArray &key, void *data, size_t size); void deletedEntity(const QByteArray &key); Q_SIGNALS: @@ -72,7 +73,6 @@ private: class AKONADI2COMMON_EXPORT PipelineState { public: - // domain objects? PipelineState(); PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters); PipelineState(const PipelineState &other); diff --git a/common/storage.h b/common/storage.h index 6cfa3d6..6ae7838 100644 --- a/common/storage.h +++ b/common/storage.h @@ -52,7 +52,7 @@ public: //TODO: row removal //TODO: cursor based read //TODO: query? - bool write(const char *key, size_t keySize, const char *value, size_t valueSize); + bool write(void const *key, size_t keySize, void const *value, size_t valueSize); bool write(const std::string &sKey, const std::string &sValue); void read(const std::string &sKey, const std::function &resultHandler); diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index eeb0045..42e3d33 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -194,17 +194,17 @@ void Storage::abortTransaction() d->transaction = 0; } -bool Storage::write(const char *key, size_t keySize, const char *value, size_t valueSize) -{ - return write(std::string(key, keySize), std::string(value, valueSize)); -} - -bool Storage::write(const std::string &sKey, const std::string &sValue) +bool Storage::write(void const *keyPtr, size_t keySize, void const *valuePtr, size_t valueSize) { if (!d->env) { return false; } + if (d->mode == ReadOnly) { + std::cerr << "tried to write in read-only mode." << std::endl; + return false; + } + const bool implicitTransaction = !d->transaction || d->readTransaction; if (implicitTransaction) { // TODO: if this fails, still try the write below? @@ -215,10 +215,10 @@ bool Storage::write(const std::string &sKey, const std::string &sValue) int rc; MDB_val key, data; - key.mv_size = sKey.size(); - key.mv_data = (void*)sKey.data(); - data.mv_size = sValue.size(); - data.mv_data = (void*)sValue.data(); + key.mv_size = keySize; + key.mv_data = const_cast(keyPtr); + data.mv_size = valueSize; + data.mv_data = const_cast(valuePtr); rc = mdb_put(d->transaction, d->dbi, &key, &data, 0); if (rc) { @@ -236,6 +236,11 @@ bool Storage::write(const std::string &sKey, const std::string &sValue) return !rc; } +bool Storage::write(const std::string &sKey, const std::string &sValue) +{ + return write(const_cast(sKey.data()), sKey.size(), const_cast(sValue.data()), sValue.size()); +} + void Storage::read(const std::string &sKey, const std::function &resultHandler, const std::function &errorHandler) diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp index dd634f1..2ba0dcf 100644 --- a/common/test/clientapitest.cpp +++ b/common/test/clientapitest.cpp @@ -31,7 +31,7 @@ private Q_SLOTS: void testLoad() { DummyResourceFacade facade; - facade.results << QSharedPointer::create("resource", "id", 0); + facade.results << QSharedPointer::create("resource", "id", 0, QSharedPointer()); Akonadi2::FacadeFactory::instance().registerFacade("dummyresource", [facade](){ return new DummyResourceFacade(facade); }); -- cgit v1.2.3