From 77944384d24b5005d6b8648572a31a3ae84dd946 Mon Sep 17 00:00:00 2001 From: Aaron Seigo Date: Tue, 16 Dec 2014 22:40:44 +0100 Subject: add pipelines (as a sketch only), message ids and message responses --- common/CMakeLists.txt | 2 + common/commands.cpp | 15 +++++-- common/commands.h | 9 +++-- common/commands/commandcompletion.fbs | 6 +-- common/pipeline.cpp | 75 +++++++++++++++++++++++++++++++++++ common/pipeline.h | 47 ++++++++++++++++++++++ common/resource.cpp | 4 ++ common/resource.h | 8 +++- common/resourceaccess.cpp | 35 ++++++++++------ 9 files changed, 176 insertions(+), 25 deletions(-) create mode 100644 common/pipeline.cpp create mode 100644 common/pipeline.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 7de4aa9..001dab5 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -1,5 +1,6 @@ project(akonadi2common) generate_flatbuffers( + commands/commandcompletion commands/createentity commands/deleteentity commands/fetchentity @@ -20,6 +21,7 @@ set(command_SRCS clientapi.cpp commands.cpp console.cpp + pipeline.cpp resource.cpp resourceaccess.cpp storage_common.cpp diff --git a/common/commands.cpp b/common/commands.cpp index ecbbfdb..1dfeabe 100644 --- a/common/commands.cpp +++ b/common/commands.cpp @@ -28,17 +28,23 @@ namespace Akonadi2 namespace Commands { -void write(QIODevice *device, int commandId) +int headerSize() { - write(device, commandId, 0, 0); + return sizeof(int) + (sizeof(uint) * 2); } -void write(QIODevice *device, int commandId, const char *buffer, uint size) +void write(QIODevice *device, int messageId, int commandId) +{ + write(device, messageId, commandId, 0, 0); +} + +void write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size) { if (size > 0 && !buffer) { size = 0; } + device->write((const char*)&messageId, sizeof(int)); device->write((const char*)&commandId, sizeof(int)); device->write((const char*)&size, sizeof(uint)); if (buffer) { @@ -46,9 +52,10 @@ void write(QIODevice *device, int commandId, const char *buffer, uint size) } } -void write(QIODevice *device, int commandId, flatbuffers::FlatBufferBuilder &fbb) +void write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb) { const int dataSize = fbb.GetSize(); + device->write((const char*)&messageId, sizeof(int)); device->write((const char*)&commandId, sizeof(int)); device->write((const char*)&dataSize, sizeof(int)); device->write((const char*)fbb.GetBufferPointer(), dataSize); diff --git a/common/commands.h b/common/commands.h index 874db73..c63bb47 100644 --- a/common/commands.h +++ b/common/commands.h @@ -33,8 +33,8 @@ namespace Commands enum CommandIds { UnknownCommand = 0, - HandshakeCommand, CommandCompletion, + HandshakeCommand, RevisionUpdateCommand, SynchronizeCommand, FetchEntityCommand, @@ -45,9 +45,10 @@ enum CommandIds { CustomCommand = 0xffff }; -void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId); -void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId, const char *buffer, uint size); -void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId, flatbuffers::FlatBufferBuilder &fbb); +int AKONADI2COMMON_EXPORT headerSize(); +void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId); +void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size); +void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb); } diff --git a/common/commands/commandcompletion.fbs b/common/commands/commandcompletion.fbs index 9583108..5330b4f 100644 --- a/common/commands/commandcompletion.fbs +++ b/common/commands/commandcompletion.fbs @@ -1,9 +1,9 @@ namespace Akonadi2; table CommandCompletion { - id: ulong - success: bool - log: ulong = 0 + id: ulong; + success: bool = true; + log: ulong = 0; } root_type CommandCompletion; diff --git a/common/pipeline.cpp b/common/pipeline.cpp new file mode 100644 index 0000000..41def7c --- /dev/null +++ b/common/pipeline.cpp @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2014 Aaron Seigo + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ + +#include "pipeline.h" + +#include + +namespace Akonadi2 +{ + +class Pipeline::Private +{ +public: + Private(const QString &storageName) + : storage(QStandardPaths::writableLocation(QStandardPaths::QStandardPaths::GenericDataLocation) + "/akonadi2", storageName, Akonadi2::Storage::ReadWrite) + { + + } + + Akonadi2::Storage storage; +}; + +Pipeline::Pipeline(const QString &storageName) + : d(new Private(storageName)) +{ +} + +Pipeline::~Pipeline() +{ +} + +Storage &Pipeline::storage() +{ + return d->storage; +} + +void Pipeline::null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) +{ + d->storage.write(key, keySize, buffer, bufferSize); +} + +void Pipeline::newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) +{ + d->storage.write(key, keySize, buffer, bufferSize); +} + +void Pipeline::modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) +{ + d->storage.write(key, keySize, buffer, bufferSize); +} + +void Pipeline::deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) +{ + d->storage.write(key, keySize, buffer, bufferSize); +} + +} // namespace Akonadi2 + diff --git a/common/pipeline.h b/common/pipeline.h new file mode 100644 index 0000000..635e630 --- /dev/null +++ b/common/pipeline.h @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2014 Aaron Seigo + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ + +#pragma once + +#include +#include + +namespace Akonadi2 +{ + +class AKONADI2COMMON_EXPORT Pipeline +{ +public: + Pipeline(const QString &storagePath); + ~Pipeline(); + + Storage &storage(); + void null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); + void newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); + void modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); + void deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); + +private: + class Private; + Private * const d; +}; + +} // namespace Akonadi2 + diff --git a/common/resource.cpp b/common/resource.cpp index 7120d1c..11a03ca 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -39,6 +39,10 @@ Resource::~Resource() //delete d; } +void Resource::processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline) +{ +} + void Resource::synchronizeWithSource() { } diff --git a/common/resource.h b/common/resource.h index 2ecff03..53c0bc1 100644 --- a/common/resource.h +++ b/common/resource.h @@ -18,18 +18,22 @@ * License along with this library. If not, see . */ -#include "clientapi.h" +#include +#include +#include namespace Akonadi2 { -class Resource +class AKONADI2COMMON_EXPORT Resource { public: //TODO: configuration Resource(); virtual ~Resource(); + //TODO: this will need to be async + virtual void processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline); virtual void synchronizeWithSource(); private: diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index a4f3c94..2f7d207 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -22,6 +22,7 @@ #include "common/console.h" #include "common/commands.h" +#include "common/commandcompletion_generated.h" #include "common/handshake_generated.h" #include "common/revisionupdate_generated.h" @@ -54,10 +55,10 @@ public: delete[] m_buffer; } - void write(QIODevice *device) + void write(QIODevice *device, uint messageId) { Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); - Commands::write(device, m_commandId, m_buffer, m_bufferSize); + Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); } private: @@ -80,13 +81,15 @@ public: QByteArray partialMessageBuffer; flatbuffers::FlatBufferBuilder fbb; QVector commandQueue; + uint messageId; }; ResourceAccess::Private::Private(const QString &name, ResourceAccess *q) : resourceName(name), socket(new QLocalSocket(q)), tryOpenTimer(new QTimer(q)), - startingProcess(false) + startingProcess(false), + messageId(0) { } @@ -129,7 +132,7 @@ void ResourceAccess::sendCommand(int commandId) { if (isReady()) { log(QString("Sending command %1").arg(commandId)); - Commands::write(d->socket, commandId); + Commands::write(d->socket, ++d->messageId, commandId); } else { d->commandQueue << new QueuedCommand(commandId); } @@ -139,7 +142,7 @@ void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder & { if (isReady()) { log(QString("Sending command %1").arg(commandId)); - Commands::write(d->socket, commandId, fbb); + Commands::write(d->socket, ++d->messageId, commandId, fbb); } else { d->commandQueue << new QueuedCommand(commandId, fbb); } @@ -180,7 +183,7 @@ void ResourceAccess::connected() auto name = d->fbb.CreateString(QString::number((long long)this).toLatin1()); auto command = Akonadi2::CreateHandshake(d->fbb, name); Akonadi2::FinishHandshakeBuffer(d->fbb, command); - Commands::write(d->socket, Commands::HandshakeCommand, d->fbb); + Commands::write(d->socket, ++d->messageId, Commands::HandshakeCommand, d->fbb); d->fbb.Clear(); } @@ -188,7 +191,7 @@ void ResourceAccess::connected() //TODO: serialize instead of blast them all through the socket? log(QString("We have %1 queued commands").arg(d->commandQueue.size())); for (QueuedCommand *command: d->commandQueue) { - command->write(d->socket); + command->write(d->socket, ++d->messageId); delete command; } d->commandQueue.clear(); @@ -239,25 +242,33 @@ void ResourceAccess::readResourceMessage() bool ResourceAccess::processMessageBuffer() { - static const int headerSize = (sizeof(int) * 2); + static const int headerSize = Commands::headerSize(); if (d->partialMessageBuffer.size() < headerSize) { return false; } - const int commandId = *(int*)d->partialMessageBuffer.constData(); - const int size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int)); + //messageId is unused, so commented out + //const uint messageId = *(int*)(d->partialMessageBuffer.constData()); + const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); + const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); - if (size > d->partialMessageBuffer.size() - headerSize) { + if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) { return false; } switch (commandId) { case Commands::RevisionUpdateCommand: { - auto buffer = Akonadi2::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); + auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); log(QString("Revision updated to: %1").arg(buffer->revision())); emit revisionChanged(buffer->revision()); break; } + case Commands::CommandCompletion: { + auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); + log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); + //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc + break; + } default: break; } -- cgit v1.2.3