diff options
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 63 |
1 files changed, 60 insertions, 3 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 29acce4..c7f323a 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -6,13 +6,17 @@ | |||
6 | #include "createentity_generated.h" | 6 | #include "createentity_generated.h" |
7 | #include "modifyentity_generated.h" | 7 | #include "modifyentity_generated.h" |
8 | #include "deleteentity_generated.h" | 8 | #include "deleteentity_generated.h" |
9 | #include "inspection_generated.h" | ||
10 | #include "notification_generated.h" | ||
9 | #include "domainadaptor.h" | 11 | #include "domainadaptor.h" |
10 | #include "commands.h" | 12 | #include "commands.h" |
11 | #include "index.h" | 13 | #include "index.h" |
12 | #include "log.h" | 14 | #include "log.h" |
13 | #include "definitions.h" | 15 | #include "definitions.h" |
16 | #include "bufferutils.h" | ||
14 | 17 | ||
15 | #include <QUuid> | 18 | #include <QUuid> |
19 | #include <QDataStream> | ||
16 | 20 | ||
17 | static int sBatchSize = 100; | 21 | static int sBatchSize = 100; |
18 | 22 | ||
@@ -112,6 +116,7 @@ private: | |||
112 | class CommandProcessor : public QObject | 116 | class CommandProcessor : public QObject |
113 | { | 117 | { |
114 | Q_OBJECT | 118 | Q_OBJECT |
119 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; | ||
115 | public: | 120 | public: |
116 | CommandProcessor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues) | 121 | CommandProcessor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues) |
117 | : QObject(), | 122 | : QObject(), |
@@ -135,6 +140,11 @@ public: | |||
135 | mLowerBoundRevision = revision; | 140 | mLowerBoundRevision = revision; |
136 | } | 141 | } |
137 | 142 | ||
143 | void setInspectionCommand(const InspectionFunction &f) | ||
144 | { | ||
145 | mInspect = f; | ||
146 | } | ||
147 | |||
138 | 148 | ||
139 | signals: | 149 | signals: |
140 | void error(int errorCode, const QString &errorMessage); | 150 | void error(int errorCode, const QString &errorMessage); |
@@ -176,6 +186,14 @@ private slots: | |||
176 | return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 186 | return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
177 | case Akonadi2::Commands::CreateEntityCommand: | 187 | case Akonadi2::Commands::CreateEntityCommand: |
178 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 188 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
189 | case Akonadi2::Commands::InspectionCommand: | ||
190 | if (mInspect) { | ||
191 | return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { | ||
192 | return -1; | ||
193 | }); | ||
194 | } else { | ||
195 | return KAsync::error<qint64>(-1, "Missing inspection command."); | ||
196 | } | ||
179 | default: | 197 | default: |
180 | return KAsync::error<qint64>(-1, "Unhandled command"); | 198 | return KAsync::error<qint64>(-1, "Unhandled command"); |
181 | } | 199 | } |
@@ -266,6 +284,7 @@ private: | |||
266 | bool mProcessingLock; | 284 | bool mProcessingLock; |
267 | //The lowest revision we no longer need | 285 | //The lowest revision we no longer need |
268 | qint64 mLowerBoundRevision; | 286 | qint64 mLowerBoundRevision; |
287 | InspectionFunction mInspect; | ||
269 | }; | 288 | }; |
270 | 289 | ||
271 | 290 | ||
@@ -279,6 +298,38 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
279 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 298 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
280 | { | 299 | { |
281 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | 300 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); |
301 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | ||
302 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | ||
303 | if (Akonadi2::Commands::VerifyInspectionBuffer(verifier)) { | ||
304 | auto buffer = Akonadi2::Commands::GetInspection(command); | ||
305 | int inspectionType = buffer->type(); | ||
306 | |||
307 | QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id()); | ||
308 | QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId()); | ||
309 | QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType()); | ||
310 | QByteArray property = BufferUtils::extractBuffer(buffer->property()); | ||
311 | QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue()); | ||
312 | QDataStream s(expectedValueString); | ||
313 | QVariant expectedValue; | ||
314 | s >> expectedValue; | ||
315 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() { | ||
316 | Akonadi2::Notification n; | ||
317 | n.type = Akonadi2::Commands::NotificationType_Inspection; | ||
318 | n.id = inspectionId; | ||
319 | n.code = Akonadi2::Commands::NotificationCode_Success; | ||
320 | emit notify(n); | ||
321 | }, [=](int code, const QString &message) { | ||
322 | Akonadi2::Notification n; | ||
323 | n.type = Akonadi2::Commands::NotificationType_Inspection; | ||
324 | n.message = message; | ||
325 | n.id = inspectionId; | ||
326 | n.code = Akonadi2::Commands::NotificationCode_Failure; | ||
327 | emit notify(n); | ||
328 | }).exec(); | ||
329 | return KAsync::null<void>(); | ||
330 | } | ||
331 | return KAsync::error<void>(-1, "Invalid inspection command."); | ||
332 | }); | ||
282 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 333 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
283 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 334 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
284 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { | 335 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { |
@@ -301,6 +352,12 @@ GenericResource::~GenericResource() | |||
301 | delete mSourceChangeReplay; | 352 | delete mSourceChangeReplay; |
302 | } | 353 | } |
303 | 354 | ||
355 | KAsync::Job<void> GenericResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) | ||
356 | { | ||
357 | Warning() << "Inspection not implemented"; | ||
358 | return KAsync::null<void>(); | ||
359 | } | ||
360 | |||
304 | void GenericResource::enableChangeReplay(bool enable) | 361 | void GenericResource::enableChangeReplay(bool enable) |
305 | { | 362 | { |
306 | if (enable) { | 363 | if (enable) { |
@@ -464,7 +521,7 @@ void GenericResource::createEntity(const QByteArray &akonadiId, const QByteArray | |||
464 | auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | 521 | auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); |
465 | auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); | 522 | auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); |
466 | Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); | 523 | Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); |
467 | callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 524 | callback(BufferUtils::extractBuffer(fbb)); |
468 | } | 525 | } |
469 | 526 | ||
470 | void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 527 | void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) |
@@ -481,7 +538,7 @@ void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision, | |||
481 | //TODO removals | 538 | //TODO removals |
482 | auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); | 539 | auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); |
483 | Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location); | 540 | Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location); |
484 | callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 541 | callback(BufferUtils::extractBuffer(fbb)); |
485 | } | 542 | } |
486 | 543 | ||
487 | void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | 544 | void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) |
@@ -494,7 +551,7 @@ void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision, | |||
494 | auto type = fbb.CreateString(bufferType.toStdString()); | 551 | auto type = fbb.CreateString(bufferType.toStdString()); |
495 | auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); | 552 | auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); |
496 | Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location); | 553 | Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location); |
497 | callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 554 | callback(BufferUtils::extractBuffer(fbb)); |
498 | } | 555 | } |
499 | 556 | ||
500 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | 557 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) |