summaryrefslogtreecommitdiffstats
path: root/common/resourceaccess.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r--common/resourceaccess.cpp253
1 files changed, 119 insertions, 134 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 0716ae2..c8c8189 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -53,27 +53,20 @@ static void queuedInvoke(const std::function<void()> &f, QObject *context = 0)
53{ 53{
54 auto timer = QSharedPointer<QTimer>::create(); 54 auto timer = QSharedPointer<QTimer>::create();
55 timer->setSingleShot(true); 55 timer->setSingleShot(true);
56 QObject::connect(timer.data(), &QTimer::timeout, context, [f, timer]() { 56 QObject::connect(timer.data(), &QTimer::timeout, context, [f, timer]() { f(); });
57 f();
58 });
59 timer->start(0); 57 timer->start(0);
60} 58}
61 59
62namespace Sink 60namespace Sink {
63{
64 61
65struct QueuedCommand 62struct QueuedCommand
66{ 63{
67public: 64public:
68 QueuedCommand(int commandId, const std::function<void(int, const QString &)> &callback) 65 QueuedCommand(int commandId, const std::function<void(int, const QString &)> &callback) : commandId(commandId), callback(callback)
69 : commandId(commandId), 66 {
70 callback(callback) 67 }
71 {} 68
72 69 QueuedCommand(int commandId, const QByteArray &b, const std::function<void(int, const QString &)> &callback) : commandId(commandId), buffer(b), callback(callback)
73 QueuedCommand(int commandId, const QByteArray &b, const std::function<void(int, const QString &)> &callback)
74 : commandId(commandId),
75 buffer(b),
76 callback(callback)
77 { 70 {
78 } 71 }
79 72
@@ -102,17 +95,14 @@ public:
102 QByteArray partialMessageBuffer; 95 QByteArray partialMessageBuffer;
103 QVector<QSharedPointer<QueuedCommand>> commandQueue; 96 QVector<QSharedPointer<QueuedCommand>> commandQueue;
104 QMap<uint, QSharedPointer<QueuedCommand>> pendingCommands; 97 QMap<uint, QSharedPointer<QueuedCommand>> pendingCommands;
105 QMultiMap<uint, std::function<void(int error, const QString &errorMessage)> > resultHandler; 98 QMultiMap<uint, std::function<void(int error, const QString &errorMessage)>> resultHandler;
106 QSet<uint> completeCommands; 99 QSet<uint> completeCommands;
107 uint messageId; 100 uint messageId;
108 bool openingSocket; 101 bool openingSocket;
109}; 102};
110 103
111ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q) 104ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q)
112 : resourceName(name), 105 : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false)
113 resourceInstanceIdentifier(instanceIdentifier),
114 messageId(0),
115 openingSocket(false)
116{ 106{
117} 107}
118 108
@@ -124,7 +114,7 @@ void ResourceAccess::Private::abortPendingOperations()
124 } 114 }
125 auto handlers = resultHandler.values(); 115 auto handlers = resultHandler.values();
126 resultHandler.clear(); 116 resultHandler.clear();
127 for(auto handler : handlers) { 117 for (auto handler : handlers) {
128 handler(1, "The resource closed unexpectedly"); 118 handler(1, "The resource closed unexpectedly");
129 } 119 }
130} 120}
@@ -132,20 +122,20 @@ void ResourceAccess::Private::abortPendingOperations()
132void ResourceAccess::Private::callCallbacks() 122void ResourceAccess::Private::callCallbacks()
133{ 123{
134 for (auto id : completeCommands) { 124 for (auto id : completeCommands) {
135 //We remove the callbacks first because the handler can kill resourceaccess directly 125 // We remove the callbacks first because the handler can kill resourceaccess directly
136 const auto callbacks = resultHandler.values(id); 126 const auto callbacks = resultHandler.values(id);
137 resultHandler.remove(id); 127 resultHandler.remove(id);
138 for(auto handler : callbacks) { 128 for (auto handler : callbacks) {
139 handler(0, QString()); 129 handler(0, QString());
140 } 130 }
141 } 131 }
142} 132}
143 133
144//Connects to server and returns connected socket on success 134// Connects to server and returns connected socket on success
145KAsync::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const QByteArray &identifier) 135KAsync::Job<QSharedPointer<QLocalSocket>> ResourceAccess::connectToServer(const QByteArray &identifier)
146{ 136{
147 auto s = QSharedPointer<QLocalSocket>::create(); 137 auto s = QSharedPointer<QLocalSocket>::create();
148 return KAsync::start<QSharedPointer<QLocalSocket> >([identifier, s](KAsync::Future<QSharedPointer<QLocalSocket> > &future) { 138 return KAsync::start<QSharedPointer<QLocalSocket>>([identifier, s](KAsync::Future<QSharedPointer<QLocalSocket>> &future) {
149 s->setServerName(identifier); 139 s->setServerName(identifier);
150 auto context = new QObject; 140 auto context = new QObject;
151 QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() { 141 QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() {
@@ -154,7 +144,7 @@ KAsync::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const
154 future.setValue(s); 144 future.setValue(s);
155 future.setFinished(); 145 future.setFinished();
156 }); 146 });
157 QObject::connect(s.data(), static_cast<void(QLocalSocket::*)(QLocalSocket::LocalSocketError)>(&QLocalSocket::error), context, [&future, context](QLocalSocket::LocalSocketError) { 147 QObject::connect(s.data(), static_cast<void (QLocalSocket::*)(QLocalSocket::LocalSocketError)>(&QLocalSocket::error), context, [&future, context](QLocalSocket::LocalSocketError) {
158 delete context; 148 delete context;
159 future.setError(-1, "Failed to connect to server."); 149 future.setError(-1, "Failed to connect to server.");
160 }); 150 });
@@ -164,66 +154,67 @@ KAsync::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const
164 154
165KAsync::Job<void> ResourceAccess::Private::tryToConnect() 155KAsync::Job<void> ResourceAccess::Private::tryToConnect()
166{ 156{
167 //We may have a socket from the last connection leftover 157 // We may have a socket from the last connection leftover
168 socket.reset(); 158 socket.reset();
169 auto counter = QSharedPointer<int>::create(0); 159 auto counter = QSharedPointer<int>::create(0);
170 return KAsync::dowhile([this]() -> bool { 160 return KAsync::dowhile([this]() -> bool { return !socket; },
171 return !socket; 161 [this, counter](KAsync::Future<void> &future) {
172 }, 162 TracePrivate() << "Loop";
173 [this, counter](KAsync::Future<void> &future) { 163 connectToServer(resourceInstanceIdentifier)
174 TracePrivate() << "Loop"; 164 .then<void, QSharedPointer<QLocalSocket>>(
175 connectToServer(resourceInstanceIdentifier) 165 [this, &future](const QSharedPointer<QLocalSocket> &s) {
176 .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { 166 Q_ASSERT(s);
177 Q_ASSERT(s); 167 socket = s;
178 socket = s; 168 future.setFinished();
179 future.setFinished(); 169 },
180 }, [&future, counter, this](int errorCode, const QString &errorString) { 170 [&future, counter, this](int errorCode, const QString &errorString) {
181 static int waitTime = 10; 171 static int waitTime = 10;
182 static int timeout = 500; 172 static int timeout = 500;
183 static int maxRetries = timeout / waitTime; 173 static int maxRetries = timeout / waitTime;
184 if (*counter > maxRetries) { 174 if (*counter > maxRetries) {
185 TracePrivate() << "Giving up"; 175 TracePrivate() << "Giving up";
186 future.setError(-1, "Failed to connect to socket"); 176 future.setError(-1, "Failed to connect to socket");
187 } else { 177 } else {
188 KAsync::wait(waitTime).then<void>([&future]() { 178 KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec();
189 future.setFinished(); 179 }
190 }).exec(); 180 *counter = *counter + 1;
191 } 181 })
192 *counter = *counter + 1; 182 .exec();
193 }) 183 });
194 .exec();
195 });
196} 184}
197 185
198KAsync::Job<void> ResourceAccess::Private::initializeSocket() 186KAsync::Job<void> ResourceAccess::Private::initializeSocket()
199{ 187{
200 return KAsync::start<void>([this](KAsync::Future<void> &future) { 188 return KAsync::start<void>([this](KAsync::Future<void> &future) {
201 TracePrivate() << "Trying to connect"; 189 TracePrivate() << "Trying to connect";
202 connectToServer(resourceInstanceIdentifier).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { 190 connectToServer(resourceInstanceIdentifier)
203 TracePrivate() << "Connected to resource, without having to start it."; 191 .then<void, QSharedPointer<QLocalSocket>>(
204 Q_ASSERT(s); 192 [this, &future](const QSharedPointer<QLocalSocket> &s) {
205 socket = s; 193 TracePrivate() << "Connected to resource, without having to start it.";
206 future.setFinished(); 194 Q_ASSERT(s);
207 }, 195 socket = s;
208 [this, &future](int errorCode, const QString &errorString) {
209 TracePrivate() << "Failed to connect, starting resource";
210 //We failed to connect, so let's start the resource
211 QStringList args;
212 args << resourceInstanceIdentifier;
213 qint64 pid = 0;
214 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) {
215 TracePrivate() << "Started resource " << pid;
216 tryToConnect()
217 .then<void>([&future]() {
218 future.setFinished(); 196 future.setFinished();
219 }, [this, &future](int errorCode, const QString &errorString) { 197 },
220 Warning() << "Failed to connect to started resource"; 198 [this, &future](int errorCode, const QString &errorString) {
221 future.setError(errorCode, errorString); 199 TracePrivate() << "Failed to connect, starting resource";
222 }).exec(); 200 // We failed to connect, so let's start the resource
223 } else { 201 QStringList args;
224 Warning() << "Failed to start resource"; 202 args << resourceInstanceIdentifier;
225 } 203 qint64 pid = 0;
226 }).exec(); 204 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) {
205 TracePrivate() << "Started resource " << pid;
206 tryToConnect()
207 .then<void>([&future]() { future.setFinished(); },
208 [this, &future](int errorCode, const QString &errorString) {
209 Warning() << "Failed to connect to started resource";
210 future.setError(errorCode, errorString);
211 })
212 .exec();
213 } else {
214 Warning() << "Failed to start resource";
215 }
216 })
217 .exec();
227 }); 218 });
228} 219}
229 220
@@ -235,8 +226,7 @@ static QByteArray getResourceName(const QByteArray &instanceIdentifier)
235} 226}
236 227
237ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier) 228ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier)
238 : ResourceAccessInterface(), 229 : ResourceAccessInterface(), d(new Private(getResourceName(resourceInstanceIdentifier), resourceInstanceIdentifier, this))
239 d(new Private(getResourceName(resourceInstanceIdentifier), resourceInstanceIdentifier, this))
240{ 230{
241 Log() << "Starting access"; 231 Log() << "Starting access";
242} 232}
@@ -280,10 +270,10 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId)
280 }); 270 });
281} 271}
282 272
283KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) 273KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb)
284{ 274{
285 //The flatbuffer is transient, but we want to store it until the job is executed 275 // The flatbuffer is transient, but we want to store it until the job is executed
286 QByteArray buffer(reinterpret_cast<const char*>(fbb.GetBufferPointer()), fbb.GetSize()); 276 QByteArray buffer(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
287 return KAsync::start<void>([commandId, buffer, this](KAsync::Future<void> &f) { 277 return KAsync::start<void>([commandId, buffer, this](KAsync::Future<void> &f) {
288 auto callback = [&f](int error, const QString &errorMessage) { 278 auto callback = [&f](int error, const QString &errorMessage) {
289 if (error) { 279 if (error) {
@@ -313,7 +303,7 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool loca
313KAsync::Job<void> ResourceAccess::sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) 303KAsync::Job<void> ResourceAccess::sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer)
314{ 304{
315 flatbuffers::FlatBufferBuilder fbb; 305 flatbuffers::FlatBufferBuilder fbb;
316 //This is the resource buffer type and not the domain type 306 // This is the resource buffer type and not the domain type
317 auto type = fbb.CreateString(resourceBufferType.constData()); 307 auto type = fbb.CreateString(resourceBufferType.constData());
318 auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); 308 auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size());
319 auto location = Sink::Commands::CreateCreateEntity(fbb, 0, type, delta); 309 auto location = Sink::Commands::CreateCreateEntity(fbb, 0, type, delta);
@@ -322,13 +312,14 @@ KAsync::Job<void> ResourceAccess::sendCreateCommand(const QByteArray &resourceBu
322 return sendCommand(Sink::Commands::CreateEntityCommand, fbb); 312 return sendCommand(Sink::Commands::CreateEntityCommand, fbb);
323} 313}
324 314
325KAsync::Job<void> ResourceAccess::sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) 315KAsync::Job<void>
316ResourceAccess::sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer)
326{ 317{
327 flatbuffers::FlatBufferBuilder fbb; 318 flatbuffers::FlatBufferBuilder fbb;
328 auto entityId = fbb.CreateString(uid.constData()); 319 auto entityId = fbb.CreateString(uid.constData());
329 //This is the resource buffer type and not the domain type 320 // This is the resource buffer type and not the domain type
330 auto type = fbb.CreateString(resourceBufferType.constData()); 321 auto type = fbb.CreateString(resourceBufferType.constData());
331 //FIXME 322 // FIXME
332 auto deletions = 0; 323 auto deletions = 0;
333 auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); 324 auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size());
334 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta); 325 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta);
@@ -341,7 +332,7 @@ KAsync::Job<void> ResourceAccess::sendDeleteCommand(const QByteArray &uid, qint6
341{ 332{
342 flatbuffers::FlatBufferBuilder fbb; 333 flatbuffers::FlatBufferBuilder fbb;
343 auto entityId = fbb.CreateString(uid.constData()); 334 auto entityId = fbb.CreateString(uid.constData());
344 //This is the resource buffer type and not the domain type 335 // This is the resource buffer type and not the domain type
345 auto type = fbb.CreateString(resourceBufferType.constData()); 336 auto type = fbb.CreateString(resourceBufferType.constData());
346 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type); 337 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type);
347 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 338 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
@@ -358,7 +349,8 @@ KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision)
358 return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb); 349 return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb);
359} 350}
360 351
361KAsync::Job<void> ResourceAccess::sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) 352KAsync::Job<void>
353ResourceAccess::sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
362{ 354{
363 flatbuffers::FlatBufferBuilder fbb; 355 flatbuffers::FlatBufferBuilder fbb;
364 auto id = fbb.CreateString(inspectionId.toStdString()); 356 auto id = fbb.CreateString(inspectionId.toStdString());
@@ -371,7 +363,7 @@ KAsync::Job<void> ResourceAccess::sendInspectionCommand(const QByteArray &inspec
371 s << expectedValue; 363 s << expectedValue;
372 364
373 auto expected = fbb.CreateString(array.toStdString()); 365 auto expected = fbb.CreateString(array.toStdString());
374 auto location = Sink::Commands::CreateInspection (fbb, id, 0, entity, domain, prop, expected); 366 auto location = Sink::Commands::CreateInspection(fbb, id, 0, entity, domain, prop, expected);
375 Sink::Commands::FinishInspectionBuffer(fbb, location); 367 Sink::Commands::FinishInspectionBuffer(fbb, location);
376 open(); 368 open();
377 return sendCommand(Sink::Commands::InspectionCommand, fbb); 369 return sendCommand(Sink::Commands::InspectionCommand, fbb);
@@ -389,21 +381,21 @@ void ResourceAccess::open()
389 auto time = QSharedPointer<QTime>::create(); 381 auto time = QSharedPointer<QTime>::create();
390 time->start(); 382 time->start();
391 d->openingSocket = true; 383 d->openingSocket = true;
392 d->initializeSocket().then<void>([this, time]() { 384 d->initializeSocket()
393 Trace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); 385 .then<void>(
394 d->openingSocket = false; 386 [this, time]() {
395 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, 387 Trace() << "Socket is initialized." << Log::TraceTime(time->elapsed());
396 this, &ResourceAccess::disconnected); 388 d->openingSocket = false;
397 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), 389 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected);
398 this, SLOT(connectionError(QLocalSocket::LocalSocketError))); 390 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError)));
399 QObject::connect(d->socket.data(), &QIODevice::readyRead, 391 QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage);
400 this, &ResourceAccess::readResourceMessage); 392 connected();
401 connected(); 393 },
402 }, 394 [this](int error, const QString &errorString) {
403 [this](int error, const QString &errorString) { 395 d->openingSocket = false;
404 d->openingSocket = false; 396 Warning() << "Failed to initialize socket " << errorString;
405 Warning() << "Failed to initialize socket " << errorString; 397 })
406 }).exec(); 398 .exec();
407} 399}
408 400
409void ResourceAccess::close() 401void ResourceAccess::close()
@@ -417,7 +409,7 @@ void ResourceAccess::close()
417void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command) 409void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command)
418{ 410{
419 Q_ASSERT(isReady()); 411 Q_ASSERT(isReady());
420 //TODO: we should have a timeout for commands 412 // TODO: we should have a timeout for commands
421 d->messageId++; 413 d->messageId++;
422 const auto messageId = d->messageId; 414 const auto messageId = d->messageId;
423 Log() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId); 415 Log() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId);
@@ -427,17 +419,17 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command)
427 d->pendingCommands.remove(messageId); 419 d->pendingCommands.remove(messageId);
428 command->callback(errorCode, errorMessage); 420 command->callback(errorCode, errorMessage);
429 }); 421 });
430 //Keep track of the command until we're sure it arrived 422 // Keep track of the command until we're sure it arrived
431 d->pendingCommands.insert(d->messageId, command); 423 d->pendingCommands.insert(d->messageId, command);
432 Commands::write(d->socket.data(), d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); 424 Commands::write(d->socket.data(), d->messageId, command->commandId, command->buffer.constData(), command->buffer.size());
433} 425}
434 426
435void ResourceAccess::processCommandQueue() 427void ResourceAccess::processCommandQueue()
436{ 428{
437 //TODO: serialize instead of blast them all through the socket? 429 // TODO: serialize instead of blast them all through the socket?
438 Trace() << "We have " << d->commandQueue.size() << " queued commands"; 430 Trace() << "We have " << d->commandQueue.size() << " queued commands";
439 Trace() << "Pending commands: " << d->pendingCommands.size(); 431 Trace() << "Pending commands: " << d->pendingCommands.size();
440 for (auto command: d->commandQueue) { 432 for (auto command : d->commandQueue) {
441 sendCommand(command); 433 sendCommand(command);
442 } 434 }
443 d->commandQueue.clear(); 435 d->commandQueue.clear();
@@ -446,7 +438,7 @@ void ResourceAccess::processCommandQueue()
446void ResourceAccess::processPendingCommandQueue() 438void ResourceAccess::processPendingCommandQueue()
447{ 439{
448 Trace() << "We have " << d->pendingCommands.size() << " pending commands"; 440 Trace() << "We have " << d->pendingCommands.size() << " pending commands";
449 for (auto command: d->pendingCommands) { 441 for (auto command : d->pendingCommands) {
450 Trace() << "Reenquing command " << command->commandId; 442 Trace() << "Reenquing command " << command->commandId;
451 d->commandQueue << command; 443 d->commandQueue << command;
452 } 444 }
@@ -471,9 +463,9 @@ void ResourceAccess::connected()
471 Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); 463 Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb);
472 } 464 }
473 465
474 //Reenqueue pending commands, we failed to send them 466 // Reenqueue pending commands, we failed to send them
475 processPendingCommandQueue(); 467 processPendingCommandQueue();
476 //Send queued commands 468 // Send queued commands
477 processCommandQueue(); 469 processCommandQueue();
478 470
479 emit ready(true); 471 emit ready(true);
@@ -510,7 +502,8 @@ void ResourceAccess::readResourceMessage()
510 d->partialMessageBuffer += d->socket->readAll(); 502 d->partialMessageBuffer += d->socket->readAll();
511 503
512 // should be scheduled rather than processed all at once 504 // should be scheduled rather than processed all at once
513 while (processMessageBuffer()) {} 505 while (processMessageBuffer()) {
506 }
514} 507}
515 508
516bool ResourceAccess::processMessageBuffer() 509bool ResourceAccess::processMessageBuffer()
@@ -521,9 +514,9 @@ bool ResourceAccess::processMessageBuffer()
521 return false; 514 return false;
522 } 515 }
523 516
524 //const uint messageId = *(int*)(d->partialMessageBuffer.constData()); 517 // const uint messageId = *(int*)(d->partialMessageBuffer.constData());
525 const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); 518 const int commandId = *(int *)(d->partialMessageBuffer.constData() + sizeof(uint));
526 const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); 519 const uint size = *(int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint));
527 520
528 if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) { 521 if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) {
529 Warning() << "command too small"; 522 Warning() << "command too small";
@@ -546,10 +539,8 @@ bool ResourceAccess::processMessageBuffer()
546 Log() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"); 539 Log() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully");
547 540
548 d->completeCommands << buffer->id(); 541 d->completeCommands << buffer->id();
549 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 542 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
550 queuedInvoke([=]() { 543 queuedInvoke([=]() { d->callCallbacks(); }, this);
551 d->callCallbacks();
552 }, this);
553 break; 544 break;
554 } 545 }
555 case Commands::NotificationCommand: { 546 case Commands::NotificationCommand: {
@@ -563,21 +554,18 @@ bool ResourceAccess::processMessageBuffer()
563 Log() << "Received inspection notification."; 554 Log() << "Received inspection notification.";
564 Notification n; 555 Notification n;
565 if (buffer->identifier()) { 556 if (buffer->identifier()) {
566 //Don't use fromRawData, the buffer is gone once we invoke emit notification 557 // Don't use fromRawData, the buffer is gone once we invoke emit notification
567 n.id = BufferUtils::extractBufferCopy(buffer->identifier()); 558 n.id = BufferUtils::extractBufferCopy(buffer->identifier());
568 } 559 }
569 if (buffer->message()) { 560 if (buffer->message()) {
570 //Don't use fromRawData, the buffer is gone once we invoke emit notification 561 // Don't use fromRawData, the buffer is gone once we invoke emit notification
571 n.message = BufferUtils::extractBufferCopy(buffer->message()); 562 n.message = BufferUtils::extractBufferCopy(buffer->message());
572 } 563 }
573 n.type = buffer->type(); 564 n.type = buffer->type();
574 n.code = buffer->code(); 565 n.code = buffer->code();
575 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 566 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
576 queuedInvoke([=]() { 567 queuedInvoke([=]() { emit notification(n); }, this);
577 emit notification(n); 568 } break;
578 }, this);
579 }
580 break;
581 case Sink::Commands::NotificationType::NotificationType_Status: 569 case Sink::Commands::NotificationType::NotificationType_Status:
582 case Sink::Commands::NotificationType::NotificationType_Warning: 570 case Sink::Commands::NotificationType::NotificationType_Warning:
583 case Sink::Commands::NotificationType::NotificationType_Progress: 571 case Sink::Commands::NotificationType::NotificationType_Progress:
@@ -608,7 +596,7 @@ ResourceAccessFactory &ResourceAccessFactory::instance()
608Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &instanceIdentifier) 596Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &instanceIdentifier)
609{ 597{
610 if (!mCache.contains(instanceIdentifier)) { 598 if (!mCache.contains(instanceIdentifier)) {
611 //Reuse the pointer if something else kept the resourceaccess alive 599 // Reuse the pointer if something else kept the resourceaccess alive
612 if (mWeakCache.contains(instanceIdentifier)) { 600 if (mWeakCache.contains(instanceIdentifier)) {
613 auto sharedPointer = mWeakCache.value(instanceIdentifier).toStrongRef(); 601 auto sharedPointer = mWeakCache.value(instanceIdentifier).toStrongRef();
614 if (sharedPointer) { 602 if (sharedPointer) {
@@ -616,7 +604,7 @@ Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &ins
616 } 604 }
617 } 605 }
618 if (!mCache.contains(instanceIdentifier)) { 606 if (!mCache.contains(instanceIdentifier)) {
619 //Create a new instance if necessary 607 // Create a new instance if necessary
620 auto sharedPointer = Sink::ResourceAccess::Ptr::create(instanceIdentifier); 608 auto sharedPointer = Sink::ResourceAccess::Ptr::create(instanceIdentifier);
621 QObject::connect(sharedPointer.data(), &Sink::ResourceAccess::ready, sharedPointer.data(), [this, instanceIdentifier](bool ready) { 609 QObject::connect(sharedPointer.data(), &Sink::ResourceAccess::ready, sharedPointer.data(), [this, instanceIdentifier](bool ready) {
622 if (!ready) { 610 if (!ready) {
@@ -629,10 +617,8 @@ Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &ins
629 } 617 }
630 if (!mTimer.contains(instanceIdentifier)) { 618 if (!mTimer.contains(instanceIdentifier)) {
631 auto timer = new QTimer; 619 auto timer = new QTimer;
632 //Drop connection after 3 seconds (which is a random value) 620 // Drop connection after 3 seconds (which is a random value)
633 QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { 621 QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { mCache.remove(instanceIdentifier); });
634 mCache.remove(instanceIdentifier);
635 });
636 timer->setInterval(3000); 622 timer->setInterval(3000);
637 mTimer.insert(instanceIdentifier, timer); 623 mTimer.insert(instanceIdentifier, timer);
638 } 624 }
@@ -640,7 +626,6 @@ Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &ins
640 timer->start(); 626 timer->start();
641 return mCache.value(instanceIdentifier); 627 return mCache.value(instanceIdentifier);
642} 628}
643
644} 629}
645 630
646#pragma clang diagnostic push 631#pragma clang diagnostic push