summaryrefslogtreecommitdiffstats
path: root/common/resourceaccess.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r--common/resourceaccess.cpp33
1 files changed, 17 insertions, 16 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 22d4cdb..29d5a1c 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -115,6 +115,10 @@ void ResourceAccess::Private::abortPendingOperations()
115 for (auto handler : handlers) { 115 for (auto handler : handlers) {
116 handler(1, "The resource closed unexpectedly"); 116 handler(1, "The resource closed unexpectedly");
117 } 117 }
118 for (auto queuedCommand : commandQueue) {
119 queuedCommand->callback(1, "The resource closed unexpectedly");
120 }
121 commandQueue.clear();
118} 122}
119 123
120void ResourceAccess::Private::callCallbacks() 124void ResourceAccess::Private::callCallbacks()
@@ -214,6 +218,7 @@ KAsync::Job<void> ResourceAccess::Private::initializeSocket()
214 }); 218 });
215 } else { 219 } else {
216 SinkWarning() << "Failed to start resource"; 220 SinkWarning() << "Failed to start resource";
221 return KAsync::error(-1, "Failed to start resource");
217 } 222 }
218 return KAsync::null(); 223 return KAsync::null();
219 } else { 224 } else {
@@ -256,6 +261,16 @@ void ResourceAccess::registerCallback(uint messageId, const std::function<void(i
256 d->resultHandler.insert(messageId, callback); 261 d->resultHandler.insert(messageId, callback);
257} 262}
258 263
264void ResourceAccess::enqueueCommand(const QSharedPointer<QueuedCommand> &command)
265{
266 d->commandQueue << command;
267 if (isReady()) {
268 processCommandQueue();
269 } else {
270 open();
271 }
272}
273
259KAsync::Job<void> ResourceAccess::sendCommand(int commandId) 274KAsync::Job<void> ResourceAccess::sendCommand(int commandId)
260{ 275{
261 return KAsync::start<void>([this, commandId](KAsync::Future<void> &f) { 276 return KAsync::start<void>([this, commandId](KAsync::Future<void> &f) {
@@ -265,10 +280,7 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId)
265 } 280 }
266 f.setFinished(); 281 f.setFinished();
267 }; 282 };
268 d->commandQueue << QSharedPointer<QueuedCommand>::create(commandId, continuation); 283 enqueueCommand(QSharedPointer<QueuedCommand>::create(commandId, continuation));
269 if (isReady()) {
270 processCommandQueue();
271 }
272 }); 284 });
273} 285}
274 286
@@ -284,11 +296,7 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu
284 f.setFinished(); 296 f.setFinished();
285 } 297 }
286 }; 298 };
287 299 enqueueCommand(QSharedPointer<QueuedCommand>::create(commandId, buffer, callback));
288 d->commandQueue << QSharedPointer<QueuedCommand>::create(commandId, buffer, callback);
289 if (isReady()) {
290 processCommandQueue();
291 }
292 }); 300 });
293} 301}
294 302
@@ -305,7 +313,6 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &que
305 builder.add_query(q); 313 builder.add_query(q);
306 Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); 314 Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish());
307 315
308 open();
309 return sendCommand(Commands::SynchronizeCommand, fbb); 316 return sendCommand(Commands::SynchronizeCommand, fbb);
310} 317}
311 318
@@ -318,7 +325,6 @@ KAsync::Job<void> ResourceAccess::sendCreateCommand(const QByteArray &uid, const
318 auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); 325 auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size());
319 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); 326 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta);
320 Sink::Commands::FinishCreateEntityBuffer(fbb, location); 327 Sink::Commands::FinishCreateEntityBuffer(fbb, location);
321 open();
322 return sendCommand(Sink::Commands::CreateEntityCommand, fbb); 328 return sendCommand(Sink::Commands::CreateEntityCommand, fbb);
323} 329}
324 330
@@ -334,7 +340,6 @@ ResourceAccess::sendModifyCommand(const QByteArray &uid, qint64 revision, const
334 auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData()); 340 auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData());
335 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, true, modifiedProperties, resource, remove); 341 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, true, modifiedProperties, resource, remove);
336 Sink::Commands::FinishModifyEntityBuffer(fbb, location); 342 Sink::Commands::FinishModifyEntityBuffer(fbb, location);
337 open();
338 return sendCommand(Sink::Commands::ModifyEntityCommand, fbb); 343 return sendCommand(Sink::Commands::ModifyEntityCommand, fbb);
339} 344}
340 345
@@ -345,7 +350,6 @@ KAsync::Job<void> ResourceAccess::sendDeleteCommand(const QByteArray &uid, qint6
345 auto type = fbb.CreateString(resourceBufferType.constData()); 350 auto type = fbb.CreateString(resourceBufferType.constData());
346 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type); 351 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type);
347 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 352 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
348 open();
349 return sendCommand(Sink::Commands::DeleteEntityCommand, fbb); 353 return sendCommand(Sink::Commands::DeleteEntityCommand, fbb);
350} 354}
351 355
@@ -354,7 +358,6 @@ KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision)
354 flatbuffers::FlatBufferBuilder fbb; 358 flatbuffers::FlatBufferBuilder fbb;
355 auto location = Sink::Commands::CreateRevisionReplayed(fbb, revision); 359 auto location = Sink::Commands::CreateRevisionReplayed(fbb, revision);
356 Sink::Commands::FinishRevisionReplayedBuffer(fbb, location); 360 Sink::Commands::FinishRevisionReplayedBuffer(fbb, location);
357 open();
358 return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb); 361 return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb);
359} 362}
360 363
@@ -374,7 +377,6 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &insp
374 auto expected = fbb.CreateString(array.toStdString()); 377 auto expected = fbb.CreateString(array.toStdString());
375 auto location = Sink::Commands::CreateInspection(fbb, id, inspectionType, entity, domain, prop, expected); 378 auto location = Sink::Commands::CreateInspection(fbb, id, inspectionType, entity, domain, prop, expected);
376 Sink::Commands::FinishInspectionBuffer(fbb, location); 379 Sink::Commands::FinishInspectionBuffer(fbb, location);
377 open();
378 return sendCommand(Sink::Commands::InspectionCommand, fbb); 380 return sendCommand(Sink::Commands::InspectionCommand, fbb);
379} 381}
380 382
@@ -384,7 +386,6 @@ KAsync::Job<void> ResourceAccess::sendFlushCommand(int flushType, const QByteArr
384 auto id = fbb.CreateString(flushId.toStdString()); 386 auto id = fbb.CreateString(flushId.toStdString());
385 auto location = Sink::Commands::CreateFlush(fbb, id, flushType); 387 auto location = Sink::Commands::CreateFlush(fbb, id, flushType);
386 Sink::Commands::FinishFlushBuffer(fbb, location); 388 Sink::Commands::FinishFlushBuffer(fbb, location);
387 open();
388 return sendCommand(Sink::Commands::FlushCommand, fbb); 389 return sendCommand(Sink::Commands::FlushCommand, fbb);
389} 390}
390 391