diff options
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r-- | common/resourceaccess.cpp | 78 |
1 files changed, 46 insertions, 32 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 0c435c9..80d60e8 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -41,11 +41,13 @@ | |||
41 | #include <QProcess> | 41 | #include <QProcess> |
42 | #include <QDataStream> | 42 | #include <QDataStream> |
43 | #include <QBuffer> | 43 | #include <QBuffer> |
44 | #include <QTime> | ||
44 | 45 | ||
45 | #undef Trace | 46 | #undef Trace |
46 | #define Trace() Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess") | 47 | #define TracePrivate() Trace_area("client.communication." + resourceInstanceIdentifier) |
48 | #define Trace() Trace_area("client.communication." + d->resourceInstanceIdentifier) | ||
47 | #undef Log | 49 | #undef Log |
48 | #define Log(IDENTIFIER) Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess("+IDENTIFIER+")") | 50 | #define Log() Log_area("client.communication." + d->resourceInstanceIdentifier) |
49 | 51 | ||
50 | static void queuedInvoke(const std::function<void()> &f, QObject *context = 0) | 52 | static void queuedInvoke(const std::function<void()> &f, QObject *context = 0) |
51 | { | 53 | { |
@@ -168,45 +170,48 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect() | |||
168 | return !socket; | 170 | return !socket; |
169 | }, | 171 | }, |
170 | [this, counter](KAsync::Future<void> &future) { | 172 | [this, counter](KAsync::Future<void> &future) { |
171 | Trace() << "Loop"; | 173 | TracePrivate() << "Loop"; |
172 | KAsync::wait(50) | 174 | connectToServer(resourceInstanceIdentifier) |
173 | .then(connectToServer(resourceInstanceIdentifier)) | ||
174 | .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { | 175 | .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { |
175 | Q_ASSERT(s); | 176 | Q_ASSERT(s); |
176 | socket = s; | 177 | socket = s; |
177 | future.setFinished(); | 178 | future.setFinished(); |
178 | }, | 179 | }, [&future, counter, this](int errorCode, const QString &errorString) { |
179 | [&future, counter](int errorCode, const QString &errorString) { | 180 | static int waitTime = 10; |
180 | const int maxRetries = 10; | 181 | static int timeout = 500; |
182 | static int maxRetries = timeout / waitTime; | ||
181 | if (*counter > maxRetries) { | 183 | if (*counter > maxRetries) { |
182 | Trace() << "Giving up"; | 184 | TracePrivate() << "Giving up"; |
183 | future.setError(-1, "Failed to connect to socket"); | 185 | future.setError(-1, "Failed to connect to socket"); |
184 | } else { | 186 | } else { |
185 | future.setFinished(); | 187 | KAsync::wait(waitTime).then<void>([&future]() { |
188 | future.setFinished(); | ||
189 | }).exec(); | ||
186 | } | 190 | } |
187 | *counter = *counter + 1; | 191 | *counter = *counter + 1; |
188 | }).exec(); | 192 | }) |
193 | .exec(); | ||
189 | }); | 194 | }); |
190 | } | 195 | } |
191 | 196 | ||
192 | KAsync::Job<void> ResourceAccess::Private::initializeSocket() | 197 | KAsync::Job<void> ResourceAccess::Private::initializeSocket() |
193 | { | 198 | { |
194 | return KAsync::start<void>([this](KAsync::Future<void> &future) { | 199 | return KAsync::start<void>([this](KAsync::Future<void> &future) { |
195 | Trace() << "Trying to connect"; | 200 | TracePrivate() << "Trying to connect"; |
196 | connectToServer(resourceInstanceIdentifier).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { | 201 | connectToServer(resourceInstanceIdentifier).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { |
197 | Trace() << "Connected to resource, without having to start it."; | 202 | TracePrivate() << "Connected to resource, without having to start it."; |
198 | Q_ASSERT(s); | 203 | Q_ASSERT(s); |
199 | socket = s; | 204 | socket = s; |
200 | future.setFinished(); | 205 | future.setFinished(); |
201 | }, | 206 | }, |
202 | [this, &future](int errorCode, const QString &errorString) { | 207 | [this, &future](int errorCode, const QString &errorString) { |
203 | Trace() << "Failed to connect, starting resource"; | 208 | TracePrivate() << "Failed to connect, starting resource"; |
204 | //We failed to connect, so let's start the resource | 209 | //We failed to connect, so let's start the resource |
205 | QStringList args; | 210 | QStringList args; |
206 | args << resourceInstanceIdentifier; | 211 | args << resourceInstanceIdentifier; |
207 | qint64 pid = 0; | 212 | qint64 pid = 0; |
208 | if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { | 213 | if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { |
209 | Trace() << "Started resource " << pid; | 214 | TracePrivate() << "Started resource " << pid; |
210 | tryToConnect() | 215 | tryToConnect() |
211 | .then<void>([&future]() { | 216 | .then<void>([&future]() { |
212 | future.setFinished(); | 217 | future.setFinished(); |
@@ -232,12 +237,12 @@ ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier) | |||
232 | : ResourceAccessInterface(), | 237 | : ResourceAccessInterface(), |
233 | d(new Private(getResourceName(resourceInstanceIdentifier), resourceInstanceIdentifier, this)) | 238 | d(new Private(getResourceName(resourceInstanceIdentifier), resourceInstanceIdentifier, this)) |
234 | { | 239 | { |
235 | log("Starting access"); | 240 | Log() << "Starting access"; |
236 | } | 241 | } |
237 | 242 | ||
238 | ResourceAccess::~ResourceAccess() | 243 | ResourceAccess::~ResourceAccess() |
239 | { | 244 | { |
240 | log("Closing access"); | 245 | Log() << "Closing access"; |
241 | if (!d->resultHandler.isEmpty()) { | 246 | if (!d->resultHandler.isEmpty()) { |
242 | Warning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys(); | 247 | Warning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys(); |
243 | } | 248 | } |
@@ -380,9 +385,11 @@ void ResourceAccess::open() | |||
380 | if (d->openingSocket) { | 385 | if (d->openingSocket) { |
381 | return; | 386 | return; |
382 | } | 387 | } |
388 | auto time = QSharedPointer<QTime>::create(); | ||
389 | time->start(); | ||
383 | d->openingSocket = true; | 390 | d->openingSocket = true; |
384 | d->initializeSocket().then<void>([this]() { | 391 | d->initializeSocket().then<void>([this, time]() { |
385 | Trace() << "Socket is initialized"; | 392 | Trace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); |
386 | d->openingSocket = false; | 393 | d->openingSocket = false; |
387 | QObject::connect(d->socket.data(), &QLocalSocket::disconnected, | 394 | QObject::connect(d->socket.data(), &QLocalSocket::disconnected, |
388 | this, &ResourceAccess::disconnected); | 395 | this, &ResourceAccess::disconnected); |
@@ -400,7 +407,7 @@ void ResourceAccess::open() | |||
400 | 407 | ||
401 | void ResourceAccess::close() | 408 | void ResourceAccess::close() |
402 | { | 409 | { |
403 | log(QString("Closing %1").arg(d->socket->fullServerName())); | 410 | Log() << QString("Closing %1").arg(d->socket->fullServerName()); |
404 | Trace() << "Pending commands: " << d->pendingCommands.size(); | 411 | Trace() << "Pending commands: " << d->pendingCommands.size(); |
405 | Trace() << "Queued commands: " << d->commandQueue.size(); | 412 | Trace() << "Queued commands: " << d->commandQueue.size(); |
406 | d->socket->close(); | 413 | d->socket->close(); |
@@ -412,7 +419,7 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command) | |||
412 | //TODO: we should have a timeout for commands | 419 | //TODO: we should have a timeout for commands |
413 | d->messageId++; | 420 | d->messageId++; |
414 | const auto messageId = d->messageId; | 421 | const auto messageId = d->messageId; |
415 | log(QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId)); | 422 | Log() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId); |
416 | Q_ASSERT(command->callback); | 423 | Q_ASSERT(command->callback); |
417 | registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) { | 424 | registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) { |
418 | Trace() << "Command complete " << messageId; | 425 | Trace() << "Command complete " << messageId; |
@@ -452,7 +459,7 @@ void ResourceAccess::connected() | |||
452 | return; | 459 | return; |
453 | } | 460 | } |
454 | 461 | ||
455 | log(QString("Connected: %1").arg(d->socket->fullServerName())); | 462 | Log() << QString("Connected: %1").arg(d->socket->fullServerName()); |
456 | 463 | ||
457 | { | 464 | { |
458 | flatbuffers::FlatBufferBuilder fbb; | 465 | flatbuffers::FlatBufferBuilder fbb; |
@@ -472,7 +479,7 @@ void ResourceAccess::connected() | |||
472 | 479 | ||
473 | void ResourceAccess::disconnected() | 480 | void ResourceAccess::disconnected() |
474 | { | 481 | { |
475 | log(QString("Disconnected from %1").arg(d->socket->fullServerName())); | 482 | Log() << QString("Disconnected from %1").arg(d->socket->fullServerName()); |
476 | d->socket->close(); | 483 | d->socket->close(); |
477 | emit ready(false); | 484 | emit ready(false); |
478 | } | 485 | } |
@@ -480,7 +487,7 @@ void ResourceAccess::disconnected() | |||
480 | void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) | 487 | void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) |
481 | { | 488 | { |
482 | if (error == QLocalSocket::PeerClosedError) { | 489 | if (error == QLocalSocket::PeerClosedError) { |
483 | Log(d->resourceInstanceIdentifier) << "The resource closed the connection."; | 490 | Log() << "The resource closed the connection."; |
484 | d->abortPendingOperations(); | 491 | d->abortPendingOperations(); |
485 | } else { | 492 | } else { |
486 | Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); | 493 | Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); |
@@ -524,14 +531,17 @@ bool ResourceAccess::processMessageBuffer() | |||
524 | switch (commandId) { | 531 | switch (commandId) { |
525 | case Commands::RevisionUpdateCommand: { | 532 | case Commands::RevisionUpdateCommand: { |
526 | auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); | 533 | auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); |
527 | log(QString("Revision updated to: %1").arg(buffer->revision())); | 534 | Log() << QString("Revision updated to: %1").arg(buffer->revision()); |
535 | Notification n; | ||
536 | n.type = Sink::Commands::NotificationType::NotificationType_RevisionUpdate; | ||
537 | emit notification(n); | ||
528 | emit revisionChanged(buffer->revision()); | 538 | emit revisionChanged(buffer->revision()); |
529 | 539 | ||
530 | break; | 540 | break; |
531 | } | 541 | } |
532 | case Commands::CommandCompletionCommand: { | 542 | case Commands::CommandCompletionCommand: { |
533 | auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); | 543 | auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); |
534 | log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); | 544 | Log() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"); |
535 | 545 | ||
536 | d->completeCommands << buffer->id(); | 546 | d->completeCommands << buffer->id(); |
537 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first | 547 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first |
@@ -544,11 +554,11 @@ bool ResourceAccess::processMessageBuffer() | |||
544 | auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize); | 554 | auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize); |
545 | switch (buffer->type()) { | 555 | switch (buffer->type()) { |
546 | case Sink::Commands::NotificationType::NotificationType_Shutdown: | 556 | case Sink::Commands::NotificationType::NotificationType_Shutdown: |
547 | Log(d->resourceInstanceIdentifier) << "Received shutdown notification."; | 557 | Log() << "Received shutdown notification."; |
548 | close(); | 558 | close(); |
549 | break; | 559 | break; |
550 | case Sink::Commands::NotificationType::NotificationType_Inspection: { | 560 | case Sink::Commands::NotificationType::NotificationType_Inspection: { |
551 | Log(d->resourceInstanceIdentifier) << "Received inspection notification."; | 561 | Log() << "Received inspection notification."; |
552 | Notification n; | 562 | Notification n; |
553 | if (buffer->identifier()) { | 563 | if (buffer->identifier()) { |
554 | //Don't use fromRawData, the buffer is gone once we invoke emit notification | 564 | //Don't use fromRawData, the buffer is gone once we invoke emit notification |
@@ -566,6 +576,10 @@ bool ResourceAccess::processMessageBuffer() | |||
566 | }, this); | 576 | }, this); |
567 | } | 577 | } |
568 | break; | 578 | break; |
579 | case Sink::Commands::NotificationType::NotificationType_Status: | ||
580 | case Sink::Commands::NotificationType::NotificationType_Warning: | ||
581 | case Sink::Commands::NotificationType::NotificationType_Progress: | ||
582 | case Sink::Commands::NotificationType::NotificationType_RevisionUpdate: | ||
569 | default: | 583 | default: |
570 | Warning() << "Received unknown notification: " << buffer->type(); | 584 | Warning() << "Received unknown notification: " << buffer->type(); |
571 | break; | 585 | break; |
@@ -580,9 +594,9 @@ bool ResourceAccess::processMessageBuffer() | |||
580 | return d->partialMessageBuffer.size() >= headerSize; | 594 | return d->partialMessageBuffer.size() >= headerSize; |
581 | } | 595 | } |
582 | 596 | ||
583 | void ResourceAccess::log(const QString &message) | ||
584 | { | ||
585 | Log(d->resourceInstanceIdentifier) << this << message; | ||
586 | } | 597 | } |
587 | 598 | ||
588 | } | 599 | #pragma clang diagnostic push |
600 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" | ||
601 | #include "moc_resourceaccess.cpp" | ||
602 | #pragma clang diagnostic pop | ||