diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-03-03 09:01:05 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-03-03 09:01:05 +0100 |
commit | 4d9746c828558c9f872e0aed52442863affb25d5 (patch) | |
tree | 507d7c2ba67f47d3cbbcf01a722236ff1b48426b /common/listener.cpp | |
parent | 9cea920b7dd51867a0be0fed2f461b6be73c103e (diff) | |
download | sink-4d9746c828558c9f872e0aed52442863affb25d5.tar.gz sink-4d9746c828558c9f872e0aed52442863affb25d5.zip |
Fromatted the whole codebase with clang-format.
clang-format -i */**{.cpp,.h}
Diffstat (limited to 'common/listener.cpp')
-rw-r--r-- | common/listener.cpp | 61 |
1 files changed, 27 insertions, 34 deletions
diff --git a/common/listener.cpp b/common/listener.cpp index ed6f305..145267a 100644 --- a/common/listener.cpp +++ b/common/listener.cpp | |||
@@ -51,8 +51,7 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent | |||
51 | m_clientBufferProcessesTimer(new QTimer(this)), | 51 | m_clientBufferProcessesTimer(new QTimer(this)), |
52 | m_messageId(0) | 52 | m_messageId(0) |
53 | { | 53 | { |
54 | connect(m_server, &QLocalServer::newConnection, | 54 | connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); |
55 | this, &Listener::acceptConnection); | ||
56 | Trace() << "Trying to open " << m_resourceInstanceIdentifier; | 55 | Trace() << "Trying to open " << m_resourceInstanceIdentifier; |
57 | 56 | ||
58 | if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { | 57 | if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { |
@@ -77,12 +76,11 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent | |||
77 | } | 76 | } |
78 | }); | 77 | }); |
79 | 78 | ||
80 | //TODO: experiment with different timeouts | 79 | // TODO: experiment with different timeouts |
81 | // or even just drop down to invoking the method queued? => invoke queued unless we need throttling | 80 | // or even just drop down to invoking the method queued? => invoke queued unless we need throttling |
82 | m_clientBufferProcessesTimer->setInterval(0); | 81 | m_clientBufferProcessesTimer->setInterval(0); |
83 | m_clientBufferProcessesTimer->setSingleShot(true); | 82 | m_clientBufferProcessesTimer->setSingleShot(true); |
84 | connect(m_clientBufferProcessesTimer, &QTimer::timeout, | 83 | connect(m_clientBufferProcessesTimer, &QTimer::timeout, this, &Listener::processClientBuffers); |
85 | this, &Listener::processClientBuffers); | ||
86 | } | 84 | } |
87 | 85 | ||
88 | Listener::~Listener() | 86 | Listener::~Listener() |
@@ -91,7 +89,7 @@ Listener::~Listener() | |||
91 | 89 | ||
92 | void Listener::closeAllConnections() | 90 | void Listener::closeAllConnections() |
93 | { | 91 | { |
94 | for (Client &client: m_connections) { | 92 | for (Client &client : m_connections) { |
95 | if (client.socket) { | 93 | if (client.socket) { |
96 | disconnect(client.socket, 0, this, 0); | 94 | disconnect(client.socket, 0, this, 0); |
97 | client.socket->close(); | 95 | client.socket->close(); |
@@ -114,13 +112,11 @@ void Listener::acceptConnection() | |||
114 | } | 112 | } |
115 | 113 | ||
116 | m_connections << Client("Unknown Client", socket); | 114 | m_connections << Client("Unknown Client", socket); |
117 | connect(socket, &QIODevice::readyRead, | 115 | connect(socket, &QIODevice::readyRead, this, &Listener::onDataAvailable); |
118 | this, &Listener::onDataAvailable); | 116 | connect(socket, &QLocalSocket::disconnected, this, &Listener::clientDropped); |
119 | connect(socket, &QLocalSocket::disconnected, | ||
120 | this, &Listener::clientDropped); | ||
121 | m_checkConnectionsTimer->stop(); | 117 | m_checkConnectionsTimer->stop(); |
122 | 118 | ||
123 | //If this is the first client, set the lower limit for revision cleanup | 119 | // If this is the first client, set the lower limit for revision cleanup |
124 | if (m_connections.size() == 1) { | 120 | if (m_connections.size() == 1) { |
125 | loadResource()->setLowerBoundRevision(0); | 121 | loadResource()->setLowerBoundRevision(0); |
126 | } | 122 | } |
@@ -157,7 +153,7 @@ void Listener::clientDropped() | |||
157 | 153 | ||
158 | void Listener::checkConnections() | 154 | void Listener::checkConnections() |
159 | { | 155 | { |
160 | //If this was the last client, disengage the lower limit for revision cleanup | 156 | // If this was the last client, disengage the lower limit for revision cleanup |
161 | if (m_connections.isEmpty()) { | 157 | if (m_connections.isEmpty()) { |
162 | loadResource()->setLowerBoundRevision(std::numeric_limits<qint64>::max()); | 158 | loadResource()->setLowerBoundRevision(std::numeric_limits<qint64>::max()); |
163 | } | 159 | } |
@@ -176,7 +172,7 @@ void Listener::onDataAvailable() | |||
176 | void Listener::readFromSocket(QLocalSocket *socket) | 172 | void Listener::readFromSocket(QLocalSocket *socket) |
177 | { | 173 | { |
178 | Trace() << "Reading from socket..."; | 174 | Trace() << "Reading from socket..."; |
179 | for (Client &client: m_connections) { | 175 | for (Client &client : m_connections) { |
180 | if (client.socket == socket) { | 176 | if (client.socket == socket) { |
181 | client.commandBuffer += socket->readAll(); | 177 | client.commandBuffer += socket->readAll(); |
182 | if (!m_clientBufferProcessesTimer->isActive()) { | 178 | if (!m_clientBufferProcessesTimer->isActive()) { |
@@ -189,11 +185,11 @@ void Listener::readFromSocket(QLocalSocket *socket) | |||
189 | 185 | ||
190 | void Listener::processClientBuffers() | 186 | void Listener::processClientBuffers() |
191 | { | 187 | { |
192 | //TODO: we should not process all clients, but iterate async over them and process | 188 | // TODO: we should not process all clients, but iterate async over them and process |
193 | // one command from each in turn to ensure all clients get fair handling of | 189 | // one command from each in turn to ensure all clients get fair handling of |
194 | // commands? | 190 | // commands? |
195 | bool again = false; | 191 | bool again = false; |
196 | for (Client &client: m_connections) { | 192 | for (Client &client : m_connections) { |
197 | if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) { | 193 | if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) { |
198 | continue; | 194 | continue; |
199 | } | 195 | } |
@@ -237,9 +233,10 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
237 | job = job.then<void>(loadResource()->processAllMessages()); | 233 | job = job.then<void>(loadResource()->processAllMessages()); |
238 | } | 234 | } |
239 | job.then<void>([callback, timer]() { | 235 | job.then<void>([callback, timer]() { |
240 | Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); | 236 | Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); |
241 | callback(true); | 237 | callback(true); |
242 | }).exec(); | 238 | }) |
239 | .exec(); | ||
243 | return; | 240 | return; |
244 | } else { | 241 | } else { |
245 | Warning() << "received invalid command"; | 242 | Warning() << "received invalid command"; |
@@ -256,7 +253,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
256 | break; | 253 | break; |
257 | case Sink::Commands::ShutdownCommand: | 254 | case Sink::Commands::ShutdownCommand: |
258 | Log() << QString("\tReceived shutdown command from %1").arg(client.name); | 255 | Log() << QString("\tReceived shutdown command from %1").arg(client.name); |
259 | //Immediately reject new connections | 256 | // Immediately reject new connections |
260 | m_server->close(); | 257 | m_server->close(); |
261 | QTimer::singleShot(0, this, &Listener::quit); | 258 | QTimer::singleShot(0, this, &Listener::quit); |
262 | break; | 259 | break; |
@@ -273,16 +270,14 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
273 | Warning() << "received invalid command"; | 270 | Warning() << "received invalid command"; |
274 | } | 271 | } |
275 | loadResource()->setLowerBoundRevision(lowerBoundRevision()); | 272 | loadResource()->setLowerBoundRevision(lowerBoundRevision()); |
276 | } | 273 | } break; |
277 | break; | ||
278 | case Sink::Commands::RemoveFromDiskCommand: { | 274 | case Sink::Commands::RemoveFromDiskCommand: { |
279 | Log() << QString("\tReceived a remove from disk command from %1").arg(client.name); | 275 | Log() << QString("\tReceived a remove from disk command from %1").arg(client.name); |
280 | m_resource->removeDataFromDisk(); | 276 | m_resource->removeDataFromDisk(); |
281 | delete m_resource; | 277 | delete m_resource; |
282 | m_resource = nullptr; | 278 | m_resource = nullptr; |
283 | loadResource()->setLowerBoundRevision(0); | 279 | loadResource()->setLowerBoundRevision(0); |
284 | } | 280 | } break; |
285 | break; | ||
286 | default: | 281 | default: |
287 | if (commandId > Sink::Commands::CustomCommand) { | 282 | if (commandId > Sink::Commands::CustomCommand) { |
288 | Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; | 283 | Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; |
@@ -313,7 +308,7 @@ qint64 Listener::lowerBoundRevision() | |||
313 | 308 | ||
314 | void Listener::quit() | 309 | void Listener::quit() |
315 | { | 310 | { |
316 | //Broadcast shutdown notifications to open clients, so they don't try to restart the resource | 311 | // Broadcast shutdown notifications to open clients, so they don't try to restart the resource |
317 | auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Commands::NotificationType::NotificationType_Shutdown); | 312 | auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Commands::NotificationType::NotificationType_Shutdown); |
318 | Sink::Commands::FinishNotificationBuffer(m_fbb, command); | 313 | Sink::Commands::FinishNotificationBuffer(m_fbb, command); |
319 | for (Client &client : m_connections) { | 314 | for (Client &client : m_connections) { |
@@ -323,7 +318,7 @@ void Listener::quit() | |||
323 | } | 318 | } |
324 | m_fbb.Clear(); | 319 | m_fbb.Clear(); |
325 | 320 | ||
326 | //Connections will be cleaned up later | 321 | // Connections will be cleaned up later |
327 | emit noClients(); | 322 | emit noClients(); |
328 | } | 323 | } |
329 | 324 | ||
@@ -334,12 +329,12 @@ bool Listener::processClientBuffer(Client &client) | |||
334 | return false; | 329 | return false; |
335 | } | 330 | } |
336 | 331 | ||
337 | const uint messageId = *(uint*)client.commandBuffer.constData(); | 332 | const uint messageId = *(uint *)client.commandBuffer.constData(); |
338 | const int commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); | 333 | const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint)); |
339 | const uint size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); | 334 | const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); |
340 | Trace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; | 335 | Trace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; |
341 | 336 | ||
342 | //TODO: reject messages above a certain size? | 337 | // TODO: reject messages above a certain size? |
343 | 338 | ||
344 | const bool commandComplete = size <= uint(client.commandBuffer.size() - headerSize); | 339 | const bool commandComplete = size <= uint(client.commandBuffer.size() - headerSize); |
345 | if (commandComplete) { | 340 | if (commandComplete) { |
@@ -386,7 +381,7 @@ void Listener::updateClientsWithRevision(qint64 revision) | |||
386 | auto command = Sink::Commands::CreateRevisionUpdate(m_fbb, revision); | 381 | auto command = Sink::Commands::CreateRevisionUpdate(m_fbb, revision); |
387 | Sink::Commands::FinishRevisionUpdateBuffer(m_fbb, command); | 382 | Sink::Commands::FinishRevisionUpdateBuffer(m_fbb, command); |
388 | 383 | ||
389 | for (const Client &client: m_connections) { | 384 | for (const Client &client : m_connections) { |
390 | if (!client.socket || !client.socket->isValid()) { | 385 | if (!client.socket || !client.socket->isValid()) { |
391 | continue; | 386 | continue; |
392 | } | 387 | } |
@@ -423,10 +418,8 @@ Sink::Resource *Listener::loadResource() | |||
423 | m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); | 418 | m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); |
424 | Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); | 419 | Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); |
425 | Trace() << QString("\tResource: %1").arg((qlonglong)m_resource); | 420 | Trace() << QString("\tResource: %1").arg((qlonglong)m_resource); |
426 | connect(m_resource, &Sink::Resource::revisionUpdated, | 421 | connect(m_resource, &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); |
427 | this, &Listener::refreshRevision); | 422 | connect(m_resource, &Sink::Resource::notify, this, &Listener::notify); |
428 | connect(m_resource, &Sink::Resource::notify, | ||
429 | this, &Listener::notify); | ||
430 | } else { | 423 | } else { |
431 | ErrorMsg() << "Failed to load resource " << m_resourceName; | 424 | ErrorMsg() << "Failed to load resource " << m_resourceName; |
432 | m_resource = new Sink::Resource; | 425 | m_resource = new Sink::Resource; |