summaryrefslogtreecommitdiffstats
path: root/common/listener.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-07 22:23:49 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-07 22:23:49 +0200
commitda2b049e248c1ad7efeb53685158a205335e4e36 (patch)
tree1e7e5e940e9b760b2108081b1d2f3879cebdb0ff /common/listener.cpp
parent9bcb822963fc96c94dbe7dcc4134dcd2dac454ff (diff)
downloadsink-da2b049e248c1ad7efeb53685158a205335e4e36.tar.gz
sink-da2b049e248c1ad7efeb53685158a205335e4e36.zip
A new debug system.
Instead of a single #define as debug area the new system allows for an identifier for each debug message with the structure component.area. The component is a dot separated identifier of the runtime component, such as the process or the plugin. The area is the code component, and can be as such defined at compiletime. The idea of this system is that it becomes possible to i.e. look at the output of all messages in the query subsystem of a specific resource (something that happens in the client process, but in the resource-specific subcomponent). The new macros are supposed to be less likely to clash with other names, hence the new names.
Diffstat (limited to 'common/listener.cpp')
-rw-r--r--common/listener.cpp65
1 files changed, 31 insertions, 34 deletions
diff --git a/common/listener.cpp b/common/listener.cpp
index af8eaa2..2c5c1df 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -39,9 +39,6 @@
39#include <QTime> 39#include <QTime>
40#include <QDataStream> 40#include <QDataStream>
41 41
42#undef DEBUG_AREA
43#define DEBUG_AREA "resource.communication"
44
45Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) 42Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent)
46 : QObject(parent), 43 : QObject(parent),
47 m_server(new QLocalServer(this)), 44 m_server(new QLocalServer(this)),
@@ -51,18 +48,18 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra
51 m_messageId(0) 48 m_messageId(0)
52{ 49{
53 connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection); 50 connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection);
54 Trace() << "Trying to open " << m_resourceInstanceIdentifier; 51 SinkTrace() << "Trying to open " << m_resourceInstanceIdentifier;
55 52
56 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { 53 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
57 m_server->removeServer(m_resourceInstanceIdentifier); 54 m_server->removeServer(m_resourceInstanceIdentifier);
58 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { 55 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
59 Warning() << "Utter failure to start server"; 56 SinkWarning() << "Utter failure to start server";
60 exit(-1); 57 exit(-1);
61 } 58 }
62 } 59 }
63 60
64 if (m_server->isListening()) { 61 if (m_server->isListening()) {
65 Log() << QString("Listening on %1").arg(m_server->serverName()); 62 SinkLog() << QString("Listening on %1").arg(m_server->serverName());
66 } 63 }
67 64
68 m_checkConnectionsTimer = std::unique_ptr<QTimer>(new QTimer); 65 m_checkConnectionsTimer = std::unique_ptr<QTimer>(new QTimer);
@@ -70,7 +67,7 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra
70 m_checkConnectionsTimer->setInterval(1000); 67 m_checkConnectionsTimer->setInterval(1000);
71 connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() { 68 connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() {
72 if (m_connections.isEmpty()) { 69 if (m_connections.isEmpty()) {
73 Log() << QString("No connections, shutting down."); 70 SinkLog() << QString("No connections, shutting down.");
74 quit(); 71 quit();
75 } 72 }
76 }); 73 });
@@ -91,7 +88,7 @@ void Listener::emergencyAbortAllConnections()
91{ 88{
92 for (Client &client : m_connections) { 89 for (Client &client : m_connections) {
93 if (client.socket) { 90 if (client.socket) {
94 Warning() << "Sending panic"; 91 SinkWarning() << "Sending panic";
95 client.socket->write("PANIC"); 92 client.socket->write("PANIC");
96 client.socket->waitForBytesWritten(); 93 client.socket->waitForBytesWritten();
97 disconnect(client.socket, 0, this, 0); 94 disconnect(client.socket, 0, this, 0);
@@ -120,11 +117,11 @@ void Listener::closeAllConnections()
120 117
121void Listener::acceptConnection() 118void Listener::acceptConnection()
122{ 119{
123 Trace() << "Accepting connection"; 120 SinkTrace() << "Accepting connection";
124 QLocalSocket *socket = m_server->nextPendingConnection(); 121 QLocalSocket *socket = m_server->nextPendingConnection();
125 122
126 if (!socket) { 123 if (!socket) {
127 Warning() << "Accepted connection but didn't get a socket for it"; 124 SinkWarning() << "Accepted connection but didn't get a socket for it";
128 return; 125 return;
129 } 126 }
130 127
@@ -156,13 +153,13 @@ void Listener::clientDropped()
156 const Client &client = it.next(); 153 const Client &client = it.next();
157 if (client.socket == socket) { 154 if (client.socket == socket) {
158 dropped = true; 155 dropped = true;
159 Log() << QString("Dropped connection: %1").arg(client.name) << socket; 156 SinkLog() << QString("Dropped connection: %1").arg(client.name) << socket;
160 it.remove(); 157 it.remove();
161 break; 158 break;
162 } 159 }
163 } 160 }
164 if (!dropped) { 161 if (!dropped) {
165 Warning() << "Failed to find connection for disconnected socket: " << socket; 162 SinkWarning() << "Failed to find connection for disconnected socket: " << socket;
166 } 163 }
167 164
168 checkConnections(); 165 checkConnections();
@@ -188,7 +185,7 @@ void Listener::onDataAvailable()
188 185
189void Listener::readFromSocket(QLocalSocket *socket) 186void Listener::readFromSocket(QLocalSocket *socket)
190{ 187{
191 Trace() << "Reading from socket..."; 188 SinkTrace() << "Reading from socket...";
192 for (Client &client : m_connections) { 189 for (Client &client : m_connections) {
193 if (client.socket == socket) { 190 if (client.socket == socket) {
194 client.commandBuffer += socket->readAll(); 191 client.commandBuffer += socket->readAll();
@@ -231,7 +228,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
231 auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData()); 228 auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData());
232 client.name = buffer->name()->c_str(); 229 client.name = buffer->name()->c_str();
233 } else { 230 } else {
234 Warning() << "received invalid command"; 231 SinkWarning() << "received invalid command";
235 } 232 }
236 break; 233 break;
237 } 234 }
@@ -239,7 +236,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
239 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 236 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
240 if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { 237 if (Sink::Commands::VerifySynchronizeBuffer(verifier)) {
241 auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData()); 238 auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData());
242 Trace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name); 239 SinkTrace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name);
243 auto timer = QSharedPointer<QTime>::create(); 240 auto timer = QSharedPointer<QTime>::create();
244 timer->start(); 241 timer->start();
245 auto job = KAsync::null<void>(); 242 auto job = KAsync::null<void>();
@@ -250,16 +247,16 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
250 job = job.then<void>(loadResource().processAllMessages()); 247 job = job.then<void>(loadResource().processAllMessages());
251 } 248 }
252 job.then<void>([callback, timer]() { 249 job.then<void>([callback, timer]() {
253 Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); 250 SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
254 callback(true); 251 callback(true);
255 }, [callback](int errorCode, const QString &msg) { 252 }, [callback](int errorCode, const QString &msg) {
256 Warning() << "Sync failed: " << msg; 253 SinkWarning() << "Sync failed: " << msg;
257 callback(false); 254 callback(false);
258 }) 255 })
259 .exec(); 256 .exec();
260 return; 257 return;
261 } else { 258 } else {
262 Warning() << "received invalid command"; 259 SinkWarning() << "received invalid command";
263 } 260 }
264 break; 261 break;
265 } 262 }
@@ -268,31 +265,31 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
268 case Sink::Commands::DeleteEntityCommand: 265 case Sink::Commands::DeleteEntityCommand:
269 case Sink::Commands::ModifyEntityCommand: 266 case Sink::Commands::ModifyEntityCommand:
270 case Sink::Commands::CreateEntityCommand: 267 case Sink::Commands::CreateEntityCommand:
271 Trace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; 268 SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name;
272 loadResource().processCommand(commandId, commandBuffer); 269 loadResource().processCommand(commandId, commandBuffer);
273 break; 270 break;
274 case Sink::Commands::ShutdownCommand: 271 case Sink::Commands::ShutdownCommand:
275 Log() << QString("Received shutdown command from %1").arg(client.name); 272 SinkLog() << QString("Received shutdown command from %1").arg(client.name);
276 // Immediately reject new connections 273 // Immediately reject new connections
277 m_server->close(); 274 m_server->close();
278 QTimer::singleShot(0, this, &Listener::quit); 275 QTimer::singleShot(0, this, &Listener::quit);
279 break; 276 break;
280 case Sink::Commands::PingCommand: 277 case Sink::Commands::PingCommand:
281 Trace() << QString("Received ping command from %1").arg(client.name); 278 SinkTrace() << QString("Received ping command from %1").arg(client.name);
282 break; 279 break;
283 case Sink::Commands::RevisionReplayedCommand: { 280 case Sink::Commands::RevisionReplayedCommand: {
284 Trace() << QString("Received revision replayed command from %1").arg(client.name); 281 SinkTrace() << QString("Received revision replayed command from %1").arg(client.name);
285 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 282 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
286 if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { 283 if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) {
287 auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); 284 auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData());
288 client.currentRevision = buffer->revision(); 285 client.currentRevision = buffer->revision();
289 } else { 286 } else {
290 Warning() << "received invalid command"; 287 SinkWarning() << "received invalid command";
291 } 288 }
292 loadResource().setLowerBoundRevision(lowerBoundRevision()); 289 loadResource().setLowerBoundRevision(lowerBoundRevision());
293 } break; 290 } break;
294 case Sink::Commands::RemoveFromDiskCommand: { 291 case Sink::Commands::RemoveFromDiskCommand: {
295 Log() << QString("Received a remove from disk command from %1").arg(client.name); 292 SinkLog() << QString("Received a remove from disk command from %1").arg(client.name);
296 m_resource.reset(nullptr); 293 m_resource.reset(nullptr);
297 loadResource().removeDataFromDisk(); 294 loadResource().removeDataFromDisk();
298 m_server->close(); 295 m_server->close();
@@ -300,11 +297,11 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
300 } break; 297 } break;
301 default: 298 default:
302 if (commandId > Sink::Commands::CustomCommand) { 299 if (commandId > Sink::Commands::CustomCommand) {
303 Log() << QString("Received custom command from %1: ").arg(client.name) << commandId; 300 SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId;
304 loadResource().processCommand(commandId, commandBuffer); 301 loadResource().processCommand(commandId, commandBuffer);
305 } else { 302 } else {
306 success = false; 303 success = false;
307 ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; 304 SinkError() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId;
308 } 305 }
309 break; 306 break;
310 } 307 }
@@ -352,7 +349,7 @@ bool Listener::processClientBuffer(Client &client)
352 const uint messageId = *(uint *)client.commandBuffer.constData(); 349 const uint messageId = *(uint *)client.commandBuffer.constData();
353 const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint)); 350 const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint));
354 const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); 351 const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint));
355 Trace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; 352 SinkTrace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size;
356 353
357 // TODO: reject messages above a certain size? 354 // TODO: reject messages above a certain size?
358 355
@@ -365,11 +362,11 @@ bool Listener::processClientBuffer(Client &client)
365 const QByteArray commandBuffer = client.commandBuffer.left(size); 362 const QByteArray commandBuffer = client.commandBuffer.left(size);
366 client.commandBuffer.remove(0, size); 363 client.commandBuffer.remove(0, size);
367 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) { 364 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) {
368 Trace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName); 365 SinkTrace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName);
369 if (socket) { 366 if (socket) {
370 sendCommandCompleted(socket.data(), messageId, success); 367 sendCommandCompleted(socket.data(), messageId, success);
371 } else { 368 } else {
372 Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); 369 SinkLog() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName);
373 } 370 }
374 }); 371 });
375 372
@@ -406,7 +403,7 @@ void Listener::updateClientsWithRevision(qint64 revision)
406 continue; 403 continue;
407 } 404 }
408 405
409 Trace() << "Sending revision update for " << client.name << revision; 406 SinkTrace() << "Sending revision update for " << client.name << revision;
410 Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb); 407 Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb);
411 } 408 }
412 m_fbb.Clear(); 409 m_fbb.Clear();
@@ -437,15 +434,15 @@ Sink::Resource &Listener::loadResource()
437 if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { 434 if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) {
438 m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(m_resourceInstanceIdentifier)); 435 m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(m_resourceInstanceIdentifier));
439 if (!m_resource) { 436 if (!m_resource) {
440 ErrorMsg() << "Failed to instantiate the resource " << m_resourceName; 437 SinkError() << "Failed to instantiate the resource " << m_resourceName;
441 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource); 438 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource);
442 } 439 }
443 Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); 440 SinkTrace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory);
444 Trace() << QString("\tResource: %1").arg((qlonglong)m_resource.get()); 441 SinkTrace() << QString("\tResource: %1").arg((qlonglong)m_resource.get());
445 connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); 442 connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision);
446 connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify); 443 connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify);
447 } else { 444 } else {
448 ErrorMsg() << "Failed to load resource " << m_resourceName; 445 SinkError() << "Failed to load resource " << m_resourceName;
449 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource); 446 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource);
450 } 447 }
451 } 448 }