summaryrefslogtreecommitdiffstats
path: root/synchronizer/listener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'synchronizer/listener.cpp')
-rw-r--r--synchronizer/listener.cpp42
1 files changed, 35 insertions, 7 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index ef9738e..368dae5 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -34,7 +34,8 @@ Listener::Listener(const QString &resourceName, QObject *parent)
34 m_server(new QLocalServer(this)), 34 m_server(new QLocalServer(this)),
35 m_revision(0), 35 m_revision(0),
36 m_resourceName(resourceName), 36 m_resourceName(resourceName),
37 m_resource(0) 37 m_resource(0),
38 m_clientBufferProcessesTimer(new QTimer(this))
38{ 39{
39 connect(m_server, &QLocalServer::newConnection, 40 connect(m_server, &QLocalServer::newConnection,
40 this, &Listener::acceptConnection); 41 this, &Listener::acceptConnection);
@@ -52,6 +53,12 @@ Listener::Listener(const QString &resourceName, QObject *parent)
52 Akonadi2::Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); 53 Akonadi2::Console::main()->log(QString("Listening on %1").arg(m_server->serverName()));
53 } 54 }
54 55
56 //TODO: experiment with different timeouts
57 // or even just drop down to invoking the method queued?
58 m_clientBufferProcessesTimer->setInterval(10);
59 m_clientBufferProcessesTimer->setSingleShot(true);
60 connect(m_clientBufferProcessesTimer, &QTimer::timeout,
61 this, &Listener::processClientBuffers);
55 QTimer::singleShot(2000, this, SLOT(checkConnections())); 62 QTimer::singleShot(2000, this, SLOT(checkConnections()));
56} 63}
57 64
@@ -81,6 +88,8 @@ void Listener::closeAllConnections()
81 client.socket = 0; 88 client.socket = 0;
82 } 89 }
83 } 90 }
91
92 m_connections.clear();
84} 93}
85 94
86void Listener::acceptConnection() 95void Listener::acceptConnection()
@@ -140,19 +149,38 @@ void Listener::readFromSocket()
140 149
141 Akonadi2::Console::main()->log("Reading from socket..."); 150 Akonadi2::Console::main()->log("Reading from socket...");
142 for (Client &client: m_connections) { 151 for (Client &client: m_connections) {
143 Akonadi2::Console::main()->log(QString("Checking %1 %2").arg((qlonglong)client.socket).arg((qlonglong)socket));
144 if (client.socket == socket) { 152 if (client.socket == socket) {
145 Akonadi2::Console::main()->log(QString(" Client: %1").arg(client.name));
146 client.commandBuffer += socket->readAll(); 153 client.commandBuffer += socket->readAll();
147 // FIXME: schedule these rather than process them all at once 154 if (processClientBuffer(client) && !m_clientBufferProcessesTimer->isActive()) {
148 // right now this can lead to starvation of clients due to 155 // we have more client buffers to handle
149 // one overly active client 156 m_clientBufferProcessesTimer->start();
150 while (processClientBuffer(client)) {} 157 }
151 break; 158 break;
152 } 159 }
153 } 160 }
154} 161}
155 162
163void Listener::processClientBuffers()
164{
165 //TODO: we should not process all clients, but iterate async over them and process
166 // one command from each in turn to ensure all clients get fair handling of
167 // commands?
168 bool again = false;
169 for (Client &client: m_connections) {
170 if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) {
171 continue;
172 }
173
174 if (processClientBuffer(client)) {
175 again = true;
176 }
177 }
178
179 if (again) {
180 m_clientBufferProcessesTimer->start();
181 }
182}
183
156bool Listener::processClientBuffer(Client &client) 184bool Listener::processClientBuffer(Client &client)
157{ 185{
158 static const int headerSize = (sizeof(int) + sizeof(uint)); 186 static const int headerSize = (sizeof(int) + sizeof(uint));