diff options
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r-- | common/resourceaccess.cpp | 79 |
1 files changed, 64 insertions, 15 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 7be1259..6592699 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -30,19 +30,33 @@ | |||
30 | #include "common/modifyentity_generated.h" | 30 | #include "common/modifyentity_generated.h" |
31 | #include "common/deleteentity_generated.h" | 31 | #include "common/deleteentity_generated.h" |
32 | #include "common/revisionreplayed_generated.h" | 32 | #include "common/revisionreplayed_generated.h" |
33 | #include "common/inspection_generated.h" | ||
33 | #include "common/entitybuffer.h" | 34 | #include "common/entitybuffer.h" |
35 | #include "common/bufferutils.h" | ||
34 | #include "log.h" | 36 | #include "log.h" |
35 | 37 | ||
36 | #include <QCoreApplication> | 38 | #include <QCoreApplication> |
37 | #include <QDebug> | 39 | #include <QDebug> |
38 | #include <QDir> | 40 | #include <QDir> |
39 | #include <QProcess> | 41 | #include <QProcess> |
42 | #include <QDataStream> | ||
43 | #include <QBuffer> | ||
40 | 44 | ||
41 | #undef Trace | 45 | #undef Trace |
42 | #define Trace() Akonadi2::Log::debugStream(Akonadi2::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess") | 46 | #define Trace() Akonadi2::Log::debugStream(Akonadi2::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess") |
43 | #undef Log | 47 | #undef Log |
44 | #define Log(IDENTIFIER) Akonadi2::Log::debugStream(Akonadi2::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess("+IDENTIFIER+")") | 48 | #define Log(IDENTIFIER) Akonadi2::Log::debugStream(Akonadi2::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess("+IDENTIFIER+")") |
45 | 49 | ||
50 | static void queuedInvoke(const std::function<void()> &f, QObject *context = 0) | ||
51 | { | ||
52 | auto timer = QSharedPointer<QTimer>::create(); | ||
53 | timer->setSingleShot(true); | ||
54 | QObject::connect(timer.data(), &QTimer::timeout, context, [f, timer]() { | ||
55 | f(); | ||
56 | }); | ||
57 | timer->start(0); | ||
58 | } | ||
59 | |||
46 | namespace Akonadi2 | 60 | namespace Akonadi2 |
47 | { | 61 | { |
48 | 62 | ||
@@ -284,8 +298,8 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool loca | |||
284 | { | 298 | { |
285 | Trace() << "Sending synchronize command: " << sourceSync << localSync; | 299 | Trace() << "Sending synchronize command: " << sourceSync << localSync; |
286 | flatbuffers::FlatBufferBuilder fbb; | 300 | flatbuffers::FlatBufferBuilder fbb; |
287 | auto command = Akonadi2::CreateSynchronize(fbb, sourceSync, localSync); | 301 | auto command = Akonadi2::Commands::CreateSynchronize(fbb, sourceSync, localSync); |
288 | Akonadi2::FinishSynchronizeBuffer(fbb, command); | 302 | Akonadi2::Commands::FinishSynchronizeBuffer(fbb, command); |
289 | open(); | 303 | open(); |
290 | return sendCommand(Commands::SynchronizeCommand, fbb); | 304 | return sendCommand(Commands::SynchronizeCommand, fbb); |
291 | } | 305 | } |
@@ -338,6 +352,25 @@ KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision) | |||
338 | return sendCommand(Akonadi2::Commands::RevisionReplayedCommand, fbb); | 352 | return sendCommand(Akonadi2::Commands::RevisionReplayedCommand, fbb); |
339 | } | 353 | } |
340 | 354 | ||
355 | KAsync::Job<void> ResourceAccess::sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) | ||
356 | { | ||
357 | flatbuffers::FlatBufferBuilder fbb; | ||
358 | auto id = fbb.CreateString(inspectionId.toStdString()); | ||
359 | auto domain = fbb.CreateString(domainType.toStdString()); | ||
360 | auto entity = fbb.CreateString(entityId.toStdString()); | ||
361 | auto prop = fbb.CreateString(property.toStdString()); | ||
362 | |||
363 | QByteArray array; | ||
364 | QDataStream s(&array, QIODevice::WriteOnly); | ||
365 | s << expectedValue; | ||
366 | |||
367 | auto expected = fbb.CreateString(array.toStdString()); | ||
368 | auto location = Akonadi2::Commands::CreateInspection (fbb, id, 0, entity, domain, prop, expected); | ||
369 | Akonadi2::Commands::FinishInspectionBuffer(fbb, location); | ||
370 | open(); | ||
371 | return sendCommand(Akonadi2::Commands::InspectionCommand, fbb); | ||
372 | } | ||
373 | |||
341 | void ResourceAccess::open() | 374 | void ResourceAccess::open() |
342 | { | 375 | { |
343 | if (d->socket && d->socket->isValid()) { | 376 | if (d->socket && d->socket->isValid()) { |
@@ -424,8 +457,8 @@ void ResourceAccess::connected() | |||
424 | { | 457 | { |
425 | flatbuffers::FlatBufferBuilder fbb; | 458 | flatbuffers::FlatBufferBuilder fbb; |
426 | auto name = fbb.CreateString(QString("PID: %1 ResourceAccess: %2").arg(QCoreApplication::applicationPid()).arg(reinterpret_cast<qlonglong>(this)).toLatin1()); | 459 | auto name = fbb.CreateString(QString("PID: %1 ResourceAccess: %2").arg(QCoreApplication::applicationPid()).arg(reinterpret_cast<qlonglong>(this)).toLatin1()); |
427 | auto command = Akonadi2::CreateHandshake(fbb, name); | 460 | auto command = Akonadi2::Commands::CreateHandshake(fbb, name); |
428 | Akonadi2::FinishHandshakeBuffer(fbb, command); | 461 | Akonadi2::Commands::FinishHandshakeBuffer(fbb, command); |
429 | Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); | 462 | Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); |
430 | } | 463 | } |
431 | 464 | ||
@@ -490,28 +523,49 @@ bool ResourceAccess::processMessageBuffer() | |||
490 | 523 | ||
491 | switch (commandId) { | 524 | switch (commandId) { |
492 | case Commands::RevisionUpdateCommand: { | 525 | case Commands::RevisionUpdateCommand: { |
493 | auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); | 526 | auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); |
494 | log(QString("Revision updated to: %1").arg(buffer->revision())); | 527 | log(QString("Revision updated to: %1").arg(buffer->revision())); |
495 | emit revisionChanged(buffer->revision()); | 528 | emit revisionChanged(buffer->revision()); |
496 | 529 | ||
497 | break; | 530 | break; |
498 | } | 531 | } |
499 | case Commands::CommandCompletion: { | 532 | case Commands::CommandCompletionCommand: { |
500 | auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); | 533 | auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); |
501 | log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); | 534 | log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); |
502 | 535 | ||
503 | d->completeCommands << buffer->id(); | 536 | d->completeCommands << buffer->id(); |
504 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first | 537 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first |
505 | QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection); | 538 | queuedInvoke([=]() { |
539 | d->callCallbacks(); | ||
540 | }, this); | ||
506 | break; | 541 | break; |
507 | } | 542 | } |
508 | case Commands::NotificationCommand: { | 543 | case Commands::NotificationCommand: { |
509 | auto buffer = GetNotification(d->partialMessageBuffer.constData() + headerSize); | 544 | auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize); |
510 | switch (buffer->type()) { | 545 | switch (buffer->type()) { |
511 | case Akonadi2::NotificationType::NotificationType_Shutdown: | 546 | case Akonadi2::Commands::NotificationType::NotificationType_Shutdown: |
512 | Log(d->resourceInstanceIdentifier) << "Received shutdown notification."; | 547 | Log(d->resourceInstanceIdentifier) << "Received shutdown notification."; |
513 | close(); | 548 | close(); |
514 | break; | 549 | break; |
550 | case Akonadi2::Commands::NotificationType::NotificationType_Inspection: { | ||
551 | Log(d->resourceInstanceIdentifier) << "Received inspection notification."; | ||
552 | Notification n; | ||
553 | if (buffer->identifier()) { | ||
554 | //Don't use fromRawData, the buffer is gone once we invoke emit notification | ||
555 | n.id = BufferUtils::extractBufferCopy(buffer->identifier()); | ||
556 | } | ||
557 | if (buffer->message()) { | ||
558 | //Don't use fromRawData, the buffer is gone once we invoke emit notification | ||
559 | n.message = BufferUtils::extractBufferCopy(buffer->message()); | ||
560 | } | ||
561 | n.type = buffer->type(); | ||
562 | n.code = buffer->code(); | ||
563 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first | ||
564 | queuedInvoke([=]() { | ||
565 | emit notification(n); | ||
566 | }, this); | ||
567 | } | ||
568 | break; | ||
515 | default: | 569 | default: |
516 | Warning() << "Received unknown notification: " << buffer->type(); | 570 | Warning() << "Received unknown notification: " << buffer->type(); |
517 | break; | 571 | break; |
@@ -526,11 +580,6 @@ bool ResourceAccess::processMessageBuffer() | |||
526 | return d->partialMessageBuffer.size() >= headerSize; | 580 | return d->partialMessageBuffer.size() >= headerSize; |
527 | } | 581 | } |
528 | 582 | ||
529 | void ResourceAccess::callCallbacks() | ||
530 | { | ||
531 | d->callCallbacks(); | ||
532 | } | ||
533 | |||
534 | void ResourceAccess::log(const QString &message) | 583 | void ResourceAccess::log(const QString &message) |
535 | { | 584 | { |
536 | Log(d->resourceInstanceIdentifier) << this << message; | 585 | Log(d->resourceInstanceIdentifier) << this << message; |