From b4df9eb5f1f4a0ac2b1272fc34d4b8aad473008b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 5 Jul 2016 15:22:10 +0200 Subject: Prepare for making the resource status available --- common/listener.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'common/listener.cpp') diff --git a/common/listener.cpp b/common/listener.cpp index 84afe16..d2fc510 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -330,7 +330,7 @@ qint64 Listener::lowerBoundRevision() void Listener::quit() { // Broadcast shutdown notifications to open clients, so they don't try to restart the resource - auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Commands::NotificationType::NotificationType_Shutdown); + auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Notification::Shutdown); Sink::Commands::FinishNotificationBuffer(m_fbb, command); for (Client &client : m_connections) { if (client.socket && client.socket->isOpen()) { @@ -418,7 +418,7 @@ void Listener::notify(const Sink::Notification ¬ification) auto messageString = m_fbb.CreateString(notification.message.toUtf8().constData(), notification.message.toUtf8().size()); auto idString = m_fbb.CreateString(notification.id.constData(), notification.id.size()); Sink::Commands::NotificationBuilder builder(m_fbb); - builder.add_type(static_cast(notification.type)); + builder.add_type(notification.type); builder.add_code(notification.code); builder.add_identifier(idString); builder.add_message(messageString); -- cgit v1.2.3 From 1803924a9474af03bf24bc00303c6373fdd05487 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 6 Jul 2016 17:52:33 +0200 Subject: Fixed a bunch of memory leaks. Found with valgrind --- common/listener.cpp | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'common/listener.cpp') diff --git a/common/listener.cpp b/common/listener.cpp index d2fc510..32c57ac 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -85,6 +85,11 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra Listener::~Listener() { + closeAllConnections(); + delete m_resource; + delete m_checkConnectionsTimer; + delete m_clientBufferProcessesTimer; + delete m_server; } void Listener::emergencyAbortAllConnections() -- cgit v1.2.3 From 5cba3372881994b5afa96449237aab80cc424e6d Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 7 Jul 2016 09:15:20 +0200 Subject: Less memory leaking with unique_ptr --- common/listener.cpp | 48 +++++++++++++++++++++--------------------------- 1 file changed, 21 insertions(+), 27 deletions(-) (limited to 'common/listener.cpp') diff --git a/common/listener.cpp b/common/listener.cpp index 32c57ac..af8eaa2 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -47,11 +47,10 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra m_server(new QLocalServer(this)), m_resourceName(resourceType), m_resourceInstanceIdentifier(resourceInstanceIdentifier), - m_resource(0), m_clientBufferProcessesTimer(new QTimer(this)), m_messageId(0) { - connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); + connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection); Trace() << "Trying to open " << m_resourceInstanceIdentifier; if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { @@ -66,10 +65,10 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra Log() << QString("Listening on %1").arg(m_server->serverName()); } - m_checkConnectionsTimer = new QTimer; + m_checkConnectionsTimer = std::unique_ptr(new QTimer); m_checkConnectionsTimer->setSingleShot(true); m_checkConnectionsTimer->setInterval(1000); - connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { + connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() { if (m_connections.isEmpty()) { Log() << QString("No connections, shutting down."); quit(); @@ -80,16 +79,12 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra // or even just drop down to invoking the method queued? => invoke queued unless we need throttling m_clientBufferProcessesTimer->setInterval(0); m_clientBufferProcessesTimer->setSingleShot(true); - connect(m_clientBufferProcessesTimer, &QTimer::timeout, this, &Listener::processClientBuffers); + connect(m_clientBufferProcessesTimer.get(), &QTimer::timeout, this, &Listener::processClientBuffers); } Listener::~Listener() { closeAllConnections(); - delete m_resource; - delete m_checkConnectionsTimer; - delete m_clientBufferProcessesTimer; - delete m_server; } void Listener::emergencyAbortAllConnections() @@ -140,7 +135,7 @@ void Listener::acceptConnection() // If this is the first client, set the lower limit for revision cleanup if (m_connections.size() == 1) { - loadResource()->setLowerBoundRevision(0); + loadResource().setLowerBoundRevision(0); } if (socket->bytesAvailable()) { @@ -177,7 +172,7 @@ void Listener::checkConnections() { // If this was the last client, disengage the lower limit for revision cleanup if (m_connections.isEmpty()) { - loadResource()->setLowerBoundRevision(std::numeric_limits::max()); + loadResource().setLowerBoundRevision(std::numeric_limits::max()); } m_checkConnectionsTimer->start(); } @@ -249,10 +244,10 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c timer->start(); auto job = KAsync::null(); if (buffer->sourceSync()) { - job = loadResource()->synchronizeWithSource(); + job = loadResource().synchronizeWithSource(); } if (buffer->localSync()) { - job = job.then(loadResource()->processAllMessages()); + job = job.then(loadResource().processAllMessages()); } job.then([callback, timer]() { Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); @@ -274,7 +269,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c case Sink::Commands::ModifyEntityCommand: case Sink::Commands::CreateEntityCommand: Trace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; - loadResource()->processCommand(commandId, commandBuffer); + loadResource().processCommand(commandId, commandBuffer); break; case Sink::Commands::ShutdownCommand: Log() << QString("Received shutdown command from %1").arg(client.name); @@ -294,20 +289,19 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c } else { Warning() << "received invalid command"; } - loadResource()->setLowerBoundRevision(lowerBoundRevision()); + loadResource().setLowerBoundRevision(lowerBoundRevision()); } break; case Sink::Commands::RemoveFromDiskCommand: { Log() << QString("Received a remove from disk command from %1").arg(client.name); - delete m_resource; - m_resource = nullptr; - loadResource()->removeDataFromDisk(); + m_resource.reset(nullptr); + loadResource().removeDataFromDisk(); m_server->close(); QTimer::singleShot(0, this, &Listener::quit); } break; default: if (commandId > Sink::Commands::CustomCommand) { Log() << QString("Received custom command from %1: ").arg(client.name) << commandId; - loadResource()->processCommand(commandId, commandBuffer); + loadResource().processCommand(commandId, commandBuffer); } else { success = false; ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; @@ -437,25 +431,25 @@ void Listener::notify(const Sink::Notification ¬ification) m_fbb.Clear(); } -Sink::Resource *Listener::loadResource() +Sink::Resource &Listener::loadResource() { if (!m_resource) { if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { - m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); + m_resource = std::unique_ptr(resourceFactory->createResource(m_resourceInstanceIdentifier)); if (!m_resource) { ErrorMsg() << "Failed to instantiate the resource " << m_resourceName; - m_resource = new Sink::Resource; + m_resource = std::unique_ptr(new Sink::Resource); } Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); - Trace() << QString("\tResource: %1").arg((qlonglong)m_resource); - connect(m_resource, &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); - connect(m_resource, &Sink::Resource::notify, this, &Listener::notify); + Trace() << QString("\tResource: %1").arg((qlonglong)m_resource.get()); + connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); + connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify); } else { ErrorMsg() << "Failed to load resource " << m_resourceName; - m_resource = new Sink::Resource; + m_resource = std::unique_ptr(new Sink::Resource); } } - return m_resource; + return *m_resource; } #pragma clang diagnostic push -- cgit v1.2.3 From da2b049e248c1ad7efeb53685158a205335e4e36 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 7 Jul 2016 22:23:49 +0200 Subject: A new debug system. Instead of a single #define as debug area the new system allows for an identifier for each debug message with the structure component.area. The component is a dot separated identifier of the runtime component, such as the process or the plugin. The area is the code component, and can be as such defined at compiletime. The idea of this system is that it becomes possible to i.e. look at the output of all messages in the query subsystem of a specific resource (something that happens in the client process, but in the resource-specific subcomponent). The new macros are supposed to be less likely to clash with other names, hence the new names. --- common/listener.cpp | 65 +++++++++++++++++++++++++---------------------------- 1 file changed, 31 insertions(+), 34 deletions(-) (limited to 'common/listener.cpp') diff --git a/common/listener.cpp b/common/listener.cpp index af8eaa2..2c5c1df 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -39,9 +39,6 @@ #include #include -#undef DEBUG_AREA -#define DEBUG_AREA "resource.communication" - Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) : QObject(parent), m_server(new QLocalServer(this)), @@ -51,18 +48,18 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra m_messageId(0) { connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection); - Trace() << "Trying to open " << m_resourceInstanceIdentifier; + SinkTrace() << "Trying to open " << m_resourceInstanceIdentifier; if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { m_server->removeServer(m_resourceInstanceIdentifier); if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { - Warning() << "Utter failure to start server"; + SinkWarning() << "Utter failure to start server"; exit(-1); } } if (m_server->isListening()) { - Log() << QString("Listening on %1").arg(m_server->serverName()); + SinkLog() << QString("Listening on %1").arg(m_server->serverName()); } m_checkConnectionsTimer = std::unique_ptr(new QTimer); @@ -70,7 +67,7 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra m_checkConnectionsTimer->setInterval(1000); connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() { if (m_connections.isEmpty()) { - Log() << QString("No connections, shutting down."); + SinkLog() << QString("No connections, shutting down."); quit(); } }); @@ -91,7 +88,7 @@ void Listener::emergencyAbortAllConnections() { for (Client &client : m_connections) { if (client.socket) { - Warning() << "Sending panic"; + SinkWarning() << "Sending panic"; client.socket->write("PANIC"); client.socket->waitForBytesWritten(); disconnect(client.socket, 0, this, 0); @@ -120,11 +117,11 @@ void Listener::closeAllConnections() void Listener::acceptConnection() { - Trace() << "Accepting connection"; + SinkTrace() << "Accepting connection"; QLocalSocket *socket = m_server->nextPendingConnection(); if (!socket) { - Warning() << "Accepted connection but didn't get a socket for it"; + SinkWarning() << "Accepted connection but didn't get a socket for it"; return; } @@ -156,13 +153,13 @@ void Listener::clientDropped() const Client &client = it.next(); if (client.socket == socket) { dropped = true; - Log() << QString("Dropped connection: %1").arg(client.name) << socket; + SinkLog() << QString("Dropped connection: %1").arg(client.name) << socket; it.remove(); break; } } if (!dropped) { - Warning() << "Failed to find connection for disconnected socket: " << socket; + SinkWarning() << "Failed to find connection for disconnected socket: " << socket; } checkConnections(); @@ -188,7 +185,7 @@ void Listener::onDataAvailable() void Listener::readFromSocket(QLocalSocket *socket) { - Trace() << "Reading from socket..."; + SinkTrace() << "Reading from socket..."; for (Client &client : m_connections) { if (client.socket == socket) { client.commandBuffer += socket->readAll(); @@ -231,7 +228,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData()); client.name = buffer->name()->c_str(); } else { - Warning() << "received invalid command"; + SinkWarning() << "received invalid command"; } break; } @@ -239,7 +236,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData()); - Trace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name); + SinkTrace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name); auto timer = QSharedPointer::create(); timer->start(); auto job = KAsync::null(); @@ -250,16 +247,16 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c job = job.then(loadResource().processAllMessages()); } job.then([callback, timer]() { - Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); + SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); callback(true); }, [callback](int errorCode, const QString &msg) { - Warning() << "Sync failed: " << msg; + SinkWarning() << "Sync failed: " << msg; callback(false); }) .exec(); return; } else { - Warning() << "received invalid command"; + SinkWarning() << "received invalid command"; } break; } @@ -268,31 +265,31 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c case Sink::Commands::DeleteEntityCommand: case Sink::Commands::ModifyEntityCommand: case Sink::Commands::CreateEntityCommand: - Trace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; + SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; loadResource().processCommand(commandId, commandBuffer); break; case Sink::Commands::ShutdownCommand: - Log() << QString("Received shutdown command from %1").arg(client.name); + SinkLog() << QString("Received shutdown command from %1").arg(client.name); // Immediately reject new connections m_server->close(); QTimer::singleShot(0, this, &Listener::quit); break; case Sink::Commands::PingCommand: - Trace() << QString("Received ping command from %1").arg(client.name); + SinkTrace() << QString("Received ping command from %1").arg(client.name); break; case Sink::Commands::RevisionReplayedCommand: { - Trace() << QString("Received revision replayed command from %1").arg(client.name); + SinkTrace() << QString("Received revision replayed command from %1").arg(client.name); flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); client.currentRevision = buffer->revision(); } else { - Warning() << "received invalid command"; + SinkWarning() << "received invalid command"; } loadResource().setLowerBoundRevision(lowerBoundRevision()); } break; case Sink::Commands::RemoveFromDiskCommand: { - Log() << QString("Received a remove from disk command from %1").arg(client.name); + SinkLog() << QString("Received a remove from disk command from %1").arg(client.name); m_resource.reset(nullptr); loadResource().removeDataFromDisk(); m_server->close(); @@ -300,11 +297,11 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c } break; default: if (commandId > Sink::Commands::CustomCommand) { - Log() << QString("Received custom command from %1: ").arg(client.name) << commandId; + SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId; loadResource().processCommand(commandId, commandBuffer); } else { success = false; - ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; + SinkError() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; } break; } @@ -352,7 +349,7 @@ bool Listener::processClientBuffer(Client &client) const uint messageId = *(uint *)client.commandBuffer.constData(); const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint)); const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); - Trace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; + SinkTrace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; // TODO: reject messages above a certain size? @@ -365,11 +362,11 @@ bool Listener::processClientBuffer(Client &client) const QByteArray commandBuffer = client.commandBuffer.left(size); client.commandBuffer.remove(0, size); processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) { - Trace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName); + SinkTrace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName); if (socket) { sendCommandCompleted(socket.data(), messageId, success); } else { - Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); + SinkLog() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); } }); @@ -406,7 +403,7 @@ void Listener::updateClientsWithRevision(qint64 revision) continue; } - Trace() << "Sending revision update for " << client.name << revision; + SinkTrace() << "Sending revision update for " << client.name << revision; Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb); } m_fbb.Clear(); @@ -437,15 +434,15 @@ Sink::Resource &Listener::loadResource() if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { m_resource = std::unique_ptr(resourceFactory->createResource(m_resourceInstanceIdentifier)); if (!m_resource) { - ErrorMsg() << "Failed to instantiate the resource " << m_resourceName; + SinkError() << "Failed to instantiate the resource " << m_resourceName; m_resource = std::unique_ptr(new Sink::Resource); } - Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); - Trace() << QString("\tResource: %1").arg((qlonglong)m_resource.get()); + SinkTrace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); + SinkTrace() << QString("\tResource: %1").arg((qlonglong)m_resource.get()); connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify); } else { - ErrorMsg() << "Failed to load resource " << m_resourceName; + SinkError() << "Failed to load resource " << m_resourceName; m_resource = std::unique_ptr(new Sink::Resource); } } -- cgit v1.2.3