summaryrefslogtreecommitdiffstats
path: root/synchronizer
diff options
context:
space:
mode:
authorAaron Seigo <aseigo@kde.org>2014-12-16 12:40:11 +0100
committerAaron Seigo <aseigo@kde.org>2014-12-16 12:40:11 +0100
commit5d246d2bea8f2330d86cdb0b14c76f6b6ceb7ec1 (patch)
treedf25d13ede11733358fd6b04e5d36b7da3bff31e /synchronizer
parent8ae840ac13d9e2161b411fbceae281a725fa8b1f (diff)
downloadsink-5d246d2bea8f2330d86cdb0b14c76f6b6ceb7ec1.tar.gz
sink-5d246d2bea8f2330d86cdb0b14c76f6b6ceb7ec1.zip
iterate async over commands
Diffstat (limited to 'synchronizer')
-rw-r--r--synchronizer/listener.cpp42
-rw-r--r--synchronizer/listener.h5
2 files changed, 39 insertions, 8 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index ef9738e..368dae5 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -34,7 +34,8 @@ Listener::Listener(const QString &resourceName, QObject *parent)
34 m_server(new QLocalServer(this)), 34 m_server(new QLocalServer(this)),
35 m_revision(0), 35 m_revision(0),
36 m_resourceName(resourceName), 36 m_resourceName(resourceName),
37 m_resource(0) 37 m_resource(0),
38 m_clientBufferProcessesTimer(new QTimer(this))
38{ 39{
39 connect(m_server, &QLocalServer::newConnection, 40 connect(m_server, &QLocalServer::newConnection,
40 this, &Listener::acceptConnection); 41 this, &Listener::acceptConnection);
@@ -52,6 +53,12 @@ Listener::Listener(const QString &resourceName, QObject *parent)
52 Akonadi2::Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); 53 Akonadi2::Console::main()->log(QString("Listening on %1").arg(m_server->serverName()));
53 } 54 }
54 55
56 //TODO: experiment with different timeouts
57 // or even just drop down to invoking the method queued?
58 m_clientBufferProcessesTimer->setInterval(10);
59 m_clientBufferProcessesTimer->setSingleShot(true);
60 connect(m_clientBufferProcessesTimer, &QTimer::timeout,
61 this, &Listener::processClientBuffers);
55 QTimer::singleShot(2000, this, SLOT(checkConnections())); 62 QTimer::singleShot(2000, this, SLOT(checkConnections()));
56} 63}
57 64
@@ -81,6 +88,8 @@ void Listener::closeAllConnections()
81 client.socket = 0; 88 client.socket = 0;
82 } 89 }
83 } 90 }
91
92 m_connections.clear();
84} 93}
85 94
86void Listener::acceptConnection() 95void Listener::acceptConnection()
@@ -140,19 +149,38 @@ void Listener::readFromSocket()
140 149
141 Akonadi2::Console::main()->log("Reading from socket..."); 150 Akonadi2::Console::main()->log("Reading from socket...");
142 for (Client &client: m_connections) { 151 for (Client &client: m_connections) {
143 Akonadi2::Console::main()->log(QString("Checking %1 %2").arg((qlonglong)client.socket).arg((qlonglong)socket));
144 if (client.socket == socket) { 152 if (client.socket == socket) {
145 Akonadi2::Console::main()->log(QString(" Client: %1").arg(client.name));
146 client.commandBuffer += socket->readAll(); 153 client.commandBuffer += socket->readAll();
147 // FIXME: schedule these rather than process them all at once 154 if (processClientBuffer(client) && !m_clientBufferProcessesTimer->isActive()) {
148 // right now this can lead to starvation of clients due to 155 // we have more client buffers to handle
149 // one overly active client 156 m_clientBufferProcessesTimer->start();
150 while (processClientBuffer(client)) {} 157 }
151 break; 158 break;
152 } 159 }
153 } 160 }
154} 161}
155 162
163void Listener::processClientBuffers()
164{
165 //TODO: we should not process all clients, but iterate async over them and process
166 // one command from each in turn to ensure all clients get fair handling of
167 // commands?
168 bool again = false;
169 for (Client &client: m_connections) {
170 if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) {
171 continue;
172 }
173
174 if (processClientBuffer(client)) {
175 again = true;
176 }
177 }
178
179 if (again) {
180 m_clientBufferProcessesTimer->start();
181 }
182}
183
156bool Listener::processClientBuffer(Client &client) 184bool Listener::processClientBuffer(Client &client)
157{ 185{
158 static const int headerSize = (sizeof(int) + sizeof(uint)); 186 static const int headerSize = (sizeof(int) + sizeof(uint));
diff --git a/synchronizer/listener.h b/synchronizer/listener.h
index 5c725df..053fac3 100644
--- a/synchronizer/listener.h
+++ b/synchronizer/listener.h
@@ -21,7 +21,6 @@
21 21
22#include <QLocalServer> 22#include <QLocalServer>
23#include <QLocalSocket> 23#include <QLocalSocket>
24#include <QList>
25#include <QObject> 24#include <QObject>
26 25
27#include <flatbuffers/flatbuffers.h> 26#include <flatbuffers/flatbuffers.h>
@@ -31,6 +30,8 @@ namespace Akonadi2
31 class Resource; 30 class Resource;
32} 31}
33 32
33class QTimer;
34
34class Client 35class Client
35{ 36{
36public: 37public:
@@ -72,6 +73,7 @@ private Q_SLOTS:
72 void clientDropped(); 73 void clientDropped();
73 void checkConnections(); 74 void checkConnections();
74 void readFromSocket(); 75 void readFromSocket();
76 void processClientBuffers();
75 77
76private: 78private:
77 bool processClientBuffer(Client &client); 79 bool processClientBuffer(Client &client);
@@ -85,4 +87,5 @@ private:
85 flatbuffers::FlatBufferBuilder m_fbb; 87 flatbuffers::FlatBufferBuilder m_fbb;
86 const QString m_resourceName; 88 const QString m_resourceName;
87 Akonadi2::Resource *m_resource; 89 Akonadi2::Resource *m_resource;
90 QTimer *m_clientBufferProcessesTimer;
88}; 91};