summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp63
1 files changed, 60 insertions, 3 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 29acce4..c7f323a 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -6,13 +6,17 @@
6#include "createentity_generated.h" 6#include "createentity_generated.h"
7#include "modifyentity_generated.h" 7#include "modifyentity_generated.h"
8#include "deleteentity_generated.h" 8#include "deleteentity_generated.h"
9#include "inspection_generated.h"
10#include "notification_generated.h"
9#include "domainadaptor.h" 11#include "domainadaptor.h"
10#include "commands.h" 12#include "commands.h"
11#include "index.h" 13#include "index.h"
12#include "log.h" 14#include "log.h"
13#include "definitions.h" 15#include "definitions.h"
16#include "bufferutils.h"
14 17
15#include <QUuid> 18#include <QUuid>
19#include <QDataStream>
16 20
17static int sBatchSize = 100; 21static int sBatchSize = 100;
18 22
@@ -112,6 +116,7 @@ private:
112class CommandProcessor : public QObject 116class CommandProcessor : public QObject
113{ 117{
114 Q_OBJECT 118 Q_OBJECT
119 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
115public: 120public:
116 CommandProcessor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues) 121 CommandProcessor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues)
117 : QObject(), 122 : QObject(),
@@ -135,6 +140,11 @@ public:
135 mLowerBoundRevision = revision; 140 mLowerBoundRevision = revision;
136 } 141 }
137 142
143 void setInspectionCommand(const InspectionFunction &f)
144 {
145 mInspect = f;
146 }
147
138 148
139signals: 149signals:
140 void error(int errorCode, const QString &errorMessage); 150 void error(int errorCode, const QString &errorMessage);
@@ -176,6 +186,14 @@ private slots:
176 return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 186 return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
177 case Akonadi2::Commands::CreateEntityCommand: 187 case Akonadi2::Commands::CreateEntityCommand:
178 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 188 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
189 case Akonadi2::Commands::InspectionCommand:
190 if (mInspect) {
191 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() {
192 return -1;
193 });
194 } else {
195 return KAsync::error<qint64>(-1, "Missing inspection command.");
196 }
179 default: 197 default:
180 return KAsync::error<qint64>(-1, "Unhandled command"); 198 return KAsync::error<qint64>(-1, "Unhandled command");
181 } 199 }
@@ -266,6 +284,7 @@ private:
266 bool mProcessingLock; 284 bool mProcessingLock;
267 //The lowest revision we no longer need 285 //The lowest revision we no longer need
268 qint64 mLowerBoundRevision; 286 qint64 mLowerBoundRevision;
287 InspectionFunction mInspect;
269}; 288};
270 289
271 290
@@ -279,6 +298,38 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
279 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 298 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
280{ 299{
281 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); 300 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
301 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
302 flatbuffers::Verifier verifier((const uint8_t *)command, size);
303 if (Akonadi2::Commands::VerifyInspectionBuffer(verifier)) {
304 auto buffer = Akonadi2::Commands::GetInspection(command);
305 int inspectionType = buffer->type();
306
307 QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id());
308 QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId());
309 QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType());
310 QByteArray property = BufferUtils::extractBuffer(buffer->property());
311 QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue());
312 QDataStream s(expectedValueString);
313 QVariant expectedValue;
314 s >> expectedValue;
315 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() {
316 Akonadi2::Notification n;
317 n.type = Akonadi2::Commands::NotificationType_Inspection;
318 n.id = inspectionId;
319 n.code = Akonadi2::Commands::NotificationCode_Success;
320 emit notify(n);
321 }, [=](int code, const QString &message) {
322 Akonadi2::Notification n;
323 n.type = Akonadi2::Commands::NotificationType_Inspection;
324 n.message = message;
325 n.id = inspectionId;
326 n.code = Akonadi2::Commands::NotificationCode_Failure;
327 emit notify(n);
328 }).exec();
329 return KAsync::null<void>();
330 }
331 return KAsync::error<void>(-1, "Invalid inspection command.");
332 });
282 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 333 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
283 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 334 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
284 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { 335 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) {
@@ -301,6 +352,12 @@ GenericResource::~GenericResource()
301 delete mSourceChangeReplay; 352 delete mSourceChangeReplay;
302} 353}
303 354
355KAsync::Job<void> GenericResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
356{
357 Warning() << "Inspection not implemented";
358 return KAsync::null<void>();
359}
360
304void GenericResource::enableChangeReplay(bool enable) 361void GenericResource::enableChangeReplay(bool enable)
305{ 362{
306 if (enable) { 363 if (enable) {
@@ -464,7 +521,7 @@ void GenericResource::createEntity(const QByteArray &akonadiId, const QByteArray
464 auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 521 auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
465 auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); 522 auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
466 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); 523 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location);
467 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 524 callback(BufferUtils::extractBuffer(fbb));
468} 525}
469 526
470void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 527void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
@@ -481,7 +538,7 @@ void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision,
481 //TODO removals 538 //TODO removals
482 auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); 539 auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource);
483 Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location); 540 Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location);
484 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 541 callback(BufferUtils::extractBuffer(fbb));
485} 542}
486 543
487void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) 544void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback)
@@ -494,7 +551,7 @@ void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision,
494 auto type = fbb.CreateString(bufferType.toStdString()); 551 auto type = fbb.CreateString(bufferType.toStdString());
495 auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); 552 auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
496 Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location); 553 Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location);
497 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 554 callback(BufferUtils::extractBuffer(fbb));
498} 555}
499 556
500void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) 557void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction)