diff options
Diffstat (limited to 'common/listener.cpp')
-rw-r--r-- | common/listener.cpp | 65 |
1 files changed, 31 insertions, 34 deletions
diff --git a/common/listener.cpp b/common/listener.cpp index af8eaa2..2c5c1df 100644 --- a/common/listener.cpp +++ b/common/listener.cpp | |||
@@ -39,9 +39,6 @@ | |||
39 | #include <QTime> | 39 | #include <QTime> |
40 | #include <QDataStream> | 40 | #include <QDataStream> |
41 | 41 | ||
42 | #undef DEBUG_AREA | ||
43 | #define DEBUG_AREA "resource.communication" | ||
44 | |||
45 | Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) | 42 | Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) |
46 | : QObject(parent), | 43 | : QObject(parent), |
47 | m_server(new QLocalServer(this)), | 44 | m_server(new QLocalServer(this)), |
@@ -51,18 +48,18 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra | |||
51 | m_messageId(0) | 48 | m_messageId(0) |
52 | { | 49 | { |
53 | connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection); | 50 | connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection); |
54 | Trace() << "Trying to open " << m_resourceInstanceIdentifier; | 51 | SinkTrace() << "Trying to open " << m_resourceInstanceIdentifier; |
55 | 52 | ||
56 | if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { | 53 | if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { |
57 | m_server->removeServer(m_resourceInstanceIdentifier); | 54 | m_server->removeServer(m_resourceInstanceIdentifier); |
58 | if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { | 55 | if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { |
59 | Warning() << "Utter failure to start server"; | 56 | SinkWarning() << "Utter failure to start server"; |
60 | exit(-1); | 57 | exit(-1); |
61 | } | 58 | } |
62 | } | 59 | } |
63 | 60 | ||
64 | if (m_server->isListening()) { | 61 | if (m_server->isListening()) { |
65 | Log() << QString("Listening on %1").arg(m_server->serverName()); | 62 | SinkLog() << QString("Listening on %1").arg(m_server->serverName()); |
66 | } | 63 | } |
67 | 64 | ||
68 | m_checkConnectionsTimer = std::unique_ptr<QTimer>(new QTimer); | 65 | m_checkConnectionsTimer = std::unique_ptr<QTimer>(new QTimer); |
@@ -70,7 +67,7 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra | |||
70 | m_checkConnectionsTimer->setInterval(1000); | 67 | m_checkConnectionsTimer->setInterval(1000); |
71 | connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() { | 68 | connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() { |
72 | if (m_connections.isEmpty()) { | 69 | if (m_connections.isEmpty()) { |
73 | Log() << QString("No connections, shutting down."); | 70 | SinkLog() << QString("No connections, shutting down."); |
74 | quit(); | 71 | quit(); |
75 | } | 72 | } |
76 | }); | 73 | }); |
@@ -91,7 +88,7 @@ void Listener::emergencyAbortAllConnections() | |||
91 | { | 88 | { |
92 | for (Client &client : m_connections) { | 89 | for (Client &client : m_connections) { |
93 | if (client.socket) { | 90 | if (client.socket) { |
94 | Warning() << "Sending panic"; | 91 | SinkWarning() << "Sending panic"; |
95 | client.socket->write("PANIC"); | 92 | client.socket->write("PANIC"); |
96 | client.socket->waitForBytesWritten(); | 93 | client.socket->waitForBytesWritten(); |
97 | disconnect(client.socket, 0, this, 0); | 94 | disconnect(client.socket, 0, this, 0); |
@@ -120,11 +117,11 @@ void Listener::closeAllConnections() | |||
120 | 117 | ||
121 | void Listener::acceptConnection() | 118 | void Listener::acceptConnection() |
122 | { | 119 | { |
123 | Trace() << "Accepting connection"; | 120 | SinkTrace() << "Accepting connection"; |
124 | QLocalSocket *socket = m_server->nextPendingConnection(); | 121 | QLocalSocket *socket = m_server->nextPendingConnection(); |
125 | 122 | ||
126 | if (!socket) { | 123 | if (!socket) { |
127 | Warning() << "Accepted connection but didn't get a socket for it"; | 124 | SinkWarning() << "Accepted connection but didn't get a socket for it"; |
128 | return; | 125 | return; |
129 | } | 126 | } |
130 | 127 | ||
@@ -156,13 +153,13 @@ void Listener::clientDropped() | |||
156 | const Client &client = it.next(); | 153 | const Client &client = it.next(); |
157 | if (client.socket == socket) { | 154 | if (client.socket == socket) { |
158 | dropped = true; | 155 | dropped = true; |
159 | Log() << QString("Dropped connection: %1").arg(client.name) << socket; | 156 | SinkLog() << QString("Dropped connection: %1").arg(client.name) << socket; |
160 | it.remove(); | 157 | it.remove(); |
161 | break; | 158 | break; |
162 | } | 159 | } |
163 | } | 160 | } |
164 | if (!dropped) { | 161 | if (!dropped) { |
165 | Warning() << "Failed to find connection for disconnected socket: " << socket; | 162 | SinkWarning() << "Failed to find connection for disconnected socket: " << socket; |
166 | } | 163 | } |
167 | 164 | ||
168 | checkConnections(); | 165 | checkConnections(); |
@@ -188,7 +185,7 @@ void Listener::onDataAvailable() | |||
188 | 185 | ||
189 | void Listener::readFromSocket(QLocalSocket *socket) | 186 | void Listener::readFromSocket(QLocalSocket *socket) |
190 | { | 187 | { |
191 | Trace() << "Reading from socket..."; | 188 | SinkTrace() << "Reading from socket..."; |
192 | for (Client &client : m_connections) { | 189 | for (Client &client : m_connections) { |
193 | if (client.socket == socket) { | 190 | if (client.socket == socket) { |
194 | client.commandBuffer += socket->readAll(); | 191 | client.commandBuffer += socket->readAll(); |
@@ -231,7 +228,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
231 | auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData()); | 228 | auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData()); |
232 | client.name = buffer->name()->c_str(); | 229 | client.name = buffer->name()->c_str(); |
233 | } else { | 230 | } else { |
234 | Warning() << "received invalid command"; | 231 | SinkWarning() << "received invalid command"; |
235 | } | 232 | } |
236 | break; | 233 | break; |
237 | } | 234 | } |
@@ -239,7 +236,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
239 | flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); | 236 | flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); |
240 | if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { | 237 | if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { |
241 | auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData()); | 238 | auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData()); |
242 | Trace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name); | 239 | SinkTrace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name); |
243 | auto timer = QSharedPointer<QTime>::create(); | 240 | auto timer = QSharedPointer<QTime>::create(); |
244 | timer->start(); | 241 | timer->start(); |
245 | auto job = KAsync::null<void>(); | 242 | auto job = KAsync::null<void>(); |
@@ -250,16 +247,16 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
250 | job = job.then<void>(loadResource().processAllMessages()); | 247 | job = job.then<void>(loadResource().processAllMessages()); |
251 | } | 248 | } |
252 | job.then<void>([callback, timer]() { | 249 | job.then<void>([callback, timer]() { |
253 | Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); | 250 | SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); |
254 | callback(true); | 251 | callback(true); |
255 | }, [callback](int errorCode, const QString &msg) { | 252 | }, [callback](int errorCode, const QString &msg) { |
256 | Warning() << "Sync failed: " << msg; | 253 | SinkWarning() << "Sync failed: " << msg; |
257 | callback(false); | 254 | callback(false); |
258 | }) | 255 | }) |
259 | .exec(); | 256 | .exec(); |
260 | return; | 257 | return; |
261 | } else { | 258 | } else { |
262 | Warning() << "received invalid command"; | 259 | SinkWarning() << "received invalid command"; |
263 | } | 260 | } |
264 | break; | 261 | break; |
265 | } | 262 | } |
@@ -268,31 +265,31 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
268 | case Sink::Commands::DeleteEntityCommand: | 265 | case Sink::Commands::DeleteEntityCommand: |
269 | case Sink::Commands::ModifyEntityCommand: | 266 | case Sink::Commands::ModifyEntityCommand: |
270 | case Sink::Commands::CreateEntityCommand: | 267 | case Sink::Commands::CreateEntityCommand: |
271 | Trace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; | 268 | SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; |
272 | loadResource().processCommand(commandId, commandBuffer); | 269 | loadResource().processCommand(commandId, commandBuffer); |
273 | break; | 270 | break; |
274 | case Sink::Commands::ShutdownCommand: | 271 | case Sink::Commands::ShutdownCommand: |
275 | Log() << QString("Received shutdown command from %1").arg(client.name); | 272 | SinkLog() << QString("Received shutdown command from %1").arg(client.name); |
276 | // Immediately reject new connections | 273 | // Immediately reject new connections |
277 | m_server->close(); | 274 | m_server->close(); |
278 | QTimer::singleShot(0, this, &Listener::quit); | 275 | QTimer::singleShot(0, this, &Listener::quit); |
279 | break; | 276 | break; |
280 | case Sink::Commands::PingCommand: | 277 | case Sink::Commands::PingCommand: |
281 | Trace() << QString("Received ping command from %1").arg(client.name); | 278 | SinkTrace() << QString("Received ping command from %1").arg(client.name); |
282 | break; | 279 | break; |
283 | case Sink::Commands::RevisionReplayedCommand: { | 280 | case Sink::Commands::RevisionReplayedCommand: { |
284 | Trace() << QString("Received revision replayed command from %1").arg(client.name); | 281 | SinkTrace() << QString("Received revision replayed command from %1").arg(client.name); |
285 | flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); | 282 | flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); |
286 | if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { | 283 | if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { |
287 | auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); | 284 | auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); |
288 | client.currentRevision = buffer->revision(); | 285 | client.currentRevision = buffer->revision(); |
289 | } else { | 286 | } else { |
290 | Warning() << "received invalid command"; | 287 | SinkWarning() << "received invalid command"; |
291 | } | 288 | } |
292 | loadResource().setLowerBoundRevision(lowerBoundRevision()); | 289 | loadResource().setLowerBoundRevision(lowerBoundRevision()); |
293 | } break; | 290 | } break; |
294 | case Sink::Commands::RemoveFromDiskCommand: { | 291 | case Sink::Commands::RemoveFromDiskCommand: { |
295 | Log() << QString("Received a remove from disk command from %1").arg(client.name); | 292 | SinkLog() << QString("Received a remove from disk command from %1").arg(client.name); |
296 | m_resource.reset(nullptr); | 293 | m_resource.reset(nullptr); |
297 | loadResource().removeDataFromDisk(); | 294 | loadResource().removeDataFromDisk(); |
298 | m_server->close(); | 295 | m_server->close(); |
@@ -300,11 +297,11 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
300 | } break; | 297 | } break; |
301 | default: | 298 | default: |
302 | if (commandId > Sink::Commands::CustomCommand) { | 299 | if (commandId > Sink::Commands::CustomCommand) { |
303 | Log() << QString("Received custom command from %1: ").arg(client.name) << commandId; | 300 | SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId; |
304 | loadResource().processCommand(commandId, commandBuffer); | 301 | loadResource().processCommand(commandId, commandBuffer); |
305 | } else { | 302 | } else { |
306 | success = false; | 303 | success = false; |
307 | ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; | 304 | SinkError() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; |
308 | } | 305 | } |
309 | break; | 306 | break; |
310 | } | 307 | } |
@@ -352,7 +349,7 @@ bool Listener::processClientBuffer(Client &client) | |||
352 | const uint messageId = *(uint *)client.commandBuffer.constData(); | 349 | const uint messageId = *(uint *)client.commandBuffer.constData(); |
353 | const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint)); | 350 | const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint)); |
354 | const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); | 351 | const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); |
355 | Trace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; | 352 | SinkTrace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; |
356 | 353 | ||
357 | // TODO: reject messages above a certain size? | 354 | // TODO: reject messages above a certain size? |
358 | 355 | ||
@@ -365,11 +362,11 @@ bool Listener::processClientBuffer(Client &client) | |||
365 | const QByteArray commandBuffer = client.commandBuffer.left(size); | 362 | const QByteArray commandBuffer = client.commandBuffer.left(size); |
366 | client.commandBuffer.remove(0, size); | 363 | client.commandBuffer.remove(0, size); |
367 | processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) { | 364 | processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) { |
368 | Trace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName); | 365 | SinkTrace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName); |
369 | if (socket) { | 366 | if (socket) { |
370 | sendCommandCompleted(socket.data(), messageId, success); | 367 | sendCommandCompleted(socket.data(), messageId, success); |
371 | } else { | 368 | } else { |
372 | Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); | 369 | SinkLog() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); |
373 | } | 370 | } |
374 | }); | 371 | }); |
375 | 372 | ||
@@ -406,7 +403,7 @@ void Listener::updateClientsWithRevision(qint64 revision) | |||
406 | continue; | 403 | continue; |
407 | } | 404 | } |
408 | 405 | ||
409 | Trace() << "Sending revision update for " << client.name << revision; | 406 | SinkTrace() << "Sending revision update for " << client.name << revision; |
410 | Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb); | 407 | Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb); |
411 | } | 408 | } |
412 | m_fbb.Clear(); | 409 | m_fbb.Clear(); |
@@ -437,15 +434,15 @@ Sink::Resource &Listener::loadResource() | |||
437 | if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { | 434 | if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { |
438 | m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(m_resourceInstanceIdentifier)); | 435 | m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(m_resourceInstanceIdentifier)); |
439 | if (!m_resource) { | 436 | if (!m_resource) { |
440 | ErrorMsg() << "Failed to instantiate the resource " << m_resourceName; | 437 | SinkError() << "Failed to instantiate the resource " << m_resourceName; |
441 | m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource); | 438 | m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource); |
442 | } | 439 | } |
443 | Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); | 440 | SinkTrace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); |
444 | Trace() << QString("\tResource: %1").arg((qlonglong)m_resource.get()); | 441 | SinkTrace() << QString("\tResource: %1").arg((qlonglong)m_resource.get()); |
445 | connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); | 442 | connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); |
446 | connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify); | 443 | connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify); |
447 | } else { | 444 | } else { |
448 | ErrorMsg() << "Failed to load resource " << m_resourceName; | 445 | SinkError() << "Failed to load resource " << m_resourceName; |
449 | m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource); | 446 | m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource); |
450 | } | 447 | } |
451 | } | 448 | } |