From abdbd0e8d9a761b7906da2999d4fd58f2771c202 Mon Sep 17 00:00:00 2001 From: Aaron Seigo Date: Tue, 16 Dec 2014 12:13:06 +0100 Subject: use a dptr, API for sending commands, queue commands until connected --- common/resourceaccess.cpp | 191 ++++++++++++++++++++++++++++++++++------------ common/resourceaccess.h | 13 ++-- 2 files changed, 149 insertions(+), 55 deletions(-) (limited to 'common') diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 2b58545..a4f3c94 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -26,33 +26,88 @@ #include "common/revisionupdate_generated.h" #include +#include #include namespace Akonadi2 { +class QueuedCommand +{ +public: + QueuedCommand(int commandId) + : m_commandId(commandId), + m_bufferSize(0), + m_buffer(0) + {} + + QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) + : m_commandId(commandId), + m_bufferSize(fbb.GetSize()), + m_buffer(new char[m_bufferSize]) + { + memcpy(m_buffer, fbb.GetBufferPointer(), m_bufferSize); + } + + ~QueuedCommand() + { + delete[] m_buffer; + } + + void write(QIODevice *device) + { + Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); + Commands::write(device, m_commandId, m_buffer, m_bufferSize); + } + +private: + QueuedCommand(const QueuedCommand &other); + QueuedCommand &operator=(const QueuedCommand &rhs); + + const int m_commandId; + const uint m_bufferSize; + char *m_buffer; +}; + +class ResourceAccess::Private +{ +public: + Private(const QString &name, ResourceAccess *ra); + QString resourceName; + QLocalSocket *socket; + QTimer *tryOpenTimer; + bool startingProcess; + QByteArray partialMessageBuffer; + flatbuffers::FlatBufferBuilder fbb; + QVector commandQueue; +}; + +ResourceAccess::Private::Private(const QString &name, ResourceAccess *q) + : resourceName(name), + socket(new QLocalSocket(q)), + tryOpenTimer(new QTimer(q)), + startingProcess(false) +{ +} + ResourceAccess::ResourceAccess(const QString &resourceName, QObject *parent) : QObject(parent), - m_resourceName(resourceName), - m_socket(new QLocalSocket(this)), - m_tryOpenTimer(new QTimer(this)), - m_startingProcess(false) + d(new Private(resourceName, this)) { - m_tryOpenTimer->setInterval(50); - m_tryOpenTimer->setSingleShot(true); - connect(m_tryOpenTimer, &QTimer::timeout, - this, &ResourceAccess::open); + d->tryOpenTimer->setInterval(50); + d->tryOpenTimer->setSingleShot(true); + connect(d->tryOpenTimer, &QTimer::timeout, + this, &ResourceAccess::open); log("Starting access"); - connect(m_socket, &QLocalSocket::connected, + connect(d->socket, &QLocalSocket::connected, this, &ResourceAccess::connected); - connect(m_socket, &QLocalSocket::disconnected, + connect(d->socket, &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); - connect(m_socket, SIGNAL(error(QLocalSocket::LocalSocketError)), + connect(d->socket, SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); - connect(m_socket, &QIODevice::readyRead, + connect(d->socket, &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); - } ResourceAccess::~ResourceAccess() @@ -62,53 +117,89 @@ ResourceAccess::~ResourceAccess() QString ResourceAccess::resourceName() const { - return m_resourceName; + return d->resourceName; } bool ResourceAccess::isReady() const { - return m_socket->isValid(); + return d->socket->isValid(); +} + +void ResourceAccess::sendCommand(int commandId) +{ + if (isReady()) { + log(QString("Sending command %1").arg(commandId)); + Commands::write(d->socket, commandId); + } else { + d->commandQueue << new QueuedCommand(commandId); + } +} + +void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) +{ + if (isReady()) { + log(QString("Sending command %1").arg(commandId)); + Commands::write(d->socket, commandId, fbb); + } else { + d->commandQueue << new QueuedCommand(commandId, fbb); + } } void ResourceAccess::open() { - if (m_socket->isValid()) { - log("Socket valid, so aborting the open"); + if (d->socket->isValid()) { + log("Socket valid, so not opening again"); return; } - m_socket->setServerName(m_resourceName); - log(QString("Opening %1").arg(m_socket->serverName())); + //TODO: if we try and try and the process does not pick up + // we should probably try to start the process again + d->socket->setServerName(d->resourceName); + log(QString("Opening %1").arg(d->socket->serverName())); //FIXME: race between starting the exec and opening the socket? - m_socket->open(); + d->socket->open(); } void ResourceAccess::close() { - log(QString("Closing %1").arg(m_socket->fullServerName())); - m_socket->close(); + log(QString("Closing %1").arg(d->socket->fullServerName())); + d->socket->close(); } void ResourceAccess::connected() { - m_startingProcess = false; - log(QString("Connected: ").arg(m_socket->fullServerName())); + d->startingProcess = false; + + if (!isReady()) { + return; + } + + log(QString("Connected: %1").arg(d->socket->fullServerName())); { - auto name = m_fbb.CreateString(QString::number((long long)this).toLatin1()); - auto command = Akonadi2::CreateHandshake(m_fbb, name); - Akonadi2::FinishHandshakeBuffer(m_fbb, command); - Commands::write(m_socket, Commands::HandshakeCommand, m_fbb); - m_fbb.Clear(); + auto name = d->fbb.CreateString(QString::number((long long)this).toLatin1()); + auto command = Akonadi2::CreateHandshake(d->fbb, name); + Akonadi2::FinishHandshakeBuffer(d->fbb, command); + Commands::write(d->socket, Commands::HandshakeCommand, d->fbb); + d->fbb.Clear(); + } + + //TODO: should confirm the commands made it with a response? + //TODO: serialize instead of blast them all through the socket? + log(QString("We have %1 queued commands").arg(d->commandQueue.size())); + for (QueuedCommand *command: d->commandQueue) { + command->write(d->socket); + delete command; } + d->commandQueue.clear(); emit ready(true); } void ResourceAccess::disconnected() { - m_socket->close(); - log(QString("Disconnected from %1").arg(m_socket->fullServerName())); + d->socket->close(); + log(QString("Disconnected from %1").arg(d->socket->fullServerName())); emit ready(false); open(); } @@ -116,29 +207,31 @@ void ResourceAccess::disconnected() void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) { log(QString("Connection error: %2").arg(error)); - if (m_startingProcess) { - if (!m_tryOpenTimer->isActive()) { - m_tryOpenTimer->start(); + if (d->startingProcess) { + if (!d->tryOpenTimer->isActive()) { + d->tryOpenTimer->start(); } return; } - m_startingProcess = true; - log(QString("Attempting to start resource ") + m_resourceName); + d->startingProcess = true; + log(QString("Attempting to start resource ") + d->resourceName); QStringList args; - args << m_resourceName; - if (QProcess::startDetached("akonadi2_synchronizer", args)) { - m_socket->open(); + args << d->resourceName; + if (QProcess::startDetached("akonadi2_synchronizer", args, QDir::homePath())) { + if (!d->tryOpenTimer->isActive()) { + d->tryOpenTimer->start(); + } } } void ResourceAccess::readResourceMessage() { - if (!m_socket->isValid()) { + if (!d->socket->isValid()) { return; } - m_partialMessageBuffer += m_socket->readAll(); + d->partialMessageBuffer += d->socket->readAll(); // should be scheduled rather than processed all at once while (processMessageBuffer()) {} @@ -147,20 +240,20 @@ void ResourceAccess::readResourceMessage() bool ResourceAccess::processMessageBuffer() { static const int headerSize = (sizeof(int) * 2); - if (m_partialMessageBuffer.size() < headerSize) { + if (d->partialMessageBuffer.size() < headerSize) { return false; } - const int commandId = *(int*)m_partialMessageBuffer.constData(); - const int size = *(int*)(m_partialMessageBuffer.constData() + sizeof(int)); + const int commandId = *(int*)d->partialMessageBuffer.constData(); + const int size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int)); - if (size > m_partialMessageBuffer.size() - headerSize) { + if (size > d->partialMessageBuffer.size() - headerSize) { return false; } switch (commandId) { case Commands::RevisionUpdateCommand: { - auto buffer = Akonadi2::GetRevisionUpdate(m_partialMessageBuffer.constData() + headerSize); + auto buffer = Akonadi2::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); log(QString("Revision updated to: %1").arg(buffer->revision())); emit revisionChanged(buffer->revision()); break; @@ -169,13 +262,13 @@ bool ResourceAccess::processMessageBuffer() break; } - m_partialMessageBuffer.remove(0, headerSize + size); - return m_partialMessageBuffer.size() >= headerSize; + d->partialMessageBuffer.remove(0, headerSize + size); + return d->partialMessageBuffer.size() >= headerSize; } void ResourceAccess::log(const QString &message) { - Console::main()->log(m_resourceName + ": " + message); + Console::main()->log(d->resourceName + ": " + message); } } diff --git a/common/resourceaccess.h b/common/resourceaccess.h index f381af1..3a35af6 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -40,6 +40,9 @@ public: QString resourceName() const; bool isReady() const; + void sendCommand(int commandId); + void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); + public Q_SLOTS: void open(); void close(); @@ -49,6 +52,7 @@ Q_SIGNALS: void revisionChanged(unsigned long long revision); private Q_SLOTS: + //TODO: move these to the Private class void connected(); void disconnected(); void connectionError(QLocalSocket::LocalSocketError error); @@ -57,12 +61,9 @@ private Q_SLOTS: private: void log(const QString &message); - QString m_resourceName; - QLocalSocket *m_socket; - QTimer *m_tryOpenTimer; - bool m_startingProcess; - QByteArray m_partialMessageBuffer; - flatbuffers::FlatBufferBuilder m_fbb; + + class Private; + Private * const d; }; } -- cgit v1.2.3