From bc2a95cad05e454a84c317f1078edb329bd3afd4 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 15 Jan 2015 01:56:09 +0100 Subject: Writing from facade. --- common/CMakeLists.txt | 1 + common/commands/createentity.fbs | 2 +- common/entitybuffer.cpp | 9 +++-- common/entitybuffer.h | 2 +- common/metadata.fbs | 1 + common/pipeline.cpp | 79 +++++++++++++++++++++++++++------------- common/pipeline.h | 41 +++++---------------- common/queuedcommand.fbs | 11 ++++++ common/resourceaccess.h | 2 +- 9 files changed, 85 insertions(+), 63 deletions(-) create mode 100644 common/queuedcommand.fbs (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 671d1cd..6f8fee3 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -10,6 +10,7 @@ generate_flatbuffers( domain/event entity metadata + queuedcommand ) if (STORAGE_unqlite) diff --git a/common/commands/createentity.fbs b/common/commands/createentity.fbs index 564c231..23eeff9 100644 --- a/common/commands/createentity.fbs +++ b/common/commands/createentity.fbs @@ -1,4 +1,4 @@ -namespace Akonadi2; +namespace Akonadi2.Commands; table CreateEntity { domainType: string; diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp index aa5847c..4af84ef 100644 --- a/common/entitybuffer.cpp +++ b/common/entitybuffer.cpp @@ -56,11 +56,12 @@ void EntityBuffer::extractResourceBuffer(void *dataValue, int dataSize, const st } } -void EntityBuffer::assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize) +void EntityBuffer::assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize) { - auto metadata = fbb.CreateVector(static_cast(metadataData), metadataSize); - auto resource = fbb.CreateVector(static_cast(resourceData), resourceSize); - auto local = fbb.CreateVector(static_cast(localData), localSize); + qDebug() << "res size: " << resourceSize; + auto metadata = fbb.CreateVector(static_cast(metadataData), metadataSize); + auto resource = fbb.CreateVector(static_cast(resourceData), resourceSize); + auto local = fbb.CreateVector(static_cast(localData), localSize); auto builder = Akonadi2::EntityBuilder(fbb); builder.add_metadata(metadata); builder.add_resource(resource); diff --git a/common/entitybuffer.h b/common/entitybuffer.h index 600b04d..097b450 100644 --- a/common/entitybuffer.h +++ b/common/entitybuffer.h @@ -15,7 +15,7 @@ public: const Entity &entity(); static void extractResourceBuffer(void *dataValue, int dataSize, const std::function &handler); - static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize); + static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize); private: const Entity *mEntity; diff --git a/common/metadata.fbs b/common/metadata.fbs index 34a8df2..bb1163d 100644 --- a/common/metadata.fbs +++ b/common/metadata.fbs @@ -3,6 +3,7 @@ namespace Akonadi2; table Metadata { revision: ulong; processed: bool = true; + processingProgress: [string]; } root_type Metadata; diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 10bae54..9cc7450 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -23,10 +23,13 @@ #include #include #include +#include #include #include "entity_generated.h" #include "metadata_generated.h" +#include "createentity_generated.h" #include "entitybuffer.h" +#include "async/src/async.h" namespace Akonadi2 { @@ -90,39 +93,59 @@ void Pipeline::null() // state.step(); } -void Pipeline::newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size) +Async::Job Pipeline::newEntity(void const *command, size_t size) { - const qint64 newRevision = storage().maxRevision() + 1; - - //Add metadata buffer - flatbuffers::FlatBufferBuilder metadataFbb; - auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); - metadataBuilder.add_revision(newRevision); - metadataBuilder.add_processed(false); - auto metadataBuffer = metadataBuilder.Finish(); - Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); - - flatbuffers::FlatBufferBuilder fbb; - EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), resourceBufferData, size, 0, 0); - - storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); - storage().setMaxRevision(newRevision); + qDebug() << "new entity"; + Async::start([&](Async::Future future) { + + //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. + const auto key = QUuid::createUuid().toString().toUtf8(); + + //TODO figure out if we already have created a revision for the message? + const qint64 newRevision = storage().maxRevision() + 1; + + auto createEntity = Akonadi2::Commands::GetCreateEntity(command); + //TODO rename createEntitiy->domainType to bufferType + const QString entityType = QString::fromUtf8(reinterpret_cast(createEntity->domainType()->Data()), createEntity->domainType()->size()); + auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); + // + // const QString entityType; + // auto entity = Akonadi2::GetEntity(0); + + //Add metadata buffer + flatbuffers::FlatBufferBuilder metadataFbb; + auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); + metadataBuilder.add_revision(newRevision); + metadataBuilder.add_processed(false); + auto metadataBuffer = metadataBuilder.Finish(); + Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); + //TODO we should reserve some space in metadata for in-place updates + + flatbuffers::FlatBufferBuilder fbb; + EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); + + storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); + storage().setMaxRevision(newRevision); + + PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { + future.setFinished(); + }); + d->activePipelines << state; + state.step(); - PipelineState state(this, NewPipeline, key, d->newPipeline[entityType]); - d->activePipelines << state; - state.step(); + }); } void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) { - PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType]); + PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [](){}); d->activePipelines << state; state.step(); } void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) { - PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType]); + PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [](){}); d->activePipelines << state; state.step(); } @@ -160,6 +183,7 @@ void Pipeline::pipelineCompleted(const PipelineState &state) } if (state.type() != NullPipeline) { + //TODO what revision is finalized? emit revisionUpdated(); } scheduleStep(); @@ -172,12 +196,13 @@ void Pipeline::pipelineCompleted(const PipelineState &state) class PipelineState::Private : public QSharedData { public: - Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector filters) + Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector filters, const std::function &c) : pipeline(p), type(t), key(k), filterIt(filters), - idle(true) + idle(true), + callback(c) {} Private() @@ -191,6 +216,7 @@ public: QByteArray key; QVectorIterator filterIt; bool idle; + std::function callback; }; PipelineState::PipelineState() @@ -199,8 +225,8 @@ PipelineState::PipelineState() } -PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters) - : d(new Private(pipeline, type, key, filters)) +PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters, const std::function &callback) + : d(new Private(pipeline, type, key, filters, callback)) { } @@ -247,6 +273,7 @@ void PipelineState::step() d->idle = false; if (d->filterIt.hasNext()) { + //TODO skip step if already processed d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { auto entity = Akonadi2::GetEntity(dataValue); d->filterIt.next()->process(*this, *entity); @@ -254,11 +281,13 @@ void PipelineState::step() }); } else { d->pipeline->pipelineCompleted(*this); + d->callback(); } } void PipelineState::processingCompleted(Preprocessor *filter) { + //TODO record processing progress if (d->pipeline && filter == d->filterIt.peekPrevious()) { d->idle = true; d->pipeline->pipelineStepped(*this); diff --git a/common/pipeline.h b/common/pipeline.h index 6b847f5..918d21e 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -27,7 +27,7 @@ #include #include -#include //For domain types +#include "async/src/async.h" #include "entity_generated.h" @@ -49,34 +49,13 @@ public: Storage &storage() const; - // template - // Storage &storage() const; - - template - void setPreprocessors(Type type, const QVector &preprocessors) - { - setPreprocessors(Akonadi2::Domain::getTypeName(), type, preprocessors); - } + void setPreprocessors(const QString &entityType, Type pipelineType, const QVector &preprocessors); void null(); - template - void newEntity(const QByteArray &key, void *resourceBufferData, size_t size) - { - newEntity(Akonadi2::Domain::getTypeName(), key, resourceBufferData, size); - } - - template - void modifiedEntity(const QByteArray &key, void *data, size_t size) - { - modifiedEntity(Akonadi2::Domain::getTypeName(), key, data, size); - } - - template - void deletedEntity(const QByteArray &key) - { - deletedEntity(Akonadi2::Domain::getTypeName(), key); - } + Async::Job newEntity(void const *command, size_t size); + void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); + void deletedEntity(const QString &entityType, const QByteArray &key); Q_SIGNALS: void revisionUpdated(); @@ -86,10 +65,6 @@ private Q_SLOTS: void stepPipelines(); private: - void setPreprocessors(const QString &entityType, Type pipelineType, const QVector &preprocessors); - void newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size); - void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); - void deletedEntity(const QString &entityType, const QByteArray &key); void pipelineStepped(const PipelineState &state); void pipelineCompleted(const PipelineState &state); void scheduleStep(); @@ -104,7 +79,7 @@ class AKONADI2COMMON_EXPORT PipelineState { public: PipelineState(); - PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters); + PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters, const std::function &callback); PipelineState(const PipelineState &other); ~PipelineState(); @@ -114,6 +89,7 @@ public: bool isIdle() const; QByteArray key() const; Pipeline::Type type() const; + //TODO expose command void step(); void processingCompleted(Preprocessor *filter); @@ -129,7 +105,10 @@ public: Preprocessor(); virtual ~Preprocessor(); + //TODO pass actual command as well, for changerecording virtual void process(PipelineState state, const Akonadi2::Entity &); + //TODO to record progress + // virtual QString id(); protected: void processingCompleted(PipelineState state); diff --git a/common/queuedcommand.fbs b/common/queuedcommand.fbs new file mode 100644 index 0000000..0ca899f --- /dev/null +++ b/common/queuedcommand.fbs @@ -0,0 +1,11 @@ +namespace Akonadi2; + +table QueuedCommand { + commandId: int; + command: [ubyte]; + // entityId: string; + // sourceRevision: ulong; + // targetRevision: [ubyte]; +} + +root_type QueuedCommand; diff --git a/common/resourceaccess.h b/common/resourceaccess.h index d45ebde..0a333f6 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -43,7 +43,7 @@ public: //TODO use jobs void sendCommand(int commandId, const std::function &callback = std::function()); - void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback); + void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback = std::function()); Async::Job synchronizeResource(); public Q_SLOTS: -- cgit v1.2.3