diff options
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r-- | common/resourceaccess.cpp | 144 |
1 files changed, 80 insertions, 64 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index d3bd85f..c878143 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -44,12 +44,6 @@ | |||
44 | #include <QBuffer> | 44 | #include <QBuffer> |
45 | #include <QTime> | 45 | #include <QTime> |
46 | 46 | ||
47 | #undef Trace | ||
48 | #define TracePrivate() Trace_area("client.communication." + resourceInstanceIdentifier) | ||
49 | #define Trace() Trace_area("client.communication." + d->resourceInstanceIdentifier) | ||
50 | #undef Log | ||
51 | #define Log() Log_area("client.communication." + d->resourceInstanceIdentifier) | ||
52 | |||
53 | static void queuedInvoke(const std::function<void()> &f, QObject *context = 0) | 47 | static void queuedInvoke(const std::function<void()> &f, QObject *context = 0) |
54 | { | 48 | { |
55 | auto timer = QSharedPointer<QTimer>::create(); | 49 | auto timer = QSharedPointer<QTimer>::create(); |
@@ -100,8 +94,10 @@ public: | |||
100 | QHash<uint, bool> completeCommands; | 94 | QHash<uint, bool> completeCommands; |
101 | uint messageId; | 95 | uint messageId; |
102 | bool openingSocket; | 96 | bool openingSocket; |
97 | SINK_DEBUG_COMPONENT(resourceInstanceIdentifier) | ||
103 | }; | 98 | }; |
104 | 99 | ||
100 | |||
105 | ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q) | 101 | ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q) |
106 | : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false) | 102 | : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false) |
107 | { | 103 | { |
@@ -111,7 +107,7 @@ void ResourceAccess::Private::abortPendingOperations() | |||
111 | { | 107 | { |
112 | callCallbacks(); | 108 | callCallbacks(); |
113 | if (!resultHandler.isEmpty()) { | 109 | if (!resultHandler.isEmpty()) { |
114 | Warning() << "Aborting pending operations " << resultHandler.keys(); | 110 | SinkWarning() << "Aborting pending operations " << resultHandler.keys(); |
115 | } | 111 | } |
116 | auto handlers = resultHandler.values(); | 112 | auto handlers = resultHandler.values(); |
117 | resultHandler.clear(); | 113 | resultHandler.clear(); |
@@ -165,7 +161,7 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect() | |||
165 | auto counter = QSharedPointer<int>::create(0); | 161 | auto counter = QSharedPointer<int>::create(0); |
166 | return KAsync::dowhile([this]() -> bool { return !socket; }, | 162 | return KAsync::dowhile([this]() -> bool { return !socket; }, |
167 | [this, counter](KAsync::Future<void> &future) { | 163 | [this, counter](KAsync::Future<void> &future) { |
168 | TracePrivate() << "Loop"; | 164 | SinkTrace() << "Loop"; |
169 | connectToServer(resourceInstanceIdentifier) | 165 | connectToServer(resourceInstanceIdentifier) |
170 | .then<void, QSharedPointer<QLocalSocket>>( | 166 | .then<void, QSharedPointer<QLocalSocket>>( |
171 | [this, &future](const QSharedPointer<QLocalSocket> &s) { | 167 | [this, &future](const QSharedPointer<QLocalSocket> &s) { |
@@ -178,7 +174,7 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect() | |||
178 | static int timeout = 500; | 174 | static int timeout = 500; |
179 | static int maxRetries = timeout / waitTime; | 175 | static int maxRetries = timeout / waitTime; |
180 | if (*counter > maxRetries) { | 176 | if (*counter > maxRetries) { |
181 | TracePrivate() << "Giving up"; | 177 | SinkTrace() << "Giving up"; |
182 | future.setError(-1, "Failed to connect to socket"); | 178 | future.setError(-1, "Failed to connect to socket"); |
183 | } else { | 179 | } else { |
184 | KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec(); | 180 | KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec(); |
@@ -192,17 +188,17 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect() | |||
192 | KAsync::Job<void> ResourceAccess::Private::initializeSocket() | 188 | KAsync::Job<void> ResourceAccess::Private::initializeSocket() |
193 | { | 189 | { |
194 | return KAsync::start<void>([this](KAsync::Future<void> &future) { | 190 | return KAsync::start<void>([this](KAsync::Future<void> &future) { |
195 | TracePrivate() << "Trying to connect"; | 191 | SinkTrace() << "Trying to connect"; |
196 | connectToServer(resourceInstanceIdentifier) | 192 | connectToServer(resourceInstanceIdentifier) |
197 | .then<void, QSharedPointer<QLocalSocket>>( | 193 | .then<void, QSharedPointer<QLocalSocket>>( |
198 | [this, &future](const QSharedPointer<QLocalSocket> &s) { | 194 | [this, &future](const QSharedPointer<QLocalSocket> &s) { |
199 | TracePrivate() << "Connected to resource, without having to start it."; | 195 | SinkTrace() << "Connected to resource, without having to start it."; |
200 | Q_ASSERT(s); | 196 | Q_ASSERT(s); |
201 | socket = s; | 197 | socket = s; |
202 | future.setFinished(); | 198 | future.setFinished(); |
203 | }, | 199 | }, |
204 | [this, &future](int errorCode, const QString &errorString) { | 200 | [this, &future](int errorCode, const QString &errorString) { |
205 | TracePrivate() << "Failed to connect, starting resource"; | 201 | SinkTrace() << "Failed to connect, starting resource"; |
206 | // We failed to connect, so let's start the resource | 202 | // We failed to connect, so let's start the resource |
207 | QStringList args; | 203 | QStringList args; |
208 | if (Sink::Test::testModeEnabled()) { | 204 | if (Sink::Test::testModeEnabled()) { |
@@ -211,16 +207,16 @@ KAsync::Job<void> ResourceAccess::Private::initializeSocket() | |||
211 | args << resourceInstanceIdentifier << resourceName; | 207 | args << resourceInstanceIdentifier << resourceName; |
212 | qint64 pid = 0; | 208 | qint64 pid = 0; |
213 | if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { | 209 | if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { |
214 | TracePrivate() << "Started resource " << pid; | 210 | SinkTrace() << "Started resource " << pid; |
215 | tryToConnect() | 211 | tryToConnect() |
216 | .then<void>([&future]() { future.setFinished(); }, | 212 | .then<void>([&future]() { future.setFinished(); }, |
217 | [this, &future](int errorCode, const QString &errorString) { | 213 | [this, &future](int errorCode, const QString &errorString) { |
218 | Warning() << "Failed to connect to started resource"; | 214 | SinkWarning() << "Failed to connect to started resource"; |
219 | future.setError(errorCode, errorString); | 215 | future.setError(errorCode, errorString); |
220 | }) | 216 | }) |
221 | .exec(); | 217 | .exec(); |
222 | } else { | 218 | } else { |
223 | Warning() << "Failed to start resource"; | 219 | SinkWarning() << "Failed to start resource"; |
224 | } | 220 | } |
225 | }) | 221 | }) |
226 | .exec(); | 222 | .exec(); |
@@ -230,14 +226,15 @@ KAsync::Job<void> ResourceAccess::Private::initializeSocket() | |||
230 | ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType) | 226 | ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType) |
231 | : ResourceAccessInterface(), d(new Private(resourceType, resourceInstanceIdentifier, this)) | 227 | : ResourceAccessInterface(), d(new Private(resourceType, resourceInstanceIdentifier, this)) |
232 | { | 228 | { |
233 | Trace() << "Starting access"; | 229 | mResourceStatus = Sink::ApplicationDomain::OfflineStatus; |
230 | SinkTrace() << "Starting access"; | ||
234 | } | 231 | } |
235 | 232 | ||
236 | ResourceAccess::~ResourceAccess() | 233 | ResourceAccess::~ResourceAccess() |
237 | { | 234 | { |
238 | Log() << "Closing access"; | 235 | SinkLog() << "Closing access"; |
239 | if (!d->resultHandler.isEmpty()) { | 236 | if (!d->resultHandler.isEmpty()) { |
240 | Warning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys(); | 237 | SinkWarning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys(); |
241 | } | 238 | } |
242 | } | 239 | } |
243 | 240 | ||
@@ -294,7 +291,7 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu | |||
294 | 291 | ||
295 | KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) | 292 | KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) |
296 | { | 293 | { |
297 | Trace() << "Sending synchronize command: " << sourceSync << localSync; | 294 | SinkTrace() << "Sending synchronize command: " << sourceSync << localSync; |
298 | flatbuffers::FlatBufferBuilder fbb; | 295 | flatbuffers::FlatBufferBuilder fbb; |
299 | auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync); | 296 | auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync); |
300 | Sink::Commands::FinishSynchronizeBuffer(fbb, command); | 297 | Sink::Commands::FinishSynchronizeBuffer(fbb, command); |
@@ -375,7 +372,7 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &insp | |||
375 | void ResourceAccess::open() | 372 | void ResourceAccess::open() |
376 | { | 373 | { |
377 | if (d->socket && d->socket->isValid()) { | 374 | if (d->socket && d->socket->isValid()) { |
378 | // Trace() << "Socket valid, so not opening again"; | 375 | // SinkTrace() << "Socket valid, so not opening again"; |
379 | return; | 376 | return; |
380 | } | 377 | } |
381 | if (d->openingSocket) { | 378 | if (d->openingSocket) { |
@@ -387,7 +384,7 @@ void ResourceAccess::open() | |||
387 | d->initializeSocket() | 384 | d->initializeSocket() |
388 | .then<void>( | 385 | .then<void>( |
389 | [this, time]() { | 386 | [this, time]() { |
390 | Trace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); | 387 | SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); |
391 | d->openingSocket = false; | 388 | d->openingSocket = false; |
392 | QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); | 389 | QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); |
393 | QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); | 390 | QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); |
@@ -396,16 +393,16 @@ void ResourceAccess::open() | |||
396 | }, | 393 | }, |
397 | [this](int error, const QString &errorString) { | 394 | [this](int error, const QString &errorString) { |
398 | d->openingSocket = false; | 395 | d->openingSocket = false; |
399 | Warning() << "Failed to initialize socket " << errorString; | 396 | SinkWarning() << "Failed to initialize socket " << errorString; |
400 | }) | 397 | }) |
401 | .exec(); | 398 | .exec(); |
402 | } | 399 | } |
403 | 400 | ||
404 | void ResourceAccess::close() | 401 | void ResourceAccess::close() |
405 | { | 402 | { |
406 | Log() << QString("Closing %1").arg(d->socket->fullServerName()); | 403 | SinkLog() << QString("Closing %1").arg(d->socket->fullServerName()); |
407 | Trace() << "Pending commands: " << d->pendingCommands.size(); | 404 | SinkTrace() << "Pending commands: " << d->pendingCommands.size(); |
408 | Trace() << "Queued commands: " << d->commandQueue.size(); | 405 | SinkTrace() << "Queued commands: " << d->commandQueue.size(); |
409 | d->socket->close(); | 406 | d->socket->close(); |
410 | } | 407 | } |
411 | 408 | ||
@@ -415,10 +412,10 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command) | |||
415 | // TODO: we should have a timeout for commands | 412 | // TODO: we should have a timeout for commands |
416 | d->messageId++; | 413 | d->messageId++; |
417 | const auto messageId = d->messageId; | 414 | const auto messageId = d->messageId; |
418 | Trace() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId); | 415 | SinkTrace() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId); |
419 | Q_ASSERT(command->callback); | 416 | Q_ASSERT(command->callback); |
420 | registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) { | 417 | registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) { |
421 | Trace() << "Command complete " << messageId; | 418 | SinkTrace() << "Command complete " << messageId; |
422 | d->pendingCommands.remove(messageId); | 419 | d->pendingCommands.remove(messageId); |
423 | command->callback(errorCode, errorMessage); | 420 | command->callback(errorCode, errorMessage); |
424 | }); | 421 | }); |
@@ -430,8 +427,8 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command) | |||
430 | void ResourceAccess::processCommandQueue() | 427 | void ResourceAccess::processCommandQueue() |
431 | { | 428 | { |
432 | // TODO: serialize instead of blast them all through the socket? | 429 | // TODO: serialize instead of blast them all through the socket? |
433 | Trace() << "We have " << d->commandQueue.size() << " queued commands"; | 430 | SinkTrace() << "We have " << d->commandQueue.size() << " queued commands"; |
434 | Trace() << "Pending commands: " << d->pendingCommands.size(); | 431 | SinkTrace() << "Pending commands: " << d->pendingCommands.size(); |
435 | for (auto command : d->commandQueue) { | 432 | for (auto command : d->commandQueue) { |
436 | sendCommand(command); | 433 | sendCommand(command); |
437 | } | 434 | } |
@@ -440,9 +437,9 @@ void ResourceAccess::processCommandQueue() | |||
440 | 437 | ||
441 | void ResourceAccess::processPendingCommandQueue() | 438 | void ResourceAccess::processPendingCommandQueue() |
442 | { | 439 | { |
443 | Trace() << "We have " << d->pendingCommands.size() << " pending commands"; | 440 | SinkTrace() << "We have " << d->pendingCommands.size() << " pending commands"; |
444 | for (auto command : d->pendingCommands) { | 441 | for (auto command : d->pendingCommands) { |
445 | Trace() << "Reenquing command " << command->commandId; | 442 | SinkTrace() << "Reenquing command " << command->commandId; |
446 | d->commandQueue << command; | 443 | d->commandQueue << command; |
447 | } | 444 | } |
448 | d->pendingCommands.clear(); | 445 | d->pendingCommands.clear(); |
@@ -452,11 +449,11 @@ void ResourceAccess::processPendingCommandQueue() | |||
452 | void ResourceAccess::connected() | 449 | void ResourceAccess::connected() |
453 | { | 450 | { |
454 | if (!isReady()) { | 451 | if (!isReady()) { |
455 | Trace() << "Connected but not ready?"; | 452 | SinkTrace() << "Connected but not ready?"; |
456 | return; | 453 | return; |
457 | } | 454 | } |
458 | 455 | ||
459 | Trace() << QString("Connected: %1").arg(d->socket->fullServerName()); | 456 | SinkTrace() << QString("Connected: %1").arg(d->socket->fullServerName()); |
460 | 457 | ||
461 | { | 458 | { |
462 | flatbuffers::FlatBufferBuilder fbb; | 459 | flatbuffers::FlatBufferBuilder fbb; |
@@ -476,7 +473,7 @@ void ResourceAccess::connected() | |||
476 | 473 | ||
477 | void ResourceAccess::disconnected() | 474 | void ResourceAccess::disconnected() |
478 | { | 475 | { |
479 | Log() << QString("Disconnected from %1").arg(d->socket->fullServerName()); | 476 | SinkLog() << QString("Disconnected from %1").arg(d->socket->fullServerName()); |
480 | d->socket->close(); | 477 | d->socket->close(); |
481 | emit ready(false); | 478 | emit ready(false); |
482 | } | 479 | } |
@@ -485,15 +482,15 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) | |||
485 | { | 482 | { |
486 | const bool resourceCrashed = d->partialMessageBuffer.contains("PANIC"); | 483 | const bool resourceCrashed = d->partialMessageBuffer.contains("PANIC"); |
487 | if (resourceCrashed) { | 484 | if (resourceCrashed) { |
488 | ErrorMsg() << "The resource crashed!"; | 485 | SinkError() << "The resource crashed!"; |
489 | d->abortPendingOperations(); | 486 | d->abortPendingOperations(); |
490 | } else if (error == QLocalSocket::PeerClosedError) { | 487 | } else if (error == QLocalSocket::PeerClosedError) { |
491 | Log() << "The resource closed the connection."; | 488 | SinkLog() << "The resource closed the connection."; |
492 | d->abortPendingOperations(); | 489 | d->abortPendingOperations(); |
493 | } else { | 490 | } else { |
494 | Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); | 491 | SinkWarning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); |
495 | if (d->pendingCommands.size()) { | 492 | if (d->pendingCommands.size()) { |
496 | Trace() << "Reconnecting due to pending operations: " << d->pendingCommands.size(); | 493 | SinkTrace() << "Reconnecting due to pending operations: " << d->pendingCommands.size(); |
497 | open(); | 494 | open(); |
498 | } | 495 | } |
499 | } | 496 | } |
@@ -502,7 +499,7 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) | |||
502 | void ResourceAccess::readResourceMessage() | 499 | void ResourceAccess::readResourceMessage() |
503 | { | 500 | { |
504 | if (!d->socket || !d->socket->isValid()) { | 501 | if (!d->socket || !d->socket->isValid()) { |
505 | Warning() << "No socket available"; | 502 | SinkWarning() << "No socket available"; |
506 | return; | 503 | return; |
507 | } | 504 | } |
508 | 505 | ||
@@ -513,11 +510,27 @@ void ResourceAccess::readResourceMessage() | |||
513 | } | 510 | } |
514 | } | 511 | } |
515 | 512 | ||
513 | static Sink::Notification getNotification(const Sink::Commands::Notification *buffer) | ||
514 | { | ||
515 | Sink::Notification n; | ||
516 | if (buffer->identifier()) { | ||
517 | // Don't use fromRawData, the buffer is gone once we invoke emit notification | ||
518 | n.id = BufferUtils::extractBufferCopy(buffer->identifier()); | ||
519 | } | ||
520 | if (buffer->message()) { | ||
521 | // Don't use fromRawData, the buffer is gone once we invoke emit notification | ||
522 | n.message = BufferUtils::extractBufferCopy(buffer->message()); | ||
523 | } | ||
524 | n.type = buffer->type(); | ||
525 | n.code = buffer->code(); | ||
526 | return n; | ||
527 | } | ||
528 | |||
516 | bool ResourceAccess::processMessageBuffer() | 529 | bool ResourceAccess::processMessageBuffer() |
517 | { | 530 | { |
518 | static const int headerSize = Commands::headerSize(); | 531 | static const int headerSize = Commands::headerSize(); |
519 | if (d->partialMessageBuffer.size() < headerSize) { | 532 | if (d->partialMessageBuffer.size() < headerSize) { |
520 | Warning() << "command too small"; | 533 | SinkWarning() << "command too small"; |
521 | return false; | 534 | return false; |
522 | } | 535 | } |
523 | 536 | ||
@@ -526,16 +539,16 @@ bool ResourceAccess::processMessageBuffer() | |||
526 | const uint size = *(int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); | 539 | const uint size = *(int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); |
527 | 540 | ||
528 | if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) { | 541 | if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) { |
529 | Warning() << "command too small"; | 542 | SinkWarning() << "command too small"; |
530 | return false; | 543 | return false; |
531 | } | 544 | } |
532 | 545 | ||
533 | switch (commandId) { | 546 | switch (commandId) { |
534 | case Commands::RevisionUpdateCommand: { | 547 | case Commands::RevisionUpdateCommand: { |
535 | auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); | 548 | auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); |
536 | Trace() << QString("Revision updated to: %1").arg(buffer->revision()); | 549 | SinkTrace() << QString("Revision updated to: %1").arg(buffer->revision()); |
537 | Notification n; | 550 | Notification n; |
538 | n.type = Sink::Commands::NotificationType::NotificationType_RevisionUpdate; | 551 | n.type = Sink::Notification::RevisionUpdate; |
539 | emit notification(n); | 552 | emit notification(n); |
540 | emit revisionChanged(buffer->revision()); | 553 | emit revisionChanged(buffer->revision()); |
541 | 554 | ||
@@ -543,7 +556,7 @@ bool ResourceAccess::processMessageBuffer() | |||
543 | } | 556 | } |
544 | case Commands::CommandCompletionCommand: { | 557 | case Commands::CommandCompletionCommand: { |
545 | auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); | 558 | auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); |
546 | Trace() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"); | 559 | SinkTrace() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"); |
547 | 560 | ||
548 | d->completeCommands.insert(buffer->id(), buffer->success()); | 561 | d->completeCommands.insert(buffer->id(), buffer->success()); |
549 | // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first | 562 | // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first |
@@ -553,32 +566,34 @@ bool ResourceAccess::processMessageBuffer() | |||
553 | case Commands::NotificationCommand: { | 566 | case Commands::NotificationCommand: { |
554 | auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize); | 567 | auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize); |
555 | switch (buffer->type()) { | 568 | switch (buffer->type()) { |
556 | case Sink::Commands::NotificationType::NotificationType_Shutdown: | 569 | case Sink::Notification::Shutdown: |
557 | Log() << "Received shutdown notification."; | 570 | SinkLog() << "Received shutdown notification."; |
558 | close(); | 571 | close(); |
559 | break; | 572 | break; |
560 | case Sink::Commands::NotificationType::NotificationType_Inspection: { | 573 | case Sink::Notification::Inspection: { |
561 | Trace() << "Received inspection notification."; | 574 | SinkTrace() << "Received inspection notification."; |
562 | Notification n; | 575 | auto n = getNotification(buffer); |
563 | if (buffer->identifier()) { | ||
564 | // Don't use fromRawData, the buffer is gone once we invoke emit notification | ||
565 | n.id = BufferUtils::extractBufferCopy(buffer->identifier()); | ||
566 | } | ||
567 | if (buffer->message()) { | ||
568 | // Don't use fromRawData, the buffer is gone once we invoke emit notification | ||
569 | n.message = BufferUtils::extractBufferCopy(buffer->message()); | ||
570 | } | ||
571 | n.type = buffer->type(); | ||
572 | n.code = buffer->code(); | ||
573 | // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first | 576 | // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first |
574 | queuedInvoke([=]() { emit notification(n); }, this); | 577 | queuedInvoke([=]() { emit notification(n); }, this); |
575 | } break; | 578 | } break; |
576 | case Sink::Commands::NotificationType::NotificationType_Status: | 579 | case Sink::Notification::Status: |
577 | case Sink::Commands::NotificationType::NotificationType_Warning: | 580 | if (mResourceStatus == buffer->code()) { |
578 | case Sink::Commands::NotificationType::NotificationType_Progress: | 581 | SinkTrace() << "Got an unnecessary status notification"; |
579 | case Sink::Commands::NotificationType::NotificationType_RevisionUpdate: | 582 | break; |
583 | } | ||
584 | mResourceStatus = buffer->code(); | ||
585 | SinkTrace() << "Updated status: " << mResourceStatus; | ||
586 | [[clang::fallthrough]]; | ||
587 | case Sink::Notification::Warning: | ||
588 | [[clang::fallthrough]]; | ||
589 | case Sink::Notification::Progress: { | ||
590 | auto n = getNotification(buffer); | ||
591 | SinkTrace() << "Received notification: " << n.type; | ||
592 | emit notification(n); | ||
593 | } break; | ||
594 | case Sink::Notification::RevisionUpdate: | ||
580 | default: | 595 | default: |
581 | Warning() << "Received unknown notification: " << buffer->type(); | 596 | SinkWarning() << "Received unknown notification: " << buffer->type(); |
582 | break; | 597 | break; |
583 | } | 598 | } |
584 | break; | 599 | break; |
@@ -624,6 +639,7 @@ Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &ins | |||
624 | } | 639 | } |
625 | if (!mTimer.contains(instanceIdentifier)) { | 640 | if (!mTimer.contains(instanceIdentifier)) { |
626 | auto timer = new QTimer; | 641 | auto timer = new QTimer; |
642 | timer->setSingleShot(true); | ||
627 | // Drop connection after 3 seconds (which is a random value) | 643 | // Drop connection after 3 seconds (which is a random value) |
628 | QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { mCache.remove(instanceIdentifier); }); | 644 | QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { mCache.remove(instanceIdentifier); }); |
629 | timer->setInterval(3000); | 645 | timer->setInterval(3000); |