From 4652a39fc6869fc5af46367c35027b2b53478268 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 12 Apr 2015 22:47:50 +0200 Subject: Always queue commands in resourceaccess. We want to keep the command until we know it arrived in the resource, so we can resend it otherwise. --- common/resourceaccess.cpp | 62 ++++++++++++++++++++++++++--------------------- common/resourceaccess.h | 5 ++++ 2 files changed, 40 insertions(+), 27 deletions(-) (limited to 'common') diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 7a343f9..9ecdab1 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -72,7 +72,8 @@ public: bool openingConnection; QByteArray partialMessageBuffer; flatbuffers::FlatBufferBuilder fbb; - QVector commandQueue; + QVector> commandQueue; + QMap> pendingCommands; QMultiMap > resultHandler; uint messageId; }; @@ -136,13 +137,9 @@ Async::Job ResourceAccess::sendCommand(int commandId) } f.setFinished(); }; + d->commandQueue << QSharedPointer::create(commandId, continuation); if (isReady()) { - log(QString("Sending command %1").arg(commandId)); - d->messageId++; - registerCallback(d->messageId, continuation); - Commands::write(d->socket, d->messageId, commandId); - } else { - d->commandQueue << new QueuedCommand(commandId, continuation); + processCommandQueue(); } }); } @@ -160,14 +157,9 @@ Async::Job ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu } }; + d->commandQueue << QSharedPointer::create(commandId, buffer, callback); if (isReady()) { - //TODO: We probably always want to queue the command, so we can resend it in case something goes wrong - d->messageId++; - log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); - registerCallback(d->messageId, callback); - Commands::write(d->socket, d->messageId, commandId, buffer.constData(), buffer.size()); - } else { - d->commandQueue << new QueuedCommand(commandId, buffer, callback); + processCommandQueue(); } }); } @@ -202,6 +194,34 @@ void ResourceAccess::close() d->socket->close(); } +void ResourceAccess::sendCommand(const QSharedPointer &command) +{ + Q_ASSERT(isReady()); + //TODO: we should have a timeout for commands + d->messageId++; + const auto messageId = d->messageId; + log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId)); + if (command->callback) { + registerCallback(d->messageId, [this, messageId, command](int number, QString foo) { + d->pendingCommands.remove(messageId); + command->callback(number, foo); + }); + } + //Keep track of the command until we're sure it arrived + d->pendingCommands.insert(d->messageId, command); + Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); +} + +void ResourceAccess::processCommandQueue() +{ + //TODO: serialize instead of blast them all through the socket? + log(QString("We have %1 queued commands").arg(d->commandQueue.size())); + for (auto command: d->commandQueue) { + sendCommand(command); + } + d->commandQueue.clear(); +} + void ResourceAccess::connected() { d->startingProcess = false; @@ -221,19 +241,7 @@ void ResourceAccess::connected() d->fbb.Clear(); } - //TODO: should confirm the commands made it with a response? - //TODO: serialize instead of blast them all through the socket? - log(QString("We have %1 queued commands").arg(d->commandQueue.size())); - for (QueuedCommand *command: d->commandQueue) { - d->messageId++; - log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId)); - if (command->callback) { - registerCallback(d->messageId, command->callback); - } - Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); - delete command; - } - d->commandQueue.clear(); + processCommandQueue(); emit ready(true); } diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 088bf36..4c9d9d2 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -30,6 +30,8 @@ namespace Akonadi2 { +class QueuedCommand; + class ResourceAccess : public QObject { Q_OBJECT @@ -68,6 +70,9 @@ private: void registerCallback(uint messageId, const std::function &callback); void startResourceAndConnect(); + void sendCommand(const QSharedPointer &command); + void processCommandQueue(); + class Private; Private * const d; }; -- cgit v1.2.3