From 0f75ad4b96ec5994c022109278cad28a43255793 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 3 Dec 2015 11:17:08 +0100 Subject: Improved resource access caching * Smarter caching. ResourceAccess instances close after a timeout, if not reused. * Introduced a start command to avoid race condition when sending commands to a resource that is currently shutting down. * We resend pending commands after we lost access to the resource * unexpectedly. --- common/clientapi.cpp | 14 +++++++++++++- common/clientapi.h | 9 +++++++++ common/facade.cpp | 32 +++++++++++++++++++++++++++++++- common/resourceaccess.cpp | 27 ++++++++++++++++++++++----- common/resourceaccess.h | 1 + 5 files changed, 76 insertions(+), 7 deletions(-) (limited to 'common') diff --git a/common/clientapi.cpp b/common/clientapi.cpp index b732205..3dc9370 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp @@ -149,23 +149,35 @@ KAsync::Job Store::remove(const DomainType &domainObject) KAsync::Job Store::shutdown(const QByteArray &identifier) { - Trace() << "shutdown"; + Trace() << "shutdown " << identifier; return ResourceAccess::connectToServer(identifier).then>([identifier](QSharedPointer socket, KAsync::Future &future) { //We can't currently reuse the socket socket->close(); auto resourceAccess = QSharedPointer::create(identifier); resourceAccess->open(); resourceAccess->sendCommand(Akonadi2::Commands::ShutdownCommand).then([&future, resourceAccess]() { + Trace() << "Shutdown complete"; future.setFinished(); }).exec(); }, [](int, const QString &) { + Trace() << "Resource is already closed."; //Resource isn't started, nothing to shutdown }) //FIXME JOBAPI this is only required because we don't care about the return value of connectToServer .template then([](){}); } +KAsync::Job Store::start(const QByteArray &identifier) +{ + Trace() << "start " << identifier; + auto resourceAccess = QSharedPointer::create(identifier); + resourceAccess->open(); + return resourceAccess->sendCommand(Akonadi2::Commands::PingCommand).then([resourceAccess]() { + Trace() << "Start complete"; + }); +} + KAsync::Job Store::synchronize(const Akonadi2::Query &query) { Trace() << "synchronize"; diff --git a/common/clientapi.h b/common/clientapi.h index 8f87562..edf42e4 100644 --- a/common/clientapi.h +++ b/common/clientapi.h @@ -79,6 +79,15 @@ public: */ static KAsync::Job shutdown(const QByteArray &resourceIdentifier); + /** + * Start resource. + * + * The resource is ready for operation once this command completes. + * This command is only necessary if a resource was shutdown previously, + * otherwise the resource process will automatically start as necessary. + */ + static KAsync::Job start(const QByteArray &resourceIdentifier); + /** * Synchronize data to local cache. */ diff --git a/common/facade.cpp b/common/facade.cpp index ab41f96..22ef84a 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -42,12 +42,42 @@ public: Akonadi2::ResourceAccess::Ptr getAccess(const QByteArray &instanceIdentifier) { if (!mCache.contains(instanceIdentifier)) { - mCache.insert(instanceIdentifier, Akonadi2::ResourceAccess::Ptr::create(instanceIdentifier)); + //Reuse the pointer if something else kept the resourceaccess alive + if (mWeakCache.contains(instanceIdentifier)) { + auto sharedPointer = mWeakCache.value(instanceIdentifier).toStrongRef(); + if (sharedPointer) { + mCache.insert(instanceIdentifier, sharedPointer); + } + } + if (!mCache.contains(instanceIdentifier)) { + //Create a new instance if necessary + auto sharedPointer = Akonadi2::ResourceAccess::Ptr::create(instanceIdentifier); + QObject::connect(sharedPointer.data(), &Akonadi2::ResourceAccess::ready, sharedPointer.data(), [this, instanceIdentifier](bool ready) { + if (!ready) { + mCache.remove(instanceIdentifier); + } + }); + mCache.insert(instanceIdentifier, sharedPointer); + mWeakCache.insert(instanceIdentifier, sharedPointer); + } } + if (!mTimer.contains(instanceIdentifier)) { + auto timer = new QTimer; + //Drop connection after 3 seconds (which is a random value) + QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { + mCache.remove(instanceIdentifier); + }); + timer->setInterval(3000); + mTimer.insert(instanceIdentifier, timer); + } + auto timer = mTimer.value(instanceIdentifier); + timer->start(); return mCache.value(instanceIdentifier); } + QHash > mWeakCache; QHash mCache; + QHash mTimer; }; template diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 8988032..7be1259 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -368,6 +368,8 @@ void ResourceAccess::open() void ResourceAccess::close() { log(QString("Closing %1").arg(d->socket->fullServerName())); + Trace() << "Pending commands: " << d->pendingCommands.size(); + Trace() << "Queued commands: " << d->commandQueue.size(); d->socket->close(); } @@ -393,12 +395,24 @@ void ResourceAccess::processCommandQueue() { //TODO: serialize instead of blast them all through the socket? Trace() << "We have " << d->commandQueue.size() << " queued commands"; + Trace() << "Pending commands: " << d->pendingCommands.size(); for (auto command: d->commandQueue) { sendCommand(command); } d->commandQueue.clear(); } +void ResourceAccess::processPendingCommandQueue() +{ + Trace() << "We have " << d->pendingCommands.size() << " pending commands"; + for (auto command: d->pendingCommands) { + Trace() << "Reenquing command " << command->commandId; + d->commandQueue << command; + } + d->pendingCommands.clear(); + processCommandQueue(); +} + void ResourceAccess::connected() { if (!isReady()) { @@ -415,6 +429,9 @@ void ResourceAccess::connected() Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); } + //Reenqueue pending commands, we failed to send them + processPendingCommandQueue(); + //Send queued commands processCommandQueue(); emit ready(true); @@ -424,8 +441,6 @@ void ResourceAccess::disconnected() { log(QString("Disconnected from %1").arg(d->socket->fullServerName())); d->socket->close(); - //TODO fail all existing jobs? or retry - d->abortPendingOperations(); emit ready(false); } @@ -433,12 +448,14 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) { if (error == QLocalSocket::PeerClosedError) { Log(d->resourceInstanceIdentifier) << "The resource closed the connection."; + d->abortPendingOperations(); } else { Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); + if (d->pendingCommands.size()) { + Trace() << "Reconnecting due to pending operations: " << d->pendingCommands.size(); + open(); + } } - - //TODO We could first try to reconnect and resend the message if necessary. - d->abortPendingOperations(); } void ResourceAccess::readResourceMessage() diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 527cfa3..7f61b30 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -102,6 +102,7 @@ private: void sendCommand(const QSharedPointer &command); void processCommandQueue(); + void processPendingCommandQueue(); class Private; Private * const d; -- cgit v1.2.3