From 77944384d24b5005d6b8648572a31a3ae84dd946 Mon Sep 17 00:00:00 2001 From: Aaron Seigo Date: Tue, 16 Dec 2014 22:40:44 +0100 Subject: add pipelines (as a sketch only), message ids and message responses --- CMakeLists.txt | 3 +- common/CMakeLists.txt | 2 + common/commands.cpp | 15 +++++-- common/commands.h | 9 +++-- common/commands/commandcompletion.fbs | 6 +-- common/pipeline.cpp | 75 +++++++++++++++++++++++++++++++++++ common/pipeline.h | 47 ++++++++++++++++++++++ common/resource.cpp | 4 ++ common/resource.h | 8 +++- common/resourceaccess.cpp | 35 ++++++++++------ synchronizer/listener.cpp | 70 ++++++++++++++++++++++++-------- synchronizer/listener.h | 5 +++ 12 files changed, 237 insertions(+), 42 deletions(-) create mode 100644 common/pipeline.cpp create mode 100644 common/pipeline.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 4096fa4..8d33281 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,7 +36,8 @@ endfunction(generate_flatbuffers) set(CMAKE_AUTOMOC ON) add_definitions("-Wall -std=c++0x") -include_directories(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR} ${CMAKE_BINARY_DIR}/common ${FLATBUFFERS_INCLUDE_DIR}) +include_directories(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR} ${FLATBUFFERS_INCLUDE_DIR} ${CMAKE_BINARY_DIR}/common) +include_directories(SYSTEM ${CMAKE_SOURCE_DIR}/common) configure_file(hawd.conf hawd.conf) diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 7de4aa9..001dab5 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -1,5 +1,6 @@ project(akonadi2common) generate_flatbuffers( + commands/commandcompletion commands/createentity commands/deleteentity commands/fetchentity @@ -20,6 +21,7 @@ set(command_SRCS clientapi.cpp commands.cpp console.cpp + pipeline.cpp resource.cpp resourceaccess.cpp storage_common.cpp diff --git a/common/commands.cpp b/common/commands.cpp index ecbbfdb..1dfeabe 100644 --- a/common/commands.cpp +++ b/common/commands.cpp @@ -28,17 +28,23 @@ namespace Akonadi2 namespace Commands { -void write(QIODevice *device, int commandId) +int headerSize() { - write(device, commandId, 0, 0); + return sizeof(int) + (sizeof(uint) * 2); } -void write(QIODevice *device, int commandId, const char *buffer, uint size) +void write(QIODevice *device, int messageId, int commandId) +{ + write(device, messageId, commandId, 0, 0); +} + +void write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size) { if (size > 0 && !buffer) { size = 0; } + device->write((const char*)&messageId, sizeof(int)); device->write((const char*)&commandId, sizeof(int)); device->write((const char*)&size, sizeof(uint)); if (buffer) { @@ -46,9 +52,10 @@ void write(QIODevice *device, int commandId, const char *buffer, uint size) } } -void write(QIODevice *device, int commandId, flatbuffers::FlatBufferBuilder &fbb) +void write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb) { const int dataSize = fbb.GetSize(); + device->write((const char*)&messageId, sizeof(int)); device->write((const char*)&commandId, sizeof(int)); device->write((const char*)&dataSize, sizeof(int)); device->write((const char*)fbb.GetBufferPointer(), dataSize); diff --git a/common/commands.h b/common/commands.h index 874db73..c63bb47 100644 --- a/common/commands.h +++ b/common/commands.h @@ -33,8 +33,8 @@ namespace Commands enum CommandIds { UnknownCommand = 0, - HandshakeCommand, CommandCompletion, + HandshakeCommand, RevisionUpdateCommand, SynchronizeCommand, FetchEntityCommand, @@ -45,9 +45,10 @@ enum CommandIds { CustomCommand = 0xffff }; -void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId); -void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId, const char *buffer, uint size); -void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId, flatbuffers::FlatBufferBuilder &fbb); +int AKONADI2COMMON_EXPORT headerSize(); +void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId); +void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size); +void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb); } diff --git a/common/commands/commandcompletion.fbs b/common/commands/commandcompletion.fbs index 9583108..5330b4f 100644 --- a/common/commands/commandcompletion.fbs +++ b/common/commands/commandcompletion.fbs @@ -1,9 +1,9 @@ namespace Akonadi2; table CommandCompletion { - id: ulong - success: bool - log: ulong = 0 + id: ulong; + success: bool = true; + log: ulong = 0; } root_type CommandCompletion; diff --git a/common/pipeline.cpp b/common/pipeline.cpp new file mode 100644 index 0000000..41def7c --- /dev/null +++ b/common/pipeline.cpp @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2014 Aaron Seigo + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ + +#include "pipeline.h" + +#include + +namespace Akonadi2 +{ + +class Pipeline::Private +{ +public: + Private(const QString &storageName) + : storage(QStandardPaths::writableLocation(QStandardPaths::QStandardPaths::GenericDataLocation) + "/akonadi2", storageName, Akonadi2::Storage::ReadWrite) + { + + } + + Akonadi2::Storage storage; +}; + +Pipeline::Pipeline(const QString &storageName) + : d(new Private(storageName)) +{ +} + +Pipeline::~Pipeline() +{ +} + +Storage &Pipeline::storage() +{ + return d->storage; +} + +void Pipeline::null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) +{ + d->storage.write(key, keySize, buffer, bufferSize); +} + +void Pipeline::newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) +{ + d->storage.write(key, keySize, buffer, bufferSize); +} + +void Pipeline::modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) +{ + d->storage.write(key, keySize, buffer, bufferSize); +} + +void Pipeline::deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) +{ + d->storage.write(key, keySize, buffer, bufferSize); +} + +} // namespace Akonadi2 + diff --git a/common/pipeline.h b/common/pipeline.h new file mode 100644 index 0000000..635e630 --- /dev/null +++ b/common/pipeline.h @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2014 Aaron Seigo + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ + +#pragma once + +#include +#include + +namespace Akonadi2 +{ + +class AKONADI2COMMON_EXPORT Pipeline +{ +public: + Pipeline(const QString &storagePath); + ~Pipeline(); + + Storage &storage(); + void null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); + void newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); + void modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); + void deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); + +private: + class Private; + Private * const d; +}; + +} // namespace Akonadi2 + diff --git a/common/resource.cpp b/common/resource.cpp index 7120d1c..11a03ca 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -39,6 +39,10 @@ Resource::~Resource() //delete d; } +void Resource::processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline) +{ +} + void Resource::synchronizeWithSource() { } diff --git a/common/resource.h b/common/resource.h index 2ecff03..53c0bc1 100644 --- a/common/resource.h +++ b/common/resource.h @@ -18,18 +18,22 @@ * License along with this library. If not, see . */ -#include "clientapi.h" +#include +#include +#include namespace Akonadi2 { -class Resource +class AKONADI2COMMON_EXPORT Resource { public: //TODO: configuration Resource(); virtual ~Resource(); + //TODO: this will need to be async + virtual void processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline); virtual void synchronizeWithSource(); private: diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index a4f3c94..2f7d207 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -22,6 +22,7 @@ #include "common/console.h" #include "common/commands.h" +#include "common/commandcompletion_generated.h" #include "common/handshake_generated.h" #include "common/revisionupdate_generated.h" @@ -54,10 +55,10 @@ public: delete[] m_buffer; } - void write(QIODevice *device) + void write(QIODevice *device, uint messageId) { Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); - Commands::write(device, m_commandId, m_buffer, m_bufferSize); + Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); } private: @@ -80,13 +81,15 @@ public: QByteArray partialMessageBuffer; flatbuffers::FlatBufferBuilder fbb; QVector commandQueue; + uint messageId; }; ResourceAccess::Private::Private(const QString &name, ResourceAccess *q) : resourceName(name), socket(new QLocalSocket(q)), tryOpenTimer(new QTimer(q)), - startingProcess(false) + startingProcess(false), + messageId(0) { } @@ -129,7 +132,7 @@ void ResourceAccess::sendCommand(int commandId) { if (isReady()) { log(QString("Sending command %1").arg(commandId)); - Commands::write(d->socket, commandId); + Commands::write(d->socket, ++d->messageId, commandId); } else { d->commandQueue << new QueuedCommand(commandId); } @@ -139,7 +142,7 @@ void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder & { if (isReady()) { log(QString("Sending command %1").arg(commandId)); - Commands::write(d->socket, commandId, fbb); + Commands::write(d->socket, ++d->messageId, commandId, fbb); } else { d->commandQueue << new QueuedCommand(commandId, fbb); } @@ -180,7 +183,7 @@ void ResourceAccess::connected() auto name = d->fbb.CreateString(QString::number((long long)this).toLatin1()); auto command = Akonadi2::CreateHandshake(d->fbb, name); Akonadi2::FinishHandshakeBuffer(d->fbb, command); - Commands::write(d->socket, Commands::HandshakeCommand, d->fbb); + Commands::write(d->socket, ++d->messageId, Commands::HandshakeCommand, d->fbb); d->fbb.Clear(); } @@ -188,7 +191,7 @@ void ResourceAccess::connected() //TODO: serialize instead of blast them all through the socket? log(QString("We have %1 queued commands").arg(d->commandQueue.size())); for (QueuedCommand *command: d->commandQueue) { - command->write(d->socket); + command->write(d->socket, ++d->messageId); delete command; } d->commandQueue.clear(); @@ -239,25 +242,33 @@ void ResourceAccess::readResourceMessage() bool ResourceAccess::processMessageBuffer() { - static const int headerSize = (sizeof(int) * 2); + static const int headerSize = Commands::headerSize(); if (d->partialMessageBuffer.size() < headerSize) { return false; } - const int commandId = *(int*)d->partialMessageBuffer.constData(); - const int size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int)); + //messageId is unused, so commented out + //const uint messageId = *(int*)(d->partialMessageBuffer.constData()); + const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); + const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); - if (size > d->partialMessageBuffer.size() - headerSize) { + if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) { return false; } switch (commandId) { case Commands::RevisionUpdateCommand: { - auto buffer = Akonadi2::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); + auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); log(QString("Revision updated to: %1").arg(buffer->revision())); emit revisionChanged(buffer->revision()); break; } + case Commands::CommandCompletion: { + auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); + log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); + //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc + break; + } default: break; } diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 368dae5..18442e7 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp @@ -22,8 +22,11 @@ #include "common/clientapi.h" #include "common/console.h" #include "common/commands.h" -#include "common/handshake_generated.h" #include "common/resource.h" + +// commands +#include "common/commandcompletion_generated.h" +#include "common/handshake_generated.h" #include "common/revisionupdate_generated.h" #include @@ -35,7 +38,9 @@ Listener::Listener(const QString &resourceName, QObject *parent) m_revision(0), m_resourceName(resourceName), m_resource(0), - m_clientBufferProcessesTimer(new QTimer(this)) + m_pipeline(new Akonadi2::Pipeline(resourceName)), + m_clientBufferProcessesTimer(new QTimer(this)), + m_messageId(0) { connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); @@ -64,6 +69,7 @@ Listener::Listener(const QString &resourceName, QObject *parent) Listener::~Listener() { + delete m_pipeline; } void Listener::setRevision(unsigned long long revision) @@ -183,43 +189,62 @@ void Listener::processClientBuffers() bool Listener::processClientBuffer(Client &client) { - static const int headerSize = (sizeof(int) + sizeof(uint)); + static const int headerSize = Akonadi2::Commands::headerSize(); if (client.commandBuffer.size() < headerSize) { return false; } int commandId; - uint size; - commandId = *(int*)client.commandBuffer.constData(); - size = *(uint*)(client.commandBuffer.constData() + sizeof(uint)); + uint messageId, size; + messageId = *(uint*)client.commandBuffer.constData(); + commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); + size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); //TODO: reject messages above a certain size? if (size <= uint(client.commandBuffer.size() - headerSize)) { - QByteArray data = client.commandBuffer.mid(headerSize, size); - client.commandBuffer.remove(0, headerSize + size); + client.commandBuffer.remove(0, headerSize); switch (commandId) { case Akonadi2::Commands::HandshakeCommand: { - auto buffer = Akonadi2::GetHandshake(data.constData()); - Akonadi2::Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); - client.name = buffer->name()->c_str(); - sendCurrentRevision(client); + flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); + if (Akonadi2::VerifyHandshakeBuffer(verifier)) { + auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); + client.name = buffer->name()->c_str(); + sendCurrentRevision(client); + } break; } case Akonadi2::Commands::SynchronizeCommand: { - Akonadi2::Console::main()->log(QString(" Synchronize request from %1").arg(client.name)); + Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); loadResource(); - //TODO: on failure ... what? if (m_resource) { m_resource->synchronizeWithSource(); } break; } + case Akonadi2::Commands::FetchEntityCommand: + case Akonadi2::Commands::DeleteEntityCommand: + case Akonadi2::Commands::ModifyEntityCommand: + case Akonadi2::Commands::CreateEntityCommand: + Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name)); + m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); + break; default: + if (commandId > Akonadi2::Commands::CustomCommand) { + loadResource(); + if (m_resource) { + m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); + } + } else { + //TODO: handle error: we don't know wtf this command is + } break; } + //TODO: async commands == async sendCommandCompleted + sendCommandCompleted(client, messageId); + client.commandBuffer.remove(0, size); return client.commandBuffer.size() >= headerSize; } @@ -234,7 +259,19 @@ void Listener::sendCurrentRevision(Client &client) auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); - Akonadi2::Commands::write(client.socket, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); + Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); + m_fbb.Clear(); +} + +void Listener::sendCommandCompleted(Client &client, uint messageId) +{ + if (!client.socket || !client.socket->isValid()) { + return; + } + + auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId); + Akonadi2::FinishCommandCompletionBuffer(m_fbb, command); + Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb); m_fbb.Clear(); } @@ -248,7 +285,7 @@ void Listener::updateClientsWithRevision() continue; } - Akonadi2::Commands::write(client.socket, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); + Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); } m_fbb.Clear(); } @@ -269,5 +306,6 @@ void Listener::loadResource() } else { Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); } + //TODO: on failure ... what? } diff --git a/synchronizer/listener.h b/synchronizer/listener.h index 053fac3..b294277 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h @@ -25,6 +25,8 @@ #include +#include "common/pipeline.h" + namespace Akonadi2 { class Resource; @@ -78,6 +80,7 @@ private Q_SLOTS: private: bool processClientBuffer(Client &client); void sendCurrentRevision(Client &client); + void sendCommandCompleted(Client &client, uint messageId); void updateClientsWithRevision(); void loadResource(); @@ -87,5 +90,7 @@ private: flatbuffers::FlatBufferBuilder m_fbb; const QString m_resourceName; Akonadi2::Resource *m_resource; + Akonadi2::Pipeline *m_pipeline; QTimer *m_clientBufferProcessesTimer; + int m_messageId; }; -- cgit v1.2.3