summaryrefslogtreecommitdiffstats
path: root/common/resourceaccess.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-11 11:55:29 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-11 11:55:29 +0200
commit3a3118e768e1447dc7524328e84b8d7faef81fe1 (patch)
treeaf5582170ed6164fffc9365f34b17bf449c0db40 /common/resourceaccess.cpp
parentf9379318d801df204cc50385c5eca1f28e91755e (diff)
parentce2fd2666f084eebe443598f6f3740a02913091e (diff)
downloadsink-3a3118e768e1447dc7524328e84b8d7faef81fe1.tar.gz
sink-3a3118e768e1447dc7524328e84b8d7faef81fe1.zip
Merge branch 'feature/notifications' into develop
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r--common/resourceaccess.cpp144
1 files changed, 80 insertions, 64 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index d3bd85f..c878143 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -44,12 +44,6 @@
44#include <QBuffer> 44#include <QBuffer>
45#include <QTime> 45#include <QTime>
46 46
47#undef Trace
48#define TracePrivate() Trace_area("client.communication." + resourceInstanceIdentifier)
49#define Trace() Trace_area("client.communication." + d->resourceInstanceIdentifier)
50#undef Log
51#define Log() Log_area("client.communication." + d->resourceInstanceIdentifier)
52
53static void queuedInvoke(const std::function<void()> &f, QObject *context = 0) 47static void queuedInvoke(const std::function<void()> &f, QObject *context = 0)
54{ 48{
55 auto timer = QSharedPointer<QTimer>::create(); 49 auto timer = QSharedPointer<QTimer>::create();
@@ -100,8 +94,10 @@ public:
100 QHash<uint, bool> completeCommands; 94 QHash<uint, bool> completeCommands;
101 uint messageId; 95 uint messageId;
102 bool openingSocket; 96 bool openingSocket;
97 SINK_DEBUG_COMPONENT(resourceInstanceIdentifier)
103}; 98};
104 99
100
105ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q) 101ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q)
106 : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false) 102 : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false)
107{ 103{
@@ -111,7 +107,7 @@ void ResourceAccess::Private::abortPendingOperations()
111{ 107{
112 callCallbacks(); 108 callCallbacks();
113 if (!resultHandler.isEmpty()) { 109 if (!resultHandler.isEmpty()) {
114 Warning() << "Aborting pending operations " << resultHandler.keys(); 110 SinkWarning() << "Aborting pending operations " << resultHandler.keys();
115 } 111 }
116 auto handlers = resultHandler.values(); 112 auto handlers = resultHandler.values();
117 resultHandler.clear(); 113 resultHandler.clear();
@@ -165,7 +161,7 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect()
165 auto counter = QSharedPointer<int>::create(0); 161 auto counter = QSharedPointer<int>::create(0);
166 return KAsync::dowhile([this]() -> bool { return !socket; }, 162 return KAsync::dowhile([this]() -> bool { return !socket; },
167 [this, counter](KAsync::Future<void> &future) { 163 [this, counter](KAsync::Future<void> &future) {
168 TracePrivate() << "Loop"; 164 SinkTrace() << "Loop";
169 connectToServer(resourceInstanceIdentifier) 165 connectToServer(resourceInstanceIdentifier)
170 .then<void, QSharedPointer<QLocalSocket>>( 166 .then<void, QSharedPointer<QLocalSocket>>(
171 [this, &future](const QSharedPointer<QLocalSocket> &s) { 167 [this, &future](const QSharedPointer<QLocalSocket> &s) {
@@ -178,7 +174,7 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect()
178 static int timeout = 500; 174 static int timeout = 500;
179 static int maxRetries = timeout / waitTime; 175 static int maxRetries = timeout / waitTime;
180 if (*counter > maxRetries) { 176 if (*counter > maxRetries) {
181 TracePrivate() << "Giving up"; 177 SinkTrace() << "Giving up";
182 future.setError(-1, "Failed to connect to socket"); 178 future.setError(-1, "Failed to connect to socket");
183 } else { 179 } else {
184 KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec(); 180 KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec();
@@ -192,17 +188,17 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect()
192KAsync::Job<void> ResourceAccess::Private::initializeSocket() 188KAsync::Job<void> ResourceAccess::Private::initializeSocket()
193{ 189{
194 return KAsync::start<void>([this](KAsync::Future<void> &future) { 190 return KAsync::start<void>([this](KAsync::Future<void> &future) {
195 TracePrivate() << "Trying to connect"; 191 SinkTrace() << "Trying to connect";
196 connectToServer(resourceInstanceIdentifier) 192 connectToServer(resourceInstanceIdentifier)
197 .then<void, QSharedPointer<QLocalSocket>>( 193 .then<void, QSharedPointer<QLocalSocket>>(
198 [this, &future](const QSharedPointer<QLocalSocket> &s) { 194 [this, &future](const QSharedPointer<QLocalSocket> &s) {
199 TracePrivate() << "Connected to resource, without having to start it."; 195 SinkTrace() << "Connected to resource, without having to start it.";
200 Q_ASSERT(s); 196 Q_ASSERT(s);
201 socket = s; 197 socket = s;
202 future.setFinished(); 198 future.setFinished();
203 }, 199 },
204 [this, &future](int errorCode, const QString &errorString) { 200 [this, &future](int errorCode, const QString &errorString) {
205 TracePrivate() << "Failed to connect, starting resource"; 201 SinkTrace() << "Failed to connect, starting resource";
206 // We failed to connect, so let's start the resource 202 // We failed to connect, so let's start the resource
207 QStringList args; 203 QStringList args;
208 if (Sink::Test::testModeEnabled()) { 204 if (Sink::Test::testModeEnabled()) {
@@ -211,16 +207,16 @@ KAsync::Job<void> ResourceAccess::Private::initializeSocket()
211 args << resourceInstanceIdentifier << resourceName; 207 args << resourceInstanceIdentifier << resourceName;
212 qint64 pid = 0; 208 qint64 pid = 0;
213 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { 209 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) {
214 TracePrivate() << "Started resource " << pid; 210 SinkTrace() << "Started resource " << pid;
215 tryToConnect() 211 tryToConnect()
216 .then<void>([&future]() { future.setFinished(); }, 212 .then<void>([&future]() { future.setFinished(); },
217 [this, &future](int errorCode, const QString &errorString) { 213 [this, &future](int errorCode, const QString &errorString) {
218 Warning() << "Failed to connect to started resource"; 214 SinkWarning() << "Failed to connect to started resource";
219 future.setError(errorCode, errorString); 215 future.setError(errorCode, errorString);
220 }) 216 })
221 .exec(); 217 .exec();
222 } else { 218 } else {
223 Warning() << "Failed to start resource"; 219 SinkWarning() << "Failed to start resource";
224 } 220 }
225 }) 221 })
226 .exec(); 222 .exec();
@@ -230,14 +226,15 @@ KAsync::Job<void> ResourceAccess::Private::initializeSocket()
230ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType) 226ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType)
231 : ResourceAccessInterface(), d(new Private(resourceType, resourceInstanceIdentifier, this)) 227 : ResourceAccessInterface(), d(new Private(resourceType, resourceInstanceIdentifier, this))
232{ 228{
233 Trace() << "Starting access"; 229 mResourceStatus = Sink::ApplicationDomain::OfflineStatus;
230 SinkTrace() << "Starting access";
234} 231}
235 232
236ResourceAccess::~ResourceAccess() 233ResourceAccess::~ResourceAccess()
237{ 234{
238 Log() << "Closing access"; 235 SinkLog() << "Closing access";
239 if (!d->resultHandler.isEmpty()) { 236 if (!d->resultHandler.isEmpty()) {
240 Warning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys(); 237 SinkWarning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys();
241 } 238 }
242} 239}
243 240
@@ -294,7 +291,7 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu
294 291
295KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) 292KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync)
296{ 293{
297 Trace() << "Sending synchronize command: " << sourceSync << localSync; 294 SinkTrace() << "Sending synchronize command: " << sourceSync << localSync;
298 flatbuffers::FlatBufferBuilder fbb; 295 flatbuffers::FlatBufferBuilder fbb;
299 auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync); 296 auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync);
300 Sink::Commands::FinishSynchronizeBuffer(fbb, command); 297 Sink::Commands::FinishSynchronizeBuffer(fbb, command);
@@ -375,7 +372,7 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &insp
375void ResourceAccess::open() 372void ResourceAccess::open()
376{ 373{
377 if (d->socket && d->socket->isValid()) { 374 if (d->socket && d->socket->isValid()) {
378 // Trace() << "Socket valid, so not opening again"; 375 // SinkTrace() << "Socket valid, so not opening again";
379 return; 376 return;
380 } 377 }
381 if (d->openingSocket) { 378 if (d->openingSocket) {
@@ -387,7 +384,7 @@ void ResourceAccess::open()
387 d->initializeSocket() 384 d->initializeSocket()
388 .then<void>( 385 .then<void>(
389 [this, time]() { 386 [this, time]() {
390 Trace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); 387 SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed());
391 d->openingSocket = false; 388 d->openingSocket = false;
392 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); 389 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected);
393 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); 390 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError)));
@@ -396,16 +393,16 @@ void ResourceAccess::open()
396 }, 393 },
397 [this](int error, const QString &errorString) { 394 [this](int error, const QString &errorString) {
398 d->openingSocket = false; 395 d->openingSocket = false;
399 Warning() << "Failed to initialize socket " << errorString; 396 SinkWarning() << "Failed to initialize socket " << errorString;
400 }) 397 })
401 .exec(); 398 .exec();
402} 399}
403 400
404void ResourceAccess::close() 401void ResourceAccess::close()
405{ 402{
406 Log() << QString("Closing %1").arg(d->socket->fullServerName()); 403 SinkLog() << QString("Closing %1").arg(d->socket->fullServerName());
407 Trace() << "Pending commands: " << d->pendingCommands.size(); 404 SinkTrace() << "Pending commands: " << d->pendingCommands.size();
408 Trace() << "Queued commands: " << d->commandQueue.size(); 405 SinkTrace() << "Queued commands: " << d->commandQueue.size();
409 d->socket->close(); 406 d->socket->close();
410} 407}
411 408
@@ -415,10 +412,10 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command)
415 // TODO: we should have a timeout for commands 412 // TODO: we should have a timeout for commands
416 d->messageId++; 413 d->messageId++;
417 const auto messageId = d->messageId; 414 const auto messageId = d->messageId;
418 Trace() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId); 415 SinkTrace() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId);
419 Q_ASSERT(command->callback); 416 Q_ASSERT(command->callback);
420 registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) { 417 registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) {
421 Trace() << "Command complete " << messageId; 418 SinkTrace() << "Command complete " << messageId;
422 d->pendingCommands.remove(messageId); 419 d->pendingCommands.remove(messageId);
423 command->callback(errorCode, errorMessage); 420 command->callback(errorCode, errorMessage);
424 }); 421 });
@@ -430,8 +427,8 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command)
430void ResourceAccess::processCommandQueue() 427void ResourceAccess::processCommandQueue()
431{ 428{
432 // TODO: serialize instead of blast them all through the socket? 429 // TODO: serialize instead of blast them all through the socket?
433 Trace() << "We have " << d->commandQueue.size() << " queued commands"; 430 SinkTrace() << "We have " << d->commandQueue.size() << " queued commands";
434 Trace() << "Pending commands: " << d->pendingCommands.size(); 431 SinkTrace() << "Pending commands: " << d->pendingCommands.size();
435 for (auto command : d->commandQueue) { 432 for (auto command : d->commandQueue) {
436 sendCommand(command); 433 sendCommand(command);
437 } 434 }
@@ -440,9 +437,9 @@ void ResourceAccess::processCommandQueue()
440 437
441void ResourceAccess::processPendingCommandQueue() 438void ResourceAccess::processPendingCommandQueue()
442{ 439{
443 Trace() << "We have " << d->pendingCommands.size() << " pending commands"; 440 SinkTrace() << "We have " << d->pendingCommands.size() << " pending commands";
444 for (auto command : d->pendingCommands) { 441 for (auto command : d->pendingCommands) {
445 Trace() << "Reenquing command " << command->commandId; 442 SinkTrace() << "Reenquing command " << command->commandId;
446 d->commandQueue << command; 443 d->commandQueue << command;
447 } 444 }
448 d->pendingCommands.clear(); 445 d->pendingCommands.clear();
@@ -452,11 +449,11 @@ void ResourceAccess::processPendingCommandQueue()
452void ResourceAccess::connected() 449void ResourceAccess::connected()
453{ 450{
454 if (!isReady()) { 451 if (!isReady()) {
455 Trace() << "Connected but not ready?"; 452 SinkTrace() << "Connected but not ready?";
456 return; 453 return;
457 } 454 }
458 455
459 Trace() << QString("Connected: %1").arg(d->socket->fullServerName()); 456 SinkTrace() << QString("Connected: %1").arg(d->socket->fullServerName());
460 457
461 { 458 {
462 flatbuffers::FlatBufferBuilder fbb; 459 flatbuffers::FlatBufferBuilder fbb;
@@ -476,7 +473,7 @@ void ResourceAccess::connected()
476 473
477void ResourceAccess::disconnected() 474void ResourceAccess::disconnected()
478{ 475{
479 Log() << QString("Disconnected from %1").arg(d->socket->fullServerName()); 476 SinkLog() << QString("Disconnected from %1").arg(d->socket->fullServerName());
480 d->socket->close(); 477 d->socket->close();
481 emit ready(false); 478 emit ready(false);
482} 479}
@@ -485,15 +482,15 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
485{ 482{
486 const bool resourceCrashed = d->partialMessageBuffer.contains("PANIC"); 483 const bool resourceCrashed = d->partialMessageBuffer.contains("PANIC");
487 if (resourceCrashed) { 484 if (resourceCrashed) {
488 ErrorMsg() << "The resource crashed!"; 485 SinkError() << "The resource crashed!";
489 d->abortPendingOperations(); 486 d->abortPendingOperations();
490 } else if (error == QLocalSocket::PeerClosedError) { 487 } else if (error == QLocalSocket::PeerClosedError) {
491 Log() << "The resource closed the connection."; 488 SinkLog() << "The resource closed the connection.";
492 d->abortPendingOperations(); 489 d->abortPendingOperations();
493 } else { 490 } else {
494 Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); 491 SinkWarning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString());
495 if (d->pendingCommands.size()) { 492 if (d->pendingCommands.size()) {
496 Trace() << "Reconnecting due to pending operations: " << d->pendingCommands.size(); 493 SinkTrace() << "Reconnecting due to pending operations: " << d->pendingCommands.size();
497 open(); 494 open();
498 } 495 }
499 } 496 }
@@ -502,7 +499,7 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
502void ResourceAccess::readResourceMessage() 499void ResourceAccess::readResourceMessage()
503{ 500{
504 if (!d->socket || !d->socket->isValid()) { 501 if (!d->socket || !d->socket->isValid()) {
505 Warning() << "No socket available"; 502 SinkWarning() << "No socket available";
506 return; 503 return;
507 } 504 }
508 505
@@ -513,11 +510,27 @@ void ResourceAccess::readResourceMessage()
513 } 510 }
514} 511}
515 512
513static Sink::Notification getNotification(const Sink::Commands::Notification *buffer)
514{
515 Sink::Notification n;
516 if (buffer->identifier()) {
517 // Don't use fromRawData, the buffer is gone once we invoke emit notification
518 n.id = BufferUtils::extractBufferCopy(buffer->identifier());
519 }
520 if (buffer->message()) {
521 // Don't use fromRawData, the buffer is gone once we invoke emit notification
522 n.message = BufferUtils::extractBufferCopy(buffer->message());
523 }
524 n.type = buffer->type();
525 n.code = buffer->code();
526 return n;
527}
528
516bool ResourceAccess::processMessageBuffer() 529bool ResourceAccess::processMessageBuffer()
517{ 530{
518 static const int headerSize = Commands::headerSize(); 531 static const int headerSize = Commands::headerSize();
519 if (d->partialMessageBuffer.size() < headerSize) { 532 if (d->partialMessageBuffer.size() < headerSize) {
520 Warning() << "command too small"; 533 SinkWarning() << "command too small";
521 return false; 534 return false;
522 } 535 }
523 536
@@ -526,16 +539,16 @@ bool ResourceAccess::processMessageBuffer()
526 const uint size = *(int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); 539 const uint size = *(int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint));
527 540
528 if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) { 541 if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) {
529 Warning() << "command too small"; 542 SinkWarning() << "command too small";
530 return false; 543 return false;
531 } 544 }
532 545
533 switch (commandId) { 546 switch (commandId) {
534 case Commands::RevisionUpdateCommand: { 547 case Commands::RevisionUpdateCommand: {
535 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); 548 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize);
536 Trace() << QString("Revision updated to: %1").arg(buffer->revision()); 549 SinkTrace() << QString("Revision updated to: %1").arg(buffer->revision());
537 Notification n; 550 Notification n;
538 n.type = Sink::Commands::NotificationType::NotificationType_RevisionUpdate; 551 n.type = Sink::Notification::RevisionUpdate;
539 emit notification(n); 552 emit notification(n);
540 emit revisionChanged(buffer->revision()); 553 emit revisionChanged(buffer->revision());
541 554
@@ -543,7 +556,7 @@ bool ResourceAccess::processMessageBuffer()
543 } 556 }
544 case Commands::CommandCompletionCommand: { 557 case Commands::CommandCompletionCommand: {
545 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 558 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
546 Trace() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"); 559 SinkTrace() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully");
547 560
548 d->completeCommands.insert(buffer->id(), buffer->success()); 561 d->completeCommands.insert(buffer->id(), buffer->success());
549 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 562 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
@@ -553,32 +566,34 @@ bool ResourceAccess::processMessageBuffer()
553 case Commands::NotificationCommand: { 566 case Commands::NotificationCommand: {
554 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize); 567 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize);
555 switch (buffer->type()) { 568 switch (buffer->type()) {
556 case Sink::Commands::NotificationType::NotificationType_Shutdown: 569 case Sink::Notification::Shutdown:
557 Log() << "Received shutdown notification."; 570 SinkLog() << "Received shutdown notification.";
558 close(); 571 close();
559 break; 572 break;
560 case Sink::Commands::NotificationType::NotificationType_Inspection: { 573 case Sink::Notification::Inspection: {
561 Trace() << "Received inspection notification."; 574 SinkTrace() << "Received inspection notification.";
562 Notification n; 575 auto n = getNotification(buffer);
563 if (buffer->identifier()) {
564 // Don't use fromRawData, the buffer is gone once we invoke emit notification
565 n.id = BufferUtils::extractBufferCopy(buffer->identifier());
566 }
567 if (buffer->message()) {
568 // Don't use fromRawData, the buffer is gone once we invoke emit notification
569 n.message = BufferUtils::extractBufferCopy(buffer->message());
570 }
571 n.type = buffer->type();
572 n.code = buffer->code();
573 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 576 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
574 queuedInvoke([=]() { emit notification(n); }, this); 577 queuedInvoke([=]() { emit notification(n); }, this);
575 } break; 578 } break;
576 case Sink::Commands::NotificationType::NotificationType_Status: 579 case Sink::Notification::Status:
577 case Sink::Commands::NotificationType::NotificationType_Warning: 580 if (mResourceStatus == buffer->code()) {
578 case Sink::Commands::NotificationType::NotificationType_Progress: 581 SinkTrace() << "Got an unnecessary status notification";
579 case Sink::Commands::NotificationType::NotificationType_RevisionUpdate: 582 break;
583 }
584 mResourceStatus = buffer->code();
585 SinkTrace() << "Updated status: " << mResourceStatus;
586 [[clang::fallthrough]];
587 case Sink::Notification::Warning:
588 [[clang::fallthrough]];
589 case Sink::Notification::Progress: {
590 auto n = getNotification(buffer);
591 SinkTrace() << "Received notification: " << n.type;
592 emit notification(n);
593 } break;
594 case Sink::Notification::RevisionUpdate:
580 default: 595 default:
581 Warning() << "Received unknown notification: " << buffer->type(); 596 SinkWarning() << "Received unknown notification: " << buffer->type();
582 break; 597 break;
583 } 598 }
584 break; 599 break;
@@ -624,6 +639,7 @@ Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &ins
624 } 639 }
625 if (!mTimer.contains(instanceIdentifier)) { 640 if (!mTimer.contains(instanceIdentifier)) {
626 auto timer = new QTimer; 641 auto timer = new QTimer;
642 timer->setSingleShot(true);
627 // Drop connection after 3 seconds (which is a random value) 643 // Drop connection after 3 seconds (which is a random value)
628 QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { mCache.remove(instanceIdentifier); }); 644 QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { mCache.remove(instanceIdentifier); });
629 timer->setInterval(3000); 645 timer->setInterval(3000);