From 6b1cf550608c2f17cbed9e375f15a4c14bfe8ace Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 22 Dec 2016 22:05:40 +0100 Subject: More Log::Context --- common/commandprocessor.cpp | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) (limited to 'common/commandprocessor.cpp') diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index efcd077..7cd4a5f 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -43,8 +43,9 @@ static int sCommitInterval = 10; using namespace Sink; using namespace Sink::Storage; -CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId) +CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx) : QObject(), + mLogCtx(ctx.subContext("commandprocessor")), mPipeline(pipeline), mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), @@ -130,7 +131,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data) } mSynchronizer->synchronize(query); } else { - SinkWarning() << "received invalid command"; + SinkWarningCtx(mLogCtx) << "received invalid command"; } } @@ -141,7 +142,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data) // auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); // client.currentRevision = buffer->revision(); // } else { -// SinkWarning() << "received invalid command"; +// SinkWarningCtx(mLogCtx) << "received invalid command"; // } // loadResource().setLowerBoundRevision(lowerBoundRevision()); // } @@ -179,7 +180,7 @@ void CommandProcessor::process() KAsync::Job CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand) { - SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); + SinkTraceCtx(mLogCtx) << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); const auto data = queuedCommand->command()->Data(); const auto size = queuedCommand->command()->size(); switch (queuedCommand->commandId()) { @@ -205,7 +206,7 @@ KAsync::Job CommandProcessor::processQueuedCommand(const QByteArray &dat { flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { - SinkWarning() << "invalid buffer"; + SinkWarningCtx(mLogCtx) << "invalid buffer"; // return KAsync::error(1, "Invalid Buffer"); } auto queuedCommand = Sink::GetQueuedCommand(data.constData()); @@ -214,10 +215,10 @@ KAsync::Job CommandProcessor::processQueuedCommand(const QByteArray &dat .then( [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job { if (error) { - SinkWarning() << "Error while processing queue command: " << error.errorMessage; + SinkWarningCtx(mLogCtx) << "Error while processing queue command: " << error.errorMessage; return KAsync::error(error); } - SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); + SinkTraceCtx(mLogCtx) << "Command pipeline processed: " << Sink::Commands::name(commandId); return KAsync::value(createdRevision); }); } @@ -234,13 +235,13 @@ KAsync::Job CommandProcessor::processQueue(MessageQueue *queue) time->start(); return processQueuedCommand(data) .syncThen([this, time](qint64 createdRevision) { - SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); + SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); }); }) - .then([queue](const KAsync::Error &error) { + .then([queue, this](const KAsync::Error &error) { if (error) { if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { - SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; + SinkWarningCtx(mLogCtx) << "Error while getting message from messagequeue: " << error.errorMessage; } } if (queue->isEmpty()) { @@ -258,7 +259,7 @@ KAsync::Job CommandProcessor::processPipeline() auto time = QSharedPointer::create(); time->start(); mPipeline->cleanupRevisions(mLowerBoundRevision); - SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed()); + SinkTraceCtx(mLogCtx) << "Cleanup done." << Log::TraceTime(time->elapsed()); // Go through all message queues if (mCommandQueues.isEmpty()) { @@ -273,7 +274,7 @@ KAsync::Job CommandProcessor::processPipeline() auto queue = it->next(); return processQueue(queue) .syncThen([this, time, it]() { - SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); + SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(time->elapsed()); if (it->hasNext()) { return KAsync::Continue; } @@ -325,11 +326,11 @@ KAsync::Job CommandProcessor::flush(void const *command, size_t size) const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id()); Q_ASSERT(!flushId.isEmpty()); if (flushType == Sink::Flush::FlushReplayQueue) { - SinkTrace() << "Flushing synchronizer "; + SinkTraceCtx(mLogCtx) << "Flushing synchronizer "; Q_ASSERT(mSynchronizer); mSynchronizer->flush(flushType, flushId); } else { - SinkTrace() << "Emitting flush completion" << flushId; + SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; Sink::Notification n; n.type = Sink::Notification::FlushCompletion; n.id = flushId; -- cgit v1.2.3