From 3c0e79b1e9088e3b55f65fe5cc440ffb08dfde0a Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 28 Apr 2015 10:03:44 +0200 Subject: ResourceAccess: rewrite connection code using jobs Another nice showcase implementation how things get easier, testable and composable using async. --- common/resourceaccess.cpp | 154 ++++++++++++++++++++++++++-------------------- common/resourceaccess.h | 3 +- 2 files changed, 88 insertions(+), 69 deletions(-) (limited to 'common') diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 52cd61a..c6d701d 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -65,11 +65,10 @@ class ResourceAccess::Private { public: Private(const QByteArray &name, ResourceAccess *ra); + Async::Job tryToConnect(); + Async::Job initializeSocket(); QByteArray resourceName; - QLocalSocket *socket; - QTimer *tryOpenTimer; - bool startingProcess; - bool openingConnection; + QSharedPointer socket; QByteArray partialMessageBuffer; flatbuffers::FlatBufferBuilder fbb; QVector> commandQueue; @@ -80,32 +79,84 @@ public: ResourceAccess::Private::Private(const QByteArray &name, ResourceAccess *q) : resourceName(name), - socket(new QLocalSocket(q)), - tryOpenTimer(new QTimer(q)), - startingProcess(false), - openingConnection(false), messageId(0) { } +//Connects to server and returns connected socket on success +static Async::Job > connectToServer(const QByteArray &identifier) +{ + auto s = QSharedPointer::create(); + return Async::start >([identifier, s](Async::Future > &future) { + s->setServerName(identifier); + auto context = new QObject; + QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() { + Q_ASSERT(s); + delete context; + future.setValue(s); + future.setFinished(); + }); + QObject::connect(s.data(), static_cast(&QLocalSocket::error), context, [&future, context](QLocalSocket::LocalSocketError) { + delete context; + future.setError(-1, "Failed to connect to server."); + }); + s->open(); + }); +} + +Async::Job ResourceAccess::Private::tryToConnect() +{ + return Async::dowhile([this]() -> bool { + //TODO abort after N retries? + return !socket; + }, + [this](Async::Future &future) { + Trace() << "Loop"; + Async::wait(50) + .then(connectToServer(resourceName)) + .then >([this, &future](const QSharedPointer &s) { + Q_ASSERT(s); + socket = s; + future.setFinished(); + }, + [&future](int errorCode, const QString &errorString) { + future.setFinished(); + }).exec(); + }); +} + +Async::Job ResourceAccess::Private::initializeSocket() +{ + return Async::start([this](Async::Future &future) { + Trace() << "Trying to connect"; + connectToServer(resourceName).then >([this, &future](const QSharedPointer &s) { + Trace() << "Connected to resource, without having to start it."; + Q_ASSERT(s); + socket = s; + future.setFinished(); + }, + [this, &future](int errorCode, const QString &errorString) { + Trace() << "Failed to connect, starting resource"; + //We failed to connect, so let's start the resource + QStringList args; + args << resourceName; + if (QProcess::startDetached("akonadi2_synchronizer", args, QDir::homePath())) { + tryToConnect() + .then([&future]() { + future.setFinished(); + }).exec(); + } else { + Warning() << "Failed to start resource"; + } + }).exec(); + }); +} + ResourceAccess::ResourceAccess(const QByteArray &resourceName, QObject *parent) : QObject(parent), d(new Private(resourceName, this)) { - d->tryOpenTimer->setInterval(50); - d->tryOpenTimer->setSingleShot(true); - connect(d->tryOpenTimer, &QTimer::timeout, - this, &ResourceAccess::open); - log("Starting access"); - connect(d->socket, &QLocalSocket::connected, - this, &ResourceAccess::connected); - connect(d->socket, &QLocalSocket::disconnected, - this, &ResourceAccess::disconnected); - connect(d->socket, SIGNAL(error(QLocalSocket::LocalSocketError)), - this, SLOT(connectionError(QLocalSocket::LocalSocketError))); - connect(d->socket, &QIODevice::readyRead, - this, &ResourceAccess::readResourceMessage); } ResourceAccess::~ResourceAccess() @@ -120,7 +171,7 @@ QByteArray ResourceAccess::resourceName() const bool ResourceAccess::isReady() const { - return d->socket->isValid(); + return (d->socket && d->socket->isValid()); } void ResourceAccess::registerCallback(uint messageId, const std::function &callback) @@ -174,18 +225,20 @@ Async::Job ResourceAccess::synchronizeResource(bool sourceSync, bool local void ResourceAccess::open() { - if (d->socket->isValid()) { + if (d->socket && d->socket->isValid()) { log("Socket valid, so not opening again"); return; } - d->openingConnection = true; - - //TODO: if we try and try and the process does not pick up - // we should probably try to start the process again - d->socket->setServerName(d->resourceName); - log(QString("Opening %1").arg(d->socket->serverName())); - //FIXME: race between starting the exec and opening the socket? - d->socket->open(); + d->initializeSocket().then([this]() { + Trace() << "Socket is initialized"; + QObject::connect(d->socket.data(), &QLocalSocket::disconnected, + this, &ResourceAccess::disconnected); + QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), + this, SLOT(connectionError(QLocalSocket::LocalSocketError))); + QObject::connect(d->socket.data(), &QIODevice::readyRead, + this, &ResourceAccess::readResourceMessage); + connected(); + }).exec(); } void ResourceAccess::close() @@ -209,7 +262,7 @@ void ResourceAccess::sendCommand(const QSharedPointer &command) } //Keep track of the command until we're sure it arrived d->pendingCommands.insert(d->messageId, command); - Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); + Commands::write(d->socket.data(), d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); } void ResourceAccess::processCommandQueue() @@ -224,9 +277,6 @@ void ResourceAccess::processCommandQueue() void ResourceAccess::connected() { - d->startingProcess = false; - d->openingConnection = false; - if (!isReady()) { return; } @@ -237,7 +287,7 @@ void ResourceAccess::connected() auto name = d->fbb.CreateString(QString::number(QCoreApplication::applicationPid()).toLatin1()); auto command = Akonadi2::CreateHandshake(d->fbb, name); Akonadi2::FinishHandshakeBuffer(d->fbb, command); - Commands::write(d->socket, ++d->messageId, Commands::HandshakeCommand, d->fbb); + Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, d->fbb); d->fbb.Clear(); } @@ -255,21 +305,6 @@ void ResourceAccess::disconnected() void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) { - //We tried to connect to the server, but the socket is not yet available. - //We're trying to connect but failed, start the resource and retry. - //Don't automatically restart on later disconnects. - if (d->openingConnection && error == QLocalSocket::LocalSocketError::ServerNotFoundError) { - startResourceAndConnect(); - return; - } - //Retry to connect to the server while starting the process - if (d->startingProcess) { - if (!d->tryOpenTimer->isActive()) { - d->tryOpenTimer->start(); - } - return; - } - if (error == QLocalSocket::PeerClosedError) { Log() << "The resource closed the connection."; } else { @@ -283,24 +318,9 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) d->resultHandler.clear(); } -void ResourceAccess::startResourceAndConnect() -{ - d->startingProcess = true; - Log() << "Attempting to start resource " + d->resourceName; - QStringList args; - args << d->resourceName; - if (QProcess::startDetached("akonadi2_synchronizer", args, QDir::homePath())) { - if (!d->tryOpenTimer->isActive()) { - d->tryOpenTimer->start(); - } - } else { - qWarning() << "Failed to start resource"; - } -} - void ResourceAccess::readResourceMessage() { - if (!d->socket->isValid()) { + if (!d->socket || !d->socket->isValid()) { return; } diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 4c9d9d2..dc7640d 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -58,7 +58,6 @@ Q_SIGNALS: private Q_SLOTS: //TODO: move these to the Private class - void connected(); void disconnected(); void connectionError(QLocalSocket::LocalSocketError error); void readResourceMessage(); @@ -66,9 +65,9 @@ private Q_SLOTS: void callCallbacks(int id); private: + void connected(); void log(const QString &message); void registerCallback(uint messageId, const std::function &callback); - void startResourceAndConnect(); void sendCommand(const QSharedPointer &command); void processCommandQueue(); -- cgit v1.2.3