From 5f40ace47be289c74ad95948c75ed86676158639 Mon Sep 17 00:00:00 2001 From: Aaron Seigo Date: Sat, 6 Dec 2014 02:33:51 +0100 Subject: resource -> synchronizer the resource will be the plugin that interacts with the source and store --- CMakeLists.txt | 4 +- client/resourceaccess.cpp | 2 +- resource/CMakeLists.txt | 13 --- resource/listener.cpp | 192 -------------------------------------------- resource/listener.h | 63 --------------- resource/main.cpp | 25 ------ synchronizer/CMakeLists.txt | 13 +++ synchronizer/listener.cpp | 192 ++++++++++++++++++++++++++++++++++++++++++++ synchronizer/listener.h | 63 +++++++++++++++ synchronizer/main.cpp | 25 ++++++ 10 files changed, 296 insertions(+), 296 deletions(-) delete mode 100644 resource/CMakeLists.txt delete mode 100644 resource/listener.cpp delete mode 100644 resource/listener.h delete mode 100644 resource/main.cpp create mode 100644 synchronizer/CMakeLists.txt create mode 100644 synchronizer/listener.cpp create mode 100644 synchronizer/listener.h create mode 100644 synchronizer/main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index e8b6f23..0ead220 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -44,8 +44,8 @@ add_subdirectory(common) # the client add_subdirectory(client) -# the resource -add_subdirectory(resource) +# the synchronizer +add_subdirectory(synchronizer) # a simple dummy resource implementation add_subdirectory(dummyresource) diff --git a/client/resourceaccess.cpp b/client/resourceaccess.cpp index 6f1e114..fce8ca6 100644 --- a/client/resourceaccess.cpp +++ b/client/resourceaccess.cpp @@ -104,7 +104,7 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) log(QString("Attempting to start resource ") + m_resourceName); QStringList args; args << m_resourceName; - if (QProcess::startDetached("akonadinext_resource", args)) { + if (QProcess::startDetached("akonadinext_synchronizer", args)) { m_socket->open(); } } diff --git a/resource/CMakeLists.txt b/resource/CMakeLists.txt deleted file mode 100644 index 1c21b1a..0000000 --- a/resource/CMakeLists.txt +++ /dev/null @@ -1,13 +0,0 @@ -project(akonadinext_resource) - -include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) - -set(akonadinextresource_SRCS - main.cpp - listener.cpp -) - -add_executable(${PROJECT_NAME} ${akonadinextresource_SRCS}) -target_link_libraries(${PROJECT_NAME} akonadinextcommon) -qt5_use_modules(${PROJECT_NAME} Widgets Network) -install(TARGETS ${PROJECT_NAME} DESTINATION bin) diff --git a/resource/listener.cpp b/resource/listener.cpp deleted file mode 100644 index 23a5a70..0000000 --- a/resource/listener.cpp +++ /dev/null @@ -1,192 +0,0 @@ -#include "listener.h" - -#include "common/console.h" -#include "common/commands.h" -#include "common/handshake_generated.h" -#include "common/revisionupdate_generated.h" - -#include -#include - -Listener::Listener(const QString &resource, QObject *parent) - : QObject(parent), - m_server(new QLocalServer(this)), - m_revision(0) -{ - connect(m_server, &QLocalServer::newConnection, - this, &Listener::acceptConnection); - Console::main()->log(QString("Trying to open %1").arg(resource)); - if (!m_server->listen(resource)) { - // FIXME: multiple starts need to be handled here - m_server->removeServer(resource); - if (!m_server->listen(resource)) { - Console::main()->log("Utter failure to start server"); - exit(-1); - } - } - - if (m_server->isListening()) { - Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); - } - - QTimer::singleShot(2000, this, SLOT(checkConnections())); -} - -Listener::~Listener() -{ -} - -void Listener::setRevision(unsigned long long revision) -{ - if (m_revision != revision) { - m_revision = revision; - updateClientsWithRevision(); - } -} - -unsigned long long Listener::revision() const -{ - return m_revision; -} - -void Listener::closeAllConnections() -{ - //TODO: close all client connectionsin m_connections - for (Client &client: m_connections) { - if (client.socket) { - client.socket->close(); - delete client.socket; - client.socket = 0; - } - } -} - -void Listener::acceptConnection() -{ - Console::main()->log(QString("Accepting connection")); - QLocalSocket *socket = m_server->nextPendingConnection(); - - if (!socket) { - return; - } - - Console::main()->log("Got a connection"); - Client client("Unknown Client" /*fixme: actual names!*/, socket); - connect(socket, &QIODevice::readyRead, - this, &Listener::readFromSocket); - m_connections << client; - connect(socket, &QLocalSocket::disconnected, - this, &Listener::clientDropped); - -} - -void Listener::clientDropped() -{ - QLocalSocket *socket = qobject_cast(sender()); - if (!socket) { - return; - } - - Console::main()->log("Dropping connection..."); - QMutableVectorIterator it(m_connections); - while (it.hasNext()) { - const Client &client = it.next(); - if (client.socket == socket) { - Console::main()->log(QString(" dropped... %1").arg(client.name)); - it.remove(); - break; - } - } - - checkConnections(); -} - -void Listener::checkConnections() -{ - if (m_connections.isEmpty()) { - m_server->close(); - emit noClients(); - } -} - -void Listener::readFromSocket() -{ - QLocalSocket *socket = qobject_cast(sender()); - if (!socket) { - return; - } - - Console::main()->log("Reading from socket..."); - for (Client &client: m_connections) { - if (client.socket == socket) { - Console::main()->log(QString(" Client: %1").arg(client.name)); - client.commandBuffer += socket->readAll(); - // FIXME: schedule these rather than process them all at once - // right now this can lead to starvation of clients due to - // one overly active client - while (processClientBuffer(client)) {} - break; - } - } -} - -bool Listener::processClientBuffer(Client &client) -{ - static const int headerSize = (sizeof(int) * 2); - Console::main()->log(QString("processing %1").arg(client.commandBuffer.size())); - if (client.commandBuffer.size() < headerSize) { - return false; - } - - int commandId, size; - commandId = *(int*)client.commandBuffer.constData(); - size = *(int*)(client.commandBuffer.constData() + sizeof(int)); - - if (size <= client.commandBuffer.size() - headerSize) { - QByteArray data = client.commandBuffer.mid(headerSize, size); - client.commandBuffer.remove(0, headerSize + size); - - switch (commandId) { - case Commands::HandshakeCommand: { - auto buffer = Akonadi::GetHandshake(data.constData()); - Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); - sendCurrentRevision(client); - break; - } - default: - // client.hasSentCommand = true; - break; - } - - return client.commandBuffer.size() >= headerSize; - } else { - return false; - } -} - -void Listener::sendCurrentRevision(Client &client) -{ - if (!client.socket || !client.socket->isValid()) { - return; - } - - auto command = Akonadi::CreateRevisionUpdate(m_fbb, m_revision); - Akonadi::FinishRevisionUpdateBuffer(m_fbb, command); - Commands::write(client.socket, Commands::RevisionUpdateCommand, m_fbb); - m_fbb.Clear(); -} - -void Listener::updateClientsWithRevision() -{ - auto command = Akonadi::CreateRevisionUpdate(m_fbb, m_revision); - Akonadi::FinishRevisionUpdateBuffer(m_fbb, command); - - for (const Client &client: m_connections) { - if (!client.socket || !client.socket->isValid()) { - continue; - } - - Commands::write(client.socket, Commands::RevisionUpdateCommand, m_fbb); - } - m_fbb.Clear(); -} diff --git a/resource/listener.h b/resource/listener.h deleted file mode 100644 index dcc3818..0000000 --- a/resource/listener.h +++ /dev/null @@ -1,63 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include - -class Client -{ -public: - Client() - : socket(nullptr), - hasSentCommand(false) - { - } - - Client(const QString &n, QLocalSocket *s) - : name(n), - socket(s), - hasSentCommand(false) - { - } - - QString name; - QLocalSocket *socket; - QByteArray commandBuffer; - bool hasSentCommand; -}; - -class Listener : public QObject -{ - Q_OBJECT - -public: - Listener(const QString &resourceName, QObject *parent = 0); - ~Listener(); - - void setRevision(unsigned long long revision); - unsigned long long revision() const; - -Q_SIGNALS: - void noClients(); - -public Q_SLOTS: - void closeAllConnections(); - -private Q_SLOTS: - void acceptConnection(); - void clientDropped(); - void checkConnections(); - void readFromSocket(); - -private: - bool processClientBuffer(Client &client); - void sendCurrentRevision(Client &client); - void updateClientsWithRevision(); - QLocalServer *m_server; - QVector m_connections; - unsigned long long m_revision; - flatbuffers::FlatBufferBuilder m_fbb; -}; diff --git a/resource/main.cpp b/resource/main.cpp deleted file mode 100644 index 91c0a9a..0000000 --- a/resource/main.cpp +++ /dev/null @@ -1,25 +0,0 @@ - -#include - -#include "common/console.h" -#include "listener.h" - -int main(int argc, char *argv[]) -{ - QApplication app(argc, argv); - - new Console(QString("Resource: %1").arg(argv[1])); - if (argc < 2) { - Console::main()->log("Not enough args"); - return app.exec(); - } - - Listener *listener = new Listener(argv[1]); - - QObject::connect(&app, &QCoreApplication::aboutToQuit, - listener, &Listener::closeAllConnections); - QObject::connect(listener, &Listener::noClients, - &app, &QCoreApplication::quit); - - return app.exec(); -} \ No newline at end of file diff --git a/synchronizer/CMakeLists.txt b/synchronizer/CMakeLists.txt new file mode 100644 index 0000000..92cb465 --- /dev/null +++ b/synchronizer/CMakeLists.txt @@ -0,0 +1,13 @@ +project(akonadinext_synchronizer) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) + +set(akonadinextsynchronizer_SRCS + main.cpp + listener.cpp +) + +add_executable(${PROJECT_NAME} ${akonadinextsynchronizer_SRCS}) +target_link_libraries(${PROJECT_NAME} akonadinextcommon) +qt5_use_modules(${PROJECT_NAME} Widgets Network) +install(TARGETS ${PROJECT_NAME} DESTINATION bin) diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp new file mode 100644 index 0000000..23a5a70 --- /dev/null +++ b/synchronizer/listener.cpp @@ -0,0 +1,192 @@ +#include "listener.h" + +#include "common/console.h" +#include "common/commands.h" +#include "common/handshake_generated.h" +#include "common/revisionupdate_generated.h" + +#include +#include + +Listener::Listener(const QString &resource, QObject *parent) + : QObject(parent), + m_server(new QLocalServer(this)), + m_revision(0) +{ + connect(m_server, &QLocalServer::newConnection, + this, &Listener::acceptConnection); + Console::main()->log(QString("Trying to open %1").arg(resource)); + if (!m_server->listen(resource)) { + // FIXME: multiple starts need to be handled here + m_server->removeServer(resource); + if (!m_server->listen(resource)) { + Console::main()->log("Utter failure to start server"); + exit(-1); + } + } + + if (m_server->isListening()) { + Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); + } + + QTimer::singleShot(2000, this, SLOT(checkConnections())); +} + +Listener::~Listener() +{ +} + +void Listener::setRevision(unsigned long long revision) +{ + if (m_revision != revision) { + m_revision = revision; + updateClientsWithRevision(); + } +} + +unsigned long long Listener::revision() const +{ + return m_revision; +} + +void Listener::closeAllConnections() +{ + //TODO: close all client connectionsin m_connections + for (Client &client: m_connections) { + if (client.socket) { + client.socket->close(); + delete client.socket; + client.socket = 0; + } + } +} + +void Listener::acceptConnection() +{ + Console::main()->log(QString("Accepting connection")); + QLocalSocket *socket = m_server->nextPendingConnection(); + + if (!socket) { + return; + } + + Console::main()->log("Got a connection"); + Client client("Unknown Client" /*fixme: actual names!*/, socket); + connect(socket, &QIODevice::readyRead, + this, &Listener::readFromSocket); + m_connections << client; + connect(socket, &QLocalSocket::disconnected, + this, &Listener::clientDropped); + +} + +void Listener::clientDropped() +{ + QLocalSocket *socket = qobject_cast(sender()); + if (!socket) { + return; + } + + Console::main()->log("Dropping connection..."); + QMutableVectorIterator it(m_connections); + while (it.hasNext()) { + const Client &client = it.next(); + if (client.socket == socket) { + Console::main()->log(QString(" dropped... %1").arg(client.name)); + it.remove(); + break; + } + } + + checkConnections(); +} + +void Listener::checkConnections() +{ + if (m_connections.isEmpty()) { + m_server->close(); + emit noClients(); + } +} + +void Listener::readFromSocket() +{ + QLocalSocket *socket = qobject_cast(sender()); + if (!socket) { + return; + } + + Console::main()->log("Reading from socket..."); + for (Client &client: m_connections) { + if (client.socket == socket) { + Console::main()->log(QString(" Client: %1").arg(client.name)); + client.commandBuffer += socket->readAll(); + // FIXME: schedule these rather than process them all at once + // right now this can lead to starvation of clients due to + // one overly active client + while (processClientBuffer(client)) {} + break; + } + } +} + +bool Listener::processClientBuffer(Client &client) +{ + static const int headerSize = (sizeof(int) * 2); + Console::main()->log(QString("processing %1").arg(client.commandBuffer.size())); + if (client.commandBuffer.size() < headerSize) { + return false; + } + + int commandId, size; + commandId = *(int*)client.commandBuffer.constData(); + size = *(int*)(client.commandBuffer.constData() + sizeof(int)); + + if (size <= client.commandBuffer.size() - headerSize) { + QByteArray data = client.commandBuffer.mid(headerSize, size); + client.commandBuffer.remove(0, headerSize + size); + + switch (commandId) { + case Commands::HandshakeCommand: { + auto buffer = Akonadi::GetHandshake(data.constData()); + Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); + sendCurrentRevision(client); + break; + } + default: + // client.hasSentCommand = true; + break; + } + + return client.commandBuffer.size() >= headerSize; + } else { + return false; + } +} + +void Listener::sendCurrentRevision(Client &client) +{ + if (!client.socket || !client.socket->isValid()) { + return; + } + + auto command = Akonadi::CreateRevisionUpdate(m_fbb, m_revision); + Akonadi::FinishRevisionUpdateBuffer(m_fbb, command); + Commands::write(client.socket, Commands::RevisionUpdateCommand, m_fbb); + m_fbb.Clear(); +} + +void Listener::updateClientsWithRevision() +{ + auto command = Akonadi::CreateRevisionUpdate(m_fbb, m_revision); + Akonadi::FinishRevisionUpdateBuffer(m_fbb, command); + + for (const Client &client: m_connections) { + if (!client.socket || !client.socket->isValid()) { + continue; + } + + Commands::write(client.socket, Commands::RevisionUpdateCommand, m_fbb); + } + m_fbb.Clear(); +} diff --git a/synchronizer/listener.h b/synchronizer/listener.h new file mode 100644 index 0000000..dcc3818 --- /dev/null +++ b/synchronizer/listener.h @@ -0,0 +1,63 @@ +#pragma once + +#include +#include +#include +#include + +#include + +class Client +{ +public: + Client() + : socket(nullptr), + hasSentCommand(false) + { + } + + Client(const QString &n, QLocalSocket *s) + : name(n), + socket(s), + hasSentCommand(false) + { + } + + QString name; + QLocalSocket *socket; + QByteArray commandBuffer; + bool hasSentCommand; +}; + +class Listener : public QObject +{ + Q_OBJECT + +public: + Listener(const QString &resourceName, QObject *parent = 0); + ~Listener(); + + void setRevision(unsigned long long revision); + unsigned long long revision() const; + +Q_SIGNALS: + void noClients(); + +public Q_SLOTS: + void closeAllConnections(); + +private Q_SLOTS: + void acceptConnection(); + void clientDropped(); + void checkConnections(); + void readFromSocket(); + +private: + bool processClientBuffer(Client &client); + void sendCurrentRevision(Client &client); + void updateClientsWithRevision(); + QLocalServer *m_server; + QVector m_connections; + unsigned long long m_revision; + flatbuffers::FlatBufferBuilder m_fbb; +}; diff --git a/synchronizer/main.cpp b/synchronizer/main.cpp new file mode 100644 index 0000000..91c0a9a --- /dev/null +++ b/synchronizer/main.cpp @@ -0,0 +1,25 @@ + +#include + +#include "common/console.h" +#include "listener.h" + +int main(int argc, char *argv[]) +{ + QApplication app(argc, argv); + + new Console(QString("Resource: %1").arg(argv[1])); + if (argc < 2) { + Console::main()->log("Not enough args"); + return app.exec(); + } + + Listener *listener = new Listener(argv[1]); + + QObject::connect(&app, &QCoreApplication::aboutToQuit, + listener, &Listener::closeAllConnections); + QObject::connect(listener, &Listener::noClients, + &app, &QCoreApplication::quit); + + return app.exec(); +} \ No newline at end of file -- cgit v1.2.3