From 5d246d2bea8f2330d86cdb0b14c76f6b6ceb7ec1 Mon Sep 17 00:00:00 2001 From: Aaron Seigo Date: Tue, 16 Dec 2014 12:40:11 +0100 Subject: iterate async over commands --- synchronizer/listener.cpp | 42 +++++++++++++++++++++++++++++++++++------- synchronizer/listener.h | 5 ++++- 2 files changed, 39 insertions(+), 8 deletions(-) (limited to 'synchronizer') diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index ef9738e..368dae5 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp @@ -34,7 +34,8 @@ Listener::Listener(const QString &resourceName, QObject *parent) m_server(new QLocalServer(this)), m_revision(0), m_resourceName(resourceName), - m_resource(0) + m_resource(0), + m_clientBufferProcessesTimer(new QTimer(this)) { connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); @@ -52,6 +53,12 @@ Listener::Listener(const QString &resourceName, QObject *parent) Akonadi2::Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); } + //TODO: experiment with different timeouts + // or even just drop down to invoking the method queued? + m_clientBufferProcessesTimer->setInterval(10); + m_clientBufferProcessesTimer->setSingleShot(true); + connect(m_clientBufferProcessesTimer, &QTimer::timeout, + this, &Listener::processClientBuffers); QTimer::singleShot(2000, this, SLOT(checkConnections())); } @@ -81,6 +88,8 @@ void Listener::closeAllConnections() client.socket = 0; } } + + m_connections.clear(); } void Listener::acceptConnection() @@ -140,19 +149,38 @@ void Listener::readFromSocket() Akonadi2::Console::main()->log("Reading from socket..."); for (Client &client: m_connections) { - Akonadi2::Console::main()->log(QString("Checking %1 %2").arg((qlonglong)client.socket).arg((qlonglong)socket)); if (client.socket == socket) { - Akonadi2::Console::main()->log(QString(" Client: %1").arg(client.name)); client.commandBuffer += socket->readAll(); - // FIXME: schedule these rather than process them all at once - // right now this can lead to starvation of clients due to - // one overly active client - while (processClientBuffer(client)) {} + if (processClientBuffer(client) && !m_clientBufferProcessesTimer->isActive()) { + // we have more client buffers to handle + m_clientBufferProcessesTimer->start(); + } break; } } } +void Listener::processClientBuffers() +{ + //TODO: we should not process all clients, but iterate async over them and process + // one command from each in turn to ensure all clients get fair handling of + // commands? + bool again = false; + for (Client &client: m_connections) { + if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) { + continue; + } + + if (processClientBuffer(client)) { + again = true; + } + } + + if (again) { + m_clientBufferProcessesTimer->start(); + } +} + bool Listener::processClientBuffer(Client &client) { static const int headerSize = (sizeof(int) + sizeof(uint)); diff --git a/synchronizer/listener.h b/synchronizer/listener.h index 5c725df..053fac3 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h @@ -21,7 +21,6 @@ #include #include -#include #include #include @@ -31,6 +30,8 @@ namespace Akonadi2 class Resource; } +class QTimer; + class Client { public: @@ -72,6 +73,7 @@ private Q_SLOTS: void clientDropped(); void checkConnections(); void readFromSocket(); + void processClientBuffers(); private: bool processClientBuffer(Client &client); @@ -85,4 +87,5 @@ private: flatbuffers::FlatBufferBuilder m_fbb; const QString m_resourceName; Akonadi2::Resource *m_resource; + QTimer *m_clientBufferProcessesTimer; }; -- cgit v1.2.3