summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/commandprocessor.cpp27
-rw-r--r--common/commandprocessor.h14
-rw-r--r--common/genericresource.cpp58
-rw-r--r--common/genericresource.h4
-rw-r--r--common/inspector.cpp85
-rw-r--r--common/inspector.h52
7 files changed, 174 insertions, 67 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 018fc22..df44ce5 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -78,6 +78,7 @@ set(command_SRCS
78 mail/threadindexer.cpp 78 mail/threadindexer.cpp
79 notification.cpp 79 notification.cpp
80 commandprocessor.cpp 80 commandprocessor.cpp
81 inspector.cpp
81 ${storage_SRCS}) 82 ${storage_SRCS})
82 83
83add_library(${PROJECT_NAME} SHARED ${command_SRCS}) 84add_library(${PROJECT_NAME} SHARED ${command_SRCS})
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
index c9fca37..4ff352b 100644
--- a/common/commandprocessor.cpp
+++ b/common/commandprocessor.cpp
@@ -22,7 +22,8 @@
22#include "commands.h" 22#include "commands.h"
23#include "messagequeue.h" 23#include "messagequeue.h"
24#include "queuedcommand_generated.h" 24#include "queuedcommand_generated.h"
25 25#include "inspector.h"
26#include "synchronizer.h"
26#include "pipeline.h" 27#include "pipeline.h"
27 28
28static int sBatchSize = 100; 29static int sBatchSize = 100;
@@ -42,11 +43,6 @@ void CommandProcessor::setOldestUsedRevision(qint64 revision)
42 mLowerBoundRevision = revision; 43 mLowerBoundRevision = revision;
43} 44}
44 45
45void CommandProcessor::setInspectionCommand(const InspectionFunction &f)
46{
47 mInspect = f;
48}
49
50void CommandProcessor::setFlushCommand(const FlushFunction &f) 46void CommandProcessor::setFlushCommand(const FlushFunction &f)
51{ 47{
52 mFlush = f; 48 mFlush = f;
@@ -91,12 +87,9 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCom
91 case Sink::Commands::CreateEntityCommand: 87 case Sink::Commands::CreateEntityCommand:
92 return mPipeline->newEntity(data, size); 88 return mPipeline->newEntity(data, size);
93 case Sink::Commands::InspectionCommand: 89 case Sink::Commands::InspectionCommand:
94 if (mInspect) { 90 Q_ASSERT(mInspector);
95 return mInspect(data, size) 91 return mInspector->processCommand(data, size)
96 .syncThen<qint64>([]() { return -1; }); 92 .syncThen<qint64>([]() { return -1; });
97 } else {
98 return KAsync::error<qint64>(-1, "Missing inspection command.");
99 }
100 case Sink::Commands::FlushCommand: 93 case Sink::Commands::FlushCommand:
101 if (mFlush) { 94 if (mFlush) {
102 return mFlush(data, size) 95 return mFlush(data, size)
@@ -191,3 +184,15 @@ KAsync::Job<void> CommandProcessor::processPipeline()
191 }); 184 });
192} 185}
193 186
187void CommandProcessor::setInspector(const QSharedPointer<Inspector> &inspector)
188{
189 mInspector = inspector;
190 QObject::connect(mInspector.data(), &Inspector::notify, this, &CommandProcessor::notify);
191}
192
193void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer)
194{
195 mSynchronizer = synchronizer;
196 QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify);
197}
198
diff --git a/common/commandprocessor.h b/common/commandprocessor.h
index 51d845e..75ae37a 100644
--- a/common/commandprocessor.h
+++ b/common/commandprocessor.h
@@ -24,12 +24,16 @@
24#include <QObject> 24#include <QObject>
25#include <Async/Async> 25#include <Async/Async>
26#include <functional> 26#include <functional>
27
27#include "log.h" 28#include "log.h"
29#include "notification.h"
28 30
29class MessageQueue; 31class MessageQueue;
30 32
31namespace Sink { 33namespace Sink {
32 class Pipeline; 34 class Pipeline;
35 class Inspector;
36 class Synchronizer;
33 class QueuedCommand; 37 class QueuedCommand;
34 38
35/** 39/**
@@ -38,7 +42,6 @@ namespace Sink {
38class CommandProcessor : public QObject 42class CommandProcessor : public QObject
39{ 43{
40 Q_OBJECT 44 Q_OBJECT
41 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
42 typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction; 45 typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction;
43 SINK_DEBUG_AREA("commandprocessor") 46 SINK_DEBUG_AREA("commandprocessor")
44 47
@@ -47,11 +50,13 @@ public:
47 50
48 void setOldestUsedRevision(qint64 revision); 51 void setOldestUsedRevision(qint64 revision);
49 52
50 void setInspectionCommand(const InspectionFunction &f);
51
52 void setFlushCommand(const FlushFunction &f); 53 void setFlushCommand(const FlushFunction &f);
53 54
55 void setInspector(const QSharedPointer<Inspector> &inspector);
56 void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer);
57
54signals: 58signals:
59 void notify(Notification);
55 void error(int errorCode, const QString &errorMessage); 60 void error(int errorCode, const QString &errorMessage);
56 61
57private: 62private:
@@ -72,8 +77,9 @@ private:
72 bool mProcessingLock; 77 bool mProcessingLock;
73 // The lowest revision we no longer need 78 // The lowest revision we no longer need
74 qint64 mLowerBoundRevision; 79 qint64 mLowerBoundRevision;
75 InspectionFunction mInspect;
76 FlushFunction mFlush; 80 FlushFunction mFlush;
81 QSharedPointer<Synchronizer> mSynchronizer;
82 QSharedPointer<Inspector> mInspector;
77}; 83};
78 84
79}; 85};
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 3aa4fce..80e59c9 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -59,41 +59,6 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q
59 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 59 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
60{ 60{
61 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); 61 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue));
62 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
63 flatbuffers::Verifier verifier((const uint8_t *)command, size);
64 if (Sink::Commands::VerifyInspectionBuffer(verifier)) {
65 auto buffer = Sink::Commands::GetInspection(command);
66 int inspectionType = buffer->type();
67
68 QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id());
69 QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId());
70 QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType());
71 QByteArray property = BufferUtils::extractBuffer(buffer->property());
72 QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue());
73 QDataStream s(expectedValueString);
74 QVariant expectedValue;
75 s >> expectedValue;
76 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue)
77 .then<void>(
78 [=](const KAsync::Error &error) {
79 Sink::Notification n;
80 n.type = Sink::Notification::Inspection;
81 n.id = inspectionId;
82 if (error) {
83 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage;
84 n.code = Sink::Notification::Failure;
85 } else {
86 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
87 n.code = Sink::Notification::Success;
88 }
89 emit notify(n);
90 return KAsync::null();
91 })
92 .exec();
93 return KAsync::null<void>();
94 }
95 return KAsync::error<void>(-1, "Invalid inspection command.");
96 });
97 mProcessor->setFlushCommand([this](void const *command, size_t size) { 62 mProcessor->setFlushCommand([this](void const *command, size_t size) {
98 flatbuffers::Verifier verifier((const uint8_t *)command, size); 63 flatbuffers::Verifier verifier((const uint8_t *)command, size);
99 if (Sink::Commands::VerifyFlushBuffer(verifier)) { 64 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
@@ -114,14 +79,9 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q
114 } 79 }
115 return KAsync::error<void>(-1, "Invalid flush command."); 80 return KAsync::error<void>(-1, "Invalid flush command.");
116 }); 81 });
117 { 82 QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
118 auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 83 QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify);
119 Q_ASSERT(ret); 84 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
120 }
121 {
122 auto ret = QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
123 Q_ASSERT(ret);
124 }
125 85
126 mCommitQueueTimer.setInterval(sCommitInterval); 86 mCommitQueueTimer.setInterval(sCommitInterval);
127 mCommitQueueTimer.setSingleShot(true); 87 mCommitQueueTimer.setSingleShot(true);
@@ -132,13 +92,6 @@ GenericResource::~GenericResource()
132{ 92{
133} 93}
134 94
135KAsync::Job<void> GenericResource::inspect(
136 int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
137{
138 SinkWarning() << "Inspection not implemented";
139 return KAsync::null<void>();
140}
141
142void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) 95void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors)
143{ 96{
144 mPipeline->setPreprocessors(type, preprocessors); 97 mPipeline->setPreprocessors(type, preprocessors);
@@ -179,6 +132,11 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync
179 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); 132 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection);
180} 133}
181 134
135void GenericResource::setupInspector(const QSharedPointer<Inspector> &inspector)
136{
137 mProcessor->setInspector(inspector);
138}
139
182void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) 140void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
183{ 141{
184 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); 142 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk();
diff --git a/common/genericresource.h b/common/genericresource.h
index 12f15f3..0bc47da 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -32,6 +32,7 @@ namespace Sink {
32class Pipeline; 32class Pipeline;
33class Preprocessor; 33class Preprocessor;
34class Synchronizer; 34class Synchronizer;
35class Inspector;
35class CommandProcessor; 36class CommandProcessor;
36 37
37/** 38/**
@@ -50,8 +51,6 @@ public:
50 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE; 51 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE;
51 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; 52 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE;
52 virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; 53 virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE;
53 virtual KAsync::Job<void>
54 inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue);
55 54
56 int error() const; 55 int error() const;
57 56
@@ -64,6 +63,7 @@ private slots:
64protected: 63protected:
65 void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors); 64 void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors);
66 void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); 65 void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer);
66 void setupInspector(const QSharedPointer<Inspector> &inspector);
67 67
68 void onProcessorError(int errorCode, const QString &errorMessage); 68 void onProcessorError(int errorCode, const QString &errorMessage);
69 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); 69 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data);
diff --git a/common/inspector.cpp b/common/inspector.cpp
new file mode 100644
index 0000000..8b4c93a
--- /dev/null
+++ b/common/inspector.cpp
@@ -0,0 +1,85 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "inspector.h"
21
22#include "resourcecontext.h"
23#include "inspection_generated.h"
24#include "bufferutils.h"
25
26#include <QDataStream>
27
28using namespace Sink;
29
30Inspector::Inspector(const ResourceContext &context)
31 : QObject(),
32 mResourceContext(context)
33 // mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)),
34 // mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite)
35{
36 // SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
37}
38
39Inspector::~Inspector()
40{
41
42}
43
44KAsync::Job<void> Inspector::processCommand(void const *command, size_t size)
45{
46 flatbuffers::Verifier verifier((const uint8_t *)command, size);
47 if (Sink::Commands::VerifyInspectionBuffer(verifier)) {
48 auto buffer = Sink::Commands::GetInspection(command);
49 int inspectionType = buffer->type();
50
51 QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id());
52 QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId());
53 QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType());
54 QByteArray property = BufferUtils::extractBuffer(buffer->property());
55 QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue());
56 QDataStream s(expectedValueString);
57 QVariant expectedValue;
58 s >> expectedValue;
59 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue)
60 .then<void>(
61 [=](const KAsync::Error &error) {
62 Sink::Notification n;
63 n.type = Sink::Notification::Inspection;
64 n.id = inspectionId;
65 if (error) {
66 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage;
67 n.code = Sink::Notification::Failure;
68 } else {
69 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
70 n.code = Sink::Notification::Success;
71 }
72 emit notify(n);
73 return KAsync::null();
74 })
75 .exec();
76 return KAsync::null<void>();
77 }
78 return KAsync::error<void>(-1, "Invalid inspection command.");
79}
80
81KAsync::Job<void> Inspector::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
82{
83 return KAsync::error(-1, "Inspection not implemented.");
84}
85
diff --git a/common/inspector.h b/common/inspector.h
new file mode 100644
index 0000000..ff167b1
--- /dev/null
+++ b/common/inspector.h
@@ -0,0 +1,52 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include "sink_export.h"
23#include <QObject>
24#include <Async/Async>
25
26#include "notification.h"
27#include "resourcecontext.h"
28
29namespace Sink {
30
31/**
32 * Synchronize and add what we don't already have to local queue
33 */
34class SINK_EXPORT Inspector : public QObject
35{
36 Q_OBJECT
37public:
38 Inspector(const ResourceContext &resourceContext);
39 virtual ~Inspector();
40
41 KAsync::Job<void> processCommand(void const *command, size_t size);
42
43signals:
44 void notify(Notification);
45
46protected:
47 virtual KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue);
48
49 Sink::ResourceContext mResourceContext;
50};
51
52}