diff options
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 58 |
1 files changed, 8 insertions, 50 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 3aa4fce..80e59c9 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -59,41 +59,6 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q | |||
59 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 59 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
60 | { | 60 | { |
61 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); | 61 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); |
62 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | ||
63 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | ||
64 | if (Sink::Commands::VerifyInspectionBuffer(verifier)) { | ||
65 | auto buffer = Sink::Commands::GetInspection(command); | ||
66 | int inspectionType = buffer->type(); | ||
67 | |||
68 | QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id()); | ||
69 | QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId()); | ||
70 | QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType()); | ||
71 | QByteArray property = BufferUtils::extractBuffer(buffer->property()); | ||
72 | QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue()); | ||
73 | QDataStream s(expectedValueString); | ||
74 | QVariant expectedValue; | ||
75 | s >> expectedValue; | ||
76 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) | ||
77 | .then<void>( | ||
78 | [=](const KAsync::Error &error) { | ||
79 | Sink::Notification n; | ||
80 | n.type = Sink::Notification::Inspection; | ||
81 | n.id = inspectionId; | ||
82 | if (error) { | ||
83 | Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage; | ||
84 | n.code = Sink::Notification::Failure; | ||
85 | } else { | ||
86 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; | ||
87 | n.code = Sink::Notification::Success; | ||
88 | } | ||
89 | emit notify(n); | ||
90 | return KAsync::null(); | ||
91 | }) | ||
92 | .exec(); | ||
93 | return KAsync::null<void>(); | ||
94 | } | ||
95 | return KAsync::error<void>(-1, "Invalid inspection command."); | ||
96 | }); | ||
97 | mProcessor->setFlushCommand([this](void const *command, size_t size) { | 62 | mProcessor->setFlushCommand([this](void const *command, size_t size) { |
98 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 63 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
99 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | 64 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { |
@@ -114,14 +79,9 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q | |||
114 | } | 79 | } |
115 | return KAsync::error<void>(-1, "Invalid flush command."); | 80 | return KAsync::error<void>(-1, "Invalid flush command."); |
116 | }); | 81 | }); |
117 | { | 82 | QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
118 | auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 83 | QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); |
119 | Q_ASSERT(ret); | 84 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
120 | } | ||
121 | { | ||
122 | auto ret = QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | ||
123 | Q_ASSERT(ret); | ||
124 | } | ||
125 | 85 | ||
126 | mCommitQueueTimer.setInterval(sCommitInterval); | 86 | mCommitQueueTimer.setInterval(sCommitInterval); |
127 | mCommitQueueTimer.setSingleShot(true); | 87 | mCommitQueueTimer.setSingleShot(true); |
@@ -132,13 +92,6 @@ GenericResource::~GenericResource() | |||
132 | { | 92 | { |
133 | } | 93 | } |
134 | 94 | ||
135 | KAsync::Job<void> GenericResource::inspect( | ||
136 | int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) | ||
137 | { | ||
138 | SinkWarning() << "Inspection not implemented"; | ||
139 | return KAsync::null<void>(); | ||
140 | } | ||
141 | |||
142 | void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) | 95 | void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) |
143 | { | 96 | { |
144 | mPipeline->setPreprocessors(type, preprocessors); | 97 | mPipeline->setPreprocessors(type, preprocessors); |
@@ -179,6 +132,11 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync | |||
179 | QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); | 132 | QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); |
180 | } | 133 | } |
181 | 134 | ||
135 | void GenericResource::setupInspector(const QSharedPointer<Inspector> &inspector) | ||
136 | { | ||
137 | mProcessor->setInspector(inspector); | ||
138 | } | ||
139 | |||
182 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) | 140 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) |
183 | { | 141 | { |
184 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); | 142 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); |