diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-12-22 22:05:40 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-12-22 22:05:40 +0100 |
commit | 6b1cf550608c2f17cbed9e375f15a4c14bfe8ace (patch) | |
tree | 2e693a731bb2e9ce1a793b14cef98cdc13e382cd | |
parent | b2ad8f785e801a35cadf254d827f56d648be510c (diff) | |
download | sink-6b1cf550608c2f17cbed9e375f15a4c14bfe8ace.tar.gz sink-6b1cf550608c2f17cbed9e375f15a4c14bfe8ace.zip |
More Log::Context
-rw-r--r-- | common/commandprocessor.cpp | 29 | ||||
-rw-r--r-- | common/commandprocessor.h | 4 | ||||
-rw-r--r-- | common/genericresource.cpp | 4 | ||||
-rw-r--r-- | common/log.h | 3 | ||||
-rw-r--r-- | common/modelresult.cpp | 30 | ||||
-rw-r--r-- | common/modelresult.h | 4 | ||||
-rw-r--r-- | common/pipeline.cpp | 45 | ||||
-rw-r--r-- | common/pipeline.h | 2 | ||||
-rw-r--r-- | common/store.cpp | 14 | ||||
-rw-r--r-- | tests/mailquerybenchmark.cpp | 2 | ||||
-rw-r--r-- | tests/pipelinebenchmark.cpp | 2 | ||||
-rw-r--r-- | tests/pipelinetest.cpp | 10 |
12 files changed, 78 insertions, 71 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index efcd077..7cd4a5f 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp | |||
@@ -43,8 +43,9 @@ static int sCommitInterval = 10; | |||
43 | using namespace Sink; | 43 | using namespace Sink; |
44 | using namespace Sink::Storage; | 44 | using namespace Sink::Storage; |
45 | 45 | ||
46 | CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId) | 46 | CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx) |
47 | : QObject(), | 47 | : QObject(), |
48 | mLogCtx(ctx.subContext("commandprocessor")), | ||
48 | mPipeline(pipeline), | 49 | mPipeline(pipeline), |
49 | mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), | 50 | mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), |
50 | mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), | 51 | mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), |
@@ -130,7 +131,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data) | |||
130 | } | 131 | } |
131 | mSynchronizer->synchronize(query); | 132 | mSynchronizer->synchronize(query); |
132 | } else { | 133 | } else { |
133 | SinkWarning() << "received invalid command"; | 134 | SinkWarningCtx(mLogCtx) << "received invalid command"; |
134 | } | 135 | } |
135 | } | 136 | } |
136 | 137 | ||
@@ -141,7 +142,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data) | |||
141 | // auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); | 142 | // auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); |
142 | // client.currentRevision = buffer->revision(); | 143 | // client.currentRevision = buffer->revision(); |
143 | // } else { | 144 | // } else { |
144 | // SinkWarning() << "received invalid command"; | 145 | // SinkWarningCtx(mLogCtx) << "received invalid command"; |
145 | // } | 146 | // } |
146 | // loadResource().setLowerBoundRevision(lowerBoundRevision()); | 147 | // loadResource().setLowerBoundRevision(lowerBoundRevision()); |
147 | // } | 148 | // } |
@@ -179,7 +180,7 @@ void CommandProcessor::process() | |||
179 | 180 | ||
180 | KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand) | 181 | KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand) |
181 | { | 182 | { |
182 | SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); | 183 | SinkTraceCtx(mLogCtx) << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); |
183 | const auto data = queuedCommand->command()->Data(); | 184 | const auto data = queuedCommand->command()->Data(); |
184 | const auto size = queuedCommand->command()->size(); | 185 | const auto size = queuedCommand->command()->size(); |
185 | switch (queuedCommand->commandId()) { | 186 | switch (queuedCommand->commandId()) { |
@@ -205,7 +206,7 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const QByteArray &dat | |||
205 | { | 206 | { |
206 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); | 207 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); |
207 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { | 208 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { |
208 | SinkWarning() << "invalid buffer"; | 209 | SinkWarningCtx(mLogCtx) << "invalid buffer"; |
209 | // return KAsync::error<void, qint64>(1, "Invalid Buffer"); | 210 | // return KAsync::error<void, qint64>(1, "Invalid Buffer"); |
210 | } | 211 | } |
211 | auto queuedCommand = Sink::GetQueuedCommand(data.constData()); | 212 | auto queuedCommand = Sink::GetQueuedCommand(data.constData()); |
@@ -214,10 +215,10 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const QByteArray &dat | |||
214 | .then<qint64, qint64>( | 215 | .then<qint64, qint64>( |
215 | [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> { | 216 | [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> { |
216 | if (error) { | 217 | if (error) { |
217 | SinkWarning() << "Error while processing queue command: " << error.errorMessage; | 218 | SinkWarningCtx(mLogCtx) << "Error while processing queue command: " << error.errorMessage; |
218 | return KAsync::error<qint64>(error); | 219 | return KAsync::error<qint64>(error); |
219 | } | 220 | } |
220 | SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); | 221 | SinkTraceCtx(mLogCtx) << "Command pipeline processed: " << Sink::Commands::name(commandId); |
221 | return KAsync::value<qint64>(createdRevision); | 222 | return KAsync::value<qint64>(createdRevision); |
222 | }); | 223 | }); |
223 | } | 224 | } |
@@ -234,13 +235,13 @@ KAsync::Job<void> CommandProcessor::processQueue(MessageQueue *queue) | |||
234 | time->start(); | 235 | time->start(); |
235 | return processQueuedCommand(data) | 236 | return processQueuedCommand(data) |
236 | .syncThen<void, qint64>([this, time](qint64 createdRevision) { | 237 | .syncThen<void, qint64>([this, time](qint64 createdRevision) { |
237 | SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); | 238 | SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); |
238 | }); | 239 | }); |
239 | }) | 240 | }) |
240 | .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) { | 241 | .then<KAsync::ControlFlowFlag>([queue, this](const KAsync::Error &error) { |
241 | if (error) { | 242 | if (error) { |
242 | if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { | 243 | if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { |
243 | SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; | 244 | SinkWarningCtx(mLogCtx) << "Error while getting message from messagequeue: " << error.errorMessage; |
244 | } | 245 | } |
245 | } | 246 | } |
246 | if (queue->isEmpty()) { | 247 | if (queue->isEmpty()) { |
@@ -258,7 +259,7 @@ KAsync::Job<void> CommandProcessor::processPipeline() | |||
258 | auto time = QSharedPointer<QTime>::create(); | 259 | auto time = QSharedPointer<QTime>::create(); |
259 | time->start(); | 260 | time->start(); |
260 | mPipeline->cleanupRevisions(mLowerBoundRevision); | 261 | mPipeline->cleanupRevisions(mLowerBoundRevision); |
261 | SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed()); | 262 | SinkTraceCtx(mLogCtx) << "Cleanup done." << Log::TraceTime(time->elapsed()); |
262 | 263 | ||
263 | // Go through all message queues | 264 | // Go through all message queues |
264 | if (mCommandQueues.isEmpty()) { | 265 | if (mCommandQueues.isEmpty()) { |
@@ -273,7 +274,7 @@ KAsync::Job<void> CommandProcessor::processPipeline() | |||
273 | auto queue = it->next(); | 274 | auto queue = it->next(); |
274 | return processQueue(queue) | 275 | return processQueue(queue) |
275 | .syncThen<KAsync::ControlFlowFlag>([this, time, it]() { | 276 | .syncThen<KAsync::ControlFlowFlag>([this, time, it]() { |
276 | SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); | 277 | SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(time->elapsed()); |
277 | if (it->hasNext()) { | 278 | if (it->hasNext()) { |
278 | return KAsync::Continue; | 279 | return KAsync::Continue; |
279 | } | 280 | } |
@@ -325,11 +326,11 @@ KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size) | |||
325 | const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id()); | 326 | const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id()); |
326 | Q_ASSERT(!flushId.isEmpty()); | 327 | Q_ASSERT(!flushId.isEmpty()); |
327 | if (flushType == Sink::Flush::FlushReplayQueue) { | 328 | if (flushType == Sink::Flush::FlushReplayQueue) { |
328 | SinkTrace() << "Flushing synchronizer "; | 329 | SinkTraceCtx(mLogCtx) << "Flushing synchronizer "; |
329 | Q_ASSERT(mSynchronizer); | 330 | Q_ASSERT(mSynchronizer); |
330 | mSynchronizer->flush(flushType, flushId); | 331 | mSynchronizer->flush(flushType, flushId); |
331 | } else { | 332 | } else { |
332 | SinkTrace() << "Emitting flush completion" << flushId; | 333 | SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; |
333 | Sink::Notification n; | 334 | Sink::Notification n; |
334 | n.type = Sink::Notification::FlushCompletion; | 335 | n.type = Sink::Notification::FlushCompletion; |
335 | n.id = flushId; | 336 | n.id = flushId; |
diff --git a/common/commandprocessor.h b/common/commandprocessor.h index 81f93e5..eeb7ecf 100644 --- a/common/commandprocessor.h +++ b/common/commandprocessor.h | |||
@@ -43,10 +43,9 @@ namespace Sink { | |||
43 | class CommandProcessor : public QObject | 43 | class CommandProcessor : public QObject |
44 | { | 44 | { |
45 | Q_OBJECT | 45 | Q_OBJECT |
46 | SINK_DEBUG_AREA("commandprocessor") | ||
47 | 46 | ||
48 | public: | 47 | public: |
49 | CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId); | 48 | CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx); |
50 | 49 | ||
51 | void setOldestUsedRevision(qint64 revision); | 50 | void setOldestUsedRevision(qint64 revision); |
52 | 51 | ||
@@ -79,6 +78,7 @@ private: | |||
79 | 78 | ||
80 | KAsync::Job<void> flush(void const *command, size_t size); | 79 | KAsync::Job<void> flush(void const *command, size_t size); |
81 | 80 | ||
81 | Sink::Log::Context mLogCtx; | ||
82 | Sink::Pipeline *mPipeline; | 82 | Sink::Pipeline *mPipeline; |
83 | MessageQueue mUserQueue; | 83 | MessageQueue mUserQueue; |
84 | MessageQueue mSynchronizerQueue; | 84 | MessageQueue mSynchronizerQueue; |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index c11e899..5ba9e5d 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -31,8 +31,8 @@ using namespace Sink::Storage; | |||
31 | GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) | 31 | GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) |
32 | : Sink::Resource(), | 32 | : Sink::Resource(), |
33 | mResourceContext(resourceContext), | 33 | mResourceContext(resourceContext), |
34 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)), | 34 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext, "resource." + resourceContext.instanceId())), |
35 | mProcessor(QSharedPointer<CommandProcessor>::create(mPipeline.data(), resourceContext.instanceId())), | 35 | mProcessor(QSharedPointer<CommandProcessor>::create(mPipeline.data(), resourceContext.instanceId(), "resource." + resourceContext.instanceId())), |
36 | mError(0), | 36 | mError(0), |
37 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 37 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
38 | { | 38 | { |
diff --git a/common/log.h b/common/log.h index 9063ac8..fc2a3ed 100644 --- a/common/log.h +++ b/common/log.h | |||
@@ -8,6 +8,9 @@ namespace Log { | |||
8 | 8 | ||
9 | struct Context { | 9 | struct Context { |
10 | Context() = default; | 10 | Context() = default; |
11 | Context(const QByteArray &n) : name(n) {} | ||
12 | Context(const char *n) : name(n) {} | ||
13 | |||
11 | QByteArray name; | 14 | QByteArray name; |
12 | Context subContext(const QByteArray &sub) const { | 15 | Context subContext(const QByteArray &sub) const { |
13 | return Context{name + "." + sub}; | 16 | return Context{name + "." + sub}; |
diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 8e92365..6695484 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp | |||
@@ -34,8 +34,8 @@ static uint qHash(const Sink::ApplicationDomain::ApplicationDomainType &type) | |||
34 | } | 34 | } |
35 | 35 | ||
36 | template <class T, class Ptr> | 36 | template <class T, class Ptr> |
37 | ModelResult<T, Ptr>::ModelResult(const Sink::Query &query, const QList<QByteArray> &propertyColumns) | 37 | ModelResult<T, Ptr>::ModelResult(const Sink::Query &query, const QList<QByteArray> &propertyColumns, const Sink::Log::Context &ctx) |
38 | : QAbstractItemModel(), mPropertyColumns(propertyColumns), mQuery(query) | 38 | : QAbstractItemModel(), mLogCtx(ctx.subContext("modelresult")), mPropertyColumns(propertyColumns), mQuery(query) |
39 | { | 39 | { |
40 | } | 40 | } |
41 | 41 | ||
@@ -176,7 +176,7 @@ bool ModelResult<T, Ptr>::canFetchMore(const QModelIndex &parent) const | |||
176 | template <class T, class Ptr> | 176 | template <class T, class Ptr> |
177 | void ModelResult<T, Ptr>::fetchMore(const QModelIndex &parent) | 177 | void ModelResult<T, Ptr>::fetchMore(const QModelIndex &parent) |
178 | { | 178 | { |
179 | SinkTrace() << "Fetching more: " << parent; | 179 | SinkTraceCtx(mLogCtx) << "Fetching more: " << parent; |
180 | fetchEntities(parent); | 180 | fetchEntities(parent); |
181 | } | 181 | } |
182 | 182 | ||
@@ -187,7 +187,7 @@ void ModelResult<T, Ptr>::add(const Ptr &value) | |||
187 | const auto id = parentId(value); | 187 | const auto id = parentId(value); |
188 | // Ignore updates we get before the initial fetch is done | 188 | // Ignore updates we get before the initial fetch is done |
189 | if (!mEntityChildrenFetched.contains(id)) { | 189 | if (!mEntityChildrenFetched.contains(id)) { |
190 | SinkTrace() << "Too early" << id; | 190 | SinkTraceCtx(mLogCtx) << "Too early" << id; |
191 | return; | 191 | return; |
192 | } | 192 | } |
193 | if (mEntities.contains(childId)) { | 193 | if (mEntities.contains(childId)) { |
@@ -195,7 +195,7 @@ void ModelResult<T, Ptr>::add(const Ptr &value) | |||
195 | return; | 195 | return; |
196 | } | 196 | } |
197 | auto parent = createIndexFromId(id); | 197 | auto parent = createIndexFromId(id); |
198 | SinkTrace() << "Added entity " << childId << "id: " << value->identifier() << "parent: " << id; | 198 | SinkTraceCtx(mLogCtx) << "Added entity " << childId << "id: " << value->identifier() << "parent: " << id; |
199 | const auto keys = mTree[id]; | 199 | const auto keys = mTree[id]; |
200 | int index = 0; | 200 | int index = 0; |
201 | for (; index < keys.size(); index++) { | 201 | for (; index < keys.size(); index++) { |
@@ -203,13 +203,13 @@ void ModelResult<T, Ptr>::add(const Ptr &value) | |||
203 | break; | 203 | break; |
204 | } | 204 | } |
205 | } | 205 | } |
206 | // SinkTrace() << "Inserting rows " << index << parent; | 206 | // SinkTraceCtx(mLogCtx) << "Inserting rows " << index << parent; |
207 | beginInsertRows(parent, index, index); | 207 | beginInsertRows(parent, index, index); |
208 | mEntities.insert(childId, value); | 208 | mEntities.insert(childId, value); |
209 | mTree[id].insert(index, childId); | 209 | mTree[id].insert(index, childId); |
210 | mParents.insert(childId, id); | 210 | mParents.insert(childId, id); |
211 | endInsertRows(); | 211 | endInsertRows(); |
212 | // SinkTrace() << "Inserted rows " << mTree[id].size(); | 212 | // SinkTraceCtx(mLogCtx) << "Inserted rows " << mTree[id].size(); |
213 | } | 213 | } |
214 | 214 | ||
215 | 215 | ||
@@ -219,7 +219,7 @@ void ModelResult<T, Ptr>::remove(const Ptr &value) | |||
219 | auto childId = qHash(*value); | 219 | auto childId = qHash(*value); |
220 | auto id = parentId(value); | 220 | auto id = parentId(value); |
221 | auto parent = createIndexFromId(id); | 221 | auto parent = createIndexFromId(id); |
222 | SinkTrace() << "Removed entity" << childId; | 222 | SinkTraceCtx(mLogCtx) << "Removed entity" << childId; |
223 | auto index = mTree[id].indexOf(childId); | 223 | auto index = mTree[id].indexOf(childId); |
224 | beginRemoveRows(parent, index, index); | 224 | beginRemoveRows(parent, index, index); |
225 | mEntities.remove(childId); | 225 | mEntities.remove(childId); |
@@ -236,18 +236,18 @@ void ModelResult<T, Ptr>::fetchEntities(const QModelIndex &parent) | |||
236 | const auto id = getIdentifier(parent); | 236 | const auto id = getIdentifier(parent); |
237 | mEntityChildrenFetchComplete.remove(id); | 237 | mEntityChildrenFetchComplete.remove(id); |
238 | mEntityChildrenFetched.insert(id); | 238 | mEntityChildrenFetched.insert(id); |
239 | SinkTrace() << "Loading child entities of parent " << id; | 239 | SinkTraceCtx(mLogCtx) << "Loading child entities of parent " << id; |
240 | if (loadEntities) { | 240 | if (loadEntities) { |
241 | loadEntities(parent.data(DomainObjectRole).template value<Ptr>()); | 241 | loadEntities(parent.data(DomainObjectRole).template value<Ptr>()); |
242 | } else { | 242 | } else { |
243 | SinkWarning() << "No way to fetch entities"; | 243 | SinkWarningCtx(mLogCtx) << "No way to fetch entities"; |
244 | } | 244 | } |
245 | } | 245 | } |
246 | 246 | ||
247 | template <class T, class Ptr> | 247 | template <class T, class Ptr> |
248 | void ModelResult<T, Ptr>::setFetcher(const std::function<void(const Ptr &parent)> &fetcher) | 248 | void ModelResult<T, Ptr>::setFetcher(const std::function<void(const Ptr &parent)> &fetcher) |
249 | { | 249 | { |
250 | SinkTrace() << "Setting fetcher"; | 250 | SinkTraceCtx(mLogCtx) << "Setting fetcher"; |
251 | loadEntities = fetcher; | 251 | loadEntities = fetcher; |
252 | } | 252 | } |
253 | 253 | ||
@@ -262,7 +262,7 @@ void ModelResult<T, Ptr>::setEmitter(const typename Sink::ResultEmitter<Ptr>::Pt | |||
262 | }); | 262 | }); |
263 | }); | 263 | }); |
264 | emitter->onModified([this](const Ptr &value) { | 264 | emitter->onModified([this](const Ptr &value) { |
265 | SinkTrace() << "Received modification: " << value->identifier(); | 265 | SinkTraceCtx(mLogCtx) << "Received modification: " << value->identifier(); |
266 | threadBoundary.callInMainThread([this, value]() { | 266 | threadBoundary.callInMainThread([this, value]() { |
267 | modify(value); | 267 | modify(value); |
268 | }); | 268 | }); |
@@ -273,7 +273,7 @@ void ModelResult<T, Ptr>::setEmitter(const typename Sink::ResultEmitter<Ptr>::Pt | |||
273 | }); | 273 | }); |
274 | }); | 274 | }); |
275 | emitter->onInitialResultSetComplete([this](const Ptr &parent, bool fetchedAll) { | 275 | emitter->onInitialResultSetComplete([this](const Ptr &parent, bool fetchedAll) { |
276 | SinkTrace() << "Initial result set complete"; | 276 | SinkTraceCtx(mLogCtx) << "Initial result set complete"; |
277 | const qint64 parentId = parent ? qHash(*parent) : 0; | 277 | const qint64 parentId = parent ? qHash(*parent) : 0; |
278 | const auto parentIndex = createIndexFromId(parentId); | 278 | const auto parentIndex = createIndexFromId(parentId); |
279 | mEntityChildrenFetchComplete.insert(parentId); | 279 | mEntityChildrenFetchComplete.insert(parentId); |
@@ -297,7 +297,7 @@ void ModelResult<T, Ptr>::modify(const Ptr &value) | |||
297 | auto childId = qHash(*value); | 297 | auto childId = qHash(*value); |
298 | if (!mEntities.contains(childId)) { | 298 | if (!mEntities.contains(childId)) { |
299 | //Happens because the DatabaseQuery emits modifiations also if the item used to be filtered. | 299 | //Happens because the DatabaseQuery emits modifiations also if the item used to be filtered. |
300 | SinkTrace() << "Tried to modify a value that is not yet part of the model"; | 300 | SinkTraceCtx(mLogCtx) << "Tried to modify a value that is not yet part of the model"; |
301 | add(value); | 301 | add(value); |
302 | return; | 302 | return; |
303 | } | 303 | } |
@@ -307,7 +307,7 @@ void ModelResult<T, Ptr>::modify(const Ptr &value) | |||
307 | return; | 307 | return; |
308 | } | 308 | } |
309 | auto parent = createIndexFromId(id); | 309 | auto parent = createIndexFromId(id); |
310 | SinkTrace() << "Modified entity" << childId; | 310 | SinkTraceCtx(mLogCtx) << "Modified entity" << childId; |
311 | auto i = mTree[id].indexOf(childId); | 311 | auto i = mTree[id].indexOf(childId); |
312 | Q_ASSERT(i >= 0); | 312 | Q_ASSERT(i >= 0); |
313 | mEntities.remove(childId); | 313 | mEntities.remove(childId); |
diff --git a/common/modelresult.h b/common/modelresult.h index b7fc0ec..daa48bd 100644 --- a/common/modelresult.h +++ b/common/modelresult.h | |||
@@ -26,6 +26,7 @@ | |||
26 | #include <QSharedPointer> | 26 | #include <QSharedPointer> |
27 | #include <functional> | 27 | #include <functional> |
28 | #include "query.h" | 28 | #include "query.h" |
29 | #include "log.h" | ||
29 | #include "resultprovider.h" | 30 | #include "resultprovider.h" |
30 | #include "threadboundary.h" | 31 | #include "threadboundary.h" |
31 | 32 | ||
@@ -40,7 +41,7 @@ public: | |||
40 | DomainObjectBaseRole | 41 | DomainObjectBaseRole |
41 | }; | 42 | }; |
42 | 43 | ||
43 | ModelResult(const Sink::Query &query, const QList<QByteArray> &propertyColumns); | 44 | ModelResult(const Sink::Query &query, const QList<QByteArray> &propertyColumns, const Sink::Log::Context &); |
44 | 45 | ||
45 | void setEmitter(const typename Sink::ResultEmitter<Ptr>::Ptr &); | 46 | void setEmitter(const typename Sink::ResultEmitter<Ptr>::Ptr &); |
46 | 47 | ||
@@ -67,6 +68,7 @@ private: | |||
67 | QModelIndex createIndexFromId(const qint64 &id) const; | 68 | QModelIndex createIndexFromId(const qint64 &id) const; |
68 | void fetchEntities(const QModelIndex &parent); | 69 | void fetchEntities(const QModelIndex &parent); |
69 | 70 | ||
71 | Sink::Log::Context mLogCtx; | ||
70 | // TODO we should be able to directly use T as index, with an appropriate hash function, and thus have a QMap<T, T> and QList<T> | 72 | // TODO we should be able to directly use T as index, with an appropriate hash function, and thus have a QMap<T, T> and QList<T> |
71 | QMap<qint64 /* entity id */, Ptr> mEntities; | 73 | QMap<qint64 /* entity id */, Ptr> mEntities; |
72 | QMap<qint64 /* parent entity id */, QList<qint64> /* child entity id*/> mTree; | 74 | QMap<qint64 /* parent entity id */, QList<qint64> /* child entity id*/> mTree; |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index c9a8092..4cb5f21 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -49,10 +49,11 @@ using namespace Sink::Storage; | |||
49 | class Pipeline::Private | 49 | class Pipeline::Private |
50 | { | 50 | { |
51 | public: | 51 | public: |
52 | Private(const ResourceContext &context) : resourceContext(context), entityStore(context, {"pipeline"}), revisionChanged(false) | 52 | Private(const ResourceContext &context, const Sink::Log::Context &ctx) : logCtx{ctx.subContext("pipeline")}, resourceContext(context), entityStore(context, ctx), revisionChanged(false) |
53 | { | 53 | { |
54 | } | 54 | } |
55 | 55 | ||
56 | Sink::Log::Context logCtx; | ||
56 | ResourceContext resourceContext; | 57 | ResourceContext resourceContext; |
57 | Storage::EntityStore entityStore; | 58 | Storage::EntityStore entityStore; |
58 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; | 59 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; |
@@ -62,7 +63,7 @@ public: | |||
62 | }; | 63 | }; |
63 | 64 | ||
64 | 65 | ||
65 | Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) | 66 | Pipeline::Pipeline(const ResourceContext &context, const Sink::Log::Context &ctx) : QObject(nullptr), d(new Private(context, ctx)) |
66 | { | 67 | { |
67 | //Create main store immediately on first start | 68 | //Create main store immediately on first start |
68 | d->entityStore.startTransaction(DataStore::ReadWrite); | 69 | d->entityStore.startTransaction(DataStore::ReadWrite); |
@@ -90,7 +91,7 @@ void Pipeline::startTransaction() | |||
90 | // for (auto processor : d->processors[bufferType]) { | 91 | // for (auto processor : d->processors[bufferType]) { |
91 | // processor->startBatch(); | 92 | // processor->startBatch(); |
92 | // } | 93 | // } |
93 | SinkTrace() << "Starting transaction."; | 94 | SinkTraceCtx(d->logCtx) << "Starting transaction."; |
94 | d->transactionTime.start(); | 95 | d->transactionTime.start(); |
95 | d->transactionItemCount = 0; | 96 | d->transactionItemCount = 0; |
96 | d->entityStore.startTransaction(DataStore::ReadWrite); | 97 | d->entityStore.startTransaction(DataStore::ReadWrite); |
@@ -109,7 +110,7 @@ void Pipeline::commit() | |||
109 | } | 110 | } |
110 | const auto revision = d->entityStore.maxRevision(); | 111 | const auto revision = d->entityStore.maxRevision(); |
111 | const auto elapsed = d->transactionTime.elapsed(); | 112 | const auto elapsed = d->transactionTime.elapsed(); |
112 | SinkTrace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " | 113 | SinkTraceCtx(d->logCtx) << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " |
113 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; | 114 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; |
114 | d->entityStore.commitTransaction(); | 115 | d->entityStore.commitTransaction(); |
115 | if (d->revisionChanged) { | 116 | if (d->revisionChanged) { |
@@ -125,7 +126,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
125 | { | 126 | { |
126 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 127 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
127 | if (!Commands::VerifyCreateEntityBuffer(verifyer)) { | 128 | if (!Commands::VerifyCreateEntityBuffer(verifyer)) { |
128 | SinkWarning() << "invalid buffer, not a create entity buffer"; | 129 | SinkWarningCtx(d->logCtx) << "invalid buffer, not a create entity buffer"; |
129 | return KAsync::error<qint64>(0); | 130 | return KAsync::error<qint64>(0); |
130 | } | 131 | } |
131 | } | 132 | } |
@@ -137,7 +138,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
137 | if (createEntity->entityId()) { | 138 | if (createEntity->entityId()) { |
138 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); | 139 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); |
139 | if (d->entityStore.contains(bufferType, key)) { | 140 | if (d->entityStore.contains(bufferType, key)) { |
140 | SinkError() << "An entity with this id already exists: " << key; | 141 | SinkErrorCtx(d->logCtx) << "An entity with this id already exists: " << key; |
141 | return KAsync::error<qint64>(0); | 142 | return KAsync::error<qint64>(0); |
142 | } | 143 | } |
143 | } | 144 | } |
@@ -145,25 +146,25 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
145 | if (key.isEmpty()) { | 146 | if (key.isEmpty()) { |
146 | key = DataStore::generateUid(); | 147 | key = DataStore::generateUid(); |
147 | } | 148 | } |
148 | SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 149 | SinkTraceCtx(d->logCtx) << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
149 | Q_ASSERT(!key.isEmpty()); | 150 | Q_ASSERT(!key.isEmpty()); |
150 | 151 | ||
151 | { | 152 | { |
152 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 153 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
153 | if (!VerifyEntityBuffer(verifyer)) { | 154 | if (!VerifyEntityBuffer(verifyer)) { |
154 | SinkWarning() << "invalid buffer, not an entity buffer"; | 155 | SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer"; |
155 | return KAsync::error<qint64>(0); | 156 | return KAsync::error<qint64>(0); |
156 | } | 157 | } |
157 | } | 158 | } |
158 | auto entity = GetEntity(createEntity->delta()->Data()); | 159 | auto entity = GetEntity(createEntity->delta()->Data()); |
159 | if (!entity->resource()->size() && !entity->local()->size()) { | 160 | if (!entity->resource()->size() && !entity->local()->size()) { |
160 | SinkWarning() << "No local and no resource buffer while trying to create entity."; | 161 | SinkWarningCtx(d->logCtx) << "No local and no resource buffer while trying to create entity."; |
161 | return KAsync::error<qint64>(0); | 162 | return KAsync::error<qint64>(0); |
162 | } | 163 | } |
163 | 164 | ||
164 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); | 165 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); |
165 | if (!adaptorFactory) { | 166 | if (!adaptorFactory) { |
166 | SinkWarning() << "no adaptor factory for type " << bufferType; | 167 | SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType; |
167 | return KAsync::error<qint64>(0); | 168 | return KAsync::error<qint64>(0); |
168 | } | 169 | } |
169 | 170 | ||
@@ -203,7 +204,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
203 | { | 204 | { |
204 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 205 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
205 | if (!Commands::VerifyModifyEntityBuffer(verifyer)) { | 206 | if (!Commands::VerifyModifyEntityBuffer(verifyer)) { |
206 | SinkWarning() << "invalid buffer, not a modify entity buffer"; | 207 | SinkWarningCtx(d->logCtx) << "invalid buffer, not a modify entity buffer"; |
207 | return KAsync::error<qint64>(0); | 208 | return KAsync::error<qint64>(0); |
208 | } | 209 | } |
209 | } | 210 | } |
@@ -213,29 +214,29 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
213 | if (modifyEntity->modifiedProperties()) { | 214 | if (modifyEntity->modifiedProperties()) { |
214 | changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); | 215 | changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); |
215 | } else { | 216 | } else { |
216 | SinkWarning() << "No changeset available"; | 217 | SinkWarningCtx(d->logCtx) << "No changeset available"; |
217 | } | 218 | } |
218 | const qint64 baseRevision = modifyEntity->revision(); | 219 | const qint64 baseRevision = modifyEntity->revision(); |
219 | const bool replayToSource = modifyEntity->replayToSource(); | 220 | const bool replayToSource = modifyEntity->replayToSource(); |
220 | 221 | ||
221 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); | 222 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); |
222 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); | 223 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); |
223 | SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 224 | SinkTraceCtx(d->logCtx) << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
224 | if (bufferType.isEmpty() || key.isEmpty()) { | 225 | if (bufferType.isEmpty() || key.isEmpty()) { |
225 | SinkWarning() << "entity type or key " << bufferType << key; | 226 | SinkWarningCtx(d->logCtx) << "entity type or key " << bufferType << key; |
226 | return KAsync::error<qint64>(0); | 227 | return KAsync::error<qint64>(0); |
227 | } | 228 | } |
228 | { | 229 | { |
229 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); | 230 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); |
230 | if (!VerifyEntityBuffer(verifyer)) { | 231 | if (!VerifyEntityBuffer(verifyer)) { |
231 | SinkWarning() << "invalid buffer, not an entity buffer"; | 232 | SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer"; |
232 | return KAsync::error<qint64>(0); | 233 | return KAsync::error<qint64>(0); |
233 | } | 234 | } |
234 | } | 235 | } |
235 | 236 | ||
236 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); | 237 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); |
237 | if (!adaptorFactory) { | 238 | if (!adaptorFactory) { |
238 | SinkWarning() << "no adaptor factory for type " << bufferType; | 239 | SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType; |
239 | return KAsync::error<qint64>(0); | 240 | return KAsync::error<qint64>(0); |
240 | } | 241 | } |
241 | 242 | ||
@@ -255,7 +256,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
255 | auto changeset = diff.changedProperties(); | 256 | auto changeset = diff.changedProperties(); |
256 | const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); | 257 | const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); |
257 | if (current.identifier().isEmpty()) { | 258 | if (current.identifier().isEmpty()) { |
258 | SinkWarning() << "Failed to read current version: " << diff.identifier(); | 259 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); |
259 | return KAsync::error<qint64>(0); | 260 | return KAsync::error<qint64>(0); |
260 | } | 261 | } |
261 | 262 | ||
@@ -276,11 +277,11 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
276 | newEntity.setResource(targetResource); | 277 | newEntity.setResource(targetResource); |
277 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); | 278 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); |
278 | 279 | ||
279 | SinkTrace() << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; | 280 | SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; |
280 | auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); | 281 | auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); |
281 | job = job.syncThen<void>([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { | 282 | job = job.syncThen<void>([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { |
282 | if (!error) { | 283 | if (!error) { |
283 | SinkTrace() << "Move of " << current.identifier() << "was successfull"; | 284 | SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; |
284 | if (isMove) { | 285 | if (isMove) { |
285 | startTransaction(); | 286 | startTransaction(); |
286 | flatbuffers::FlatBufferBuilder fbb; | 287 | flatbuffers::FlatBufferBuilder fbb; |
@@ -293,7 +294,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
293 | commit(); | 294 | commit(); |
294 | } | 295 | } |
295 | } else { | 296 | } else { |
296 | SinkError() << "Failed to move entity " << targetResource << " to resource " << current.identifier(); | 297 | SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); |
297 | } | 298 | } |
298 | }); | 299 | }); |
299 | job.exec(); | 300 | job.exec(); |
@@ -321,7 +322,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
321 | { | 322 | { |
322 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 323 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
323 | if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { | 324 | if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { |
324 | SinkWarning() << "invalid buffer, not a delete entity buffer"; | 325 | SinkWarningCtx(d->logCtx) << "invalid buffer, not a delete entity buffer"; |
325 | return KAsync::error<qint64>(0); | 326 | return KAsync::error<qint64>(0); |
326 | } | 327 | } |
327 | } | 328 | } |
@@ -330,7 +331,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
330 | const bool replayToSource = deleteEntity->replayToSource(); | 331 | const bool replayToSource = deleteEntity->replayToSource(); |
331 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); | 332 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); |
332 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 333 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
333 | SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 334 | SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
334 | 335 | ||
335 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { | 336 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { |
336 | foreach (const auto &processor, d->processors[bufferType]) { | 337 | foreach (const auto &processor, d->processors[bufferType]) { |
diff --git a/common/pipeline.h b/common/pipeline.h index b663dea..c9982b7 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -46,7 +46,7 @@ class SINK_EXPORT Pipeline : public QObject | |||
46 | Q_OBJECT | 46 | Q_OBJECT |
47 | 47 | ||
48 | public: | 48 | public: |
49 | Pipeline(const ResourceContext &context); | 49 | Pipeline(const ResourceContext &context, const Sink::Log::Context &ctx); |
50 | ~Pipeline(); | 50 | ~Pipeline(); |
51 | 51 | ||
52 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); | 52 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); |
diff --git a/common/store.cpp b/common/store.cpp index 8007626..554f540 100644 --- a/common/store.cpp +++ b/common/store.cpp | |||
@@ -57,7 +57,7 @@ QString Store::getTemporaryFilePath() | |||
57 | /* | 57 | /* |
58 | * Returns a map of resource instance identifiers and resource type | 58 | * Returns a map of resource instance identifiers and resource type |
59 | */ | 59 | */ |
60 | static QMap<QByteArray, QByteArray> getResources(const Sink::Query::Filter &query, const QByteArray &type = QByteArray()) | 60 | static QMap<QByteArray, QByteArray> getResources(const Sink::Query::Filter &query, const QByteArray &type, const Sink::Log::Context &ctx) |
61 | { | 61 | { |
62 | const QList<QByteArray> resourceFilter = query.ids; | 62 | const QList<QByteArray> resourceFilter = query.ids; |
63 | 63 | ||
@@ -97,11 +97,11 @@ static QMap<QByteArray, QByteArray> getResources(const Sink::Query::Filter &quer | |||
97 | } | 97 | } |
98 | resources.insert(res, configuredResources.value(res)); | 98 | resources.insert(res, configuredResources.value(res)); |
99 | } else { | 99 | } else { |
100 | SinkWarning() << "Resource is not existing: " << res; | 100 | SinkWarningCtx(ctx) << "Resource is not existing: " << res; |
101 | } | 101 | } |
102 | } | 102 | } |
103 | } | 103 | } |
104 | SinkTrace() << "Found resources: " << resources; | 104 | SinkTraceCtx(ctx) << "Found resources: " << resources; |
105 | return resources; | 105 | return resources; |
106 | } | 106 | } |
107 | 107 | ||
@@ -133,7 +133,7 @@ QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) | |||
133 | Log::Context ctx{query.id()}; | 133 | Log::Context ctx{query.id()}; |
134 | query.setType(ApplicationDomain::getTypeName<DomainType>()); | 134 | query.setType(ApplicationDomain::getTypeName<DomainType>()); |
135 | SinkTraceCtx(ctx) << "Loading model: " << query; | 135 | SinkTraceCtx(ctx) << "Loading model: " << query; |
136 | auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr>>::create(query, query.requestedProperties); | 136 | auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr>>::create(query, query.requestedProperties, ctx); |
137 | 137 | ||
138 | //* Client defines lifetime of model | 138 | //* Client defines lifetime of model |
139 | //* The model lifetime defines the duration of live-queries | 139 | //* The model lifetime defines the duration of live-queries |
@@ -142,7 +142,7 @@ QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) | |||
142 | //* The result provider needs to live for as long as results are provided (until the last thread exits). | 142 | //* The result provider needs to live for as long as results are provided (until the last thread exits). |
143 | 143 | ||
144 | // Query all resources and aggregate results | 144 | // Query all resources and aggregate results |
145 | auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName<DomainType>()); | 145 | auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName<DomainType>(), ctx); |
146 | auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create(); | 146 | auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create(); |
147 | model->setEmitter(aggregatingEmitter); | 147 | model->setEmitter(aggregatingEmitter); |
148 | 148 | ||
@@ -297,7 +297,7 @@ KAsync::Job<void> Store::synchronize(const Sink::Query &query) | |||
297 | 297 | ||
298 | KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope) | 298 | KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope) |
299 | { | 299 | { |
300 | auto resources = getResources(scope.getResourceFilter()).keys(); | 300 | auto resources = getResources(scope.getResourceFilter(), {}, {}).keys(); |
301 | SinkLog() << "Synchronize" << resources; | 301 | SinkLog() << "Synchronize" << resources; |
302 | return KAsync::value(resources) | 302 | return KAsync::value(resources) |
303 | .template each([scope](const QByteArray &resource) { | 303 | .template each([scope](const QByteArray &resource) { |
@@ -377,7 +377,7 @@ QList<DomainType> Store::read(const Sink::Query &q) | |||
377 | auto query = q; | 377 | auto query = q; |
378 | query.setFlags(Query::SynchronousQuery); | 378 | query.setFlags(Query::SynchronousQuery); |
379 | QList<DomainType> list; | 379 | QList<DomainType> list; |
380 | auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName<DomainType>()); | 380 | auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName<DomainType>(), ctx); |
381 | auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create(); | 381 | auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create(); |
382 | aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){ | 382 | aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){ |
383 | SinkTraceCtx(ctx) << "Found value: " << value->identifier(); | 383 | SinkTraceCtx(ctx) << "Found value: " << value->identifier(); |
diff --git a/tests/mailquerybenchmark.cpp b/tests/mailquerybenchmark.cpp index e55e744..080d30e 100644 --- a/tests/mailquerybenchmark.cpp +++ b/tests/mailquerybenchmark.cpp | |||
@@ -61,7 +61,7 @@ class MailQueryBenchmark : public QObject | |||
61 | { | 61 | { |
62 | TestResource::removeFromDisk(resourceIdentifier); | 62 | TestResource::removeFromDisk(resourceIdentifier); |
63 | 63 | ||
64 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(Sink::ResourceContext{resourceIdentifier, "test"}); | 64 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(Sink::ResourceContext{resourceIdentifier, "test"}, "test"); |
65 | 65 | ||
66 | auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create(); | 66 | auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create(); |
67 | 67 | ||
diff --git a/tests/pipelinebenchmark.cpp b/tests/pipelinebenchmark.cpp index 341c733..7052c4c 100644 --- a/tests/pipelinebenchmark.cpp +++ b/tests/pipelinebenchmark.cpp | |||
@@ -61,7 +61,7 @@ class PipelineBenchmark : public QObject | |||
61 | { | 61 | { |
62 | TestResource::removeFromDisk(resourceIdentifier); | 62 | TestResource::removeFromDisk(resourceIdentifier); |
63 | 63 | ||
64 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(Sink::ResourceContext{resourceIdentifier, "test"}); | 64 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(Sink::ResourceContext{resourceIdentifier, "test"}, "test"); |
65 | pipeline->setPreprocessors("mail", preprocessors); | 65 | pipeline->setPreprocessors("mail", preprocessors); |
66 | 66 | ||
67 | QTime time; | 67 | QTime time; |
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index e68aa53..0268ec5 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp | |||
@@ -213,7 +213,7 @@ private slots: | |||
213 | flatbuffers::FlatBufferBuilder entityFbb; | 213 | flatbuffers::FlatBufferBuilder entityFbb; |
214 | auto command = createEntityCommand(createEvent(entityFbb)); | 214 | auto command = createEntityCommand(createEvent(entityFbb)); |
215 | 215 | ||
216 | Sink::Pipeline pipeline(getContext()); | 216 | Sink::Pipeline pipeline(getContext(), {"test"}); |
217 | 217 | ||
218 | pipeline.startTransaction(); | 218 | pipeline.startTransaction(); |
219 | pipeline.newEntity(command.constData(), command.size()); | 219 | pipeline.newEntity(command.constData(), command.size()); |
@@ -236,7 +236,7 @@ private slots: | |||
236 | flatbuffers::FlatBufferBuilder entityFbb; | 236 | flatbuffers::FlatBufferBuilder entityFbb; |
237 | auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); | 237 | auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); |
238 | 238 | ||
239 | Sink::Pipeline pipeline(getContext()); | 239 | Sink::Pipeline pipeline(getContext(), {"test"}); |
240 | 240 | ||
241 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | 241 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); |
242 | 242 | ||
@@ -282,7 +282,7 @@ private slots: | |||
282 | flatbuffers::FlatBufferBuilder entityFbb; | 282 | flatbuffers::FlatBufferBuilder entityFbb; |
283 | auto command = createEntityCommand(createEvent(entityFbb)); | 283 | auto command = createEntityCommand(createEvent(entityFbb)); |
284 | 284 | ||
285 | Sink::Pipeline pipeline(getContext()); | 285 | Sink::Pipeline pipeline(getContext(), {"test"}); |
286 | 286 | ||
287 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | 287 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); |
288 | 288 | ||
@@ -325,7 +325,7 @@ private slots: | |||
325 | { | 325 | { |
326 | flatbuffers::FlatBufferBuilder entityFbb; | 326 | flatbuffers::FlatBufferBuilder entityFbb; |
327 | auto command = createEntityCommand(createEvent(entityFbb)); | 327 | auto command = createEntityCommand(createEvent(entityFbb)); |
328 | Sink::Pipeline pipeline(getContext()); | 328 | Sink::Pipeline pipeline(getContext(), {"test"}); |
329 | 329 | ||
330 | // Create the initial revision | 330 | // Create the initial revision |
331 | pipeline.startTransaction(); | 331 | pipeline.startTransaction(); |
@@ -359,7 +359,7 @@ private slots: | |||
359 | 359 | ||
360 | auto testProcessor = new TestProcessor; | 360 | auto testProcessor = new TestProcessor; |
361 | 361 | ||
362 | Sink::Pipeline pipeline(getContext()); | 362 | Sink::Pipeline pipeline(getContext(), {"test"}); |
363 | pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << testProcessor); | 363 | pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << testProcessor); |
364 | pipeline.startTransaction(); | 364 | pipeline.startTransaction(); |
365 | // pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); | 365 | // pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); |