diff options
author | Aaron Seigo <aseigo@kde.org> | 2014-12-16 12:40:11 +0100 |
---|---|---|
committer | Aaron Seigo <aseigo@kde.org> | 2014-12-16 12:40:11 +0100 |
commit | 5d246d2bea8f2330d86cdb0b14c76f6b6ceb7ec1 (patch) | |
tree | df25d13ede11733358fd6b04e5d36b7da3bff31e /synchronizer | |
parent | 8ae840ac13d9e2161b411fbceae281a725fa8b1f (diff) | |
download | sink-5d246d2bea8f2330d86cdb0b14c76f6b6ceb7ec1.tar.gz sink-5d246d2bea8f2330d86cdb0b14c76f6b6ceb7ec1.zip |
iterate async over commands
Diffstat (limited to 'synchronizer')
-rw-r--r-- | synchronizer/listener.cpp | 42 | ||||
-rw-r--r-- | synchronizer/listener.h | 5 |
2 files changed, 39 insertions, 8 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index ef9738e..368dae5 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -34,7 +34,8 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
34 | m_server(new QLocalServer(this)), | 34 | m_server(new QLocalServer(this)), |
35 | m_revision(0), | 35 | m_revision(0), |
36 | m_resourceName(resourceName), | 36 | m_resourceName(resourceName), |
37 | m_resource(0) | 37 | m_resource(0), |
38 | m_clientBufferProcessesTimer(new QTimer(this)) | ||
38 | { | 39 | { |
39 | connect(m_server, &QLocalServer::newConnection, | 40 | connect(m_server, &QLocalServer::newConnection, |
40 | this, &Listener::acceptConnection); | 41 | this, &Listener::acceptConnection); |
@@ -52,6 +53,12 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
52 | Akonadi2::Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); | 53 | Akonadi2::Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); |
53 | } | 54 | } |
54 | 55 | ||
56 | //TODO: experiment with different timeouts | ||
57 | // or even just drop down to invoking the method queued? | ||
58 | m_clientBufferProcessesTimer->setInterval(10); | ||
59 | m_clientBufferProcessesTimer->setSingleShot(true); | ||
60 | connect(m_clientBufferProcessesTimer, &QTimer::timeout, | ||
61 | this, &Listener::processClientBuffers); | ||
55 | QTimer::singleShot(2000, this, SLOT(checkConnections())); | 62 | QTimer::singleShot(2000, this, SLOT(checkConnections())); |
56 | } | 63 | } |
57 | 64 | ||
@@ -81,6 +88,8 @@ void Listener::closeAllConnections() | |||
81 | client.socket = 0; | 88 | client.socket = 0; |
82 | } | 89 | } |
83 | } | 90 | } |
91 | |||
92 | m_connections.clear(); | ||
84 | } | 93 | } |
85 | 94 | ||
86 | void Listener::acceptConnection() | 95 | void Listener::acceptConnection() |
@@ -140,19 +149,38 @@ void Listener::readFromSocket() | |||
140 | 149 | ||
141 | Akonadi2::Console::main()->log("Reading from socket..."); | 150 | Akonadi2::Console::main()->log("Reading from socket..."); |
142 | for (Client &client: m_connections) { | 151 | for (Client &client: m_connections) { |
143 | Akonadi2::Console::main()->log(QString("Checking %1 %2").arg((qlonglong)client.socket).arg((qlonglong)socket)); | ||
144 | if (client.socket == socket) { | 152 | if (client.socket == socket) { |
145 | Akonadi2::Console::main()->log(QString(" Client: %1").arg(client.name)); | ||
146 | client.commandBuffer += socket->readAll(); | 153 | client.commandBuffer += socket->readAll(); |
147 | // FIXME: schedule these rather than process them all at once | 154 | if (processClientBuffer(client) && !m_clientBufferProcessesTimer->isActive()) { |
148 | // right now this can lead to starvation of clients due to | 155 | // we have more client buffers to handle |
149 | // one overly active client | 156 | m_clientBufferProcessesTimer->start(); |
150 | while (processClientBuffer(client)) {} | 157 | } |
151 | break; | 158 | break; |
152 | } | 159 | } |
153 | } | 160 | } |
154 | } | 161 | } |
155 | 162 | ||
163 | void Listener::processClientBuffers() | ||
164 | { | ||
165 | //TODO: we should not process all clients, but iterate async over them and process | ||
166 | // one command from each in turn to ensure all clients get fair handling of | ||
167 | // commands? | ||
168 | bool again = false; | ||
169 | for (Client &client: m_connections) { | ||
170 | if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) { | ||
171 | continue; | ||
172 | } | ||
173 | |||
174 | if (processClientBuffer(client)) { | ||
175 | again = true; | ||
176 | } | ||
177 | } | ||
178 | |||
179 | if (again) { | ||
180 | m_clientBufferProcessesTimer->start(); | ||
181 | } | ||
182 | } | ||
183 | |||
156 | bool Listener::processClientBuffer(Client &client) | 184 | bool Listener::processClientBuffer(Client &client) |
157 | { | 185 | { |
158 | static const int headerSize = (sizeof(int) + sizeof(uint)); | 186 | static const int headerSize = (sizeof(int) + sizeof(uint)); |
diff --git a/synchronizer/listener.h b/synchronizer/listener.h index 5c725df..053fac3 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h | |||
@@ -21,7 +21,6 @@ | |||
21 | 21 | ||
22 | #include <QLocalServer> | 22 | #include <QLocalServer> |
23 | #include <QLocalSocket> | 23 | #include <QLocalSocket> |
24 | #include <QList> | ||
25 | #include <QObject> | 24 | #include <QObject> |
26 | 25 | ||
27 | #include <flatbuffers/flatbuffers.h> | 26 | #include <flatbuffers/flatbuffers.h> |
@@ -31,6 +30,8 @@ namespace Akonadi2 | |||
31 | class Resource; | 30 | class Resource; |
32 | } | 31 | } |
33 | 32 | ||
33 | class QTimer; | ||
34 | |||
34 | class Client | 35 | class Client |
35 | { | 36 | { |
36 | public: | 37 | public: |
@@ -72,6 +73,7 @@ private Q_SLOTS: | |||
72 | void clientDropped(); | 73 | void clientDropped(); |
73 | void checkConnections(); | 74 | void checkConnections(); |
74 | void readFromSocket(); | 75 | void readFromSocket(); |
76 | void processClientBuffers(); | ||
75 | 77 | ||
76 | private: | 78 | private: |
77 | bool processClientBuffer(Client &client); | 79 | bool processClientBuffer(Client &client); |
@@ -85,4 +87,5 @@ private: | |||
85 | flatbuffers::FlatBufferBuilder m_fbb; | 87 | flatbuffers::FlatBufferBuilder m_fbb; |
86 | const QString m_resourceName; | 88 | const QString m_resourceName; |
87 | Akonadi2::Resource *m_resource; | 89 | Akonadi2::Resource *m_resource; |
90 | QTimer *m_clientBufferProcessesTimer; | ||
88 | }; | 91 | }; |