summaryrefslogtreecommitdiffstats
path: root/common/commandprocessor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/commandprocessor.cpp')
-rw-r--r--common/commandprocessor.cpp29
1 files changed, 15 insertions, 14 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
index efcd077..7cd4a5f 100644
--- a/common/commandprocessor.cpp
+++ b/common/commandprocessor.cpp
@@ -43,8 +43,9 @@ static int sCommitInterval = 10;
43using namespace Sink; 43using namespace Sink;
44using namespace Sink::Storage; 44using namespace Sink::Storage;
45 45
46CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId) 46CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx)
47 : QObject(), 47 : QObject(),
48 mLogCtx(ctx.subContext("commandprocessor")),
48 mPipeline(pipeline), 49 mPipeline(pipeline),
49 mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), 50 mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"),
50 mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), 51 mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"),
@@ -130,7 +131,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data)
130 } 131 }
131 mSynchronizer->synchronize(query); 132 mSynchronizer->synchronize(query);
132 } else { 133 } else {
133 SinkWarning() << "received invalid command"; 134 SinkWarningCtx(mLogCtx) << "received invalid command";
134 } 135 }
135} 136}
136 137
@@ -141,7 +142,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data)
141// auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); 142// auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData());
142// client.currentRevision = buffer->revision(); 143// client.currentRevision = buffer->revision();
143// } else { 144// } else {
144// SinkWarning() << "received invalid command"; 145// SinkWarningCtx(mLogCtx) << "received invalid command";
145// } 146// }
146// loadResource().setLowerBoundRevision(lowerBoundRevision()); 147// loadResource().setLowerBoundRevision(lowerBoundRevision());
147// } 148// }
@@ -179,7 +180,7 @@ void CommandProcessor::process()
179 180
180KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand) 181KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand)
181{ 182{
182 SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); 183 SinkTraceCtx(mLogCtx) << "Processing command: " << Sink::Commands::name(queuedCommand->commandId());
183 const auto data = queuedCommand->command()->Data(); 184 const auto data = queuedCommand->command()->Data();
184 const auto size = queuedCommand->command()->size(); 185 const auto size = queuedCommand->command()->size();
185 switch (queuedCommand->commandId()) { 186 switch (queuedCommand->commandId()) {
@@ -205,7 +206,7 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const QByteArray &dat
205{ 206{
206 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); 207 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
207 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { 208 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
208 SinkWarning() << "invalid buffer"; 209 SinkWarningCtx(mLogCtx) << "invalid buffer";
209 // return KAsync::error<void, qint64>(1, "Invalid Buffer"); 210 // return KAsync::error<void, qint64>(1, "Invalid Buffer");
210 } 211 }
211 auto queuedCommand = Sink::GetQueuedCommand(data.constData()); 212 auto queuedCommand = Sink::GetQueuedCommand(data.constData());
@@ -214,10 +215,10 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const QByteArray &dat
214 .then<qint64, qint64>( 215 .then<qint64, qint64>(
215 [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> { 216 [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> {
216 if (error) { 217 if (error) {
217 SinkWarning() << "Error while processing queue command: " << error.errorMessage; 218 SinkWarningCtx(mLogCtx) << "Error while processing queue command: " << error.errorMessage;
218 return KAsync::error<qint64>(error); 219 return KAsync::error<qint64>(error);
219 } 220 }
220 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); 221 SinkTraceCtx(mLogCtx) << "Command pipeline processed: " << Sink::Commands::name(commandId);
221 return KAsync::value<qint64>(createdRevision); 222 return KAsync::value<qint64>(createdRevision);
222 }); 223 });
223} 224}
@@ -234,13 +235,13 @@ KAsync::Job<void> CommandProcessor::processQueue(MessageQueue *queue)
234 time->start(); 235 time->start();
235 return processQueuedCommand(data) 236 return processQueuedCommand(data)
236 .syncThen<void, qint64>([this, time](qint64 createdRevision) { 237 .syncThen<void, qint64>([this, time](qint64 createdRevision) {
237 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); 238 SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
238 }); 239 });
239 }) 240 })
240 .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) { 241 .then<KAsync::ControlFlowFlag>([queue, this](const KAsync::Error &error) {
241 if (error) { 242 if (error) {
242 if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { 243 if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) {
243 SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; 244 SinkWarningCtx(mLogCtx) << "Error while getting message from messagequeue: " << error.errorMessage;
244 } 245 }
245 } 246 }
246 if (queue->isEmpty()) { 247 if (queue->isEmpty()) {
@@ -258,7 +259,7 @@ KAsync::Job<void> CommandProcessor::processPipeline()
258 auto time = QSharedPointer<QTime>::create(); 259 auto time = QSharedPointer<QTime>::create();
259 time->start(); 260 time->start();
260 mPipeline->cleanupRevisions(mLowerBoundRevision); 261 mPipeline->cleanupRevisions(mLowerBoundRevision);
261 SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed()); 262 SinkTraceCtx(mLogCtx) << "Cleanup done." << Log::TraceTime(time->elapsed());
262 263
263 // Go through all message queues 264 // Go through all message queues
264 if (mCommandQueues.isEmpty()) { 265 if (mCommandQueues.isEmpty()) {
@@ -273,7 +274,7 @@ KAsync::Job<void> CommandProcessor::processPipeline()
273 auto queue = it->next(); 274 auto queue = it->next();
274 return processQueue(queue) 275 return processQueue(queue)
275 .syncThen<KAsync::ControlFlowFlag>([this, time, it]() { 276 .syncThen<KAsync::ControlFlowFlag>([this, time, it]() {
276 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); 277 SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(time->elapsed());
277 if (it->hasNext()) { 278 if (it->hasNext()) {
278 return KAsync::Continue; 279 return KAsync::Continue;
279 } 280 }
@@ -325,11 +326,11 @@ KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size)
325 const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id()); 326 const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id());
326 Q_ASSERT(!flushId.isEmpty()); 327 Q_ASSERT(!flushId.isEmpty());
327 if (flushType == Sink::Flush::FlushReplayQueue) { 328 if (flushType == Sink::Flush::FlushReplayQueue) {
328 SinkTrace() << "Flushing synchronizer "; 329 SinkTraceCtx(mLogCtx) << "Flushing synchronizer ";
329 Q_ASSERT(mSynchronizer); 330 Q_ASSERT(mSynchronizer);
330 mSynchronizer->flush(flushType, flushId); 331 mSynchronizer->flush(flushType, flushId);
331 } else { 332 } else {
332 SinkTrace() << "Emitting flush completion" << flushId; 333 SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId;
333 Sink::Notification n; 334 Sink::Notification n;
334 n.type = Sink::Notification::FlushCompletion; 335 n.type = Sink::Notification::FlushCompletion;
335 n.id = flushId; 336 n.id = flushId;