From 9b2257d680a5e4fa2fda8cf3302f25054a06710e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 28 Dec 2014 14:44:50 +0100 Subject: Buffers wrapped into entity buffer, async command progress tracking. --- common/CMakeLists.txt | 7 ++-- common/entity.fbs | 9 +++++ common/entitybuffer.cpp | 53 ++++++++++++++++++++++++++ common/entitybuffer.fbs | 9 ----- common/entitybuffer.h | 22 +++++++++++ common/pipeline.cpp | 29 +++++++------- common/pipeline.h | 1 + common/resource.cpp | 6 ++- common/resource.h | 3 +- common/resourceaccess.cpp | 96 ++++++++++++++++++++++++++++------------------- common/resourceaccess.h | 8 +++- 11 files changed, 174 insertions(+), 69 deletions(-) create mode 100644 common/entity.fbs create mode 100644 common/entitybuffer.cpp delete mode 100644 common/entitybuffer.fbs create mode 100644 common/entitybuffer.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index ec13e07..1a9a812 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -8,7 +8,7 @@ generate_flatbuffers( commands/modifyentity commands/revisionupdate domain/event - entitybuffer + entity metadata ) @@ -17,10 +17,11 @@ if (STORAGE_unqlite) set(storage_SRCS unqlite/unqlite.c storage_unqlite.cpp) else (STORAGE_unqlite) set(storage_SRCS storage_lmdb.cpp) - set(storage_LIBS lmdb) + set(storage_LIBS ${lmdb}) endif (STORAGE_unqlite) set(command_SRCS + entitybuffer.cpp clientapi.cpp commands.cpp console.cpp @@ -35,7 +36,7 @@ add_library(${PROJECT_NAME} SHARED ${command_SRCS}) generate_export_header(${PROJECT_NAME} BASE_NAME Akonadi2Common EXPORT_FILE_NAME akonadi2common_export.h) SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX) qt5_use_modules(${PROJECT_NAME} Widgets Network) -target_link_libraries(${PROJECT_NAME} ${storage_LIBS}) +target_link_libraries(${PROJECT_NAME} ${storage_LIBS} akonadi2async) install(TARGETS ${PROJECT_NAME} DESTINATION lib) add_subdirectory(test) diff --git a/common/entity.fbs b/common/entity.fbs new file mode 100644 index 0000000..565b1a7 --- /dev/null +++ b/common/entity.fbs @@ -0,0 +1,9 @@ +namespace Akonadi2; + +table Entity { + metadata: [ubyte]; + resource: [ubyte]; + local: [ubyte]; +} + +root_type Entity; diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp new file mode 100644 index 0000000..a78e91d --- /dev/null +++ b/common/entitybuffer.cpp @@ -0,0 +1,53 @@ +#include "entitybuffer.h" + +#include "entity_generated.h" +#include "metadata_generated.h" +#include + +using namespace Akonadi2; + +EntityBuffer::EntityBuffer(void *dataValue, int dataSize) + : mEntity(nullptr) +{ + flatbuffers::Verifier verifyer(reinterpret_cast(dataValue), dataSize); + // Q_ASSERT(Akonadi2::VerifyEntity(verifyer)); + if (!Akonadi2::VerifyEntityBuffer(verifyer)) { + qWarning() << "invalid buffer"; + } else { + mEntity = Akonadi2::GetEntity(dataValue); + } +} + +const flatbuffers::Vector* EntityBuffer::resourceBuffer() +{ + if (!mEntity) { + qDebug() << "no buffer"; + return nullptr; + } + return mEntity->resource(); +} + +const flatbuffers::Vector* EntityBuffer::metadataBuffer() +{ + if (!mEntity) { + return nullptr; + } + return mEntity->metadata(); +} + +const flatbuffers::Vector* EntityBuffer::localBuffer() +{ + if (!mEntity) { + return nullptr; + } + return mEntity->local(); +} + +void EntityBuffer::extractResourceBuffer(void *dataValue, int dataSize, const std::function *)> &handler) +{ + Akonadi2::EntityBuffer buffer(dataValue, dataSize); + if (auto resourceData = buffer.resourceBuffer()) { + handler(resourceData); + } +} + diff --git a/common/entitybuffer.fbs b/common/entitybuffer.fbs deleted file mode 100644 index 28c9b2a..0000000 --- a/common/entitybuffer.fbs +++ /dev/null @@ -1,9 +0,0 @@ -namespace Akonadi2; - -table EntityBuffer { - metadata: [ubyte]; - resource: [ubyte]; - local: [ubyte]; -} - -root_type EntityBuffer; diff --git a/common/entitybuffer.h b/common/entitybuffer.h new file mode 100644 index 0000000..2a7150e --- /dev/null +++ b/common/entitybuffer.h @@ -0,0 +1,22 @@ +#pragma once + +#include +#include + +namespace Akonadi2 { +class Entity; + +class EntityBuffer { +public: + EntityBuffer(void *dataValue, int size); + const flatbuffers::Vector *resourceBuffer(); + const flatbuffers::Vector *metadataBuffer(); + const flatbuffers::Vector *localBuffer(); + + static void extractResourceBuffer(void *dataValue, int dataSize, const std::function *)> &handler); +private: + const Entity *mEntity; +}; + +} + diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 5ca8b95..dc6d389 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -24,7 +24,7 @@ #include #include #include -#include "entitybuffer_generated.h" +#include "entity_generated.h" #include "metadata_generated.h" namespace Akonadi2 @@ -76,33 +76,31 @@ void Pipeline::newEntity(const QByteArray &key, void *resourceBufferData, size_t { const qint64 newRevision = storage().maxRevision() + 1; - flatbuffers::FlatBufferBuilder fbb; - auto builder = Akonadi2::EntityBufferBuilder(fbb); + std::vector metadataData; //Add metadata buffer { flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); auto metadataBuffer = metadataBuilder.Finish(); - Akonadi2::FinishMetadataBuffer(fbb, metadataBuffer); - //TODO use memcpy - auto metadata = fbb.CreateVector(metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); - builder.add_metadata(metadata); + Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); + metadataData.resize(metadataFbb.GetSize()); + std::copy_n(metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), back_inserter(metadataData)); } - //Add resource buffer - { - //TODO use memcpy - auto resource = fbb.CreateVector(static_cast(resourceBufferData), size); - builder.add_resource(resource); - } + flatbuffers::FlatBufferBuilder fbb; + auto metadata = fbb.CreateVector(metadataData.data(), metadataData.size()); + auto resource = fbb.CreateVector(static_cast(resourceBufferData), size); + auto builder = Akonadi2::EntityBuilder(fbb); + builder.add_metadata(metadata); + builder.add_resource(resource); //We don't have a local buffer yet // builder.add_local(); auto buffer = builder.Finish(); - Akonadi2::FinishEntityBufferBuffer(fbb, buffer); + Akonadi2::FinishEntityBuffer(fbb, buffer); qDebug() << "writing new entity" << key; storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); @@ -163,6 +161,9 @@ void Pipeline::pipelineCompleted(const PipelineState &state) emit revisionUpdated(); } scheduleStep(); + if (d->activePipelines.isEmpty()) { + emit pipelinesDrained(); + } } diff --git a/common/pipeline.h b/common/pipeline.h index 159cc1c..6ef8703 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -55,6 +55,7 @@ public: Q_SIGNALS: void revisionUpdated(); + void pipelinesDrained(); private Q_SLOTS: void stepPipelines(); diff --git a/common/resource.cpp b/common/resource.cpp index ae28485..bba6609 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -48,9 +48,11 @@ void Resource::processCommand(int commandId, const QByteArray &data, uint size, pipeline->null(); } -void Resource::synchronizeWithSource(Pipeline *pipeline) +Async::Job Resource::synchronizeWithSource(Pipeline *pipeline) { - pipeline->null(); + return Async::start([pipeline](Async::Future &f) { + pipeline->null(); + }); } class ResourceFactory::Private diff --git a/common/resource.h b/common/resource.h index 0f65e1f..fb42c1b 100644 --- a/common/resource.h +++ b/common/resource.h @@ -21,6 +21,7 @@ #include #include #include +#include namespace Akonadi2 { @@ -33,7 +34,7 @@ public: virtual ~Resource(); virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); - virtual void synchronizeWithSource(Pipeline *pipeline); + virtual Async::Job synchronizeWithSource(Pipeline *pipeline); private: class Private; diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 1706ac4..31b9e79 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -37,38 +37,36 @@ namespace Akonadi2 class QueuedCommand { public: - QueuedCommand(int commandId) - : m_commandId(commandId), - m_bufferSize(0), - m_buffer(0) + QueuedCommand(int commandId, const std::function &callback) + : commandId(commandId), + bufferSize(0), + buffer(0), + callback(callback) {} - QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) - : m_commandId(commandId), - m_bufferSize(fbb.GetSize()), - m_buffer(new char[m_bufferSize]) + QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback) + : commandId(commandId), + bufferSize(fbb.GetSize()), + buffer(new char[bufferSize]), + callback(callback) { - memcpy(m_buffer, fbb.GetBufferPointer(), m_bufferSize); + memcpy(buffer, fbb.GetBufferPointer(), bufferSize); } ~QueuedCommand() { - delete[] m_buffer; - } - - void write(QIODevice *device, uint messageId) - { - // Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); - Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); + delete[] buffer; } private: QueuedCommand(const QueuedCommand &other); QueuedCommand &operator=(const QueuedCommand &rhs); - const int m_commandId; - const uint m_bufferSize; - char *m_buffer; +public: + const int commandId; + const uint bufferSize; + char *buffer; + std::function callback; }; class ResourceAccess::Private @@ -82,7 +80,7 @@ public: QByteArray partialMessageBuffer; flatbuffers::FlatBufferBuilder fbb; QVector commandQueue; - QVector > synchronizeResultHandler; + QMultiMap > resultHandler; uint messageId; }; @@ -130,31 +128,42 @@ bool ResourceAccess::isReady() const return d->socket->isValid(); } -void ResourceAccess::sendCommand(int commandId) +void ResourceAccess::registerCallback(uint messageId, const std::function &callback) +{ + d->resultHandler.insert(messageId, callback); +} + +void ResourceAccess::sendCommand(int commandId, const std::function &callback) { if (isReady()) { log(QString("Sending command %1").arg(commandId)); - Commands::write(d->socket, ++d->messageId, commandId); + d->messageId++; + if (callback) { + registerCallback(d->messageId, callback); + } + Commands::write(d->socket, d->messageId, commandId); } else { - d->commandQueue << new QueuedCommand(commandId); + d->commandQueue << new QueuedCommand(commandId, callback); } } -void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) +void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback) { if (isReady()) { log(QString("Sending command %1").arg(commandId)); - Commands::write(d->socket, ++d->messageId, commandId, fbb); + d->messageId++; + if (callback) { + registerCallback(d->messageId, callback); + } + Commands::write(d->socket, d->messageId, commandId, fbb); } else { - d->commandQueue << new QueuedCommand(commandId, fbb); + d->commandQueue << new QueuedCommand(commandId, fbb, callback); } } void ResourceAccess::synchronizeResource(const std::function &resultHandler) { - sendCommand(Commands::SynchronizeCommand); - //TODO: this should be implemented as a job, so we don't need to store the result handler as member - d->synchronizeResultHandler << resultHandler; + sendCommand(Commands::SynchronizeCommand, resultHandler); } void ResourceAccess::open() @@ -200,7 +209,12 @@ void ResourceAccess::connected() //TODO: serialize instead of blast them all through the socket? log(QString("We have %1 queued commands").arg(d->commandQueue.size())); for (QueuedCommand *command: d->commandQueue) { - command->write(d->socket, ++d->messageId); + d->messageId++; + log(QString("Sending command %1").arg(command->commandId)); + if (command->callback) { + registerCallback(d->messageId, command->callback); + } + Commands::write(d->socket, d->messageId, command->commandId, command->buffer, command->bufferSize); delete command; } d->commandQueue.clear(); @@ -234,6 +248,8 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) if (!d->tryOpenTimer->isActive()) { d->tryOpenTimer->start(); } + } else { + qWarning() << "Failed to start resource"; } } @@ -256,8 +272,7 @@ bool ResourceAccess::processMessageBuffer() return false; } - //messageId is unused, so commented out - //const uint messageId = *(int*)(d->partialMessageBuffer.constData()); + const uint messageId = *(int*)(d->partialMessageBuffer.constData()); const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); @@ -271,18 +286,15 @@ bool ResourceAccess::processMessageBuffer() log(QString("Revision updated to: %1").arg(buffer->revision())); emit revisionChanged(buffer->revision()); - //FIXME: The result handler should be called on completion of the synchronize command, and not upon arbitrary revision updates. - for(auto handler : d->synchronizeResultHandler) { - //FIXME: we should associate the handler with a buffer->id() to avoid prematurely triggering the result handler from a delayed synchronized response (this is relevant for on-demand syncing). - handler(); - } - d->synchronizeResultHandler.clear(); break; } case Commands::CommandCompletion: { auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc + + //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first + QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id())); break; } default: @@ -293,6 +305,14 @@ bool ResourceAccess::processMessageBuffer() return d->partialMessageBuffer.size() >= headerSize; } +void ResourceAccess::callCallbacks(int id) +{ + for(auto handler : d->resultHandler.values(id)) { + handler(); + } + d->resultHandler.remove(id); +} + void ResourceAccess::log(const QString &message) { qDebug() << d->resourceName + ": " + message; diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 7416b25..d79c993 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -40,8 +40,9 @@ public: QString resourceName() const; bool isReady() const; - void sendCommand(int commandId); - void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); + //TODO use jobs + void sendCommand(int commandId, const std::function &callback = std::function()); + void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function &callback); void synchronizeResource(const std::function &resultHandler); public Q_SLOTS: @@ -51,6 +52,7 @@ public Q_SLOTS: Q_SIGNALS: void ready(bool isReady); void revisionChanged(unsigned long long revision); + void commandCompleted(); private Q_SLOTS: //TODO: move these to the Private class @@ -59,9 +61,11 @@ private Q_SLOTS: void connectionError(QLocalSocket::LocalSocketError error); void readResourceMessage(); bool processMessageBuffer(); + void callCallbacks(int id); private: void log(const QString &message); + void registerCallback(uint messageId, const std::function &callback); class Private; Private * const d; -- cgit v1.2.3