summaryrefslogtreecommitdiffstats
path: root/common/listener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/listener.cpp')
-rw-r--r--common/listener.cpp65
1 files changed, 31 insertions, 34 deletions
diff --git a/common/listener.cpp b/common/listener.cpp
index af8eaa2..2c5c1df 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -39,9 +39,6 @@
39#include <QTime> 39#include <QTime>
40#include <QDataStream> 40#include <QDataStream>
41 41
42#undef DEBUG_AREA
43#define DEBUG_AREA "resource.communication"
44
45Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) 42Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent)
46 : QObject(parent), 43 : QObject(parent),
47 m_server(new QLocalServer(this)), 44 m_server(new QLocalServer(this)),
@@ -51,18 +48,18 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra
51 m_messageId(0) 48 m_messageId(0)
52{ 49{
53 connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection); 50 connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection);
54 Trace() << "Trying to open " << m_resourceInstanceIdentifier; 51 SinkTrace() << "Trying to open " << m_resourceInstanceIdentifier;
55 52
56 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { 53 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
57 m_server->removeServer(m_resourceInstanceIdentifier); 54 m_server->removeServer(m_resourceInstanceIdentifier);
58 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { 55 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
59 Warning() << "Utter failure to start server"; 56 SinkWarning() << "Utter failure to start server";
60 exit(-1); 57 exit(-1);
61 } 58 }
62 } 59 }
63 60
64 if (m_server->isListening()) { 61 if (m_server->isListening()) {
65 Log() << QString("Listening on %1").arg(m_server->serverName()); 62 SinkLog() << QString("Listening on %1").arg(m_server->serverName());
66 } 63 }
67 64
68 m_checkConnectionsTimer = std::unique_ptr<QTimer>(new QTimer); 65 m_checkConnectionsTimer = std::unique_ptr<QTimer>(new QTimer);
@@ -70,7 +67,7 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra
70 m_checkConnectionsTimer->setInterval(1000); 67 m_checkConnectionsTimer->setInterval(1000);
71 connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() { 68 connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() {
72 if (m_connections.isEmpty()) { 69 if (m_connections.isEmpty()) {
73 Log() << QString("No connections, shutting down."); 70 SinkLog() << QString("No connections, shutting down.");
74 quit(); 71 quit();
75 } 72 }
76 }); 73 });
@@ -91,7 +88,7 @@ void Listener::emergencyAbortAllConnections()
91{ 88{
92 for (Client &client : m_connections) { 89 for (Client &client : m_connections) {
93 if (client.socket) { 90 if (client.socket) {
94 Warning() << "Sending panic"; 91 SinkWarning() << "Sending panic";
95 client.socket->write("PANIC"); 92 client.socket->write("PANIC");
96 client.socket->waitForBytesWritten(); 93 client.socket->waitForBytesWritten();
97 disconnect(client.socket, 0, this, 0); 94 disconnect(client.socket, 0, this, 0);
@@ -120,11 +117,11 @@ void Listener::closeAllConnections()
120 117
121void Listener::acceptConnection() 118void Listener::acceptConnection()
122{ 119{
123 Trace() << "Accepting connection"; 120 SinkTrace() << "Accepting connection";
124 QLocalSocket *socket = m_server->nextPendingConnection(); 121 QLocalSocket *socket = m_server->nextPendingConnection();
125 122
126 if (!socket) { 123 if (!socket) {
127 Warning() << "Accepted connection but didn't get a socket for it"; 124 SinkWarning() << "Accepted connection but didn't get a socket for it";
128 return; 125 return;
129 } 126 }
130 127
@@ -156,13 +153,13 @@ void Listener::clientDropped()
156 const Client &client = it.next(); 153 const Client &client = it.next();
157 if (client.socket == socket) { 154 if (client.socket == socket) {
158 dropped = true; 155 dropped = true;
159 Log() << QString("Dropped connection: %1").arg(client.name) << socket; 156 SinkLog() << QString("Dropped connection: %1").arg(client.name) << socket;
160 it.remove(); 157 it.remove();
161 break; 158 break;
162 } 159 }
163 } 160 }
164 if (!dropped) { 161 if (!dropped) {
165 Warning() << "Failed to find connection for disconnected socket: " << socket; 162 SinkWarning() << "Failed to find connection for disconnected socket: " << socket;
166 } 163 }
167 164
168 checkConnections(); 165 checkConnections();
@@ -188,7 +185,7 @@ void Listener::onDataAvailable()
188 185
189void Listener::readFromSocket(QLocalSocket *socket) 186void Listener::readFromSocket(QLocalSocket *socket)
190{ 187{
191 Trace() << "Reading from socket..."; 188 SinkTrace() << "Reading from socket...";
192 for (Client &client : m_connections) { 189 for (Client &client : m_connections) {
193 if (client.socket == socket) { 190 if (client.socket == socket) {
194 client.commandBuffer += socket->readAll(); 191 client.commandBuffer += socket->readAll();
@@ -231,7 +228,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
231 auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData()); 228 auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData());
232 client.name = buffer->name()->c_str(); 229 client.name = buffer->name()->c_str();
233 } else { 230 } else {
234 Warning() << "received invalid command"; 231 SinkWarning() << "received invalid command";
235 } 232 }
236 break; 233 break;
237 } 234 }
@@ -239,7 +236,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
239 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 236 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
240 if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { 237 if (Sink::Commands::VerifySynchronizeBuffer(verifier)) {
241 auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData()); 238 auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData());
242 Trace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name); 239 SinkTrace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name);
243 auto timer = QSharedPointer<QTime>::create(); 240 auto timer = QSharedPointer<QTime>::create();
244 timer->start(); 241 timer->start();
245 auto job = KAsync::null<void>(); 242 auto job = KAsync::null<void>();
@@ -250,16 +247,16 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
250 job = job.then<void>(loadResource().processAllMessages()); 247 job = job.then<void>(loadResource().processAllMessages());
251 } 248 }
252 job.then<void>([callback, timer]() { 249 job.then<void>([callback, timer]() {
253 Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); 250 SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
254 callback(true); 251 callback(true);
255 }, [callback](int errorCode, const QString &msg) { 252 }, [callback](int errorCode, const QString &msg) {
256 Warning() << "Sync failed: " << msg; 253 SinkWarning() << "Sync failed: " << msg;
257 callback(false); 254 callback(false);
258 }) 255 })
259 .exec(); 256 .exec();
260 return; 257 return;
261 } else { 258 } else {
262 Warning() << "received invalid command"; 259 SinkWarning() << "received invalid command";
263 } 260 }
264 break; 261 break;
265 } 262 }
@@ -268,31 +265,31 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
268 case Sink::Commands::DeleteEntityCommand: 265 case Sink::Commands::DeleteEntityCommand:
269 case Sink::Commands::ModifyEntityCommand: 266 case Sink::Commands::ModifyEntityCommand:
270 case Sink::Commands::CreateEntityCommand: 267 case Sink::Commands::CreateEntityCommand:
271 Trace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; 268 SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name;
272 loadResource().processCommand(commandId, commandBuffer); 269 loadResource().processCommand(commandId, commandBuffer);
273 break; 270 break;
274 case Sink::Commands::ShutdownCommand: 271 case Sink::Commands::ShutdownCommand:
275 Log() << QString("Received shutdown command from %1").arg(client.name); 272 SinkLog() << QString("Received shutdown command from %1").arg(client.name);
276 // Immediately reject new connections 273 // Immediately reject new connections
277 m_server->close(); 274 m_server->close();
278 QTimer::singleShot(0, this, &Listener::quit); 275 QTimer::singleShot(0, this, &Listener::quit);
279 break; 276 break;
280 case Sink::Commands::PingCommand: 277 case Sink::Commands::PingCommand:
281 Trace() << QString("Received ping command from %1").arg(client.name); 278 SinkTrace() << QString("Received ping command from %1").arg(client.name);
282 break; 279 break;
283 case Sink::Commands::RevisionReplayedCommand: { 280 case Sink::Commands::RevisionReplayedCommand: {
284 Trace() << QString("Received revision replayed command from %1").arg(client.name); 281 SinkTrace() << QString("Received revision replayed command from %1").arg(client.name);
285 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 282 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
286 if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { 283 if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) {
287 auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); 284 auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData());
288 client.currentRevision = buffer->revision(); 285 client.currentRevision = buffer->revision();
289 } else { 286 } else {
290 Warning() << "received invalid command"; 287 SinkWarning() << "received invalid command";
291 } 288 }
292 loadResource().setLowerBoundRevision(lowerBoundRevision()); 289 loadResource().setLowerBoundRevision(lowerBoundRevision());
293 } break; 290 } break;
294 case Sink::Commands::RemoveFromDiskCommand: { 291 case Sink::Commands::RemoveFromDiskCommand: {
295 Log() << QString("Received a remove from disk command from %1").arg(client.name); 292 SinkLog() << QString("Received a remove from disk command from %1").arg(client.name);
296 m_resource.reset(nullptr); 293 m_resource.reset(nullptr);
297 loadResource().removeDataFromDisk(); 294 loadResource().removeDataFromDisk();
298 m_server->close(); 295 m_server->close();
@@ -300,11 +297,11 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
300 } break; 297 } break;
301 default: 298 default:
302 if (commandId > Sink::Commands::CustomCommand) { 299 if (commandId > Sink::Commands::CustomCommand) {
303 Log() << QString("Received custom command from %1: ").arg(client.name) << commandId; 300 SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId;
304 loadResource().processCommand(commandId, commandBuffer); 301 loadResource().processCommand(commandId, commandBuffer);
305 } else { 302 } else {
306 success = false; 303 success = false;
307 ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; 304 SinkError() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId;
308 } 305 }
309 break; 306 break;
310 } 307 }
@@ -352,7 +349,7 @@ bool Listener::processClientBuffer(Client &client)
352 const uint messageId = *(uint *)client.commandBuffer.constData(); 349 const uint messageId = *(uint *)client.commandBuffer.constData();
353 const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint)); 350 const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint));
354 const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); 351 const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint));
355 Trace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; 352 SinkTrace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size;
356 353
357 // TODO: reject messages above a certain size? 354 // TODO: reject messages above a certain size?
358 355
@@ -365,11 +362,11 @@ bool Listener::processClientBuffer(Client &client)
365 const QByteArray commandBuffer = client.commandBuffer.left(size); 362 const QByteArray commandBuffer = client.commandBuffer.left(size);
366 client.commandBuffer.remove(0, size); 363 client.commandBuffer.remove(0, size);
367 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) { 364 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) {
368 Trace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName); 365 SinkTrace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName);
369 if (socket) { 366 if (socket) {
370 sendCommandCompleted(socket.data(), messageId, success); 367 sendCommandCompleted(socket.data(), messageId, success);
371 } else { 368 } else {
372 Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); 369 SinkLog() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName);
373 } 370 }
374 }); 371 });
375 372
@@ -406,7 +403,7 @@ void Listener::updateClientsWithRevision(qint64 revision)
406 continue; 403 continue;
407 } 404 }
408 405
409 Trace() << "Sending revision update for " << client.name << revision; 406 SinkTrace() << "Sending revision update for " << client.name << revision;
410 Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb); 407 Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb);
411 } 408 }
412 m_fbb.Clear(); 409 m_fbb.Clear();
@@ -437,15 +434,15 @@ Sink::Resource &Listener::loadResource()
437 if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { 434 if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) {
438 m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(m_resourceInstanceIdentifier)); 435 m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(m_resourceInstanceIdentifier));
439 if (!m_resource) { 436 if (!m_resource) {
440 ErrorMsg() << "Failed to instantiate the resource " << m_resourceName; 437 SinkError() << "Failed to instantiate the resource " << m_resourceName;
441 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource); 438 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource);
442 } 439 }
443 Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); 440 SinkTrace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory);
444 Trace() << QString("\tResource: %1").arg((qlonglong)m_resource.get()); 441 SinkTrace() << QString("\tResource: %1").arg((qlonglong)m_resource.get());
445 connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); 442 connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision);
446 connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify); 443 connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify);
447 } else { 444 } else {
448 ErrorMsg() << "Failed to load resource " << m_resourceName; 445 SinkError() << "Failed to load resource " << m_resourceName;
449 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource); 446 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource);
450 } 447 }
451 } 448 }