diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | common/commandprocessor.cpp | 27 | ||||
-rw-r--r-- | common/commandprocessor.h | 14 | ||||
-rw-r--r-- | common/genericresource.cpp | 58 | ||||
-rw-r--r-- | common/genericresource.h | 4 | ||||
-rw-r--r-- | common/inspector.cpp | 85 | ||||
-rw-r--r-- | common/inspector.h | 52 |
7 files changed, 174 insertions, 67 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 018fc22..df44ce5 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -78,6 +78,7 @@ set(command_SRCS | |||
78 | mail/threadindexer.cpp | 78 | mail/threadindexer.cpp |
79 | notification.cpp | 79 | notification.cpp |
80 | commandprocessor.cpp | 80 | commandprocessor.cpp |
81 | inspector.cpp | ||
81 | ${storage_SRCS}) | 82 | ${storage_SRCS}) |
82 | 83 | ||
83 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) | 84 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) |
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index c9fca37..4ff352b 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp | |||
@@ -22,7 +22,8 @@ | |||
22 | #include "commands.h" | 22 | #include "commands.h" |
23 | #include "messagequeue.h" | 23 | #include "messagequeue.h" |
24 | #include "queuedcommand_generated.h" | 24 | #include "queuedcommand_generated.h" |
25 | 25 | #include "inspector.h" | |
26 | #include "synchronizer.h" | ||
26 | #include "pipeline.h" | 27 | #include "pipeline.h" |
27 | 28 | ||
28 | static int sBatchSize = 100; | 29 | static int sBatchSize = 100; |
@@ -42,11 +43,6 @@ void CommandProcessor::setOldestUsedRevision(qint64 revision) | |||
42 | mLowerBoundRevision = revision; | 43 | mLowerBoundRevision = revision; |
43 | } | 44 | } |
44 | 45 | ||
45 | void CommandProcessor::setInspectionCommand(const InspectionFunction &f) | ||
46 | { | ||
47 | mInspect = f; | ||
48 | } | ||
49 | |||
50 | void CommandProcessor::setFlushCommand(const FlushFunction &f) | 46 | void CommandProcessor::setFlushCommand(const FlushFunction &f) |
51 | { | 47 | { |
52 | mFlush = f; | 48 | mFlush = f; |
@@ -91,12 +87,9 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCom | |||
91 | case Sink::Commands::CreateEntityCommand: | 87 | case Sink::Commands::CreateEntityCommand: |
92 | return mPipeline->newEntity(data, size); | 88 | return mPipeline->newEntity(data, size); |
93 | case Sink::Commands::InspectionCommand: | 89 | case Sink::Commands::InspectionCommand: |
94 | if (mInspect) { | 90 | Q_ASSERT(mInspector); |
95 | return mInspect(data, size) | 91 | return mInspector->processCommand(data, size) |
96 | .syncThen<qint64>([]() { return -1; }); | 92 | .syncThen<qint64>([]() { return -1; }); |
97 | } else { | ||
98 | return KAsync::error<qint64>(-1, "Missing inspection command."); | ||
99 | } | ||
100 | case Sink::Commands::FlushCommand: | 93 | case Sink::Commands::FlushCommand: |
101 | if (mFlush) { | 94 | if (mFlush) { |
102 | return mFlush(data, size) | 95 | return mFlush(data, size) |
@@ -191,3 +184,15 @@ KAsync::Job<void> CommandProcessor::processPipeline() | |||
191 | }); | 184 | }); |
192 | } | 185 | } |
193 | 186 | ||
187 | void CommandProcessor::setInspector(const QSharedPointer<Inspector> &inspector) | ||
188 | { | ||
189 | mInspector = inspector; | ||
190 | QObject::connect(mInspector.data(), &Inspector::notify, this, &CommandProcessor::notify); | ||
191 | } | ||
192 | |||
193 | void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) | ||
194 | { | ||
195 | mSynchronizer = synchronizer; | ||
196 | QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); | ||
197 | } | ||
198 | |||
diff --git a/common/commandprocessor.h b/common/commandprocessor.h index 51d845e..75ae37a 100644 --- a/common/commandprocessor.h +++ b/common/commandprocessor.h | |||
@@ -24,12 +24,16 @@ | |||
24 | #include <QObject> | 24 | #include <QObject> |
25 | #include <Async/Async> | 25 | #include <Async/Async> |
26 | #include <functional> | 26 | #include <functional> |
27 | |||
27 | #include "log.h" | 28 | #include "log.h" |
29 | #include "notification.h" | ||
28 | 30 | ||
29 | class MessageQueue; | 31 | class MessageQueue; |
30 | 32 | ||
31 | namespace Sink { | 33 | namespace Sink { |
32 | class Pipeline; | 34 | class Pipeline; |
35 | class Inspector; | ||
36 | class Synchronizer; | ||
33 | class QueuedCommand; | 37 | class QueuedCommand; |
34 | 38 | ||
35 | /** | 39 | /** |
@@ -38,7 +42,6 @@ namespace Sink { | |||
38 | class CommandProcessor : public QObject | 42 | class CommandProcessor : public QObject |
39 | { | 43 | { |
40 | Q_OBJECT | 44 | Q_OBJECT |
41 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; | ||
42 | typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction; | 45 | typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction; |
43 | SINK_DEBUG_AREA("commandprocessor") | 46 | SINK_DEBUG_AREA("commandprocessor") |
44 | 47 | ||
@@ -47,11 +50,13 @@ public: | |||
47 | 50 | ||
48 | void setOldestUsedRevision(qint64 revision); | 51 | void setOldestUsedRevision(qint64 revision); |
49 | 52 | ||
50 | void setInspectionCommand(const InspectionFunction &f); | ||
51 | |||
52 | void setFlushCommand(const FlushFunction &f); | 53 | void setFlushCommand(const FlushFunction &f); |
53 | 54 | ||
55 | void setInspector(const QSharedPointer<Inspector> &inspector); | ||
56 | void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); | ||
57 | |||
54 | signals: | 58 | signals: |
59 | void notify(Notification); | ||
55 | void error(int errorCode, const QString &errorMessage); | 60 | void error(int errorCode, const QString &errorMessage); |
56 | 61 | ||
57 | private: | 62 | private: |
@@ -72,8 +77,9 @@ private: | |||
72 | bool mProcessingLock; | 77 | bool mProcessingLock; |
73 | // The lowest revision we no longer need | 78 | // The lowest revision we no longer need |
74 | qint64 mLowerBoundRevision; | 79 | qint64 mLowerBoundRevision; |
75 | InspectionFunction mInspect; | ||
76 | FlushFunction mFlush; | 80 | FlushFunction mFlush; |
81 | QSharedPointer<Synchronizer> mSynchronizer; | ||
82 | QSharedPointer<Inspector> mInspector; | ||
77 | }; | 83 | }; |
78 | 84 | ||
79 | }; | 85 | }; |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 3aa4fce..80e59c9 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -59,41 +59,6 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q | |||
59 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 59 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
60 | { | 60 | { |
61 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); | 61 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); |
62 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | ||
63 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | ||
64 | if (Sink::Commands::VerifyInspectionBuffer(verifier)) { | ||
65 | auto buffer = Sink::Commands::GetInspection(command); | ||
66 | int inspectionType = buffer->type(); | ||
67 | |||
68 | QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id()); | ||
69 | QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId()); | ||
70 | QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType()); | ||
71 | QByteArray property = BufferUtils::extractBuffer(buffer->property()); | ||
72 | QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue()); | ||
73 | QDataStream s(expectedValueString); | ||
74 | QVariant expectedValue; | ||
75 | s >> expectedValue; | ||
76 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) | ||
77 | .then<void>( | ||
78 | [=](const KAsync::Error &error) { | ||
79 | Sink::Notification n; | ||
80 | n.type = Sink::Notification::Inspection; | ||
81 | n.id = inspectionId; | ||
82 | if (error) { | ||
83 | Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage; | ||
84 | n.code = Sink::Notification::Failure; | ||
85 | } else { | ||
86 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; | ||
87 | n.code = Sink::Notification::Success; | ||
88 | } | ||
89 | emit notify(n); | ||
90 | return KAsync::null(); | ||
91 | }) | ||
92 | .exec(); | ||
93 | return KAsync::null<void>(); | ||
94 | } | ||
95 | return KAsync::error<void>(-1, "Invalid inspection command."); | ||
96 | }); | ||
97 | mProcessor->setFlushCommand([this](void const *command, size_t size) { | 62 | mProcessor->setFlushCommand([this](void const *command, size_t size) { |
98 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 63 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
99 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | 64 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { |
@@ -114,14 +79,9 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q | |||
114 | } | 79 | } |
115 | return KAsync::error<void>(-1, "Invalid flush command."); | 80 | return KAsync::error<void>(-1, "Invalid flush command."); |
116 | }); | 81 | }); |
117 | { | 82 | QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
118 | auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 83 | QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); |
119 | Q_ASSERT(ret); | 84 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
120 | } | ||
121 | { | ||
122 | auto ret = QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | ||
123 | Q_ASSERT(ret); | ||
124 | } | ||
125 | 85 | ||
126 | mCommitQueueTimer.setInterval(sCommitInterval); | 86 | mCommitQueueTimer.setInterval(sCommitInterval); |
127 | mCommitQueueTimer.setSingleShot(true); | 87 | mCommitQueueTimer.setSingleShot(true); |
@@ -132,13 +92,6 @@ GenericResource::~GenericResource() | |||
132 | { | 92 | { |
133 | } | 93 | } |
134 | 94 | ||
135 | KAsync::Job<void> GenericResource::inspect( | ||
136 | int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) | ||
137 | { | ||
138 | SinkWarning() << "Inspection not implemented"; | ||
139 | return KAsync::null<void>(); | ||
140 | } | ||
141 | |||
142 | void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) | 95 | void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) |
143 | { | 96 | { |
144 | mPipeline->setPreprocessors(type, preprocessors); | 97 | mPipeline->setPreprocessors(type, preprocessors); |
@@ -179,6 +132,11 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync | |||
179 | QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); | 132 | QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); |
180 | } | 133 | } |
181 | 134 | ||
135 | void GenericResource::setupInspector(const QSharedPointer<Inspector> &inspector) | ||
136 | { | ||
137 | mProcessor->setInspector(inspector); | ||
138 | } | ||
139 | |||
182 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) | 140 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) |
183 | { | 141 | { |
184 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); | 142 | Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); |
diff --git a/common/genericresource.h b/common/genericresource.h index 12f15f3..0bc47da 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -32,6 +32,7 @@ namespace Sink { | |||
32 | class Pipeline; | 32 | class Pipeline; |
33 | class Preprocessor; | 33 | class Preprocessor; |
34 | class Synchronizer; | 34 | class Synchronizer; |
35 | class Inspector; | ||
35 | class CommandProcessor; | 36 | class CommandProcessor; |
36 | 37 | ||
37 | /** | 38 | /** |
@@ -50,8 +51,6 @@ public: | |||
50 | virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE; | 51 | virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE; |
51 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; | 52 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; |
52 | virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; | 53 | virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; |
53 | virtual KAsync::Job<void> | ||
54 | inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue); | ||
55 | 54 | ||
56 | int error() const; | 55 | int error() const; |
57 | 56 | ||
@@ -64,6 +63,7 @@ private slots: | |||
64 | protected: | 63 | protected: |
65 | void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors); | 64 | void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors); |
66 | void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); | 65 | void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); |
66 | void setupInspector(const QSharedPointer<Inspector> &inspector); | ||
67 | 67 | ||
68 | void onProcessorError(int errorCode, const QString &errorMessage); | 68 | void onProcessorError(int errorCode, const QString &errorMessage); |
69 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | 69 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); |
diff --git a/common/inspector.cpp b/common/inspector.cpp new file mode 100644 index 0000000..8b4c93a --- /dev/null +++ b/common/inspector.cpp | |||
@@ -0,0 +1,85 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #include "inspector.h" | ||
21 | |||
22 | #include "resourcecontext.h" | ||
23 | #include "inspection_generated.h" | ||
24 | #include "bufferutils.h" | ||
25 | |||
26 | #include <QDataStream> | ||
27 | |||
28 | using namespace Sink; | ||
29 | |||
30 | Inspector::Inspector(const ResourceContext &context) | ||
31 | : QObject(), | ||
32 | mResourceContext(context) | ||
33 | // mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), | ||
34 | // mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) | ||
35 | { | ||
36 | // SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); | ||
37 | } | ||
38 | |||
39 | Inspector::~Inspector() | ||
40 | { | ||
41 | |||
42 | } | ||
43 | |||
44 | KAsync::Job<void> Inspector::processCommand(void const *command, size_t size) | ||
45 | { | ||
46 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | ||
47 | if (Sink::Commands::VerifyInspectionBuffer(verifier)) { | ||
48 | auto buffer = Sink::Commands::GetInspection(command); | ||
49 | int inspectionType = buffer->type(); | ||
50 | |||
51 | QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id()); | ||
52 | QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId()); | ||
53 | QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType()); | ||
54 | QByteArray property = BufferUtils::extractBuffer(buffer->property()); | ||
55 | QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue()); | ||
56 | QDataStream s(expectedValueString); | ||
57 | QVariant expectedValue; | ||
58 | s >> expectedValue; | ||
59 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) | ||
60 | .then<void>( | ||
61 | [=](const KAsync::Error &error) { | ||
62 | Sink::Notification n; | ||
63 | n.type = Sink::Notification::Inspection; | ||
64 | n.id = inspectionId; | ||
65 | if (error) { | ||
66 | Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage; | ||
67 | n.code = Sink::Notification::Failure; | ||
68 | } else { | ||
69 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; | ||
70 | n.code = Sink::Notification::Success; | ||
71 | } | ||
72 | emit notify(n); | ||
73 | return KAsync::null(); | ||
74 | }) | ||
75 | .exec(); | ||
76 | return KAsync::null<void>(); | ||
77 | } | ||
78 | return KAsync::error<void>(-1, "Invalid inspection command."); | ||
79 | } | ||
80 | |||
81 | KAsync::Job<void> Inspector::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) | ||
82 | { | ||
83 | return KAsync::error(-1, "Inspection not implemented."); | ||
84 | } | ||
85 | |||
diff --git a/common/inspector.h b/common/inspector.h new file mode 100644 index 0000000..ff167b1 --- /dev/null +++ b/common/inspector.h | |||
@@ -0,0 +1,52 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #pragma once | ||
21 | |||
22 | #include "sink_export.h" | ||
23 | #include <QObject> | ||
24 | #include <Async/Async> | ||
25 | |||
26 | #include "notification.h" | ||
27 | #include "resourcecontext.h" | ||
28 | |||
29 | namespace Sink { | ||
30 | |||
31 | /** | ||
32 | * Synchronize and add what we don't already have to local queue | ||
33 | */ | ||
34 | class SINK_EXPORT Inspector : public QObject | ||
35 | { | ||
36 | Q_OBJECT | ||
37 | public: | ||
38 | Inspector(const ResourceContext &resourceContext); | ||
39 | virtual ~Inspector(); | ||
40 | |||
41 | KAsync::Job<void> processCommand(void const *command, size_t size); | ||
42 | |||
43 | signals: | ||
44 | void notify(Notification); | ||
45 | |||
46 | protected: | ||
47 | virtual KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue); | ||
48 | |||
49 | Sink::ResourceContext mResourceContext; | ||
50 | }; | ||
51 | |||
52 | } | ||