diff options
Diffstat (limited to 'common/commandprocessor.cpp')
-rw-r--r-- | common/commandprocessor.cpp | 29 |
1 files changed, 15 insertions, 14 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index efcd077..7cd4a5f 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp | |||
@@ -43,8 +43,9 @@ static int sCommitInterval = 10; | |||
43 | using namespace Sink; | 43 | using namespace Sink; |
44 | using namespace Sink::Storage; | 44 | using namespace Sink::Storage; |
45 | 45 | ||
46 | CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId) | 46 | CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx) |
47 | : QObject(), | 47 | : QObject(), |
48 | mLogCtx(ctx.subContext("commandprocessor")), | ||
48 | mPipeline(pipeline), | 49 | mPipeline(pipeline), |
49 | mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), | 50 | mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), |
50 | mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), | 51 | mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), |
@@ -130,7 +131,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data) | |||
130 | } | 131 | } |
131 | mSynchronizer->synchronize(query); | 132 | mSynchronizer->synchronize(query); |
132 | } else { | 133 | } else { |
133 | SinkWarning() << "received invalid command"; | 134 | SinkWarningCtx(mLogCtx) << "received invalid command"; |
134 | } | 135 | } |
135 | } | 136 | } |
136 | 137 | ||
@@ -141,7 +142,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data) | |||
141 | // auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); | 142 | // auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); |
142 | // client.currentRevision = buffer->revision(); | 143 | // client.currentRevision = buffer->revision(); |
143 | // } else { | 144 | // } else { |
144 | // SinkWarning() << "received invalid command"; | 145 | // SinkWarningCtx(mLogCtx) << "received invalid command"; |
145 | // } | 146 | // } |
146 | // loadResource().setLowerBoundRevision(lowerBoundRevision()); | 147 | // loadResource().setLowerBoundRevision(lowerBoundRevision()); |
147 | // } | 148 | // } |
@@ -179,7 +180,7 @@ void CommandProcessor::process() | |||
179 | 180 | ||
180 | KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand) | 181 | KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand) |
181 | { | 182 | { |
182 | SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); | 183 | SinkTraceCtx(mLogCtx) << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); |
183 | const auto data = queuedCommand->command()->Data(); | 184 | const auto data = queuedCommand->command()->Data(); |
184 | const auto size = queuedCommand->command()->size(); | 185 | const auto size = queuedCommand->command()->size(); |
185 | switch (queuedCommand->commandId()) { | 186 | switch (queuedCommand->commandId()) { |
@@ -205,7 +206,7 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const QByteArray &dat | |||
205 | { | 206 | { |
206 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); | 207 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); |
207 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { | 208 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { |
208 | SinkWarning() << "invalid buffer"; | 209 | SinkWarningCtx(mLogCtx) << "invalid buffer"; |
209 | // return KAsync::error<void, qint64>(1, "Invalid Buffer"); | 210 | // return KAsync::error<void, qint64>(1, "Invalid Buffer"); |
210 | } | 211 | } |
211 | auto queuedCommand = Sink::GetQueuedCommand(data.constData()); | 212 | auto queuedCommand = Sink::GetQueuedCommand(data.constData()); |
@@ -214,10 +215,10 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const QByteArray &dat | |||
214 | .then<qint64, qint64>( | 215 | .then<qint64, qint64>( |
215 | [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> { | 216 | [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> { |
216 | if (error) { | 217 | if (error) { |
217 | SinkWarning() << "Error while processing queue command: " << error.errorMessage; | 218 | SinkWarningCtx(mLogCtx) << "Error while processing queue command: " << error.errorMessage; |
218 | return KAsync::error<qint64>(error); | 219 | return KAsync::error<qint64>(error); |
219 | } | 220 | } |
220 | SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); | 221 | SinkTraceCtx(mLogCtx) << "Command pipeline processed: " << Sink::Commands::name(commandId); |
221 | return KAsync::value<qint64>(createdRevision); | 222 | return KAsync::value<qint64>(createdRevision); |
222 | }); | 223 | }); |
223 | } | 224 | } |
@@ -234,13 +235,13 @@ KAsync::Job<void> CommandProcessor::processQueue(MessageQueue *queue) | |||
234 | time->start(); | 235 | time->start(); |
235 | return processQueuedCommand(data) | 236 | return processQueuedCommand(data) |
236 | .syncThen<void, qint64>([this, time](qint64 createdRevision) { | 237 | .syncThen<void, qint64>([this, time](qint64 createdRevision) { |
237 | SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); | 238 | SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); |
238 | }); | 239 | }); |
239 | }) | 240 | }) |
240 | .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) { | 241 | .then<KAsync::ControlFlowFlag>([queue, this](const KAsync::Error &error) { |
241 | if (error) { | 242 | if (error) { |
242 | if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { | 243 | if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { |
243 | SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; | 244 | SinkWarningCtx(mLogCtx) << "Error while getting message from messagequeue: " << error.errorMessage; |
244 | } | 245 | } |
245 | } | 246 | } |
246 | if (queue->isEmpty()) { | 247 | if (queue->isEmpty()) { |
@@ -258,7 +259,7 @@ KAsync::Job<void> CommandProcessor::processPipeline() | |||
258 | auto time = QSharedPointer<QTime>::create(); | 259 | auto time = QSharedPointer<QTime>::create(); |
259 | time->start(); | 260 | time->start(); |
260 | mPipeline->cleanupRevisions(mLowerBoundRevision); | 261 | mPipeline->cleanupRevisions(mLowerBoundRevision); |
261 | SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed()); | 262 | SinkTraceCtx(mLogCtx) << "Cleanup done." << Log::TraceTime(time->elapsed()); |
262 | 263 | ||
263 | // Go through all message queues | 264 | // Go through all message queues |
264 | if (mCommandQueues.isEmpty()) { | 265 | if (mCommandQueues.isEmpty()) { |
@@ -273,7 +274,7 @@ KAsync::Job<void> CommandProcessor::processPipeline() | |||
273 | auto queue = it->next(); | 274 | auto queue = it->next(); |
274 | return processQueue(queue) | 275 | return processQueue(queue) |
275 | .syncThen<KAsync::ControlFlowFlag>([this, time, it]() { | 276 | .syncThen<KAsync::ControlFlowFlag>([this, time, it]() { |
276 | SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); | 277 | SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(time->elapsed()); |
277 | if (it->hasNext()) { | 278 | if (it->hasNext()) { |
278 | return KAsync::Continue; | 279 | return KAsync::Continue; |
279 | } | 280 | } |
@@ -325,11 +326,11 @@ KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size) | |||
325 | const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id()); | 326 | const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id()); |
326 | Q_ASSERT(!flushId.isEmpty()); | 327 | Q_ASSERT(!flushId.isEmpty()); |
327 | if (flushType == Sink::Flush::FlushReplayQueue) { | 328 | if (flushType == Sink::Flush::FlushReplayQueue) { |
328 | SinkTrace() << "Flushing synchronizer "; | 329 | SinkTraceCtx(mLogCtx) << "Flushing synchronizer "; |
329 | Q_ASSERT(mSynchronizer); | 330 | Q_ASSERT(mSynchronizer); |
330 | mSynchronizer->flush(flushType, flushId); | 331 | mSynchronizer->flush(flushType, flushId); |
331 | } else { | 332 | } else { |
332 | SinkTrace() << "Emitting flush completion" << flushId; | 333 | SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; |
333 | Sink::Notification n; | 334 | Sink::Notification n; |
334 | n.type = Sink::Notification::FlushCompletion; | 335 | n.type = Sink::Notification::FlushCompletion; |
335 | n.id = flushId; | 336 | n.id = flushId; |