summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp58
1 files changed, 8 insertions, 50 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 3aa4fce..80e59c9 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -59,41 +59,6 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q
59 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 59 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
60{ 60{
61 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); 61 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue));
62 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
63 flatbuffers::Verifier verifier((const uint8_t *)command, size);
64 if (Sink::Commands::VerifyInspectionBuffer(verifier)) {
65 auto buffer = Sink::Commands::GetInspection(command);
66 int inspectionType = buffer->type();
67
68 QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id());
69 QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId());
70 QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType());
71 QByteArray property = BufferUtils::extractBuffer(buffer->property());
72 QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue());
73 QDataStream s(expectedValueString);
74 QVariant expectedValue;
75 s >> expectedValue;
76 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue)
77 .then<void>(
78 [=](const KAsync::Error &error) {
79 Sink::Notification n;
80 n.type = Sink::Notification::Inspection;
81 n.id = inspectionId;
82 if (error) {
83 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage;
84 n.code = Sink::Notification::Failure;
85 } else {
86 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
87 n.code = Sink::Notification::Success;
88 }
89 emit notify(n);
90 return KAsync::null();
91 })
92 .exec();
93 return KAsync::null<void>();
94 }
95 return KAsync::error<void>(-1, "Invalid inspection command.");
96 });
97 mProcessor->setFlushCommand([this](void const *command, size_t size) { 62 mProcessor->setFlushCommand([this](void const *command, size_t size) {
98 flatbuffers::Verifier verifier((const uint8_t *)command, size); 63 flatbuffers::Verifier verifier((const uint8_t *)command, size);
99 if (Sink::Commands::VerifyFlushBuffer(verifier)) { 64 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
@@ -114,14 +79,9 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q
114 } 79 }
115 return KAsync::error<void>(-1, "Invalid flush command."); 80 return KAsync::error<void>(-1, "Invalid flush command.");
116 }); 81 });
117 { 82 QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
118 auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 83 QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify);
119 Q_ASSERT(ret); 84 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
120 }
121 {
122 auto ret = QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
123 Q_ASSERT(ret);
124 }
125 85
126 mCommitQueueTimer.setInterval(sCommitInterval); 86 mCommitQueueTimer.setInterval(sCommitInterval);
127 mCommitQueueTimer.setSingleShot(true); 87 mCommitQueueTimer.setSingleShot(true);
@@ -132,13 +92,6 @@ GenericResource::~GenericResource()
132{ 92{
133} 93}
134 94
135KAsync::Job<void> GenericResource::inspect(
136 int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
137{
138 SinkWarning() << "Inspection not implemented";
139 return KAsync::null<void>();
140}
141
142void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) 95void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors)
143{ 96{
144 mPipeline->setPreprocessors(type, preprocessors); 97 mPipeline->setPreprocessors(type, preprocessors);
@@ -179,6 +132,11 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync
179 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); 132 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection);
180} 133}
181 134
135void GenericResource::setupInspector(const QSharedPointer<Inspector> &inspector)
136{
137 mProcessor->setInspector(inspector);
138}
139
182void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) 140void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
183{ 141{
184 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); 142 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk();