summaryrefslogtreecommitdiffstats
path: root/common/listener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/listener.cpp')
-rw-r--r--common/listener.cpp112
1 files changed, 54 insertions, 58 deletions
diff --git a/common/listener.cpp b/common/listener.cpp
index 84afe16..2c5c1df 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -39,39 +39,35 @@
39#include <QTime> 39#include <QTime>
40#include <QDataStream> 40#include <QDataStream>
41 41
42#undef DEBUG_AREA
43#define DEBUG_AREA "resource.communication"
44
45Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) 42Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent)
46 : QObject(parent), 43 : QObject(parent),
47 m_server(new QLocalServer(this)), 44 m_server(new QLocalServer(this)),
48 m_resourceName(resourceType), 45 m_resourceName(resourceType),
49 m_resourceInstanceIdentifier(resourceInstanceIdentifier), 46 m_resourceInstanceIdentifier(resourceInstanceIdentifier),
50 m_resource(0),
51 m_clientBufferProcessesTimer(new QTimer(this)), 47 m_clientBufferProcessesTimer(new QTimer(this)),
52 m_messageId(0) 48 m_messageId(0)
53{ 49{
54 connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); 50 connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection);
55 Trace() << "Trying to open " << m_resourceInstanceIdentifier; 51 SinkTrace() << "Trying to open " << m_resourceInstanceIdentifier;
56 52
57 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { 53 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
58 m_server->removeServer(m_resourceInstanceIdentifier); 54 m_server->removeServer(m_resourceInstanceIdentifier);
59 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { 55 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
60 Warning() << "Utter failure to start server"; 56 SinkWarning() << "Utter failure to start server";
61 exit(-1); 57 exit(-1);
62 } 58 }
63 } 59 }
64 60
65 if (m_server->isListening()) { 61 if (m_server->isListening()) {
66 Log() << QString("Listening on %1").arg(m_server->serverName()); 62 SinkLog() << QString("Listening on %1").arg(m_server->serverName());
67 } 63 }
68 64
69 m_checkConnectionsTimer = new QTimer; 65 m_checkConnectionsTimer = std::unique_ptr<QTimer>(new QTimer);
70 m_checkConnectionsTimer->setSingleShot(true); 66 m_checkConnectionsTimer->setSingleShot(true);
71 m_checkConnectionsTimer->setInterval(1000); 67 m_checkConnectionsTimer->setInterval(1000);
72 connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { 68 connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() {
73 if (m_connections.isEmpty()) { 69 if (m_connections.isEmpty()) {
74 Log() << QString("No connections, shutting down."); 70 SinkLog() << QString("No connections, shutting down.");
75 quit(); 71 quit();
76 } 72 }
77 }); 73 });
@@ -80,18 +76,19 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra
80 // or even just drop down to invoking the method queued? => invoke queued unless we need throttling 76 // or even just drop down to invoking the method queued? => invoke queued unless we need throttling
81 m_clientBufferProcessesTimer->setInterval(0); 77 m_clientBufferProcessesTimer->setInterval(0);
82 m_clientBufferProcessesTimer->setSingleShot(true); 78 m_clientBufferProcessesTimer->setSingleShot(true);
83 connect(m_clientBufferProcessesTimer, &QTimer::timeout, this, &Listener::processClientBuffers); 79 connect(m_clientBufferProcessesTimer.get(), &QTimer::timeout, this, &Listener::processClientBuffers);
84} 80}
85 81
86Listener::~Listener() 82Listener::~Listener()
87{ 83{
84 closeAllConnections();
88} 85}
89 86
90void Listener::emergencyAbortAllConnections() 87void Listener::emergencyAbortAllConnections()
91{ 88{
92 for (Client &client : m_connections) { 89 for (Client &client : m_connections) {
93 if (client.socket) { 90 if (client.socket) {
94 Warning() << "Sending panic"; 91 SinkWarning() << "Sending panic";
95 client.socket->write("PANIC"); 92 client.socket->write("PANIC");
96 client.socket->waitForBytesWritten(); 93 client.socket->waitForBytesWritten();
97 disconnect(client.socket, 0, this, 0); 94 disconnect(client.socket, 0, this, 0);
@@ -120,11 +117,11 @@ void Listener::closeAllConnections()
120 117
121void Listener::acceptConnection() 118void Listener::acceptConnection()
122{ 119{
123 Trace() << "Accepting connection"; 120 SinkTrace() << "Accepting connection";
124 QLocalSocket *socket = m_server->nextPendingConnection(); 121 QLocalSocket *socket = m_server->nextPendingConnection();
125 122
126 if (!socket) { 123 if (!socket) {
127 Warning() << "Accepted connection but didn't get a socket for it"; 124 SinkWarning() << "Accepted connection but didn't get a socket for it";
128 return; 125 return;
129 } 126 }
130 127
@@ -135,7 +132,7 @@ void Listener::acceptConnection()
135 132
136 // If this is the first client, set the lower limit for revision cleanup 133 // If this is the first client, set the lower limit for revision cleanup
137 if (m_connections.size() == 1) { 134 if (m_connections.size() == 1) {
138 loadResource()->setLowerBoundRevision(0); 135 loadResource().setLowerBoundRevision(0);
139 } 136 }
140 137
141 if (socket->bytesAvailable()) { 138 if (socket->bytesAvailable()) {
@@ -156,13 +153,13 @@ void Listener::clientDropped()
156 const Client &client = it.next(); 153 const Client &client = it.next();
157 if (client.socket == socket) { 154 if (client.socket == socket) {
158 dropped = true; 155 dropped = true;
159 Log() << QString("Dropped connection: %1").arg(client.name) << socket; 156 SinkLog() << QString("Dropped connection: %1").arg(client.name) << socket;
160 it.remove(); 157 it.remove();
161 break; 158 break;
162 } 159 }
163 } 160 }
164 if (!dropped) { 161 if (!dropped) {
165 Warning() << "Failed to find connection for disconnected socket: " << socket; 162 SinkWarning() << "Failed to find connection for disconnected socket: " << socket;
166 } 163 }
167 164
168 checkConnections(); 165 checkConnections();
@@ -172,7 +169,7 @@ void Listener::checkConnections()
172{ 169{
173 // If this was the last client, disengage the lower limit for revision cleanup 170 // If this was the last client, disengage the lower limit for revision cleanup
174 if (m_connections.isEmpty()) { 171 if (m_connections.isEmpty()) {
175 loadResource()->setLowerBoundRevision(std::numeric_limits<qint64>::max()); 172 loadResource().setLowerBoundRevision(std::numeric_limits<qint64>::max());
176 } 173 }
177 m_checkConnectionsTimer->start(); 174 m_checkConnectionsTimer->start();
178} 175}
@@ -188,7 +185,7 @@ void Listener::onDataAvailable()
188 185
189void Listener::readFromSocket(QLocalSocket *socket) 186void Listener::readFromSocket(QLocalSocket *socket)
190{ 187{
191 Trace() << "Reading from socket..."; 188 SinkTrace() << "Reading from socket...";
192 for (Client &client : m_connections) { 189 for (Client &client : m_connections) {
193 if (client.socket == socket) { 190 if (client.socket == socket) {
194 client.commandBuffer += socket->readAll(); 191 client.commandBuffer += socket->readAll();
@@ -231,7 +228,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
231 auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData()); 228 auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData());
232 client.name = buffer->name()->c_str(); 229 client.name = buffer->name()->c_str();
233 } else { 230 } else {
234 Warning() << "received invalid command"; 231 SinkWarning() << "received invalid command";
235 } 232 }
236 break; 233 break;
237 } 234 }
@@ -239,27 +236,27 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
239 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 236 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
240 if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { 237 if (Sink::Commands::VerifySynchronizeBuffer(verifier)) {
241 auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData()); 238 auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData());
242 Trace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name); 239 SinkTrace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name);
243 auto timer = QSharedPointer<QTime>::create(); 240 auto timer = QSharedPointer<QTime>::create();
244 timer->start(); 241 timer->start();
245 auto job = KAsync::null<void>(); 242 auto job = KAsync::null<void>();
246 if (buffer->sourceSync()) { 243 if (buffer->sourceSync()) {
247 job = loadResource()->synchronizeWithSource(); 244 job = loadResource().synchronizeWithSource();
248 } 245 }
249 if (buffer->localSync()) { 246 if (buffer->localSync()) {
250 job = job.then<void>(loadResource()->processAllMessages()); 247 job = job.then<void>(loadResource().processAllMessages());
251 } 248 }
252 job.then<void>([callback, timer]() { 249 job.then<void>([callback, timer]() {
253 Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); 250 SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
254 callback(true); 251 callback(true);
255 }, [callback](int errorCode, const QString &msg) { 252 }, [callback](int errorCode, const QString &msg) {
256 Warning() << "Sync failed: " << msg; 253 SinkWarning() << "Sync failed: " << msg;
257 callback(false); 254 callback(false);
258 }) 255 })
259 .exec(); 256 .exec();
260 return; 257 return;
261 } else { 258 } else {
262 Warning() << "received invalid command"; 259 SinkWarning() << "received invalid command";
263 } 260 }
264 break; 261 break;
265 } 262 }
@@ -268,44 +265,43 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
268 case Sink::Commands::DeleteEntityCommand: 265 case Sink::Commands::DeleteEntityCommand:
269 case Sink::Commands::ModifyEntityCommand: 266 case Sink::Commands::ModifyEntityCommand:
270 case Sink::Commands::CreateEntityCommand: 267 case Sink::Commands::CreateEntityCommand:
271 Trace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; 268 SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name;
272 loadResource()->processCommand(commandId, commandBuffer); 269 loadResource().processCommand(commandId, commandBuffer);
273 break; 270 break;
274 case Sink::Commands::ShutdownCommand: 271 case Sink::Commands::ShutdownCommand:
275 Log() << QString("Received shutdown command from %1").arg(client.name); 272 SinkLog() << QString("Received shutdown command from %1").arg(client.name);
276 // Immediately reject new connections 273 // Immediately reject new connections
277 m_server->close(); 274 m_server->close();
278 QTimer::singleShot(0, this, &Listener::quit); 275 QTimer::singleShot(0, this, &Listener::quit);
279 break; 276 break;
280 case Sink::Commands::PingCommand: 277 case Sink::Commands::PingCommand:
281 Trace() << QString("Received ping command from %1").arg(client.name); 278 SinkTrace() << QString("Received ping command from %1").arg(client.name);
282 break; 279 break;
283 case Sink::Commands::RevisionReplayedCommand: { 280 case Sink::Commands::RevisionReplayedCommand: {
284 Trace() << QString("Received revision replayed command from %1").arg(client.name); 281 SinkTrace() << QString("Received revision replayed command from %1").arg(client.name);
285 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 282 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
286 if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { 283 if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) {
287 auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); 284 auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData());
288 client.currentRevision = buffer->revision(); 285 client.currentRevision = buffer->revision();
289 } else { 286 } else {
290 Warning() << "received invalid command"; 287 SinkWarning() << "received invalid command";
291 } 288 }
292 loadResource()->setLowerBoundRevision(lowerBoundRevision()); 289 loadResource().setLowerBoundRevision(lowerBoundRevision());
293 } break; 290 } break;
294 case Sink::Commands::RemoveFromDiskCommand: { 291 case Sink::Commands::RemoveFromDiskCommand: {
295 Log() << QString("Received a remove from disk command from %1").arg(client.name); 292 SinkLog() << QString("Received a remove from disk command from %1").arg(client.name);
296 delete m_resource; 293 m_resource.reset(nullptr);
297 m_resource = nullptr; 294 loadResource().removeDataFromDisk();
298 loadResource()->removeDataFromDisk();
299 m_server->close(); 295 m_server->close();
300 QTimer::singleShot(0, this, &Listener::quit); 296 QTimer::singleShot(0, this, &Listener::quit);
301 } break; 297 } break;
302 default: 298 default:
303 if (commandId > Sink::Commands::CustomCommand) { 299 if (commandId > Sink::Commands::CustomCommand) {
304 Log() << QString("Received custom command from %1: ").arg(client.name) << commandId; 300 SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId;
305 loadResource()->processCommand(commandId, commandBuffer); 301 loadResource().processCommand(commandId, commandBuffer);
306 } else { 302 } else {
307 success = false; 303 success = false;
308 ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; 304 SinkError() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId;
309 } 305 }
310 break; 306 break;
311 } 307 }
@@ -330,7 +326,7 @@ qint64 Listener::lowerBoundRevision()
330void Listener::quit() 326void Listener::quit()
331{ 327{
332 // Broadcast shutdown notifications to open clients, so they don't try to restart the resource 328 // Broadcast shutdown notifications to open clients, so they don't try to restart the resource
333 auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Commands::NotificationType::NotificationType_Shutdown); 329 auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Notification::Shutdown);
334 Sink::Commands::FinishNotificationBuffer(m_fbb, command); 330 Sink::Commands::FinishNotificationBuffer(m_fbb, command);
335 for (Client &client : m_connections) { 331 for (Client &client : m_connections) {
336 if (client.socket && client.socket->isOpen()) { 332 if (client.socket && client.socket->isOpen()) {
@@ -353,7 +349,7 @@ bool Listener::processClientBuffer(Client &client)
353 const uint messageId = *(uint *)client.commandBuffer.constData(); 349 const uint messageId = *(uint *)client.commandBuffer.constData();
354 const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint)); 350 const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint));
355 const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); 351 const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint));
356 Trace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; 352 SinkTrace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size;
357 353
358 // TODO: reject messages above a certain size? 354 // TODO: reject messages above a certain size?
359 355
@@ -366,11 +362,11 @@ bool Listener::processClientBuffer(Client &client)
366 const QByteArray commandBuffer = client.commandBuffer.left(size); 362 const QByteArray commandBuffer = client.commandBuffer.left(size);
367 client.commandBuffer.remove(0, size); 363 client.commandBuffer.remove(0, size);
368 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) { 364 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) {
369 Trace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName); 365 SinkTrace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName);
370 if (socket) { 366 if (socket) {
371 sendCommandCompleted(socket.data(), messageId, success); 367 sendCommandCompleted(socket.data(), messageId, success);
372 } else { 368 } else {
373 Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); 369 SinkLog() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName);
374 } 370 }
375 }); 371 });
376 372
@@ -407,7 +403,7 @@ void Listener::updateClientsWithRevision(qint64 revision)
407 continue; 403 continue;
408 } 404 }
409 405
410 Trace() << "Sending revision update for " << client.name << revision; 406 SinkTrace() << "Sending revision update for " << client.name << revision;
411 Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb); 407 Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb);
412 } 408 }
413 m_fbb.Clear(); 409 m_fbb.Clear();
@@ -418,7 +414,7 @@ void Listener::notify(const Sink::Notification &notification)
418 auto messageString = m_fbb.CreateString(notification.message.toUtf8().constData(), notification.message.toUtf8().size()); 414 auto messageString = m_fbb.CreateString(notification.message.toUtf8().constData(), notification.message.toUtf8().size());
419 auto idString = m_fbb.CreateString(notification.id.constData(), notification.id.size()); 415 auto idString = m_fbb.CreateString(notification.id.constData(), notification.id.size());
420 Sink::Commands::NotificationBuilder builder(m_fbb); 416 Sink::Commands::NotificationBuilder builder(m_fbb);
421 builder.add_type(static_cast<Sink::Commands::NotificationType>(notification.type)); 417 builder.add_type(notification.type);
422 builder.add_code(notification.code); 418 builder.add_code(notification.code);
423 builder.add_identifier(idString); 419 builder.add_identifier(idString);
424 builder.add_message(messageString); 420 builder.add_message(messageString);
@@ -432,25 +428,25 @@ void Listener::notify(const Sink::Notification &notification)
432 m_fbb.Clear(); 428 m_fbb.Clear();
433} 429}
434 430
435Sink::Resource *Listener::loadResource() 431Sink::Resource &Listener::loadResource()
436{ 432{
437 if (!m_resource) { 433 if (!m_resource) {
438 if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { 434 if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) {
439 m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); 435 m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(m_resourceInstanceIdentifier));
440 if (!m_resource) { 436 if (!m_resource) {
441 ErrorMsg() << "Failed to instantiate the resource " << m_resourceName; 437 SinkError() << "Failed to instantiate the resource " << m_resourceName;
442 m_resource = new Sink::Resource; 438 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource);
443 } 439 }
444 Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); 440 SinkTrace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory);
445 Trace() << QString("\tResource: %1").arg((qlonglong)m_resource); 441 SinkTrace() << QString("\tResource: %1").arg((qlonglong)m_resource.get());
446 connect(m_resource, &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); 442 connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision);
447 connect(m_resource, &Sink::Resource::notify, this, &Listener::notify); 443 connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify);
448 } else { 444 } else {
449 ErrorMsg() << "Failed to load resource " << m_resourceName; 445 SinkError() << "Failed to load resource " << m_resourceName;
450 m_resource = new Sink::Resource; 446 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource);
451 } 447 }
452 } 448 }
453 return m_resource; 449 return *m_resource;
454} 450}
455 451
456#pragma clang diagnostic push 452#pragma clang diagnostic push