From 77944384d24b5005d6b8648572a31a3ae84dd946 Mon Sep 17 00:00:00 2001 From: Aaron Seigo Date: Tue, 16 Dec 2014 22:40:44 +0100 Subject: add pipelines (as a sketch only), message ids and message responses --- synchronizer/listener.cpp | 70 ++++++++++++++++++++++++++++++++++++----------- synchronizer/listener.h | 5 ++++ 2 files changed, 59 insertions(+), 16 deletions(-) (limited to 'synchronizer') diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 368dae5..18442e7 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp @@ -22,8 +22,11 @@ #include "common/clientapi.h" #include "common/console.h" #include "common/commands.h" -#include "common/handshake_generated.h" #include "common/resource.h" + +// commands +#include "common/commandcompletion_generated.h" +#include "common/handshake_generated.h" #include "common/revisionupdate_generated.h" #include @@ -35,7 +38,9 @@ Listener::Listener(const QString &resourceName, QObject *parent) m_revision(0), m_resourceName(resourceName), m_resource(0), - m_clientBufferProcessesTimer(new QTimer(this)) + m_pipeline(new Akonadi2::Pipeline(resourceName)), + m_clientBufferProcessesTimer(new QTimer(this)), + m_messageId(0) { connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); @@ -64,6 +69,7 @@ Listener::Listener(const QString &resourceName, QObject *parent) Listener::~Listener() { + delete m_pipeline; } void Listener::setRevision(unsigned long long revision) @@ -183,43 +189,62 @@ void Listener::processClientBuffers() bool Listener::processClientBuffer(Client &client) { - static const int headerSize = (sizeof(int) + sizeof(uint)); + static const int headerSize = Akonadi2::Commands::headerSize(); if (client.commandBuffer.size() < headerSize) { return false; } int commandId; - uint size; - commandId = *(int*)client.commandBuffer.constData(); - size = *(uint*)(client.commandBuffer.constData() + sizeof(uint)); + uint messageId, size; + messageId = *(uint*)client.commandBuffer.constData(); + commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); + size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); //TODO: reject messages above a certain size? if (size <= uint(client.commandBuffer.size() - headerSize)) { - QByteArray data = client.commandBuffer.mid(headerSize, size); - client.commandBuffer.remove(0, headerSize + size); + client.commandBuffer.remove(0, headerSize); switch (commandId) { case Akonadi2::Commands::HandshakeCommand: { - auto buffer = Akonadi2::GetHandshake(data.constData()); - Akonadi2::Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); - client.name = buffer->name()->c_str(); - sendCurrentRevision(client); + flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); + if (Akonadi2::VerifyHandshakeBuffer(verifier)) { + auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); + client.name = buffer->name()->c_str(); + sendCurrentRevision(client); + } break; } case Akonadi2::Commands::SynchronizeCommand: { - Akonadi2::Console::main()->log(QString(" Synchronize request from %1").arg(client.name)); + Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); loadResource(); - //TODO: on failure ... what? if (m_resource) { m_resource->synchronizeWithSource(); } break; } + case Akonadi2::Commands::FetchEntityCommand: + case Akonadi2::Commands::DeleteEntityCommand: + case Akonadi2::Commands::ModifyEntityCommand: + case Akonadi2::Commands::CreateEntityCommand: + Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name)); + m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); + break; default: + if (commandId > Akonadi2::Commands::CustomCommand) { + loadResource(); + if (m_resource) { + m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); + } + } else { + //TODO: handle error: we don't know wtf this command is + } break; } + //TODO: async commands == async sendCommandCompleted + sendCommandCompleted(client, messageId); + client.commandBuffer.remove(0, size); return client.commandBuffer.size() >= headerSize; } @@ -234,7 +259,19 @@ void Listener::sendCurrentRevision(Client &client) auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); - Akonadi2::Commands::write(client.socket, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); + Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); + m_fbb.Clear(); +} + +void Listener::sendCommandCompleted(Client &client, uint messageId) +{ + if (!client.socket || !client.socket->isValid()) { + return; + } + + auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId); + Akonadi2::FinishCommandCompletionBuffer(m_fbb, command); + Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb); m_fbb.Clear(); } @@ -248,7 +285,7 @@ void Listener::updateClientsWithRevision() continue; } - Akonadi2::Commands::write(client.socket, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); + Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); } m_fbb.Clear(); } @@ -269,5 +306,6 @@ void Listener::loadResource() } else { Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); } + //TODO: on failure ... what? } diff --git a/synchronizer/listener.h b/synchronizer/listener.h index 053fac3..b294277 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h @@ -25,6 +25,8 @@ #include +#include "common/pipeline.h" + namespace Akonadi2 { class Resource; @@ -78,6 +80,7 @@ private Q_SLOTS: private: bool processClientBuffer(Client &client); void sendCurrentRevision(Client &client); + void sendCommandCompleted(Client &client, uint messageId); void updateClientsWithRevision(); void loadResource(); @@ -87,5 +90,7 @@ private: flatbuffers::FlatBufferBuilder m_fbb; const QString m_resourceName; Akonadi2::Resource *m_resource; + Akonadi2::Pipeline *m_pipeline; QTimer *m_clientBufferProcessesTimer; + int m_messageId; }; -- cgit v1.2.3