summaryrefslogtreecommitdiffstats
path: root/common/resourceaccess.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-12-12 17:39:48 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-12-12 17:39:48 +0100
commit5c66308d570be67aea5195426e304d2715f8734c (patch)
tree6de8cc0df125cdcd4199462917054f5f0c8d91de /common/resourceaccess.cpp
parenteb6e9229c8cecd4b573039fafd644a16912e31f6 (diff)
downloadsink-5c66308d570be67aea5195426e304d2715f8734c.tar.gz
sink-5c66308d570be67aea5195426e304d2715f8734c.zip
Kill all commands on failing to connect to a resource.
We have to kill pending commands as well, and we have to make sure that we call open only once the commands are actually enqueued, so we can kill them in case of failure.
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r--common/resourceaccess.cpp33
1 files changed, 17 insertions, 16 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 22d4cdb..29d5a1c 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -115,6 +115,10 @@ void ResourceAccess::Private::abortPendingOperations()
115 for (auto handler : handlers) { 115 for (auto handler : handlers) {
116 handler(1, "The resource closed unexpectedly"); 116 handler(1, "The resource closed unexpectedly");
117 } 117 }
118 for (auto queuedCommand : commandQueue) {
119 queuedCommand->callback(1, "The resource closed unexpectedly");
120 }
121 commandQueue.clear();
118} 122}
119 123
120void ResourceAccess::Private::callCallbacks() 124void ResourceAccess::Private::callCallbacks()
@@ -214,6 +218,7 @@ KAsync::Job<void> ResourceAccess::Private::initializeSocket()
214 }); 218 });
215 } else { 219 } else {
216 SinkWarning() << "Failed to start resource"; 220 SinkWarning() << "Failed to start resource";
221 return KAsync::error(-1, "Failed to start resource");
217 } 222 }
218 return KAsync::null(); 223 return KAsync::null();
219 } else { 224 } else {
@@ -256,6 +261,16 @@ void ResourceAccess::registerCallback(uint messageId, const std::function<void(i
256 d->resultHandler.insert(messageId, callback); 261 d->resultHandler.insert(messageId, callback);
257} 262}
258 263
264void ResourceAccess::enqueueCommand(const QSharedPointer<QueuedCommand> &command)
265{
266 d->commandQueue << command;
267 if (isReady()) {
268 processCommandQueue();
269 } else {
270 open();
271 }
272}
273
259KAsync::Job<void> ResourceAccess::sendCommand(int commandId) 274KAsync::Job<void> ResourceAccess::sendCommand(int commandId)
260{ 275{
261 return KAsync::start<void>([this, commandId](KAsync::Future<void> &f) { 276 return KAsync::start<void>([this, commandId](KAsync::Future<void> &f) {
@@ -265,10 +280,7 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId)
265 } 280 }
266 f.setFinished(); 281 f.setFinished();
267 }; 282 };
268 d->commandQueue << QSharedPointer<QueuedCommand>::create(commandId, continuation); 283 enqueueCommand(QSharedPointer<QueuedCommand>::create(commandId, continuation));
269 if (isReady()) {
270 processCommandQueue();
271 }
272 }); 284 });
273} 285}
274 286
@@ -284,11 +296,7 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu
284 f.setFinished(); 296 f.setFinished();
285 } 297 }
286 }; 298 };
287 299 enqueueCommand(QSharedPointer<QueuedCommand>::create(commandId, buffer, callback));
288 d->commandQueue << QSharedPointer<QueuedCommand>::create(commandId, buffer, callback);
289 if (isReady()) {
290 processCommandQueue();
291 }
292 }); 300 });
293} 301}
294 302
@@ -305,7 +313,6 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &que
305 builder.add_query(q); 313 builder.add_query(q);
306 Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); 314 Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish());
307 315
308 open();
309 return sendCommand(Commands::SynchronizeCommand, fbb); 316 return sendCommand(Commands::SynchronizeCommand, fbb);
310} 317}
311 318
@@ -318,7 +325,6 @@ KAsync::Job<void> ResourceAccess::sendCreateCommand(const QByteArray &uid, const
318 auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); 325 auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size());
319 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); 326 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta);
320 Sink::Commands::FinishCreateEntityBuffer(fbb, location); 327 Sink::Commands::FinishCreateEntityBuffer(fbb, location);
321 open();
322 return sendCommand(Sink::Commands::CreateEntityCommand, fbb); 328 return sendCommand(Sink::Commands::CreateEntityCommand, fbb);
323} 329}
324 330
@@ -334,7 +340,6 @@ ResourceAccess::sendModifyCommand(const QByteArray &uid, qint64 revision, const
334 auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData()); 340 auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData());
335 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, true, modifiedProperties, resource, remove); 341 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, true, modifiedProperties, resource, remove);
336 Sink::Commands::FinishModifyEntityBuffer(fbb, location); 342 Sink::Commands::FinishModifyEntityBuffer(fbb, location);
337 open();
338 return sendCommand(Sink::Commands::ModifyEntityCommand, fbb); 343 return sendCommand(Sink::Commands::ModifyEntityCommand, fbb);
339} 344}
340 345
@@ -345,7 +350,6 @@ KAsync::Job<void> ResourceAccess::sendDeleteCommand(const QByteArray &uid, qint6
345 auto type = fbb.CreateString(resourceBufferType.constData()); 350 auto type = fbb.CreateString(resourceBufferType.constData());
346 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type); 351 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type);
347 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 352 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
348 open();
349 return sendCommand(Sink::Commands::DeleteEntityCommand, fbb); 353 return sendCommand(Sink::Commands::DeleteEntityCommand, fbb);
350} 354}
351 355
@@ -354,7 +358,6 @@ KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision)
354 flatbuffers::FlatBufferBuilder fbb; 358 flatbuffers::FlatBufferBuilder fbb;
355 auto location = Sink::Commands::CreateRevisionReplayed(fbb, revision); 359 auto location = Sink::Commands::CreateRevisionReplayed(fbb, revision);
356 Sink::Commands::FinishRevisionReplayedBuffer(fbb, location); 360 Sink::Commands::FinishRevisionReplayedBuffer(fbb, location);
357 open();
358 return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb); 361 return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb);
359} 362}
360 363
@@ -374,7 +377,6 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &insp
374 auto expected = fbb.CreateString(array.toStdString()); 377 auto expected = fbb.CreateString(array.toStdString());
375 auto location = Sink::Commands::CreateInspection(fbb, id, inspectionType, entity, domain, prop, expected); 378 auto location = Sink::Commands::CreateInspection(fbb, id, inspectionType, entity, domain, prop, expected);
376 Sink::Commands::FinishInspectionBuffer(fbb, location); 379 Sink::Commands::FinishInspectionBuffer(fbb, location);
377 open();
378 return sendCommand(Sink::Commands::InspectionCommand, fbb); 380 return sendCommand(Sink::Commands::InspectionCommand, fbb);
379} 381}
380 382
@@ -384,7 +386,6 @@ KAsync::Job<void> ResourceAccess::sendFlushCommand(int flushType, const QByteArr
384 auto id = fbb.CreateString(flushId.toStdString()); 386 auto id = fbb.CreateString(flushId.toStdString());
385 auto location = Sink::Commands::CreateFlush(fbb, id, flushType); 387 auto location = Sink::Commands::CreateFlush(fbb, id, flushType);
386 Sink::Commands::FinishFlushBuffer(fbb, location); 388 Sink::Commands::FinishFlushBuffer(fbb, location);
387 open();
388 return sendCommand(Sink::Commands::FlushCommand, fbb); 389 return sendCommand(Sink::Commands::FlushCommand, fbb);
389} 390}
390 391