summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 16:56:28 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 16:56:28 +0100
commit885f185f55249a2e97e9c7c238f89a5d0d99d1df (patch)
tree5c597d61d1d2e3d9044443539873c50fea8d9a19
parent9fe1d50d7ace50f1f7efc66412dff006f20a2062 (diff)
downloadsink-885f185f55249a2e97e9c7c238f89a5d0d99d1df.tar.gz
sink-885f185f55249a2e97e9c7c238f89a5d0d99d1df.zip
Move the commandprocessor to a separate file.
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/commandprocessor.cpp193
-rw-r--r--common/commandprocessor.h79
-rw-r--r--common/genericresource.cpp189
-rw-r--r--common/genericresource.h3
5 files changed, 275 insertions, 190 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 8a16af4..018fc22 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -77,6 +77,7 @@ set(command_SRCS
77 indexer.cpp 77 indexer.cpp
78 mail/threadindexer.cpp 78 mail/threadindexer.cpp
79 notification.cpp 79 notification.cpp
80 commandprocessor.cpp
80 ${storage_SRCS}) 81 ${storage_SRCS})
81 82
82add_library(${PROJECT_NAME} SHARED ${command_SRCS}) 83add_library(${PROJECT_NAME} SHARED ${command_SRCS})
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
new file mode 100644
index 0000000..c9fca37
--- /dev/null
+++ b/common/commandprocessor.cpp
@@ -0,0 +1,193 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "commandprocessor.h"
21
22#include "commands.h"
23#include "messagequeue.h"
24#include "queuedcommand_generated.h"
25
26#include "pipeline.h"
27
28static int sBatchSize = 100;
29
30using namespace Sink;
31
32CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false), mLowerBoundRevision(0)
33{
34 for (auto queue : mCommandQueues) {
35 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process);
36 Q_UNUSED(ret);
37 }
38}
39
40void CommandProcessor::setOldestUsedRevision(qint64 revision)
41{
42 mLowerBoundRevision = revision;
43}
44
45void CommandProcessor::setInspectionCommand(const InspectionFunction &f)
46{
47 mInspect = f;
48}
49
50void CommandProcessor::setFlushCommand(const FlushFunction &f)
51{
52 mFlush = f;
53}
54
55bool CommandProcessor::messagesToProcessAvailable()
56{
57 for (auto queue : mCommandQueues) {
58 if (!queue->isEmpty()) {
59 return true;
60 }
61 }
62 return false;
63}
64
65void CommandProcessor::process()
66{
67 if (mProcessingLock) {
68 return;
69 }
70 mProcessingLock = true;
71 auto job = processPipeline()
72 .syncThen<void>([this]() {
73 mProcessingLock = false;
74 if (messagesToProcessAvailable()) {
75 process();
76 }
77 })
78 .exec();
79}
80
81KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand)
82{
83 SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId());
84 const auto data = queuedCommand->command()->Data();
85 const auto size = queuedCommand->command()->size();
86 switch (queuedCommand->commandId()) {
87 case Sink::Commands::DeleteEntityCommand:
88 return mPipeline->deletedEntity(data, size);
89 case Sink::Commands::ModifyEntityCommand:
90 return mPipeline->modifiedEntity(data, size);
91 case Sink::Commands::CreateEntityCommand:
92 return mPipeline->newEntity(data, size);
93 case Sink::Commands::InspectionCommand:
94 if (mInspect) {
95 return mInspect(data, size)
96 .syncThen<qint64>([]() { return -1; });
97 } else {
98 return KAsync::error<qint64>(-1, "Missing inspection command.");
99 }
100 case Sink::Commands::FlushCommand:
101 if (mFlush) {
102 return mFlush(data, size)
103 .syncThen<qint64>([]() { return -1; });
104 } else {
105 return KAsync::error<qint64>(-1, "Missing inspection command.");
106 }
107 default:
108 return KAsync::error<qint64>(-1, "Unhandled command");
109 }
110}
111
112KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const QByteArray &data)
113{
114 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
115 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
116 SinkWarning() << "invalid buffer";
117 // return KAsync::error<void, qint64>(1, "Invalid Buffer");
118 }
119 auto queuedCommand = Sink::GetQueuedCommand(data.constData());
120 const auto commandId = queuedCommand->commandId();
121 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId);
122 return processQueuedCommand(queuedCommand)
123 .then<qint64, qint64>(
124 [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> {
125 if (error) {
126 SinkWarning() << "Error while processing queue command: " << error.errorMessage;
127 return KAsync::error<qint64>(error);
128 }
129 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId);
130 return KAsync::value<qint64>(createdRevision);
131 });
132}
133
134// Process all messages of this queue
135KAsync::Job<void> CommandProcessor::processQueue(MessageQueue *queue)
136{
137 auto time = QSharedPointer<QTime>::create();
138 return KAsync::syncStart<void>([this]() { mPipeline->startTransaction(); })
139 .then(KAsync::dowhile(
140 [this, queue, time]() -> KAsync::Job<KAsync::ControlFlowFlag> {
141 return queue->dequeueBatch(sBatchSize,
142 [this, time](const QByteArray &data) -> KAsync::Job<void> {
143 time->start();
144 return processQueuedCommand(data)
145 .syncThen<void, qint64>([this, time](qint64 createdRevision) {
146 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
147 });
148 })
149 .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) {
150 if (error) {
151 if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) {
152 SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage;
153 }
154 }
155 if (queue->isEmpty()) {
156 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Break);
157 } else {
158 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Continue);
159 }
160 });
161 }))
162 .syncThen<void>([this](const KAsync::Error &) { mPipeline->commit(); });
163}
164
165KAsync::Job<void> CommandProcessor::processPipeline()
166{
167 auto time = QSharedPointer<QTime>::create();
168 time->start();
169 mPipeline->cleanupRevisions(mLowerBoundRevision);
170 SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed());
171
172 // Go through all message queues
173 if (mCommandQueues.isEmpty()) {
174 return KAsync::null<void>();
175 }
176 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues);
177 return KAsync::dowhile(
178 [it, this]() {
179 auto time = QSharedPointer<QTime>::create();
180 time->start();
181
182 auto queue = it->next();
183 return processQueue(queue)
184 .syncThen<KAsync::ControlFlowFlag>([this, time, it]() {
185 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed());
186 if (it->hasNext()) {
187 return KAsync::Continue;
188 }
189 return KAsync::Break;
190 });
191 });
192}
193
diff --git a/common/commandprocessor.h b/common/commandprocessor.h
new file mode 100644
index 0000000..51d845e
--- /dev/null
+++ b/common/commandprocessor.h
@@ -0,0 +1,79 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include "sink_export.h"
23
24#include <QObject>
25#include <Async/Async>
26#include <functional>
27#include "log.h"
28
29class MessageQueue;
30
31namespace Sink {
32 class Pipeline;
33 class QueuedCommand;
34
35/**
36 * Drives the pipeline using the output from all command queues
37 */
38class CommandProcessor : public QObject
39{
40 Q_OBJECT
41 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
42 typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction;
43 SINK_DEBUG_AREA("commandprocessor")
44
45public:
46 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues);
47
48 void setOldestUsedRevision(qint64 revision);
49
50 void setInspectionCommand(const InspectionFunction &f);
51
52 void setFlushCommand(const FlushFunction &f);
53
54signals:
55 void error(int errorCode, const QString &errorMessage);
56
57private:
58 bool messagesToProcessAvailable();
59
60private slots:
61 void process();
62 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand);
63 KAsync::Job<qint64> processQueuedCommand(const QByteArray &data);
64 // Process all messages of this queue
65 KAsync::Job<void> processQueue(MessageQueue *queue);
66 KAsync::Job<void> processPipeline();
67
68private:
69 Sink::Pipeline *mPipeline;
70 // Ordered by priority
71 QList<MessageQueue *> mCommandQueues;
72 bool mProcessingLock;
73 // The lowest revision we no longer need
74 qint64 mLowerBoundRevision;
75 InspectionFunction mInspect;
76 FlushFunction mFlush;
77};
78
79};
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 7b83957..3aa4fce 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -36,6 +36,7 @@
36#include "bufferutils.h" 36#include "bufferutils.h"
37#include "adaptorfactoryregistry.h" 37#include "adaptorfactoryregistry.h"
38#include "synchronizer.h" 38#include "synchronizer.h"
39#include "commandprocessor.h"
39 40
40#include <QUuid> 41#include <QUuid>
41#include <QDataStream> 42#include <QDataStream>
@@ -48,194 +49,6 @@ static int sCommitInterval = 10;
48using namespace Sink; 49using namespace Sink;
49using namespace Sink::Storage; 50using namespace Sink::Storage;
50 51
51/**
52 * Drives the pipeline using the output from all command queues
53 */
54class CommandProcessor : public QObject
55{
56 Q_OBJECT
57 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
58 typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction;
59 SINK_DEBUG_AREA("commandprocessor")
60
61public:
62 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false), mLowerBoundRevision(0)
63 {
64 for (auto queue : mCommandQueues) {
65 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process);
66 Q_UNUSED(ret);
67 }
68 }
69
70 void setOldestUsedRevision(qint64 revision)
71 {
72 mLowerBoundRevision = revision;
73 }
74
75 void setInspectionCommand(const InspectionFunction &f)
76 {
77 mInspect = f;
78 }
79
80 void setFlushCommand(const FlushFunction &f)
81 {
82 mFlush = f;
83 }
84
85signals:
86 void error(int errorCode, const QString &errorMessage);
87
88private:
89 bool messagesToProcessAvailable()
90 {
91 for (auto queue : mCommandQueues) {
92 if (!queue->isEmpty()) {
93 return true;
94 }
95 }
96 return false;
97 }
98
99private slots:
100 void process()
101 {
102 if (mProcessingLock) {
103 return;
104 }
105 mProcessingLock = true;
106 auto job = processPipeline()
107 .syncThen<void>([this]() {
108 mProcessingLock = false;
109 if (messagesToProcessAvailable()) {
110 process();
111 }
112 })
113 .exec();
114 }
115
116 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand)
117 {
118 SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId());
119 // Throw command into appropriate pipeline
120 switch (queuedCommand->commandId()) {
121 case Sink::Commands::DeleteEntityCommand:
122 return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
123 case Sink::Commands::ModifyEntityCommand:
124 return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
125 case Sink::Commands::CreateEntityCommand:
126 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
127 case Sink::Commands::InspectionCommand:
128 if (mInspect) {
129 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size())
130 .syncThen<qint64>([]() { return -1; });
131 } else {
132 return KAsync::error<qint64>(-1, "Missing inspection command.");
133 }
134 case Sink::Commands::FlushCommand:
135 if (mFlush) {
136 return mFlush(queuedCommand->command()->Data(), queuedCommand->command()->size())
137 .syncThen<qint64>([]() { return -1; });
138 } else {
139 return KAsync::error<qint64>(-1, "Missing inspection command.");
140 }
141 default:
142 return KAsync::error<qint64>(-1, "Unhandled command");
143 }
144 }
145
146 KAsync::Job<qint64> processQueuedCommand(const QByteArray &data)
147 {
148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
149 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
150 SinkWarning() << "invalid buffer";
151 // return KAsync::error<void, qint64>(1, "Invalid Buffer");
152 }
153 auto queuedCommand = Sink::GetQueuedCommand(data.constData());
154 const auto commandId = queuedCommand->commandId();
155 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId);
156 return processQueuedCommand(queuedCommand)
157 .then<qint64, qint64>(
158 [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> {
159 if (error) {
160 SinkWarning() << "Error while processing queue command: " << error.errorMessage;
161 return KAsync::error<qint64>(error);
162 }
163 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId);
164 return KAsync::value<qint64>(createdRevision);
165 });
166 }
167
168 // Process all messages of this queue
169 KAsync::Job<void> processQueue(MessageQueue *queue)
170 {
171 auto time = QSharedPointer<QTime>::create();
172 return KAsync::syncStart<void>([this]() { mPipeline->startTransaction(); })
173 .then(KAsync::dowhile(
174 [this, queue, time]() -> KAsync::Job<KAsync::ControlFlowFlag> {
175 return queue->dequeueBatch(sBatchSize,
176 [this, time](const QByteArray &data) -> KAsync::Job<void> {
177 time->start();
178 return processQueuedCommand(data)
179 .syncThen<void, qint64>([this, time](qint64 createdRevision) {
180 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
181 });
182 })
183 .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) {
184 if (error) {
185 if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) {
186 SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage;
187 }
188 }
189 if (queue->isEmpty()) {
190 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Break);
191 } else {
192 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Continue);
193 }
194 });
195 }))
196 .syncThen<void>([this](const KAsync::Error &) { mPipeline->commit(); });
197 }
198
199 KAsync::Job<void> processPipeline()
200 {
201 auto time = QSharedPointer<QTime>::create();
202 time->start();
203 mPipeline->cleanupRevisions(mLowerBoundRevision);
204 SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed());
205
206 // Go through all message queues
207 if (mCommandQueues.isEmpty()) {
208 return KAsync::null<void>();
209 }
210 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues);
211 return KAsync::dowhile(
212 [it, this]() {
213 auto time = QSharedPointer<QTime>::create();
214 time->start();
215
216 auto queue = it->next();
217 return processQueue(queue)
218 .syncThen<KAsync::ControlFlowFlag>([this, time, it]() {
219 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed());
220 if (it->hasNext()) {
221 return KAsync::Continue;
222 }
223 return KAsync::Break;
224 });
225 });
226 }
227
228private:
229 Sink::Pipeline *mPipeline;
230 // Ordered by priority
231 QList<MessageQueue *> mCommandQueues;
232 bool mProcessingLock;
233 // The lowest revision we no longer need
234 qint64 mLowerBoundRevision;
235 InspectionFunction mInspect;
236 FlushFunction mFlush;
237};
238
239GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) 52GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline )
240 : Sink::Resource(), 53 : Sink::Resource(),
241 mResourceContext(resourceContext), 54 mResourceContext(resourceContext),
diff --git a/common/genericresource.h b/common/genericresource.h
index a3a58b9..12f15f3 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -28,12 +28,11 @@
28 28
29#include <QTimer> 29#include <QTimer>
30 30
31class CommandProcessor;
32
33namespace Sink { 31namespace Sink {
34class Pipeline; 32class Pipeline;
35class Preprocessor; 33class Preprocessor;
36class Synchronizer; 34class Synchronizer;
35class CommandProcessor;
37 36
38/** 37/**
39 * Generic Resource implementation. 38 * Generic Resource implementation.