diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-12-12 17:39:48 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-12-12 17:39:48 +0100 |
commit | 5c66308d570be67aea5195426e304d2715f8734c (patch) | |
tree | 6de8cc0df125cdcd4199462917054f5f0c8d91de /common/resourceaccess.cpp | |
parent | eb6e9229c8cecd4b573039fafd644a16912e31f6 (diff) | |
download | sink-5c66308d570be67aea5195426e304d2715f8734c.tar.gz sink-5c66308d570be67aea5195426e304d2715f8734c.zip |
Kill all commands on failing to connect to a resource.
We have to kill pending commands as well, and we have to make sure that
we call open only once the commands are actually enqueued, so we can
kill them in case of failure.
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r-- | common/resourceaccess.cpp | 33 |
1 files changed, 17 insertions, 16 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 22d4cdb..29d5a1c 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -115,6 +115,10 @@ void ResourceAccess::Private::abortPendingOperations() | |||
115 | for (auto handler : handlers) { | 115 | for (auto handler : handlers) { |
116 | handler(1, "The resource closed unexpectedly"); | 116 | handler(1, "The resource closed unexpectedly"); |
117 | } | 117 | } |
118 | for (auto queuedCommand : commandQueue) { | ||
119 | queuedCommand->callback(1, "The resource closed unexpectedly"); | ||
120 | } | ||
121 | commandQueue.clear(); | ||
118 | } | 122 | } |
119 | 123 | ||
120 | void ResourceAccess::Private::callCallbacks() | 124 | void ResourceAccess::Private::callCallbacks() |
@@ -214,6 +218,7 @@ KAsync::Job<void> ResourceAccess::Private::initializeSocket() | |||
214 | }); | 218 | }); |
215 | } else { | 219 | } else { |
216 | SinkWarning() << "Failed to start resource"; | 220 | SinkWarning() << "Failed to start resource"; |
221 | return KAsync::error(-1, "Failed to start resource"); | ||
217 | } | 222 | } |
218 | return KAsync::null(); | 223 | return KAsync::null(); |
219 | } else { | 224 | } else { |
@@ -256,6 +261,16 @@ void ResourceAccess::registerCallback(uint messageId, const std::function<void(i | |||
256 | d->resultHandler.insert(messageId, callback); | 261 | d->resultHandler.insert(messageId, callback); |
257 | } | 262 | } |
258 | 263 | ||
264 | void ResourceAccess::enqueueCommand(const QSharedPointer<QueuedCommand> &command) | ||
265 | { | ||
266 | d->commandQueue << command; | ||
267 | if (isReady()) { | ||
268 | processCommandQueue(); | ||
269 | } else { | ||
270 | open(); | ||
271 | } | ||
272 | } | ||
273 | |||
259 | KAsync::Job<void> ResourceAccess::sendCommand(int commandId) | 274 | KAsync::Job<void> ResourceAccess::sendCommand(int commandId) |
260 | { | 275 | { |
261 | return KAsync::start<void>([this, commandId](KAsync::Future<void> &f) { | 276 | return KAsync::start<void>([this, commandId](KAsync::Future<void> &f) { |
@@ -265,10 +280,7 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId) | |||
265 | } | 280 | } |
266 | f.setFinished(); | 281 | f.setFinished(); |
267 | }; | 282 | }; |
268 | d->commandQueue << QSharedPointer<QueuedCommand>::create(commandId, continuation); | 283 | enqueueCommand(QSharedPointer<QueuedCommand>::create(commandId, continuation)); |
269 | if (isReady()) { | ||
270 | processCommandQueue(); | ||
271 | } | ||
272 | }); | 284 | }); |
273 | } | 285 | } |
274 | 286 | ||
@@ -284,11 +296,7 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu | |||
284 | f.setFinished(); | 296 | f.setFinished(); |
285 | } | 297 | } |
286 | }; | 298 | }; |
287 | 299 | enqueueCommand(QSharedPointer<QueuedCommand>::create(commandId, buffer, callback)); | |
288 | d->commandQueue << QSharedPointer<QueuedCommand>::create(commandId, buffer, callback); | ||
289 | if (isReady()) { | ||
290 | processCommandQueue(); | ||
291 | } | ||
292 | }); | 300 | }); |
293 | } | 301 | } |
294 | 302 | ||
@@ -305,7 +313,6 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &que | |||
305 | builder.add_query(q); | 313 | builder.add_query(q); |
306 | Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); | 314 | Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); |
307 | 315 | ||
308 | open(); | ||
309 | return sendCommand(Commands::SynchronizeCommand, fbb); | 316 | return sendCommand(Commands::SynchronizeCommand, fbb); |
310 | } | 317 | } |
311 | 318 | ||
@@ -318,7 +325,6 @@ KAsync::Job<void> ResourceAccess::sendCreateCommand(const QByteArray &uid, const | |||
318 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); | 325 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); |
319 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); | 326 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); |
320 | Sink::Commands::FinishCreateEntityBuffer(fbb, location); | 327 | Sink::Commands::FinishCreateEntityBuffer(fbb, location); |
321 | open(); | ||
322 | return sendCommand(Sink::Commands::CreateEntityCommand, fbb); | 328 | return sendCommand(Sink::Commands::CreateEntityCommand, fbb); |
323 | } | 329 | } |
324 | 330 | ||
@@ -334,7 +340,6 @@ ResourceAccess::sendModifyCommand(const QByteArray &uid, qint64 revision, const | |||
334 | auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData()); | 340 | auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData()); |
335 | auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, true, modifiedProperties, resource, remove); | 341 | auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, true, modifiedProperties, resource, remove); |
336 | Sink::Commands::FinishModifyEntityBuffer(fbb, location); | 342 | Sink::Commands::FinishModifyEntityBuffer(fbb, location); |
337 | open(); | ||
338 | return sendCommand(Sink::Commands::ModifyEntityCommand, fbb); | 343 | return sendCommand(Sink::Commands::ModifyEntityCommand, fbb); |
339 | } | 344 | } |
340 | 345 | ||
@@ -345,7 +350,6 @@ KAsync::Job<void> ResourceAccess::sendDeleteCommand(const QByteArray &uid, qint6 | |||
345 | auto type = fbb.CreateString(resourceBufferType.constData()); | 350 | auto type = fbb.CreateString(resourceBufferType.constData()); |
346 | auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type); | 351 | auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type); |
347 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | 352 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); |
348 | open(); | ||
349 | return sendCommand(Sink::Commands::DeleteEntityCommand, fbb); | 353 | return sendCommand(Sink::Commands::DeleteEntityCommand, fbb); |
350 | } | 354 | } |
351 | 355 | ||
@@ -354,7 +358,6 @@ KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision) | |||
354 | flatbuffers::FlatBufferBuilder fbb; | 358 | flatbuffers::FlatBufferBuilder fbb; |
355 | auto location = Sink::Commands::CreateRevisionReplayed(fbb, revision); | 359 | auto location = Sink::Commands::CreateRevisionReplayed(fbb, revision); |
356 | Sink::Commands::FinishRevisionReplayedBuffer(fbb, location); | 360 | Sink::Commands::FinishRevisionReplayedBuffer(fbb, location); |
357 | open(); | ||
358 | return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb); | 361 | return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb); |
359 | } | 362 | } |
360 | 363 | ||
@@ -374,7 +377,6 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &insp | |||
374 | auto expected = fbb.CreateString(array.toStdString()); | 377 | auto expected = fbb.CreateString(array.toStdString()); |
375 | auto location = Sink::Commands::CreateInspection(fbb, id, inspectionType, entity, domain, prop, expected); | 378 | auto location = Sink::Commands::CreateInspection(fbb, id, inspectionType, entity, domain, prop, expected); |
376 | Sink::Commands::FinishInspectionBuffer(fbb, location); | 379 | Sink::Commands::FinishInspectionBuffer(fbb, location); |
377 | open(); | ||
378 | return sendCommand(Sink::Commands::InspectionCommand, fbb); | 380 | return sendCommand(Sink::Commands::InspectionCommand, fbb); |
379 | } | 381 | } |
380 | 382 | ||
@@ -384,7 +386,6 @@ KAsync::Job<void> ResourceAccess::sendFlushCommand(int flushType, const QByteArr | |||
384 | auto id = fbb.CreateString(flushId.toStdString()); | 386 | auto id = fbb.CreateString(flushId.toStdString()); |
385 | auto location = Sink::Commands::CreateFlush(fbb, id, flushType); | 387 | auto location = Sink::Commands::CreateFlush(fbb, id, flushType); |
386 | Sink::Commands::FinishFlushBuffer(fbb, location); | 388 | Sink::Commands::FinishFlushBuffer(fbb, location); |
387 | open(); | ||
388 | return sendCommand(Sink::Commands::FlushCommand, fbb); | 389 | return sendCommand(Sink::Commands::FlushCommand, fbb); |
389 | } | 390 | } |
390 | 391 | ||