summaryrefslogtreecommitdiffstats
path: root/common/resourceaccess.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r--common/resourceaccess.cpp94
1 files changed, 45 insertions, 49 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 93f97e8..c878143 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -44,12 +44,6 @@
44#include <QBuffer> 44#include <QBuffer>
45#include <QTime> 45#include <QTime>
46 46
47#undef Trace
48#define TracePrivate() Trace_area("client.communication." + resourceInstanceIdentifier)
49#define Trace() Trace_area("client.communication." + d->resourceInstanceIdentifier)
50#undef Log
51#define Log() Log_area("client.communication." + d->resourceInstanceIdentifier)
52
53static void queuedInvoke(const std::function<void()> &f, QObject *context = 0) 47static void queuedInvoke(const std::function<void()> &f, QObject *context = 0)
54{ 48{
55 auto timer = QSharedPointer<QTimer>::create(); 49 auto timer = QSharedPointer<QTimer>::create();
@@ -100,8 +94,10 @@ public:
100 QHash<uint, bool> completeCommands; 94 QHash<uint, bool> completeCommands;
101 uint messageId; 95 uint messageId;
102 bool openingSocket; 96 bool openingSocket;
97 SINK_DEBUG_COMPONENT(resourceInstanceIdentifier)
103}; 98};
104 99
100
105ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q) 101ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q)
106 : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false) 102 : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false)
107{ 103{
@@ -111,7 +107,7 @@ void ResourceAccess::Private::abortPendingOperations()
111{ 107{
112 callCallbacks(); 108 callCallbacks();
113 if (!resultHandler.isEmpty()) { 109 if (!resultHandler.isEmpty()) {
114 Warning() << "Aborting pending operations " << resultHandler.keys(); 110 SinkWarning() << "Aborting pending operations " << resultHandler.keys();
115 } 111 }
116 auto handlers = resultHandler.values(); 112 auto handlers = resultHandler.values();
117 resultHandler.clear(); 113 resultHandler.clear();
@@ -165,7 +161,7 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect()
165 auto counter = QSharedPointer<int>::create(0); 161 auto counter = QSharedPointer<int>::create(0);
166 return KAsync::dowhile([this]() -> bool { return !socket; }, 162 return KAsync::dowhile([this]() -> bool { return !socket; },
167 [this, counter](KAsync::Future<void> &future) { 163 [this, counter](KAsync::Future<void> &future) {
168 TracePrivate() << "Loop"; 164 SinkTrace() << "Loop";
169 connectToServer(resourceInstanceIdentifier) 165 connectToServer(resourceInstanceIdentifier)
170 .then<void, QSharedPointer<QLocalSocket>>( 166 .then<void, QSharedPointer<QLocalSocket>>(
171 [this, &future](const QSharedPointer<QLocalSocket> &s) { 167 [this, &future](const QSharedPointer<QLocalSocket> &s) {
@@ -178,7 +174,7 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect()
178 static int timeout = 500; 174 static int timeout = 500;
179 static int maxRetries = timeout / waitTime; 175 static int maxRetries = timeout / waitTime;
180 if (*counter > maxRetries) { 176 if (*counter > maxRetries) {
181 TracePrivate() << "Giving up"; 177 SinkTrace() << "Giving up";
182 future.setError(-1, "Failed to connect to socket"); 178 future.setError(-1, "Failed to connect to socket");
183 } else { 179 } else {
184 KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec(); 180 KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec();
@@ -192,17 +188,17 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect()
192KAsync::Job<void> ResourceAccess::Private::initializeSocket() 188KAsync::Job<void> ResourceAccess::Private::initializeSocket()
193{ 189{
194 return KAsync::start<void>([this](KAsync::Future<void> &future) { 190 return KAsync::start<void>([this](KAsync::Future<void> &future) {
195 TracePrivate() << "Trying to connect"; 191 SinkTrace() << "Trying to connect";
196 connectToServer(resourceInstanceIdentifier) 192 connectToServer(resourceInstanceIdentifier)
197 .then<void, QSharedPointer<QLocalSocket>>( 193 .then<void, QSharedPointer<QLocalSocket>>(
198 [this, &future](const QSharedPointer<QLocalSocket> &s) { 194 [this, &future](const QSharedPointer<QLocalSocket> &s) {
199 TracePrivate() << "Connected to resource, without having to start it."; 195 SinkTrace() << "Connected to resource, without having to start it.";
200 Q_ASSERT(s); 196 Q_ASSERT(s);
201 socket = s; 197 socket = s;
202 future.setFinished(); 198 future.setFinished();
203 }, 199 },
204 [this, &future](int errorCode, const QString &errorString) { 200 [this, &future](int errorCode, const QString &errorString) {
205 TracePrivate() << "Failed to connect, starting resource"; 201 SinkTrace() << "Failed to connect, starting resource";
206 // We failed to connect, so let's start the resource 202 // We failed to connect, so let's start the resource
207 QStringList args; 203 QStringList args;
208 if (Sink::Test::testModeEnabled()) { 204 if (Sink::Test::testModeEnabled()) {
@@ -211,16 +207,16 @@ KAsync::Job<void> ResourceAccess::Private::initializeSocket()
211 args << resourceInstanceIdentifier << resourceName; 207 args << resourceInstanceIdentifier << resourceName;
212 qint64 pid = 0; 208 qint64 pid = 0;
213 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { 209 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) {
214 TracePrivate() << "Started resource " << pid; 210 SinkTrace() << "Started resource " << pid;
215 tryToConnect() 211 tryToConnect()
216 .then<void>([&future]() { future.setFinished(); }, 212 .then<void>([&future]() { future.setFinished(); },
217 [this, &future](int errorCode, const QString &errorString) { 213 [this, &future](int errorCode, const QString &errorString) {
218 Warning() << "Failed to connect to started resource"; 214 SinkWarning() << "Failed to connect to started resource";
219 future.setError(errorCode, errorString); 215 future.setError(errorCode, errorString);
220 }) 216 })
221 .exec(); 217 .exec();
222 } else { 218 } else {
223 Warning() << "Failed to start resource"; 219 SinkWarning() << "Failed to start resource";
224 } 220 }
225 }) 221 })
226 .exec(); 222 .exec();
@@ -231,14 +227,14 @@ ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier, con
231 : ResourceAccessInterface(), d(new Private(resourceType, resourceInstanceIdentifier, this)) 227 : ResourceAccessInterface(), d(new Private(resourceType, resourceInstanceIdentifier, this))
232{ 228{
233 mResourceStatus = Sink::ApplicationDomain::OfflineStatus; 229 mResourceStatus = Sink::ApplicationDomain::OfflineStatus;
234 Trace() << "Starting access"; 230 SinkTrace() << "Starting access";
235} 231}
236 232
237ResourceAccess::~ResourceAccess() 233ResourceAccess::~ResourceAccess()
238{ 234{
239 Log() << "Closing access"; 235 SinkLog() << "Closing access";
240 if (!d->resultHandler.isEmpty()) { 236 if (!d->resultHandler.isEmpty()) {
241 Warning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys(); 237 SinkWarning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys();
242 } 238 }
243} 239}
244 240
@@ -295,7 +291,7 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu
295 291
296KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) 292KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync)
297{ 293{
298 Trace() << "Sending synchronize command: " << sourceSync << localSync; 294 SinkTrace() << "Sending synchronize command: " << sourceSync << localSync;
299 flatbuffers::FlatBufferBuilder fbb; 295 flatbuffers::FlatBufferBuilder fbb;
300 auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync); 296 auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync);
301 Sink::Commands::FinishSynchronizeBuffer(fbb, command); 297 Sink::Commands::FinishSynchronizeBuffer(fbb, command);
@@ -376,7 +372,7 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &insp
376void ResourceAccess::open() 372void ResourceAccess::open()
377{ 373{
378 if (d->socket && d->socket->isValid()) { 374 if (d->socket && d->socket->isValid()) {
379 // Trace() << "Socket valid, so not opening again"; 375 // SinkTrace() << "Socket valid, so not opening again";
380 return; 376 return;
381 } 377 }
382 if (d->openingSocket) { 378 if (d->openingSocket) {
@@ -388,7 +384,7 @@ void ResourceAccess::open()
388 d->initializeSocket() 384 d->initializeSocket()
389 .then<void>( 385 .then<void>(
390 [this, time]() { 386 [this, time]() {
391 Trace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); 387 SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed());
392 d->openingSocket = false; 388 d->openingSocket = false;
393 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); 389 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected);
394 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); 390 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError)));
@@ -397,16 +393,16 @@ void ResourceAccess::open()
397 }, 393 },
398 [this](int error, const QString &errorString) { 394 [this](int error, const QString &errorString) {
399 d->openingSocket = false; 395 d->openingSocket = false;
400 Warning() << "Failed to initialize socket " << errorString; 396 SinkWarning() << "Failed to initialize socket " << errorString;
401 }) 397 })
402 .exec(); 398 .exec();
403} 399}
404 400
405void ResourceAccess::close() 401void ResourceAccess::close()
406{ 402{
407 Log() << QString("Closing %1").arg(d->socket->fullServerName()); 403 SinkLog() << QString("Closing %1").arg(d->socket->fullServerName());
408 Trace() << "Pending commands: " << d->pendingCommands.size(); 404 SinkTrace() << "Pending commands: " << d->pendingCommands.size();
409 Trace() << "Queued commands: " << d->commandQueue.size(); 405 SinkTrace() << "Queued commands: " << d->commandQueue.size();
410 d->socket->close(); 406 d->socket->close();
411} 407}
412 408
@@ -416,10 +412,10 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command)
416 // TODO: we should have a timeout for commands 412 // TODO: we should have a timeout for commands
417 d->messageId++; 413 d->messageId++;
418 const auto messageId = d->messageId; 414 const auto messageId = d->messageId;
419 Trace() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId); 415 SinkTrace() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId);
420 Q_ASSERT(command->callback); 416 Q_ASSERT(command->callback);
421 registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) { 417 registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) {
422 Trace() << "Command complete " << messageId; 418 SinkTrace() << "Command complete " << messageId;
423 d->pendingCommands.remove(messageId); 419 d->pendingCommands.remove(messageId);
424 command->callback(errorCode, errorMessage); 420 command->callback(errorCode, errorMessage);
425 }); 421 });
@@ -431,8 +427,8 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command)
431void ResourceAccess::processCommandQueue() 427void ResourceAccess::processCommandQueue()
432{ 428{
433 // TODO: serialize instead of blast them all through the socket? 429 // TODO: serialize instead of blast them all through the socket?
434 Trace() << "We have " << d->commandQueue.size() << " queued commands"; 430 SinkTrace() << "We have " << d->commandQueue.size() << " queued commands";
435 Trace() << "Pending commands: " << d->pendingCommands.size(); 431 SinkTrace() << "Pending commands: " << d->pendingCommands.size();
436 for (auto command : d->commandQueue) { 432 for (auto command : d->commandQueue) {
437 sendCommand(command); 433 sendCommand(command);
438 } 434 }
@@ -441,9 +437,9 @@ void ResourceAccess::processCommandQueue()
441 437
442void ResourceAccess::processPendingCommandQueue() 438void ResourceAccess::processPendingCommandQueue()
443{ 439{
444 Trace() << "We have " << d->pendingCommands.size() << " pending commands"; 440 SinkTrace() << "We have " << d->pendingCommands.size() << " pending commands";
445 for (auto command : d->pendingCommands) { 441 for (auto command : d->pendingCommands) {
446 Trace() << "Reenquing command " << command->commandId; 442 SinkTrace() << "Reenquing command " << command->commandId;
447 d->commandQueue << command; 443 d->commandQueue << command;
448 } 444 }
449 d->pendingCommands.clear(); 445 d->pendingCommands.clear();
@@ -453,11 +449,11 @@ void ResourceAccess::processPendingCommandQueue()
453void ResourceAccess::connected() 449void ResourceAccess::connected()
454{ 450{
455 if (!isReady()) { 451 if (!isReady()) {
456 Trace() << "Connected but not ready?"; 452 SinkTrace() << "Connected but not ready?";
457 return; 453 return;
458 } 454 }
459 455
460 Trace() << QString("Connected: %1").arg(d->socket->fullServerName()); 456 SinkTrace() << QString("Connected: %1").arg(d->socket->fullServerName());
461 457
462 { 458 {
463 flatbuffers::FlatBufferBuilder fbb; 459 flatbuffers::FlatBufferBuilder fbb;
@@ -477,7 +473,7 @@ void ResourceAccess::connected()
477 473
478void ResourceAccess::disconnected() 474void ResourceAccess::disconnected()
479{ 475{
480 Log() << QString("Disconnected from %1").arg(d->socket->fullServerName()); 476 SinkLog() << QString("Disconnected from %1").arg(d->socket->fullServerName());
481 d->socket->close(); 477 d->socket->close();
482 emit ready(false); 478 emit ready(false);
483} 479}
@@ -486,15 +482,15 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
486{ 482{
487 const bool resourceCrashed = d->partialMessageBuffer.contains("PANIC"); 483 const bool resourceCrashed = d->partialMessageBuffer.contains("PANIC");
488 if (resourceCrashed) { 484 if (resourceCrashed) {
489 ErrorMsg() << "The resource crashed!"; 485 SinkError() << "The resource crashed!";
490 d->abortPendingOperations(); 486 d->abortPendingOperations();
491 } else if (error == QLocalSocket::PeerClosedError) { 487 } else if (error == QLocalSocket::PeerClosedError) {
492 Log() << "The resource closed the connection."; 488 SinkLog() << "The resource closed the connection.";
493 d->abortPendingOperations(); 489 d->abortPendingOperations();
494 } else { 490 } else {
495 Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); 491 SinkWarning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString());
496 if (d->pendingCommands.size()) { 492 if (d->pendingCommands.size()) {
497 Trace() << "Reconnecting due to pending operations: " << d->pendingCommands.size(); 493 SinkTrace() << "Reconnecting due to pending operations: " << d->pendingCommands.size();
498 open(); 494 open();
499 } 495 }
500 } 496 }
@@ -503,7 +499,7 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
503void ResourceAccess::readResourceMessage() 499void ResourceAccess::readResourceMessage()
504{ 500{
505 if (!d->socket || !d->socket->isValid()) { 501 if (!d->socket || !d->socket->isValid()) {
506 Warning() << "No socket available"; 502 SinkWarning() << "No socket available";
507 return; 503 return;
508 } 504 }
509 505
@@ -534,7 +530,7 @@ bool ResourceAccess::processMessageBuffer()
534{ 530{
535 static const int headerSize = Commands::headerSize(); 531 static const int headerSize = Commands::headerSize();
536 if (d->partialMessageBuffer.size() < headerSize) { 532 if (d->partialMessageBuffer.size() < headerSize) {
537 Warning() << "command too small"; 533 SinkWarning() << "command too small";
538 return false; 534 return false;
539 } 535 }
540 536
@@ -543,14 +539,14 @@ bool ResourceAccess::processMessageBuffer()
543 const uint size = *(int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); 539 const uint size = *(int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint));
544 540
545 if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) { 541 if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) {
546 Warning() << "command too small"; 542 SinkWarning() << "command too small";
547 return false; 543 return false;
548 } 544 }
549 545
550 switch (commandId) { 546 switch (commandId) {
551 case Commands::RevisionUpdateCommand: { 547 case Commands::RevisionUpdateCommand: {
552 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); 548 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize);
553 Trace() << QString("Revision updated to: %1").arg(buffer->revision()); 549 SinkTrace() << QString("Revision updated to: %1").arg(buffer->revision());
554 Notification n; 550 Notification n;
555 n.type = Sink::Notification::RevisionUpdate; 551 n.type = Sink::Notification::RevisionUpdate;
556 emit notification(n); 552 emit notification(n);
@@ -560,7 +556,7 @@ bool ResourceAccess::processMessageBuffer()
560 } 556 }
561 case Commands::CommandCompletionCommand: { 557 case Commands::CommandCompletionCommand: {
562 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 558 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
563 Trace() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"); 559 SinkTrace() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully");
564 560
565 d->completeCommands.insert(buffer->id(), buffer->success()); 561 d->completeCommands.insert(buffer->id(), buffer->success());
566 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 562 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
@@ -571,33 +567,33 @@ bool ResourceAccess::processMessageBuffer()
571 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize); 567 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize);
572 switch (buffer->type()) { 568 switch (buffer->type()) {
573 case Sink::Notification::Shutdown: 569 case Sink::Notification::Shutdown:
574 Log() << "Received shutdown notification."; 570 SinkLog() << "Received shutdown notification.";
575 close(); 571 close();
576 break; 572 break;
577 case Sink::Notification::Inspection: { 573 case Sink::Notification::Inspection: {
578 Trace() << "Received inspection notification."; 574 SinkTrace() << "Received inspection notification.";
579 auto n = getNotification(buffer); 575 auto n = getNotification(buffer);
580 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 576 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
581 queuedInvoke([=]() { emit notification(n); }, this); 577 queuedInvoke([=]() { emit notification(n); }, this);
582 } break; 578 } break;
583 case Sink::Notification::Status: 579 case Sink::Notification::Status:
584 if (mResourceStatus == buffer->code()) { 580 if (mResourceStatus == buffer->code()) {
585 Trace() << "Got an unnecessary status notification"; 581 SinkTrace() << "Got an unnecessary status notification";
586 break; 582 break;
587 } 583 }
588 mResourceStatus = buffer->code(); 584 mResourceStatus = buffer->code();
589 Trace() << "Updated status: " << mResourceStatus; 585 SinkTrace() << "Updated status: " << mResourceStatus;
590 [[clang::fallthrough]]; 586 [[clang::fallthrough]];
591 case Sink::Notification::Warning: 587 case Sink::Notification::Warning:
592 [[clang::fallthrough]]; 588 [[clang::fallthrough]];
593 case Sink::Notification::Progress: { 589 case Sink::Notification::Progress: {
594 auto n = getNotification(buffer); 590 auto n = getNotification(buffer);
595 Trace() << "Received notification: " << n.type; 591 SinkTrace() << "Received notification: " << n.type;
596 emit notification(n); 592 emit notification(n);
597 } break; 593 } break;
598 case Sink::Notification::RevisionUpdate: 594 case Sink::Notification::RevisionUpdate:
599 default: 595 default:
600 Warning() << "Received unknown notification: " << buffer->type(); 596 SinkWarning() << "Received unknown notification: " << buffer->type();
601 break; 597 break;
602 } 598 }
603 break; 599 break;