From 8ae840ac13d9e2161b411fbceae281a725fa8b1f Mon Sep 17 00:00:00 2001 From: Aaron Seigo Date: Tue, 16 Dec 2014 12:14:38 +0100 Subject: load the resource on first command .. it LIIIIVES! --- synchronizer/listener.cpp | 66 +++++++++++++++++++++++++++++++++++------------ synchronizer/listener.h | 16 ++++++++---- 2 files changed, 61 insertions(+), 21 deletions(-) diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 1d20c4d..ef9738e 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp @@ -19,26 +19,30 @@ #include "listener.h" +#include "common/clientapi.h" #include "common/console.h" #include "common/commands.h" #include "common/handshake_generated.h" +#include "common/resource.h" #include "common/revisionupdate_generated.h" #include #include -Listener::Listener(const QString &resource, QObject *parent) +Listener::Listener(const QString &resourceName, QObject *parent) : QObject(parent), m_server(new QLocalServer(this)), - m_revision(0) + m_revision(0), + m_resourceName(resourceName), + m_resource(0) { connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); - Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resource)); - if (!m_server->listen(resource)) { + Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); + if (!m_server->listen(resourceName)) { // FIXME: multiple starts need to be handled here - m_server->removeServer(resource); - if (!m_server->listen(resource)) { + m_server->removeServer(resourceName); + if (!m_server->listen(resourceName)) { Akonadi2::Console::main()->log("Utter failure to start server"); exit(-1); } @@ -70,7 +74,6 @@ unsigned long long Listener::revision() const void Listener::closeAllConnections() { - //TODO: close all client connectionsin m_connections for (Client &client: m_connections) { if (client.socket) { client.socket->close(); @@ -90,7 +93,7 @@ void Listener::acceptConnection() } Akonadi2::Console::main()->log("Got a connection"); - Client client("Unknown Client" /*fixme: actual names!*/, socket); + Client client("Unknown Client", socket); connect(socket, &QIODevice::readyRead, this, &Listener::readFromSocket); m_connections << client; @@ -137,6 +140,7 @@ void Listener::readFromSocket() Akonadi2::Console::main()->log("Reading from socket..."); for (Client &client: m_connections) { + Akonadi2::Console::main()->log(QString("Checking %1 %2").arg((qlonglong)client.socket).arg((qlonglong)socket)); if (client.socket == socket) { Akonadi2::Console::main()->log(QString(" Client: %1").arg(client.name)); client.commandBuffer += socket->readAll(); @@ -151,17 +155,19 @@ void Listener::readFromSocket() bool Listener::processClientBuffer(Client &client) { - static const int headerSize = (sizeof(int) * 2); - Akonadi2::Console::main()->log(QString("processing %1").arg(client.commandBuffer.size())); + static const int headerSize = (sizeof(int) + sizeof(uint)); if (client.commandBuffer.size() < headerSize) { return false; } - int commandId, size; + int commandId; + uint size; commandId = *(int*)client.commandBuffer.constData(); - size = *(int*)(client.commandBuffer.constData() + sizeof(int)); + size = *(uint*)(client.commandBuffer.constData() + sizeof(uint)); - if (size <= client.commandBuffer.size() - headerSize) { + //TODO: reject messages above a certain size? + + if (size <= uint(client.commandBuffer.size() - headerSize)) { QByteArray data = client.commandBuffer.mid(headerSize, size); client.commandBuffer.remove(0, headerSize + size); @@ -169,18 +175,27 @@ bool Listener::processClientBuffer(Client &client) case Akonadi2::Commands::HandshakeCommand: { auto buffer = Akonadi2::GetHandshake(data.constData()); Akonadi2::Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); + client.name = buffer->name()->c_str(); sendCurrentRevision(client); break; } + case Akonadi2::Commands::SynchronizeCommand: { + Akonadi2::Console::main()->log(QString(" Synchronize request from %1").arg(client.name)); + loadResource(); + //TODO: on failure ... what? + if (m_resource) { + m_resource->synchronizeWithSource(); + } + break; + } default: - // client.hasSentCommand = true; break; } return client.commandBuffer.size() >= headerSize; - } else { - return false; } + + return false; } void Listener::sendCurrentRevision(Client &client) @@ -209,3 +224,22 @@ void Listener::updateClientsWithRevision() } m_fbb.Clear(); } + +void Listener::loadResource() +{ + if (m_resource) { + return; + } + + Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName); + if (resourceFactory) { + m_resource = resourceFactory->createResource(); + Akonadi2::Console::main()->log(QString("Resource factory: %1").arg((qlonglong)resourceFactory)); + Akonadi2::Console::main()->log(QString("\tResource: %1").arg((qlonglong)m_resource)); + //TODO: this doesn't really list all the facades .. fix + Akonadi2::Console::main()->log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade(m_resourceName)->type())); + } else { + Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); + } +} + diff --git a/synchronizer/listener.h b/synchronizer/listener.h index b20da7f..5c725df 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h @@ -26,26 +26,28 @@ #include +namespace Akonadi2 +{ + class Resource; +} + class Client { public: Client() - : socket(nullptr), - hasSentCommand(false) + : socket(nullptr) { } Client(const QString &n, QLocalSocket *s) : name(n), - socket(s), - hasSentCommand(false) + socket(s) { } QString name; QLocalSocket *socket; QByteArray commandBuffer; - bool hasSentCommand; }; class Listener : public QObject @@ -75,8 +77,12 @@ private: bool processClientBuffer(Client &client); void sendCurrentRevision(Client &client); void updateClientsWithRevision(); + void loadResource(); + QLocalServer *m_server; QVector m_connections; unsigned long long m_revision; flatbuffers::FlatBufferBuilder m_fbb; + const QString m_resourceName; + Akonadi2::Resource *m_resource; }; -- cgit v1.2.3