From 5c66308d570be67aea5195426e304d2715f8734c Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 12 Dec 2016 17:39:48 +0100 Subject: Kill all commands on failing to connect to a resource. We have to kill pending commands as well, and we have to make sure that we call open only once the commands are actually enqueued, so we can kill them in case of failure. --- common/resourceaccess.cpp | 33 +++++++++++++++++---------------- common/resourceaccess.h | 1 + 2 files changed, 18 insertions(+), 16 deletions(-) (limited to 'common') diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 22d4cdb..29d5a1c 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -115,6 +115,10 @@ void ResourceAccess::Private::abortPendingOperations() for (auto handler : handlers) { handler(1, "The resource closed unexpectedly"); } + for (auto queuedCommand : commandQueue) { + queuedCommand->callback(1, "The resource closed unexpectedly"); + } + commandQueue.clear(); } void ResourceAccess::Private::callCallbacks() @@ -214,6 +218,7 @@ KAsync::Job ResourceAccess::Private::initializeSocket() }); } else { SinkWarning() << "Failed to start resource"; + return KAsync::error(-1, "Failed to start resource"); } return KAsync::null(); } else { @@ -256,6 +261,16 @@ void ResourceAccess::registerCallback(uint messageId, const std::functionresultHandler.insert(messageId, callback); } +void ResourceAccess::enqueueCommand(const QSharedPointer &command) +{ + d->commandQueue << command; + if (isReady()) { + processCommandQueue(); + } else { + open(); + } +} + KAsync::Job ResourceAccess::sendCommand(int commandId) { return KAsync::start([this, commandId](KAsync::Future &f) { @@ -265,10 +280,7 @@ KAsync::Job ResourceAccess::sendCommand(int commandId) } f.setFinished(); }; - d->commandQueue << QSharedPointer::create(commandId, continuation); - if (isReady()) { - processCommandQueue(); - } + enqueueCommand(QSharedPointer::create(commandId, continuation)); }); } @@ -284,11 +296,7 @@ KAsync::Job ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu f.setFinished(); } }; - - d->commandQueue << QSharedPointer::create(commandId, buffer, callback); - if (isReady()) { - processCommandQueue(); - } + enqueueCommand(QSharedPointer::create(commandId, buffer, callback)); }); } @@ -305,7 +313,6 @@ KAsync::Job ResourceAccess::synchronizeResource(const Sink::QueryBase &que builder.add_query(q); Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); - open(); return sendCommand(Commands::SynchronizeCommand, fbb); } @@ -318,7 +325,6 @@ KAsync::Job ResourceAccess::sendCreateCommand(const QByteArray &uid, const auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); Sink::Commands::FinishCreateEntityBuffer(fbb, location); - open(); return sendCommand(Sink::Commands::CreateEntityCommand, fbb); } @@ -334,7 +340,6 @@ ResourceAccess::sendModifyCommand(const QByteArray &uid, qint64 revision, const auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData()); auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, true, modifiedProperties, resource, remove); Sink::Commands::FinishModifyEntityBuffer(fbb, location); - open(); return sendCommand(Sink::Commands::ModifyEntityCommand, fbb); } @@ -345,7 +350,6 @@ KAsync::Job ResourceAccess::sendDeleteCommand(const QByteArray &uid, qint6 auto type = fbb.CreateString(resourceBufferType.constData()); auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type); Sink::Commands::FinishDeleteEntityBuffer(fbb, location); - open(); return sendCommand(Sink::Commands::DeleteEntityCommand, fbb); } @@ -354,7 +358,6 @@ KAsync::Job ResourceAccess::sendRevisionReplayedCommand(qint64 revision) flatbuffers::FlatBufferBuilder fbb; auto location = Sink::Commands::CreateRevisionReplayed(fbb, revision); Sink::Commands::FinishRevisionReplayedBuffer(fbb, location); - open(); return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb); } @@ -374,7 +377,6 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &insp auto expected = fbb.CreateString(array.toStdString()); auto location = Sink::Commands::CreateInspection(fbb, id, inspectionType, entity, domain, prop, expected); Sink::Commands::FinishInspectionBuffer(fbb, location); - open(); return sendCommand(Sink::Commands::InspectionCommand, fbb); } @@ -384,7 +386,6 @@ KAsync::Job ResourceAccess::sendFlushCommand(int flushType, const QByteArr auto id = fbb.CreateString(flushId.toStdString()); auto location = Sink::Commands::CreateFlush(fbb, id, flushType); Sink::Commands::FinishFlushBuffer(fbb, location); - open(); return sendCommand(Sink::Commands::FlushCommand, fbb); } diff --git a/common/resourceaccess.h b/common/resourceaccess.h index de15125..c1b4253 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -145,6 +145,7 @@ private: void sendCommand(const QSharedPointer &command); void processCommandQueue(); void processPendingCommandQueue(); + void enqueueCommand(const QSharedPointer &command); class Private; Private *const d; -- cgit v1.2.3