summaryrefslogtreecommitdiffstats
path: root/common/resourceaccess.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r--common/resourceaccess.cpp78
1 files changed, 46 insertions, 32 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 0c435c9..80d60e8 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -41,11 +41,13 @@
41#include <QProcess> 41#include <QProcess>
42#include <QDataStream> 42#include <QDataStream>
43#include <QBuffer> 43#include <QBuffer>
44#include <QTime>
44 45
45#undef Trace 46#undef Trace
46#define Trace() Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess") 47#define TracePrivate() Trace_area("client.communication." + resourceInstanceIdentifier)
48#define Trace() Trace_area("client.communication." + d->resourceInstanceIdentifier)
47#undef Log 49#undef Log
48#define Log(IDENTIFIER) Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess("+IDENTIFIER+")") 50#define Log() Log_area("client.communication." + d->resourceInstanceIdentifier)
49 51
50static void queuedInvoke(const std::function<void()> &f, QObject *context = 0) 52static void queuedInvoke(const std::function<void()> &f, QObject *context = 0)
51{ 53{
@@ -168,45 +170,48 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect()
168 return !socket; 170 return !socket;
169 }, 171 },
170 [this, counter](KAsync::Future<void> &future) { 172 [this, counter](KAsync::Future<void> &future) {
171 Trace() << "Loop"; 173 TracePrivate() << "Loop";
172 KAsync::wait(50) 174 connectToServer(resourceInstanceIdentifier)
173 .then(connectToServer(resourceInstanceIdentifier))
174 .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { 175 .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) {
175 Q_ASSERT(s); 176 Q_ASSERT(s);
176 socket = s; 177 socket = s;
177 future.setFinished(); 178 future.setFinished();
178 }, 179 }, [&future, counter, this](int errorCode, const QString &errorString) {
179 [&future, counter](int errorCode, const QString &errorString) { 180 static int waitTime = 10;
180 const int maxRetries = 10; 181 static int timeout = 500;
182 static int maxRetries = timeout / waitTime;
181 if (*counter > maxRetries) { 183 if (*counter > maxRetries) {
182 Trace() << "Giving up"; 184 TracePrivate() << "Giving up";
183 future.setError(-1, "Failed to connect to socket"); 185 future.setError(-1, "Failed to connect to socket");
184 } else { 186 } else {
185 future.setFinished(); 187 KAsync::wait(waitTime).then<void>([&future]() {
188 future.setFinished();
189 }).exec();
186 } 190 }
187 *counter = *counter + 1; 191 *counter = *counter + 1;
188 }).exec(); 192 })
193 .exec();
189 }); 194 });
190} 195}
191 196
192KAsync::Job<void> ResourceAccess::Private::initializeSocket() 197KAsync::Job<void> ResourceAccess::Private::initializeSocket()
193{ 198{
194 return KAsync::start<void>([this](KAsync::Future<void> &future) { 199 return KAsync::start<void>([this](KAsync::Future<void> &future) {
195 Trace() << "Trying to connect"; 200 TracePrivate() << "Trying to connect";
196 connectToServer(resourceInstanceIdentifier).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { 201 connectToServer(resourceInstanceIdentifier).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) {
197 Trace() << "Connected to resource, without having to start it."; 202 TracePrivate() << "Connected to resource, without having to start it.";
198 Q_ASSERT(s); 203 Q_ASSERT(s);
199 socket = s; 204 socket = s;
200 future.setFinished(); 205 future.setFinished();
201 }, 206 },
202 [this, &future](int errorCode, const QString &errorString) { 207 [this, &future](int errorCode, const QString &errorString) {
203 Trace() << "Failed to connect, starting resource"; 208 TracePrivate() << "Failed to connect, starting resource";
204 //We failed to connect, so let's start the resource 209 //We failed to connect, so let's start the resource
205 QStringList args; 210 QStringList args;
206 args << resourceInstanceIdentifier; 211 args << resourceInstanceIdentifier;
207 qint64 pid = 0; 212 qint64 pid = 0;
208 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { 213 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) {
209 Trace() << "Started resource " << pid; 214 TracePrivate() << "Started resource " << pid;
210 tryToConnect() 215 tryToConnect()
211 .then<void>([&future]() { 216 .then<void>([&future]() {
212 future.setFinished(); 217 future.setFinished();
@@ -232,12 +237,12 @@ ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier)
232 : ResourceAccessInterface(), 237 : ResourceAccessInterface(),
233 d(new Private(getResourceName(resourceInstanceIdentifier), resourceInstanceIdentifier, this)) 238 d(new Private(getResourceName(resourceInstanceIdentifier), resourceInstanceIdentifier, this))
234{ 239{
235 log("Starting access"); 240 Log() << "Starting access";
236} 241}
237 242
238ResourceAccess::~ResourceAccess() 243ResourceAccess::~ResourceAccess()
239{ 244{
240 log("Closing access"); 245 Log() << "Closing access";
241 if (!d->resultHandler.isEmpty()) { 246 if (!d->resultHandler.isEmpty()) {
242 Warning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys(); 247 Warning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys();
243 } 248 }
@@ -380,9 +385,11 @@ void ResourceAccess::open()
380 if (d->openingSocket) { 385 if (d->openingSocket) {
381 return; 386 return;
382 } 387 }
388 auto time = QSharedPointer<QTime>::create();
389 time->start();
383 d->openingSocket = true; 390 d->openingSocket = true;
384 d->initializeSocket().then<void>([this]() { 391 d->initializeSocket().then<void>([this, time]() {
385 Trace() << "Socket is initialized"; 392 Trace() << "Socket is initialized." << Log::TraceTime(time->elapsed());
386 d->openingSocket = false; 393 d->openingSocket = false;
387 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, 394 QObject::connect(d->socket.data(), &QLocalSocket::disconnected,
388 this, &ResourceAccess::disconnected); 395 this, &ResourceAccess::disconnected);
@@ -400,7 +407,7 @@ void ResourceAccess::open()
400 407
401void ResourceAccess::close() 408void ResourceAccess::close()
402{ 409{
403 log(QString("Closing %1").arg(d->socket->fullServerName())); 410 Log() << QString("Closing %1").arg(d->socket->fullServerName());
404 Trace() << "Pending commands: " << d->pendingCommands.size(); 411 Trace() << "Pending commands: " << d->pendingCommands.size();
405 Trace() << "Queued commands: " << d->commandQueue.size(); 412 Trace() << "Queued commands: " << d->commandQueue.size();
406 d->socket->close(); 413 d->socket->close();
@@ -412,7 +419,7 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command)
412 //TODO: we should have a timeout for commands 419 //TODO: we should have a timeout for commands
413 d->messageId++; 420 d->messageId++;
414 const auto messageId = d->messageId; 421 const auto messageId = d->messageId;
415 log(QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId)); 422 Log() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId);
416 Q_ASSERT(command->callback); 423 Q_ASSERT(command->callback);
417 registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) { 424 registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) {
418 Trace() << "Command complete " << messageId; 425 Trace() << "Command complete " << messageId;
@@ -452,7 +459,7 @@ void ResourceAccess::connected()
452 return; 459 return;
453 } 460 }
454 461
455 log(QString("Connected: %1").arg(d->socket->fullServerName())); 462 Log() << QString("Connected: %1").arg(d->socket->fullServerName());
456 463
457 { 464 {
458 flatbuffers::FlatBufferBuilder fbb; 465 flatbuffers::FlatBufferBuilder fbb;
@@ -472,7 +479,7 @@ void ResourceAccess::connected()
472 479
473void ResourceAccess::disconnected() 480void ResourceAccess::disconnected()
474{ 481{
475 log(QString("Disconnected from %1").arg(d->socket->fullServerName())); 482 Log() << QString("Disconnected from %1").arg(d->socket->fullServerName());
476 d->socket->close(); 483 d->socket->close();
477 emit ready(false); 484 emit ready(false);
478} 485}
@@ -480,7 +487,7 @@ void ResourceAccess::disconnected()
480void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) 487void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
481{ 488{
482 if (error == QLocalSocket::PeerClosedError) { 489 if (error == QLocalSocket::PeerClosedError) {
483 Log(d->resourceInstanceIdentifier) << "The resource closed the connection."; 490 Log() << "The resource closed the connection.";
484 d->abortPendingOperations(); 491 d->abortPendingOperations();
485 } else { 492 } else {
486 Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); 493 Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString());
@@ -524,14 +531,17 @@ bool ResourceAccess::processMessageBuffer()
524 switch (commandId) { 531 switch (commandId) {
525 case Commands::RevisionUpdateCommand: { 532 case Commands::RevisionUpdateCommand: {
526 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); 533 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize);
527 log(QString("Revision updated to: %1").arg(buffer->revision())); 534 Log() << QString("Revision updated to: %1").arg(buffer->revision());
535 Notification n;
536 n.type = Sink::Commands::NotificationType::NotificationType_RevisionUpdate;
537 emit notification(n);
528 emit revisionChanged(buffer->revision()); 538 emit revisionChanged(buffer->revision());
529 539
530 break; 540 break;
531 } 541 }
532 case Commands::CommandCompletionCommand: { 542 case Commands::CommandCompletionCommand: {
533 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 543 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
534 log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); 544 Log() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully");
535 545
536 d->completeCommands << buffer->id(); 546 d->completeCommands << buffer->id();
537 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 547 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
@@ -544,11 +554,11 @@ bool ResourceAccess::processMessageBuffer()
544 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize); 554 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize);
545 switch (buffer->type()) { 555 switch (buffer->type()) {
546 case Sink::Commands::NotificationType::NotificationType_Shutdown: 556 case Sink::Commands::NotificationType::NotificationType_Shutdown:
547 Log(d->resourceInstanceIdentifier) << "Received shutdown notification."; 557 Log() << "Received shutdown notification.";
548 close(); 558 close();
549 break; 559 break;
550 case Sink::Commands::NotificationType::NotificationType_Inspection: { 560 case Sink::Commands::NotificationType::NotificationType_Inspection: {
551 Log(d->resourceInstanceIdentifier) << "Received inspection notification."; 561 Log() << "Received inspection notification.";
552 Notification n; 562 Notification n;
553 if (buffer->identifier()) { 563 if (buffer->identifier()) {
554 //Don't use fromRawData, the buffer is gone once we invoke emit notification 564 //Don't use fromRawData, the buffer is gone once we invoke emit notification
@@ -566,6 +576,10 @@ bool ResourceAccess::processMessageBuffer()
566 }, this); 576 }, this);
567 } 577 }
568 break; 578 break;
579 case Sink::Commands::NotificationType::NotificationType_Status:
580 case Sink::Commands::NotificationType::NotificationType_Warning:
581 case Sink::Commands::NotificationType::NotificationType_Progress:
582 case Sink::Commands::NotificationType::NotificationType_RevisionUpdate:
569 default: 583 default:
570 Warning() << "Received unknown notification: " << buffer->type(); 584 Warning() << "Received unknown notification: " << buffer->type();
571 break; 585 break;
@@ -580,9 +594,9 @@ bool ResourceAccess::processMessageBuffer()
580 return d->partialMessageBuffer.size() >= headerSize; 594 return d->partialMessageBuffer.size() >= headerSize;
581} 595}
582 596
583void ResourceAccess::log(const QString &message)
584{
585 Log(d->resourceInstanceIdentifier) << this << message;
586} 597}
587 598
588} 599#pragma clang diagnostic push
600#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
601#include "moc_resourceaccess.cpp"
602#pragma clang diagnostic pop