summaryrefslogtreecommitdiffstats
path: root/common/commandprocessor.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 16:56:28 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 16:56:28 +0100
commit885f185f55249a2e97e9c7c238f89a5d0d99d1df (patch)
tree5c597d61d1d2e3d9044443539873c50fea8d9a19 /common/commandprocessor.cpp
parent9fe1d50d7ace50f1f7efc66412dff006f20a2062 (diff)
downloadsink-885f185f55249a2e97e9c7c238f89a5d0d99d1df.tar.gz
sink-885f185f55249a2e97e9c7c238f89a5d0d99d1df.zip
Move the commandprocessor to a separate file.
Diffstat (limited to 'common/commandprocessor.cpp')
-rw-r--r--common/commandprocessor.cpp193
1 files changed, 193 insertions, 0 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
new file mode 100644
index 0000000..c9fca37
--- /dev/null
+++ b/common/commandprocessor.cpp
@@ -0,0 +1,193 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "commandprocessor.h"
21
22#include "commands.h"
23#include "messagequeue.h"
24#include "queuedcommand_generated.h"
25
26#include "pipeline.h"
27
28static int sBatchSize = 100;
29
30using namespace Sink;
31
32CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false), mLowerBoundRevision(0)
33{
34 for (auto queue : mCommandQueues) {
35 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process);
36 Q_UNUSED(ret);
37 }
38}
39
40void CommandProcessor::setOldestUsedRevision(qint64 revision)
41{
42 mLowerBoundRevision = revision;
43}
44
45void CommandProcessor::setInspectionCommand(const InspectionFunction &f)
46{
47 mInspect = f;
48}
49
50void CommandProcessor::setFlushCommand(const FlushFunction &f)
51{
52 mFlush = f;
53}
54
55bool CommandProcessor::messagesToProcessAvailable()
56{
57 for (auto queue : mCommandQueues) {
58 if (!queue->isEmpty()) {
59 return true;
60 }
61 }
62 return false;
63}
64
65void CommandProcessor::process()
66{
67 if (mProcessingLock) {
68 return;
69 }
70 mProcessingLock = true;
71 auto job = processPipeline()
72 .syncThen<void>([this]() {
73 mProcessingLock = false;
74 if (messagesToProcessAvailable()) {
75 process();
76 }
77 })
78 .exec();
79}
80
81KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand)
82{
83 SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId());
84 const auto data = queuedCommand->command()->Data();
85 const auto size = queuedCommand->command()->size();
86 switch (queuedCommand->commandId()) {
87 case Sink::Commands::DeleteEntityCommand:
88 return mPipeline->deletedEntity(data, size);
89 case Sink::Commands::ModifyEntityCommand:
90 return mPipeline->modifiedEntity(data, size);
91 case Sink::Commands::CreateEntityCommand:
92 return mPipeline->newEntity(data, size);
93 case Sink::Commands::InspectionCommand:
94 if (mInspect) {
95 return mInspect(data, size)
96 .syncThen<qint64>([]() { return -1; });
97 } else {
98 return KAsync::error<qint64>(-1, "Missing inspection command.");
99 }
100 case Sink::Commands::FlushCommand:
101 if (mFlush) {
102 return mFlush(data, size)
103 .syncThen<qint64>([]() { return -1; });
104 } else {
105 return KAsync::error<qint64>(-1, "Missing inspection command.");
106 }
107 default:
108 return KAsync::error<qint64>(-1, "Unhandled command");
109 }
110}
111
112KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const QByteArray &data)
113{
114 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
115 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
116 SinkWarning() << "invalid buffer";
117 // return KAsync::error<void, qint64>(1, "Invalid Buffer");
118 }
119 auto queuedCommand = Sink::GetQueuedCommand(data.constData());
120 const auto commandId = queuedCommand->commandId();
121 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId);
122 return processQueuedCommand(queuedCommand)
123 .then<qint64, qint64>(
124 [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> {
125 if (error) {
126 SinkWarning() << "Error while processing queue command: " << error.errorMessage;
127 return KAsync::error<qint64>(error);
128 }
129 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId);
130 return KAsync::value<qint64>(createdRevision);
131 });
132}
133
134// Process all messages of this queue
135KAsync::Job<void> CommandProcessor::processQueue(MessageQueue *queue)
136{
137 auto time = QSharedPointer<QTime>::create();
138 return KAsync::syncStart<void>([this]() { mPipeline->startTransaction(); })
139 .then(KAsync::dowhile(
140 [this, queue, time]() -> KAsync::Job<KAsync::ControlFlowFlag> {
141 return queue->dequeueBatch(sBatchSize,
142 [this, time](const QByteArray &data) -> KAsync::Job<void> {
143 time->start();
144 return processQueuedCommand(data)
145 .syncThen<void, qint64>([this, time](qint64 createdRevision) {
146 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
147 });
148 })
149 .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) {
150 if (error) {
151 if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) {
152 SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage;
153 }
154 }
155 if (queue->isEmpty()) {
156 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Break);
157 } else {
158 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Continue);
159 }
160 });
161 }))
162 .syncThen<void>([this](const KAsync::Error &) { mPipeline->commit(); });
163}
164
165KAsync::Job<void> CommandProcessor::processPipeline()
166{
167 auto time = QSharedPointer<QTime>::create();
168 time->start();
169 mPipeline->cleanupRevisions(mLowerBoundRevision);
170 SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed());
171
172 // Go through all message queues
173 if (mCommandQueues.isEmpty()) {
174 return KAsync::null<void>();
175 }
176 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues);
177 return KAsync::dowhile(
178 [it, this]() {
179 auto time = QSharedPointer<QTime>::create();
180 time->start();
181
182 auto queue = it->next();
183 return processQueue(queue)
184 .syncThen<KAsync::ControlFlowFlag>([this, time, it]() {
185 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed());
186 if (it->hasNext()) {
187 return KAsync::Continue;
188 }
189 return KAsync::Break;
190 });
191 });
192}
193