From 6b1cf550608c2f17cbed9e375f15a4c14bfe8ace Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 22 Dec 2016 22:05:40 +0100 Subject: More Log::Context --- common/commandprocessor.cpp | 29 +++++++++++++++-------------- common/commandprocessor.h | 4 ++-- common/genericresource.cpp | 4 ++-- common/log.h | 3 +++ common/modelresult.cpp | 30 +++++++++++++++--------------- common/modelresult.h | 4 +++- common/pipeline.cpp | 45 +++++++++++++++++++++++---------------------- common/pipeline.h | 2 +- common/store.cpp | 14 +++++++------- 9 files changed, 71 insertions(+), 64 deletions(-) (limited to 'common') diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index efcd077..7cd4a5f 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -43,8 +43,9 @@ static int sCommitInterval = 10; using namespace Sink; using namespace Sink::Storage; -CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId) +CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx) : QObject(), + mLogCtx(ctx.subContext("commandprocessor")), mPipeline(pipeline), mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), @@ -130,7 +131,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data) } mSynchronizer->synchronize(query); } else { - SinkWarning() << "received invalid command"; + SinkWarningCtx(mLogCtx) << "received invalid command"; } } @@ -141,7 +142,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data) // auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); // client.currentRevision = buffer->revision(); // } else { -// SinkWarning() << "received invalid command"; +// SinkWarningCtx(mLogCtx) << "received invalid command"; // } // loadResource().setLowerBoundRevision(lowerBoundRevision()); // } @@ -179,7 +180,7 @@ void CommandProcessor::process() KAsync::Job CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand) { - SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); + SinkTraceCtx(mLogCtx) << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); const auto data = queuedCommand->command()->Data(); const auto size = queuedCommand->command()->size(); switch (queuedCommand->commandId()) { @@ -205,7 +206,7 @@ KAsync::Job CommandProcessor::processQueuedCommand(const QByteArray &dat { flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { - SinkWarning() << "invalid buffer"; + SinkWarningCtx(mLogCtx) << "invalid buffer"; // return KAsync::error(1, "Invalid Buffer"); } auto queuedCommand = Sink::GetQueuedCommand(data.constData()); @@ -214,10 +215,10 @@ KAsync::Job CommandProcessor::processQueuedCommand(const QByteArray &dat .then( [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job { if (error) { - SinkWarning() << "Error while processing queue command: " << error.errorMessage; + SinkWarningCtx(mLogCtx) << "Error while processing queue command: " << error.errorMessage; return KAsync::error(error); } - SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); + SinkTraceCtx(mLogCtx) << "Command pipeline processed: " << Sink::Commands::name(commandId); return KAsync::value(createdRevision); }); } @@ -234,13 +235,13 @@ KAsync::Job CommandProcessor::processQueue(MessageQueue *queue) time->start(); return processQueuedCommand(data) .syncThen([this, time](qint64 createdRevision) { - SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); + SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); }); }) - .then([queue](const KAsync::Error &error) { + .then([queue, this](const KAsync::Error &error) { if (error) { if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { - SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; + SinkWarningCtx(mLogCtx) << "Error while getting message from messagequeue: " << error.errorMessage; } } if (queue->isEmpty()) { @@ -258,7 +259,7 @@ KAsync::Job CommandProcessor::processPipeline() auto time = QSharedPointer::create(); time->start(); mPipeline->cleanupRevisions(mLowerBoundRevision); - SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed()); + SinkTraceCtx(mLogCtx) << "Cleanup done." << Log::TraceTime(time->elapsed()); // Go through all message queues if (mCommandQueues.isEmpty()) { @@ -273,7 +274,7 @@ KAsync::Job CommandProcessor::processPipeline() auto queue = it->next(); return processQueue(queue) .syncThen([this, time, it]() { - SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); + SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(time->elapsed()); if (it->hasNext()) { return KAsync::Continue; } @@ -325,11 +326,11 @@ KAsync::Job CommandProcessor::flush(void const *command, size_t size) const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id()); Q_ASSERT(!flushId.isEmpty()); if (flushType == Sink::Flush::FlushReplayQueue) { - SinkTrace() << "Flushing synchronizer "; + SinkTraceCtx(mLogCtx) << "Flushing synchronizer "; Q_ASSERT(mSynchronizer); mSynchronizer->flush(flushType, flushId); } else { - SinkTrace() << "Emitting flush completion" << flushId; + SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; Sink::Notification n; n.type = Sink::Notification::FlushCompletion; n.id = flushId; diff --git a/common/commandprocessor.h b/common/commandprocessor.h index 81f93e5..eeb7ecf 100644 --- a/common/commandprocessor.h +++ b/common/commandprocessor.h @@ -43,10 +43,9 @@ namespace Sink { class CommandProcessor : public QObject { Q_OBJECT - SINK_DEBUG_AREA("commandprocessor") public: - CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId); + CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx); void setOldestUsedRevision(qint64 revision); @@ -79,6 +78,7 @@ private: KAsync::Job flush(void const *command, size_t size); + Sink::Log::Context mLogCtx; Sink::Pipeline *mPipeline; MessageQueue mUserQueue; MessageQueue mSynchronizerQueue; diff --git a/common/genericresource.cpp b/common/genericresource.cpp index c11e899..5ba9e5d 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -31,8 +31,8 @@ using namespace Sink::Storage; GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer &pipeline ) : Sink::Resource(), mResourceContext(resourceContext), - mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceContext)), - mProcessor(QSharedPointer::create(mPipeline.data(), resourceContext.instanceId())), + mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceContext, "resource." + resourceContext.instanceId())), + mProcessor(QSharedPointer::create(mPipeline.data(), resourceContext.instanceId(), "resource." + resourceContext.instanceId())), mError(0), mClientLowerBoundRevision(std::numeric_limits::max()) { diff --git a/common/log.h b/common/log.h index 9063ac8..fc2a3ed 100644 --- a/common/log.h +++ b/common/log.h @@ -8,6 +8,9 @@ namespace Log { struct Context { Context() = default; + Context(const QByteArray &n) : name(n) {} + Context(const char *n) : name(n) {} + QByteArray name; Context subContext(const QByteArray &sub) const { return Context{name + "." + sub}; diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 8e92365..6695484 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -34,8 +34,8 @@ static uint qHash(const Sink::ApplicationDomain::ApplicationDomainType &type) } template -ModelResult::ModelResult(const Sink::Query &query, const QList &propertyColumns) - : QAbstractItemModel(), mPropertyColumns(propertyColumns), mQuery(query) +ModelResult::ModelResult(const Sink::Query &query, const QList &propertyColumns, const Sink::Log::Context &ctx) + : QAbstractItemModel(), mLogCtx(ctx.subContext("modelresult")), mPropertyColumns(propertyColumns), mQuery(query) { } @@ -176,7 +176,7 @@ bool ModelResult::canFetchMore(const QModelIndex &parent) const template void ModelResult::fetchMore(const QModelIndex &parent) { - SinkTrace() << "Fetching more: " << parent; + SinkTraceCtx(mLogCtx) << "Fetching more: " << parent; fetchEntities(parent); } @@ -187,7 +187,7 @@ void ModelResult::add(const Ptr &value) const auto id = parentId(value); // Ignore updates we get before the initial fetch is done if (!mEntityChildrenFetched.contains(id)) { - SinkTrace() << "Too early" << id; + SinkTraceCtx(mLogCtx) << "Too early" << id; return; } if (mEntities.contains(childId)) { @@ -195,7 +195,7 @@ void ModelResult::add(const Ptr &value) return; } auto parent = createIndexFromId(id); - SinkTrace() << "Added entity " << childId << "id: " << value->identifier() << "parent: " << id; + SinkTraceCtx(mLogCtx) << "Added entity " << childId << "id: " << value->identifier() << "parent: " << id; const auto keys = mTree[id]; int index = 0; for (; index < keys.size(); index++) { @@ -203,13 +203,13 @@ void ModelResult::add(const Ptr &value) break; } } - // SinkTrace() << "Inserting rows " << index << parent; + // SinkTraceCtx(mLogCtx) << "Inserting rows " << index << parent; beginInsertRows(parent, index, index); mEntities.insert(childId, value); mTree[id].insert(index, childId); mParents.insert(childId, id); endInsertRows(); - // SinkTrace() << "Inserted rows " << mTree[id].size(); + // SinkTraceCtx(mLogCtx) << "Inserted rows " << mTree[id].size(); } @@ -219,7 +219,7 @@ void ModelResult::remove(const Ptr &value) auto childId = qHash(*value); auto id = parentId(value); auto parent = createIndexFromId(id); - SinkTrace() << "Removed entity" << childId; + SinkTraceCtx(mLogCtx) << "Removed entity" << childId; auto index = mTree[id].indexOf(childId); beginRemoveRows(parent, index, index); mEntities.remove(childId); @@ -236,18 +236,18 @@ void ModelResult::fetchEntities(const QModelIndex &parent) const auto id = getIdentifier(parent); mEntityChildrenFetchComplete.remove(id); mEntityChildrenFetched.insert(id); - SinkTrace() << "Loading child entities of parent " << id; + SinkTraceCtx(mLogCtx) << "Loading child entities of parent " << id; if (loadEntities) { loadEntities(parent.data(DomainObjectRole).template value()); } else { - SinkWarning() << "No way to fetch entities"; + SinkWarningCtx(mLogCtx) << "No way to fetch entities"; } } template void ModelResult::setFetcher(const std::function &fetcher) { - SinkTrace() << "Setting fetcher"; + SinkTraceCtx(mLogCtx) << "Setting fetcher"; loadEntities = fetcher; } @@ -262,7 +262,7 @@ void ModelResult::setEmitter(const typename Sink::ResultEmitter::Pt }); }); emitter->onModified([this](const Ptr &value) { - SinkTrace() << "Received modification: " << value->identifier(); + SinkTraceCtx(mLogCtx) << "Received modification: " << value->identifier(); threadBoundary.callInMainThread([this, value]() { modify(value); }); @@ -273,7 +273,7 @@ void ModelResult::setEmitter(const typename Sink::ResultEmitter::Pt }); }); emitter->onInitialResultSetComplete([this](const Ptr &parent, bool fetchedAll) { - SinkTrace() << "Initial result set complete"; + SinkTraceCtx(mLogCtx) << "Initial result set complete"; const qint64 parentId = parent ? qHash(*parent) : 0; const auto parentIndex = createIndexFromId(parentId); mEntityChildrenFetchComplete.insert(parentId); @@ -297,7 +297,7 @@ void ModelResult::modify(const Ptr &value) auto childId = qHash(*value); if (!mEntities.contains(childId)) { //Happens because the DatabaseQuery emits modifiations also if the item used to be filtered. - SinkTrace() << "Tried to modify a value that is not yet part of the model"; + SinkTraceCtx(mLogCtx) << "Tried to modify a value that is not yet part of the model"; add(value); return; } @@ -307,7 +307,7 @@ void ModelResult::modify(const Ptr &value) return; } auto parent = createIndexFromId(id); - SinkTrace() << "Modified entity" << childId; + SinkTraceCtx(mLogCtx) << "Modified entity" << childId; auto i = mTree[id].indexOf(childId); Q_ASSERT(i >= 0); mEntities.remove(childId); diff --git a/common/modelresult.h b/common/modelresult.h index b7fc0ec..daa48bd 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -26,6 +26,7 @@ #include #include #include "query.h" +#include "log.h" #include "resultprovider.h" #include "threadboundary.h" @@ -40,7 +41,7 @@ public: DomainObjectBaseRole }; - ModelResult(const Sink::Query &query, const QList &propertyColumns); + ModelResult(const Sink::Query &query, const QList &propertyColumns, const Sink::Log::Context &); void setEmitter(const typename Sink::ResultEmitter::Ptr &); @@ -67,6 +68,7 @@ private: QModelIndex createIndexFromId(const qint64 &id) const; void fetchEntities(const QModelIndex &parent); + Sink::Log::Context mLogCtx; // TODO we should be able to directly use T as index, with an appropriate hash function, and thus have a QMap and QList QMap mEntities; QMap /* child entity id*/> mTree; diff --git a/common/pipeline.cpp b/common/pipeline.cpp index c9a8092..4cb5f21 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -49,10 +49,11 @@ using namespace Sink::Storage; class Pipeline::Private { public: - Private(const ResourceContext &context) : resourceContext(context), entityStore(context, {"pipeline"}), revisionChanged(false) + Private(const ResourceContext &context, const Sink::Log::Context &ctx) : logCtx{ctx.subContext("pipeline")}, resourceContext(context), entityStore(context, ctx), revisionChanged(false) { } + Sink::Log::Context logCtx; ResourceContext resourceContext; Storage::EntityStore entityStore; QHash>> processors; @@ -62,7 +63,7 @@ public: }; -Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) +Pipeline::Pipeline(const ResourceContext &context, const Sink::Log::Context &ctx) : QObject(nullptr), d(new Private(context, ctx)) { //Create main store immediately on first start d->entityStore.startTransaction(DataStore::ReadWrite); @@ -90,7 +91,7 @@ void Pipeline::startTransaction() // for (auto processor : d->processors[bufferType]) { // processor->startBatch(); // } - SinkTrace() << "Starting transaction."; + SinkTraceCtx(d->logCtx) << "Starting transaction."; d->transactionTime.start(); d->transactionItemCount = 0; d->entityStore.startTransaction(DataStore::ReadWrite); @@ -109,7 +110,7 @@ void Pipeline::commit() } const auto revision = d->entityStore.maxRevision(); const auto elapsed = d->transactionTime.elapsed(); - SinkTrace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " + SinkTraceCtx(d->logCtx) << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; d->entityStore.commitTransaction(); if (d->revisionChanged) { @@ -125,7 +126,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) { flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Commands::VerifyCreateEntityBuffer(verifyer)) { - SinkWarning() << "invalid buffer, not a create entity buffer"; + SinkWarningCtx(d->logCtx) << "invalid buffer, not a create entity buffer"; return KAsync::error(0); } } @@ -137,7 +138,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) if (createEntity->entityId()) { key = QByteArray(reinterpret_cast(createEntity->entityId()->Data()), createEntity->entityId()->size()); if (d->entityStore.contains(bufferType, key)) { - SinkError() << "An entity with this id already exists: " << key; + SinkErrorCtx(d->logCtx) << "An entity with this id already exists: " << key; return KAsync::error(0); } } @@ -145,25 +146,25 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) if (key.isEmpty()) { key = DataStore::generateUid(); } - SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; + SinkTraceCtx(d->logCtx) << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; Q_ASSERT(!key.isEmpty()); { flatbuffers::Verifier verifyer(reinterpret_cast(createEntity->delta()->Data()), createEntity->delta()->size()); if (!VerifyEntityBuffer(verifyer)) { - SinkWarning() << "invalid buffer, not an entity buffer"; + SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer"; return KAsync::error(0); } } auto entity = GetEntity(createEntity->delta()->Data()); if (!entity->resource()->size() && !entity->local()->size()) { - SinkWarning() << "No local and no resource buffer while trying to create entity."; + SinkWarningCtx(d->logCtx) << "No local and no resource buffer while trying to create entity."; return KAsync::error(0); } auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); if (!adaptorFactory) { - SinkWarning() << "no adaptor factory for type " << bufferType; + SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType; return KAsync::error(0); } @@ -203,7 +204,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) { flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Commands::VerifyModifyEntityBuffer(verifyer)) { - SinkWarning() << "invalid buffer, not a modify entity buffer"; + SinkWarningCtx(d->logCtx) << "invalid buffer, not a modify entity buffer"; return KAsync::error(0); } } @@ -213,29 +214,29 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) if (modifyEntity->modifiedProperties()) { changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); } else { - SinkWarning() << "No changeset available"; + SinkWarningCtx(d->logCtx) << "No changeset available"; } const qint64 baseRevision = modifyEntity->revision(); const bool replayToSource = modifyEntity->replayToSource(); const QByteArray bufferType = QByteArray(reinterpret_cast(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); const QByteArray key = QByteArray(reinterpret_cast(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); - SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; + SinkTraceCtx(d->logCtx) << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; if (bufferType.isEmpty() || key.isEmpty()) { - SinkWarning() << "entity type or key " << bufferType << key; + SinkWarningCtx(d->logCtx) << "entity type or key " << bufferType << key; return KAsync::error(0); } { flatbuffers::Verifier verifyer(reinterpret_cast(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); if (!VerifyEntityBuffer(verifyer)) { - SinkWarning() << "invalid buffer, not an entity buffer"; + SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer"; return KAsync::error(0); } } auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); if (!adaptorFactory) { - SinkWarning() << "no adaptor factory for type " << bufferType; + SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType; return KAsync::error(0); } @@ -255,7 +256,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto changeset = diff.changedProperties(); const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); if (current.identifier().isEmpty()) { - SinkWarning() << "Failed to read current version: " << diff.identifier(); + SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); return KAsync::error(0); } @@ -276,11 +277,11 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) newEntity.setResource(targetResource); newEntity.setChangedProperties(newEntity.availableProperties().toSet()); - SinkTrace() << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; + SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; auto job = TypeHelper{bufferType}.operator(), ApplicationDomain::ApplicationDomainType&>(newEntity); job = job.syncThen([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { if (!error) { - SinkTrace() << "Move of " << current.identifier() << "was successfull"; + SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; if (isMove) { startTransaction(); flatbuffers::FlatBufferBuilder fbb; @@ -293,7 +294,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) commit(); } } else { - SinkError() << "Failed to move entity " << targetResource << " to resource " << current.identifier(); + SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); } }); job.exec(); @@ -321,7 +322,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) { flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { - SinkWarning() << "invalid buffer, not a delete entity buffer"; + SinkWarningCtx(d->logCtx) << "invalid buffer, not a delete entity buffer"; return KAsync::error(0); } } @@ -330,7 +331,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) const bool replayToSource = deleteEntity->replayToSource(); const QByteArray bufferType = QByteArray(reinterpret_cast(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); - SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; + SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { foreach (const auto &processor, d->processors[bufferType]) { diff --git a/common/pipeline.h b/common/pipeline.h index b663dea..c9982b7 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -46,7 +46,7 @@ class SINK_EXPORT Pipeline : public QObject Q_OBJECT public: - Pipeline(const ResourceContext &context); + Pipeline(const ResourceContext &context, const Sink::Log::Context &ctx); ~Pipeline(); void setPreprocessors(const QString &entityType, const QVector &preprocessors); diff --git a/common/store.cpp b/common/store.cpp index 8007626..554f540 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -57,7 +57,7 @@ QString Store::getTemporaryFilePath() /* * Returns a map of resource instance identifiers and resource type */ -static QMap getResources(const Sink::Query::Filter &query, const QByteArray &type = QByteArray()) +static QMap getResources(const Sink::Query::Filter &query, const QByteArray &type, const Sink::Log::Context &ctx) { const QList resourceFilter = query.ids; @@ -97,11 +97,11 @@ static QMap getResources(const Sink::Query::Filter &quer } resources.insert(res, configuredResources.value(res)); } else { - SinkWarning() << "Resource is not existing: " << res; + SinkWarningCtx(ctx) << "Resource is not existing: " << res; } } } - SinkTrace() << "Found resources: " << resources; + SinkTraceCtx(ctx) << "Found resources: " << resources; return resources; } @@ -133,7 +133,7 @@ QSharedPointer Store::loadModel(Query query) Log::Context ctx{query.id()}; query.setType(ApplicationDomain::getTypeName()); SinkTraceCtx(ctx) << "Loading model: " << query; - auto model = QSharedPointer>::create(query, query.requestedProperties); + auto model = QSharedPointer>::create(query, query.requestedProperties, ctx); //* Client defines lifetime of model //* The model lifetime defines the duration of live-queries @@ -142,7 +142,7 @@ QSharedPointer Store::loadModel(Query query) //* The result provider needs to live for as long as results are provided (until the last thread exits). // Query all resources and aggregate results - auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName()); + auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName(), ctx); auto aggregatingEmitter = AggregatingResultEmitter::Ptr::create(); model->setEmitter(aggregatingEmitter); @@ -297,7 +297,7 @@ KAsync::Job Store::synchronize(const Sink::Query &query) KAsync::Job Store::synchronize(const Sink::SyncScope &scope) { - auto resources = getResources(scope.getResourceFilter()).keys(); + auto resources = getResources(scope.getResourceFilter(), {}, {}).keys(); SinkLog() << "Synchronize" << resources; return KAsync::value(resources) .template each([scope](const QByteArray &resource) { @@ -377,7 +377,7 @@ QList Store::read(const Sink::Query &q) auto query = q; query.setFlags(Query::SynchronousQuery); QList list; - auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName()); + auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName(), ctx); auto aggregatingEmitter = AggregatingResultEmitter::Ptr::create(); aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){ SinkTraceCtx(ctx) << "Found value: " << value->identifier(); -- cgit v1.2.3