summaryrefslogtreecommitdiffstats
path: root/common/resourceaccess.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r--common/resourceaccess.cpp79
1 files changed, 64 insertions, 15 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 7be1259..6592699 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -30,19 +30,33 @@
30#include "common/modifyentity_generated.h" 30#include "common/modifyentity_generated.h"
31#include "common/deleteentity_generated.h" 31#include "common/deleteentity_generated.h"
32#include "common/revisionreplayed_generated.h" 32#include "common/revisionreplayed_generated.h"
33#include "common/inspection_generated.h"
33#include "common/entitybuffer.h" 34#include "common/entitybuffer.h"
35#include "common/bufferutils.h"
34#include "log.h" 36#include "log.h"
35 37
36#include <QCoreApplication> 38#include <QCoreApplication>
37#include <QDebug> 39#include <QDebug>
38#include <QDir> 40#include <QDir>
39#include <QProcess> 41#include <QProcess>
42#include <QDataStream>
43#include <QBuffer>
40 44
41#undef Trace 45#undef Trace
42#define Trace() Akonadi2::Log::debugStream(Akonadi2::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess") 46#define Trace() Akonadi2::Log::debugStream(Akonadi2::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess")
43#undef Log 47#undef Log
44#define Log(IDENTIFIER) Akonadi2::Log::debugStream(Akonadi2::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess("+IDENTIFIER+")") 48#define Log(IDENTIFIER) Akonadi2::Log::debugStream(Akonadi2::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess("+IDENTIFIER+")")
45 49
50static void queuedInvoke(const std::function<void()> &f, QObject *context = 0)
51{
52 auto timer = QSharedPointer<QTimer>::create();
53 timer->setSingleShot(true);
54 QObject::connect(timer.data(), &QTimer::timeout, context, [f, timer]() {
55 f();
56 });
57 timer->start(0);
58}
59
46namespace Akonadi2 60namespace Akonadi2
47{ 61{
48 62
@@ -284,8 +298,8 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool loca
284{ 298{
285 Trace() << "Sending synchronize command: " << sourceSync << localSync; 299 Trace() << "Sending synchronize command: " << sourceSync << localSync;
286 flatbuffers::FlatBufferBuilder fbb; 300 flatbuffers::FlatBufferBuilder fbb;
287 auto command = Akonadi2::CreateSynchronize(fbb, sourceSync, localSync); 301 auto command = Akonadi2::Commands::CreateSynchronize(fbb, sourceSync, localSync);
288 Akonadi2::FinishSynchronizeBuffer(fbb, command); 302 Akonadi2::Commands::FinishSynchronizeBuffer(fbb, command);
289 open(); 303 open();
290 return sendCommand(Commands::SynchronizeCommand, fbb); 304 return sendCommand(Commands::SynchronizeCommand, fbb);
291} 305}
@@ -338,6 +352,25 @@ KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision)
338 return sendCommand(Akonadi2::Commands::RevisionReplayedCommand, fbb); 352 return sendCommand(Akonadi2::Commands::RevisionReplayedCommand, fbb);
339} 353}
340 354
355KAsync::Job<void> ResourceAccess::sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
356{
357 flatbuffers::FlatBufferBuilder fbb;
358 auto id = fbb.CreateString(inspectionId.toStdString());
359 auto domain = fbb.CreateString(domainType.toStdString());
360 auto entity = fbb.CreateString(entityId.toStdString());
361 auto prop = fbb.CreateString(property.toStdString());
362
363 QByteArray array;
364 QDataStream s(&array, QIODevice::WriteOnly);
365 s << expectedValue;
366
367 auto expected = fbb.CreateString(array.toStdString());
368 auto location = Akonadi2::Commands::CreateInspection (fbb, id, 0, entity, domain, prop, expected);
369 Akonadi2::Commands::FinishInspectionBuffer(fbb, location);
370 open();
371 return sendCommand(Akonadi2::Commands::InspectionCommand, fbb);
372}
373
341void ResourceAccess::open() 374void ResourceAccess::open()
342{ 375{
343 if (d->socket && d->socket->isValid()) { 376 if (d->socket && d->socket->isValid()) {
@@ -424,8 +457,8 @@ void ResourceAccess::connected()
424 { 457 {
425 flatbuffers::FlatBufferBuilder fbb; 458 flatbuffers::FlatBufferBuilder fbb;
426 auto name = fbb.CreateString(QString("PID: %1 ResourceAccess: %2").arg(QCoreApplication::applicationPid()).arg(reinterpret_cast<qlonglong>(this)).toLatin1()); 459 auto name = fbb.CreateString(QString("PID: %1 ResourceAccess: %2").arg(QCoreApplication::applicationPid()).arg(reinterpret_cast<qlonglong>(this)).toLatin1());
427 auto command = Akonadi2::CreateHandshake(fbb, name); 460 auto command = Akonadi2::Commands::CreateHandshake(fbb, name);
428 Akonadi2::FinishHandshakeBuffer(fbb, command); 461 Akonadi2::Commands::FinishHandshakeBuffer(fbb, command);
429 Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); 462 Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb);
430 } 463 }
431 464
@@ -490,28 +523,49 @@ bool ResourceAccess::processMessageBuffer()
490 523
491 switch (commandId) { 524 switch (commandId) {
492 case Commands::RevisionUpdateCommand: { 525 case Commands::RevisionUpdateCommand: {
493 auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); 526 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize);
494 log(QString("Revision updated to: %1").arg(buffer->revision())); 527 log(QString("Revision updated to: %1").arg(buffer->revision()));
495 emit revisionChanged(buffer->revision()); 528 emit revisionChanged(buffer->revision());
496 529
497 break; 530 break;
498 } 531 }
499 case Commands::CommandCompletion: { 532 case Commands::CommandCompletionCommand: {
500 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 533 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
501 log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); 534 log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"));
502 535
503 d->completeCommands << buffer->id(); 536 d->completeCommands << buffer->id();
504 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 537 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
505 QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection); 538 queuedInvoke([=]() {
539 d->callCallbacks();
540 }, this);
506 break; 541 break;
507 } 542 }
508 case Commands::NotificationCommand: { 543 case Commands::NotificationCommand: {
509 auto buffer = GetNotification(d->partialMessageBuffer.constData() + headerSize); 544 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize);
510 switch (buffer->type()) { 545 switch (buffer->type()) {
511 case Akonadi2::NotificationType::NotificationType_Shutdown: 546 case Akonadi2::Commands::NotificationType::NotificationType_Shutdown:
512 Log(d->resourceInstanceIdentifier) << "Received shutdown notification."; 547 Log(d->resourceInstanceIdentifier) << "Received shutdown notification.";
513 close(); 548 close();
514 break; 549 break;
550 case Akonadi2::Commands::NotificationType::NotificationType_Inspection: {
551 Log(d->resourceInstanceIdentifier) << "Received inspection notification.";
552 Notification n;
553 if (buffer->identifier()) {
554 //Don't use fromRawData, the buffer is gone once we invoke emit notification
555 n.id = BufferUtils::extractBufferCopy(buffer->identifier());
556 }
557 if (buffer->message()) {
558 //Don't use fromRawData, the buffer is gone once we invoke emit notification
559 n.message = BufferUtils::extractBufferCopy(buffer->message());
560 }
561 n.type = buffer->type();
562 n.code = buffer->code();
563 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
564 queuedInvoke([=]() {
565 emit notification(n);
566 }, this);
567 }
568 break;
515 default: 569 default:
516 Warning() << "Received unknown notification: " << buffer->type(); 570 Warning() << "Received unknown notification: " << buffer->type();
517 break; 571 break;
@@ -526,11 +580,6 @@ bool ResourceAccess::processMessageBuffer()
526 return d->partialMessageBuffer.size() >= headerSize; 580 return d->partialMessageBuffer.size() >= headerSize;
527} 581}
528 582
529void ResourceAccess::callCallbacks()
530{
531 d->callCallbacks();
532}
533
534void ResourceAccess::log(const QString &message) 583void ResourceAccess::log(const QString &message)
535{ 584{
536 Log(d->resourceInstanceIdentifier) << this << message; 585 Log(d->resourceInstanceIdentifier) << this << message;