summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 16:56:28 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 16:56:28 +0100
commit885f185f55249a2e97e9c7c238f89a5d0d99d1df (patch)
tree5c597d61d1d2e3d9044443539873c50fea8d9a19 /common/genericresource.cpp
parent9fe1d50d7ace50f1f7efc66412dff006f20a2062 (diff)
downloadsink-885f185f55249a2e97e9c7c238f89a5d0d99d1df.tar.gz
sink-885f185f55249a2e97e9c7c238f89a5d0d99d1df.zip
Move the commandprocessor to a separate file.
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp189
1 files changed, 1 insertions, 188 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 7b83957..3aa4fce 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -36,6 +36,7 @@
36#include "bufferutils.h" 36#include "bufferutils.h"
37#include "adaptorfactoryregistry.h" 37#include "adaptorfactoryregistry.h"
38#include "synchronizer.h" 38#include "synchronizer.h"
39#include "commandprocessor.h"
39 40
40#include <QUuid> 41#include <QUuid>
41#include <QDataStream> 42#include <QDataStream>
@@ -48,194 +49,6 @@ static int sCommitInterval = 10;
48using namespace Sink; 49using namespace Sink;
49using namespace Sink::Storage; 50using namespace Sink::Storage;
50 51
51/**
52 * Drives the pipeline using the output from all command queues
53 */
54class CommandProcessor : public QObject
55{
56 Q_OBJECT
57 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
58 typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction;
59 SINK_DEBUG_AREA("commandprocessor")
60
61public:
62 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false), mLowerBoundRevision(0)
63 {
64 for (auto queue : mCommandQueues) {
65 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process);
66 Q_UNUSED(ret);
67 }
68 }
69
70 void setOldestUsedRevision(qint64 revision)
71 {
72 mLowerBoundRevision = revision;
73 }
74
75 void setInspectionCommand(const InspectionFunction &f)
76 {
77 mInspect = f;
78 }
79
80 void setFlushCommand(const FlushFunction &f)
81 {
82 mFlush = f;
83 }
84
85signals:
86 void error(int errorCode, const QString &errorMessage);
87
88private:
89 bool messagesToProcessAvailable()
90 {
91 for (auto queue : mCommandQueues) {
92 if (!queue->isEmpty()) {
93 return true;
94 }
95 }
96 return false;
97 }
98
99private slots:
100 void process()
101 {
102 if (mProcessingLock) {
103 return;
104 }
105 mProcessingLock = true;
106 auto job = processPipeline()
107 .syncThen<void>([this]() {
108 mProcessingLock = false;
109 if (messagesToProcessAvailable()) {
110 process();
111 }
112 })
113 .exec();
114 }
115
116 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand)
117 {
118 SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId());
119 // Throw command into appropriate pipeline
120 switch (queuedCommand->commandId()) {
121 case Sink::Commands::DeleteEntityCommand:
122 return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
123 case Sink::Commands::ModifyEntityCommand:
124 return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
125 case Sink::Commands::CreateEntityCommand:
126 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
127 case Sink::Commands::InspectionCommand:
128 if (mInspect) {
129 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size())
130 .syncThen<qint64>([]() { return -1; });
131 } else {
132 return KAsync::error<qint64>(-1, "Missing inspection command.");
133 }
134 case Sink::Commands::FlushCommand:
135 if (mFlush) {
136 return mFlush(queuedCommand->command()->Data(), queuedCommand->command()->size())
137 .syncThen<qint64>([]() { return -1; });
138 } else {
139 return KAsync::error<qint64>(-1, "Missing inspection command.");
140 }
141 default:
142 return KAsync::error<qint64>(-1, "Unhandled command");
143 }
144 }
145
146 KAsync::Job<qint64> processQueuedCommand(const QByteArray &data)
147 {
148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
149 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
150 SinkWarning() << "invalid buffer";
151 // return KAsync::error<void, qint64>(1, "Invalid Buffer");
152 }
153 auto queuedCommand = Sink::GetQueuedCommand(data.constData());
154 const auto commandId = queuedCommand->commandId();
155 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId);
156 return processQueuedCommand(queuedCommand)
157 .then<qint64, qint64>(
158 [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> {
159 if (error) {
160 SinkWarning() << "Error while processing queue command: " << error.errorMessage;
161 return KAsync::error<qint64>(error);
162 }
163 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId);
164 return KAsync::value<qint64>(createdRevision);
165 });
166 }
167
168 // Process all messages of this queue
169 KAsync::Job<void> processQueue(MessageQueue *queue)
170 {
171 auto time = QSharedPointer<QTime>::create();
172 return KAsync::syncStart<void>([this]() { mPipeline->startTransaction(); })
173 .then(KAsync::dowhile(
174 [this, queue, time]() -> KAsync::Job<KAsync::ControlFlowFlag> {
175 return queue->dequeueBatch(sBatchSize,
176 [this, time](const QByteArray &data) -> KAsync::Job<void> {
177 time->start();
178 return processQueuedCommand(data)
179 .syncThen<void, qint64>([this, time](qint64 createdRevision) {
180 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
181 });
182 })
183 .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) {
184 if (error) {
185 if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) {
186 SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage;
187 }
188 }
189 if (queue->isEmpty()) {
190 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Break);
191 } else {
192 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Continue);
193 }
194 });
195 }))
196 .syncThen<void>([this](const KAsync::Error &) { mPipeline->commit(); });
197 }
198
199 KAsync::Job<void> processPipeline()
200 {
201 auto time = QSharedPointer<QTime>::create();
202 time->start();
203 mPipeline->cleanupRevisions(mLowerBoundRevision);
204 SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed());
205
206 // Go through all message queues
207 if (mCommandQueues.isEmpty()) {
208 return KAsync::null<void>();
209 }
210 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues);
211 return KAsync::dowhile(
212 [it, this]() {
213 auto time = QSharedPointer<QTime>::create();
214 time->start();
215
216 auto queue = it->next();
217 return processQueue(queue)
218 .syncThen<KAsync::ControlFlowFlag>([this, time, it]() {
219 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed());
220 if (it->hasNext()) {
221 return KAsync::Continue;
222 }
223 return KAsync::Break;
224 });
225 });
226 }
227
228private:
229 Sink::Pipeline *mPipeline;
230 // Ordered by priority
231 QList<MessageQueue *> mCommandQueues;
232 bool mProcessingLock;
233 // The lowest revision we no longer need
234 qint64 mLowerBoundRevision;
235 InspectionFunction mInspect;
236 FlushFunction mFlush;
237};
238
239GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) 52GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline )
240 : Sink::Resource(), 53 : Sink::Resource(),
241 mResourceContext(resourceContext), 54 mResourceContext(resourceContext),