summaryrefslogtreecommitdiffstats
path: root/common/listener.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-03-03 09:01:05 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-03-03 09:01:05 +0100
commit4d9746c828558c9f872e0aed52442863affb25d5 (patch)
tree507d7c2ba67f47d3cbbcf01a722236ff1b48426b /common/listener.cpp
parent9cea920b7dd51867a0be0fed2f461b6be73c103e (diff)
downloadsink-4d9746c828558c9f872e0aed52442863affb25d5.tar.gz
sink-4d9746c828558c9f872e0aed52442863affb25d5.zip
Fromatted the whole codebase with clang-format.
clang-format -i */**{.cpp,.h}
Diffstat (limited to 'common/listener.cpp')
-rw-r--r--common/listener.cpp61
1 files changed, 27 insertions, 34 deletions
diff --git a/common/listener.cpp b/common/listener.cpp
index ed6f305..145267a 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -51,8 +51,7 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent
51 m_clientBufferProcessesTimer(new QTimer(this)), 51 m_clientBufferProcessesTimer(new QTimer(this)),
52 m_messageId(0) 52 m_messageId(0)
53{ 53{
54 connect(m_server, &QLocalServer::newConnection, 54 connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection);
55 this, &Listener::acceptConnection);
56 Trace() << "Trying to open " << m_resourceInstanceIdentifier; 55 Trace() << "Trying to open " << m_resourceInstanceIdentifier;
57 56
58 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { 57 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
@@ -77,12 +76,11 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent
77 } 76 }
78 }); 77 });
79 78
80 //TODO: experiment with different timeouts 79 // TODO: experiment with different timeouts
81 // or even just drop down to invoking the method queued? => invoke queued unless we need throttling 80 // or even just drop down to invoking the method queued? => invoke queued unless we need throttling
82 m_clientBufferProcessesTimer->setInterval(0); 81 m_clientBufferProcessesTimer->setInterval(0);
83 m_clientBufferProcessesTimer->setSingleShot(true); 82 m_clientBufferProcessesTimer->setSingleShot(true);
84 connect(m_clientBufferProcessesTimer, &QTimer::timeout, 83 connect(m_clientBufferProcessesTimer, &QTimer::timeout, this, &Listener::processClientBuffers);
85 this, &Listener::processClientBuffers);
86} 84}
87 85
88Listener::~Listener() 86Listener::~Listener()
@@ -91,7 +89,7 @@ Listener::~Listener()
91 89
92void Listener::closeAllConnections() 90void Listener::closeAllConnections()
93{ 91{
94 for (Client &client: m_connections) { 92 for (Client &client : m_connections) {
95 if (client.socket) { 93 if (client.socket) {
96 disconnect(client.socket, 0, this, 0); 94 disconnect(client.socket, 0, this, 0);
97 client.socket->close(); 95 client.socket->close();
@@ -114,13 +112,11 @@ void Listener::acceptConnection()
114 } 112 }
115 113
116 m_connections << Client("Unknown Client", socket); 114 m_connections << Client("Unknown Client", socket);
117 connect(socket, &QIODevice::readyRead, 115 connect(socket, &QIODevice::readyRead, this, &Listener::onDataAvailable);
118 this, &Listener::onDataAvailable); 116 connect(socket, &QLocalSocket::disconnected, this, &Listener::clientDropped);
119 connect(socket, &QLocalSocket::disconnected,
120 this, &Listener::clientDropped);
121 m_checkConnectionsTimer->stop(); 117 m_checkConnectionsTimer->stop();
122 118
123 //If this is the first client, set the lower limit for revision cleanup 119 // If this is the first client, set the lower limit for revision cleanup
124 if (m_connections.size() == 1) { 120 if (m_connections.size() == 1) {
125 loadResource()->setLowerBoundRevision(0); 121 loadResource()->setLowerBoundRevision(0);
126 } 122 }
@@ -157,7 +153,7 @@ void Listener::clientDropped()
157 153
158void Listener::checkConnections() 154void Listener::checkConnections()
159{ 155{
160 //If this was the last client, disengage the lower limit for revision cleanup 156 // If this was the last client, disengage the lower limit for revision cleanup
161 if (m_connections.isEmpty()) { 157 if (m_connections.isEmpty()) {
162 loadResource()->setLowerBoundRevision(std::numeric_limits<qint64>::max()); 158 loadResource()->setLowerBoundRevision(std::numeric_limits<qint64>::max());
163 } 159 }
@@ -176,7 +172,7 @@ void Listener::onDataAvailable()
176void Listener::readFromSocket(QLocalSocket *socket) 172void Listener::readFromSocket(QLocalSocket *socket)
177{ 173{
178 Trace() << "Reading from socket..."; 174 Trace() << "Reading from socket...";
179 for (Client &client: m_connections) { 175 for (Client &client : m_connections) {
180 if (client.socket == socket) { 176 if (client.socket == socket) {
181 client.commandBuffer += socket->readAll(); 177 client.commandBuffer += socket->readAll();
182 if (!m_clientBufferProcessesTimer->isActive()) { 178 if (!m_clientBufferProcessesTimer->isActive()) {
@@ -189,11 +185,11 @@ void Listener::readFromSocket(QLocalSocket *socket)
189 185
190void Listener::processClientBuffers() 186void Listener::processClientBuffers()
191{ 187{
192 //TODO: we should not process all clients, but iterate async over them and process 188 // TODO: we should not process all clients, but iterate async over them and process
193 // one command from each in turn to ensure all clients get fair handling of 189 // one command from each in turn to ensure all clients get fair handling of
194 // commands? 190 // commands?
195 bool again = false; 191 bool again = false;
196 for (Client &client: m_connections) { 192 for (Client &client : m_connections) {
197 if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) { 193 if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) {
198 continue; 194 continue;
199 } 195 }
@@ -237,9 +233,10 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
237 job = job.then<void>(loadResource()->processAllMessages()); 233 job = job.then<void>(loadResource()->processAllMessages());
238 } 234 }
239 job.then<void>([callback, timer]() { 235 job.then<void>([callback, timer]() {
240 Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); 236 Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
241 callback(true); 237 callback(true);
242 }).exec(); 238 })
239 .exec();
243 return; 240 return;
244 } else { 241 } else {
245 Warning() << "received invalid command"; 242 Warning() << "received invalid command";
@@ -256,7 +253,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
256 break; 253 break;
257 case Sink::Commands::ShutdownCommand: 254 case Sink::Commands::ShutdownCommand:
258 Log() << QString("\tReceived shutdown command from %1").arg(client.name); 255 Log() << QString("\tReceived shutdown command from %1").arg(client.name);
259 //Immediately reject new connections 256 // Immediately reject new connections
260 m_server->close(); 257 m_server->close();
261 QTimer::singleShot(0, this, &Listener::quit); 258 QTimer::singleShot(0, this, &Listener::quit);
262 break; 259 break;
@@ -273,16 +270,14 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
273 Warning() << "received invalid command"; 270 Warning() << "received invalid command";
274 } 271 }
275 loadResource()->setLowerBoundRevision(lowerBoundRevision()); 272 loadResource()->setLowerBoundRevision(lowerBoundRevision());
276 } 273 } break;
277 break;
278 case Sink::Commands::RemoveFromDiskCommand: { 274 case Sink::Commands::RemoveFromDiskCommand: {
279 Log() << QString("\tReceived a remove from disk command from %1").arg(client.name); 275 Log() << QString("\tReceived a remove from disk command from %1").arg(client.name);
280 m_resource->removeDataFromDisk(); 276 m_resource->removeDataFromDisk();
281 delete m_resource; 277 delete m_resource;
282 m_resource = nullptr; 278 m_resource = nullptr;
283 loadResource()->setLowerBoundRevision(0); 279 loadResource()->setLowerBoundRevision(0);
284 } 280 } break;
285 break;
286 default: 281 default:
287 if (commandId > Sink::Commands::CustomCommand) { 282 if (commandId > Sink::Commands::CustomCommand) {
288 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; 283 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId;
@@ -313,7 +308,7 @@ qint64 Listener::lowerBoundRevision()
313 308
314void Listener::quit() 309void Listener::quit()
315{ 310{
316 //Broadcast shutdown notifications to open clients, so they don't try to restart the resource 311 // Broadcast shutdown notifications to open clients, so they don't try to restart the resource
317 auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Commands::NotificationType::NotificationType_Shutdown); 312 auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Commands::NotificationType::NotificationType_Shutdown);
318 Sink::Commands::FinishNotificationBuffer(m_fbb, command); 313 Sink::Commands::FinishNotificationBuffer(m_fbb, command);
319 for (Client &client : m_connections) { 314 for (Client &client : m_connections) {
@@ -323,7 +318,7 @@ void Listener::quit()
323 } 318 }
324 m_fbb.Clear(); 319 m_fbb.Clear();
325 320
326 //Connections will be cleaned up later 321 // Connections will be cleaned up later
327 emit noClients(); 322 emit noClients();
328} 323}
329 324
@@ -334,12 +329,12 @@ bool Listener::processClientBuffer(Client &client)
334 return false; 329 return false;
335 } 330 }
336 331
337 const uint messageId = *(uint*)client.commandBuffer.constData(); 332 const uint messageId = *(uint *)client.commandBuffer.constData();
338 const int commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); 333 const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint));
339 const uint size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); 334 const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint));
340 Trace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; 335 Trace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size;
341 336
342 //TODO: reject messages above a certain size? 337 // TODO: reject messages above a certain size?
343 338
344 const bool commandComplete = size <= uint(client.commandBuffer.size() - headerSize); 339 const bool commandComplete = size <= uint(client.commandBuffer.size() - headerSize);
345 if (commandComplete) { 340 if (commandComplete) {
@@ -386,7 +381,7 @@ void Listener::updateClientsWithRevision(qint64 revision)
386 auto command = Sink::Commands::CreateRevisionUpdate(m_fbb, revision); 381 auto command = Sink::Commands::CreateRevisionUpdate(m_fbb, revision);
387 Sink::Commands::FinishRevisionUpdateBuffer(m_fbb, command); 382 Sink::Commands::FinishRevisionUpdateBuffer(m_fbb, command);
388 383
389 for (const Client &client: m_connections) { 384 for (const Client &client : m_connections) {
390 if (!client.socket || !client.socket->isValid()) { 385 if (!client.socket || !client.socket->isValid()) {
391 continue; 386 continue;
392 } 387 }
@@ -423,10 +418,8 @@ Sink::Resource *Listener::loadResource()
423 m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); 418 m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier);
424 Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); 419 Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory);
425 Trace() << QString("\tResource: %1").arg((qlonglong)m_resource); 420 Trace() << QString("\tResource: %1").arg((qlonglong)m_resource);
426 connect(m_resource, &Sink::Resource::revisionUpdated, 421 connect(m_resource, &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision);
427 this, &Listener::refreshRevision); 422 connect(m_resource, &Sink::Resource::notify, this, &Listener::notify);
428 connect(m_resource, &Sink::Resource::notify,
429 this, &Listener::notify);
430 } else { 423 } else {
431 ErrorMsg() << "Failed to load resource " << m_resourceName; 424 ErrorMsg() << "Failed to load resource " << m_resourceName;
432 m_resource = new Sink::Resource; 425 m_resource = new Sink::Resource;