summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-12-22 22:05:40 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-12-22 22:05:40 +0100
commit6b1cf550608c2f17cbed9e375f15a4c14bfe8ace (patch)
tree2e693a731bb2e9ce1a793b14cef98cdc13e382cd
parentb2ad8f785e801a35cadf254d827f56d648be510c (diff)
downloadsink-6b1cf550608c2f17cbed9e375f15a4c14bfe8ace.tar.gz
sink-6b1cf550608c2f17cbed9e375f15a4c14bfe8ace.zip
More Log::Context
-rw-r--r--common/commandprocessor.cpp29
-rw-r--r--common/commandprocessor.h4
-rw-r--r--common/genericresource.cpp4
-rw-r--r--common/log.h3
-rw-r--r--common/modelresult.cpp30
-rw-r--r--common/modelresult.h4
-rw-r--r--common/pipeline.cpp45
-rw-r--r--common/pipeline.h2
-rw-r--r--common/store.cpp14
-rw-r--r--tests/mailquerybenchmark.cpp2
-rw-r--r--tests/pipelinebenchmark.cpp2
-rw-r--r--tests/pipelinetest.cpp10
12 files changed, 78 insertions, 71 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
index efcd077..7cd4a5f 100644
--- a/common/commandprocessor.cpp
+++ b/common/commandprocessor.cpp
@@ -43,8 +43,9 @@ static int sCommitInterval = 10;
43using namespace Sink; 43using namespace Sink;
44using namespace Sink::Storage; 44using namespace Sink::Storage;
45 45
46CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId) 46CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx)
47 : QObject(), 47 : QObject(),
48 mLogCtx(ctx.subContext("commandprocessor")),
48 mPipeline(pipeline), 49 mPipeline(pipeline),
49 mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), 50 mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"),
50 mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), 51 mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"),
@@ -130,7 +131,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data)
130 } 131 }
131 mSynchronizer->synchronize(query); 132 mSynchronizer->synchronize(query);
132 } else { 133 } else {
133 SinkWarning() << "received invalid command"; 134 SinkWarningCtx(mLogCtx) << "received invalid command";
134 } 135 }
135} 136}
136 137
@@ -141,7 +142,7 @@ void CommandProcessor::processSynchronizeCommand(const QByteArray &data)
141// auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); 142// auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData());
142// client.currentRevision = buffer->revision(); 143// client.currentRevision = buffer->revision();
143// } else { 144// } else {
144// SinkWarning() << "received invalid command"; 145// SinkWarningCtx(mLogCtx) << "received invalid command";
145// } 146// }
146// loadResource().setLowerBoundRevision(lowerBoundRevision()); 147// loadResource().setLowerBoundRevision(lowerBoundRevision());
147// } 148// }
@@ -179,7 +180,7 @@ void CommandProcessor::process()
179 180
180KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand) 181KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand)
181{ 182{
182 SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); 183 SinkTraceCtx(mLogCtx) << "Processing command: " << Sink::Commands::name(queuedCommand->commandId());
183 const auto data = queuedCommand->command()->Data(); 184 const auto data = queuedCommand->command()->Data();
184 const auto size = queuedCommand->command()->size(); 185 const auto size = queuedCommand->command()->size();
185 switch (queuedCommand->commandId()) { 186 switch (queuedCommand->commandId()) {
@@ -205,7 +206,7 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const QByteArray &dat
205{ 206{
206 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); 207 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
207 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { 208 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
208 SinkWarning() << "invalid buffer"; 209 SinkWarningCtx(mLogCtx) << "invalid buffer";
209 // return KAsync::error<void, qint64>(1, "Invalid Buffer"); 210 // return KAsync::error<void, qint64>(1, "Invalid Buffer");
210 } 211 }
211 auto queuedCommand = Sink::GetQueuedCommand(data.constData()); 212 auto queuedCommand = Sink::GetQueuedCommand(data.constData());
@@ -214,10 +215,10 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const QByteArray &dat
214 .then<qint64, qint64>( 215 .then<qint64, qint64>(
215 [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> { 216 [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> {
216 if (error) { 217 if (error) {
217 SinkWarning() << "Error while processing queue command: " << error.errorMessage; 218 SinkWarningCtx(mLogCtx) << "Error while processing queue command: " << error.errorMessage;
218 return KAsync::error<qint64>(error); 219 return KAsync::error<qint64>(error);
219 } 220 }
220 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); 221 SinkTraceCtx(mLogCtx) << "Command pipeline processed: " << Sink::Commands::name(commandId);
221 return KAsync::value<qint64>(createdRevision); 222 return KAsync::value<qint64>(createdRevision);
222 }); 223 });
223} 224}
@@ -234,13 +235,13 @@ KAsync::Job<void> CommandProcessor::processQueue(MessageQueue *queue)
234 time->start(); 235 time->start();
235 return processQueuedCommand(data) 236 return processQueuedCommand(data)
236 .syncThen<void, qint64>([this, time](qint64 createdRevision) { 237 .syncThen<void, qint64>([this, time](qint64 createdRevision) {
237 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); 238 SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
238 }); 239 });
239 }) 240 })
240 .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) { 241 .then<KAsync::ControlFlowFlag>([queue, this](const KAsync::Error &error) {
241 if (error) { 242 if (error) {
242 if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { 243 if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) {
243 SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; 244 SinkWarningCtx(mLogCtx) << "Error while getting message from messagequeue: " << error.errorMessage;
244 } 245 }
245 } 246 }
246 if (queue->isEmpty()) { 247 if (queue->isEmpty()) {
@@ -258,7 +259,7 @@ KAsync::Job<void> CommandProcessor::processPipeline()
258 auto time = QSharedPointer<QTime>::create(); 259 auto time = QSharedPointer<QTime>::create();
259 time->start(); 260 time->start();
260 mPipeline->cleanupRevisions(mLowerBoundRevision); 261 mPipeline->cleanupRevisions(mLowerBoundRevision);
261 SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed()); 262 SinkTraceCtx(mLogCtx) << "Cleanup done." << Log::TraceTime(time->elapsed());
262 263
263 // Go through all message queues 264 // Go through all message queues
264 if (mCommandQueues.isEmpty()) { 265 if (mCommandQueues.isEmpty()) {
@@ -273,7 +274,7 @@ KAsync::Job<void> CommandProcessor::processPipeline()
273 auto queue = it->next(); 274 auto queue = it->next();
274 return processQueue(queue) 275 return processQueue(queue)
275 .syncThen<KAsync::ControlFlowFlag>([this, time, it]() { 276 .syncThen<KAsync::ControlFlowFlag>([this, time, it]() {
276 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); 277 SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(time->elapsed());
277 if (it->hasNext()) { 278 if (it->hasNext()) {
278 return KAsync::Continue; 279 return KAsync::Continue;
279 } 280 }
@@ -325,11 +326,11 @@ KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size)
325 const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id()); 326 const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id());
326 Q_ASSERT(!flushId.isEmpty()); 327 Q_ASSERT(!flushId.isEmpty());
327 if (flushType == Sink::Flush::FlushReplayQueue) { 328 if (flushType == Sink::Flush::FlushReplayQueue) {
328 SinkTrace() << "Flushing synchronizer "; 329 SinkTraceCtx(mLogCtx) << "Flushing synchronizer ";
329 Q_ASSERT(mSynchronizer); 330 Q_ASSERT(mSynchronizer);
330 mSynchronizer->flush(flushType, flushId); 331 mSynchronizer->flush(flushType, flushId);
331 } else { 332 } else {
332 SinkTrace() << "Emitting flush completion" << flushId; 333 SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId;
333 Sink::Notification n; 334 Sink::Notification n;
334 n.type = Sink::Notification::FlushCompletion; 335 n.type = Sink::Notification::FlushCompletion;
335 n.id = flushId; 336 n.id = flushId;
diff --git a/common/commandprocessor.h b/common/commandprocessor.h
index 81f93e5..eeb7ecf 100644
--- a/common/commandprocessor.h
+++ b/common/commandprocessor.h
@@ -43,10 +43,9 @@ namespace Sink {
43class CommandProcessor : public QObject 43class CommandProcessor : public QObject
44{ 44{
45 Q_OBJECT 45 Q_OBJECT
46 SINK_DEBUG_AREA("commandprocessor")
47 46
48public: 47public:
49 CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId); 48 CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx);
50 49
51 void setOldestUsedRevision(qint64 revision); 50 void setOldestUsedRevision(qint64 revision);
52 51
@@ -79,6 +78,7 @@ private:
79 78
80 KAsync::Job<void> flush(void const *command, size_t size); 79 KAsync::Job<void> flush(void const *command, size_t size);
81 80
81 Sink::Log::Context mLogCtx;
82 Sink::Pipeline *mPipeline; 82 Sink::Pipeline *mPipeline;
83 MessageQueue mUserQueue; 83 MessageQueue mUserQueue;
84 MessageQueue mSynchronizerQueue; 84 MessageQueue mSynchronizerQueue;
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index c11e899..5ba9e5d 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -31,8 +31,8 @@ using namespace Sink::Storage;
31GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) 31GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline )
32 : Sink::Resource(), 32 : Sink::Resource(),
33 mResourceContext(resourceContext), 33 mResourceContext(resourceContext),
34 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)), 34 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext, "resource." + resourceContext.instanceId())),
35 mProcessor(QSharedPointer<CommandProcessor>::create(mPipeline.data(), resourceContext.instanceId())), 35 mProcessor(QSharedPointer<CommandProcessor>::create(mPipeline.data(), resourceContext.instanceId(), "resource." + resourceContext.instanceId())),
36 mError(0), 36 mError(0),
37 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 37 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
38{ 38{
diff --git a/common/log.h b/common/log.h
index 9063ac8..fc2a3ed 100644
--- a/common/log.h
+++ b/common/log.h
@@ -8,6 +8,9 @@ namespace Log {
8 8
9struct Context { 9struct Context {
10 Context() = default; 10 Context() = default;
11 Context(const QByteArray &n) : name(n) {}
12 Context(const char *n) : name(n) {}
13
11 QByteArray name; 14 QByteArray name;
12 Context subContext(const QByteArray &sub) const { 15 Context subContext(const QByteArray &sub) const {
13 return Context{name + "." + sub}; 16 return Context{name + "." + sub};
diff --git a/common/modelresult.cpp b/common/modelresult.cpp
index 8e92365..6695484 100644
--- a/common/modelresult.cpp
+++ b/common/modelresult.cpp
@@ -34,8 +34,8 @@ static uint qHash(const Sink::ApplicationDomain::ApplicationDomainType &type)
34} 34}
35 35
36template <class T, class Ptr> 36template <class T, class Ptr>
37ModelResult<T, Ptr>::ModelResult(const Sink::Query &query, const QList<QByteArray> &propertyColumns) 37ModelResult<T, Ptr>::ModelResult(const Sink::Query &query, const QList<QByteArray> &propertyColumns, const Sink::Log::Context &ctx)
38 : QAbstractItemModel(), mPropertyColumns(propertyColumns), mQuery(query) 38 : QAbstractItemModel(), mLogCtx(ctx.subContext("modelresult")), mPropertyColumns(propertyColumns), mQuery(query)
39{ 39{
40} 40}
41 41
@@ -176,7 +176,7 @@ bool ModelResult<T, Ptr>::canFetchMore(const QModelIndex &parent) const
176template <class T, class Ptr> 176template <class T, class Ptr>
177void ModelResult<T, Ptr>::fetchMore(const QModelIndex &parent) 177void ModelResult<T, Ptr>::fetchMore(const QModelIndex &parent)
178{ 178{
179 SinkTrace() << "Fetching more: " << parent; 179 SinkTraceCtx(mLogCtx) << "Fetching more: " << parent;
180 fetchEntities(parent); 180 fetchEntities(parent);
181} 181}
182 182
@@ -187,7 +187,7 @@ void ModelResult<T, Ptr>::add(const Ptr &value)
187 const auto id = parentId(value); 187 const auto id = parentId(value);
188 // Ignore updates we get before the initial fetch is done 188 // Ignore updates we get before the initial fetch is done
189 if (!mEntityChildrenFetched.contains(id)) { 189 if (!mEntityChildrenFetched.contains(id)) {
190 SinkTrace() << "Too early" << id; 190 SinkTraceCtx(mLogCtx) << "Too early" << id;
191 return; 191 return;
192 } 192 }
193 if (mEntities.contains(childId)) { 193 if (mEntities.contains(childId)) {
@@ -195,7 +195,7 @@ void ModelResult<T, Ptr>::add(const Ptr &value)
195 return; 195 return;
196 } 196 }
197 auto parent = createIndexFromId(id); 197 auto parent = createIndexFromId(id);
198 SinkTrace() << "Added entity " << childId << "id: " << value->identifier() << "parent: " << id; 198 SinkTraceCtx(mLogCtx) << "Added entity " << childId << "id: " << value->identifier() << "parent: " << id;
199 const auto keys = mTree[id]; 199 const auto keys = mTree[id];
200 int index = 0; 200 int index = 0;
201 for (; index < keys.size(); index++) { 201 for (; index < keys.size(); index++) {
@@ -203,13 +203,13 @@ void ModelResult<T, Ptr>::add(const Ptr &value)
203 break; 203 break;
204 } 204 }
205 } 205 }
206 // SinkTrace() << "Inserting rows " << index << parent; 206 // SinkTraceCtx(mLogCtx) << "Inserting rows " << index << parent;
207 beginInsertRows(parent, index, index); 207 beginInsertRows(parent, index, index);
208 mEntities.insert(childId, value); 208 mEntities.insert(childId, value);
209 mTree[id].insert(index, childId); 209 mTree[id].insert(index, childId);
210 mParents.insert(childId, id); 210 mParents.insert(childId, id);
211 endInsertRows(); 211 endInsertRows();
212 // SinkTrace() << "Inserted rows " << mTree[id].size(); 212 // SinkTraceCtx(mLogCtx) << "Inserted rows " << mTree[id].size();
213} 213}
214 214
215 215
@@ -219,7 +219,7 @@ void ModelResult<T, Ptr>::remove(const Ptr &value)
219 auto childId = qHash(*value); 219 auto childId = qHash(*value);
220 auto id = parentId(value); 220 auto id = parentId(value);
221 auto parent = createIndexFromId(id); 221 auto parent = createIndexFromId(id);
222 SinkTrace() << "Removed entity" << childId; 222 SinkTraceCtx(mLogCtx) << "Removed entity" << childId;
223 auto index = mTree[id].indexOf(childId); 223 auto index = mTree[id].indexOf(childId);
224 beginRemoveRows(parent, index, index); 224 beginRemoveRows(parent, index, index);
225 mEntities.remove(childId); 225 mEntities.remove(childId);
@@ -236,18 +236,18 @@ void ModelResult<T, Ptr>::fetchEntities(const QModelIndex &parent)
236 const auto id = getIdentifier(parent); 236 const auto id = getIdentifier(parent);
237 mEntityChildrenFetchComplete.remove(id); 237 mEntityChildrenFetchComplete.remove(id);
238 mEntityChildrenFetched.insert(id); 238 mEntityChildrenFetched.insert(id);
239 SinkTrace() << "Loading child entities of parent " << id; 239 SinkTraceCtx(mLogCtx) << "Loading child entities of parent " << id;
240 if (loadEntities) { 240 if (loadEntities) {
241 loadEntities(parent.data(DomainObjectRole).template value<Ptr>()); 241 loadEntities(parent.data(DomainObjectRole).template value<Ptr>());
242 } else { 242 } else {
243 SinkWarning() << "No way to fetch entities"; 243 SinkWarningCtx(mLogCtx) << "No way to fetch entities";
244 } 244 }
245} 245}
246 246
247template <class T, class Ptr> 247template <class T, class Ptr>
248void ModelResult<T, Ptr>::setFetcher(const std::function<void(const Ptr &parent)> &fetcher) 248void ModelResult<T, Ptr>::setFetcher(const std::function<void(const Ptr &parent)> &fetcher)
249{ 249{
250 SinkTrace() << "Setting fetcher"; 250 SinkTraceCtx(mLogCtx) << "Setting fetcher";
251 loadEntities = fetcher; 251 loadEntities = fetcher;
252} 252}
253 253
@@ -262,7 +262,7 @@ void ModelResult<T, Ptr>::setEmitter(const typename Sink::ResultEmitter<Ptr>::Pt
262 }); 262 });
263 }); 263 });
264 emitter->onModified([this](const Ptr &value) { 264 emitter->onModified([this](const Ptr &value) {
265 SinkTrace() << "Received modification: " << value->identifier(); 265 SinkTraceCtx(mLogCtx) << "Received modification: " << value->identifier();
266 threadBoundary.callInMainThread([this, value]() { 266 threadBoundary.callInMainThread([this, value]() {
267 modify(value); 267 modify(value);
268 }); 268 });
@@ -273,7 +273,7 @@ void ModelResult<T, Ptr>::setEmitter(const typename Sink::ResultEmitter<Ptr>::Pt
273 }); 273 });
274 }); 274 });
275 emitter->onInitialResultSetComplete([this](const Ptr &parent, bool fetchedAll) { 275 emitter->onInitialResultSetComplete([this](const Ptr &parent, bool fetchedAll) {
276 SinkTrace() << "Initial result set complete"; 276 SinkTraceCtx(mLogCtx) << "Initial result set complete";
277 const qint64 parentId = parent ? qHash(*parent) : 0; 277 const qint64 parentId = parent ? qHash(*parent) : 0;
278 const auto parentIndex = createIndexFromId(parentId); 278 const auto parentIndex = createIndexFromId(parentId);
279 mEntityChildrenFetchComplete.insert(parentId); 279 mEntityChildrenFetchComplete.insert(parentId);
@@ -297,7 +297,7 @@ void ModelResult<T, Ptr>::modify(const Ptr &value)
297 auto childId = qHash(*value); 297 auto childId = qHash(*value);
298 if (!mEntities.contains(childId)) { 298 if (!mEntities.contains(childId)) {
299 //Happens because the DatabaseQuery emits modifiations also if the item used to be filtered. 299 //Happens because the DatabaseQuery emits modifiations also if the item used to be filtered.
300 SinkTrace() << "Tried to modify a value that is not yet part of the model"; 300 SinkTraceCtx(mLogCtx) << "Tried to modify a value that is not yet part of the model";
301 add(value); 301 add(value);
302 return; 302 return;
303 } 303 }
@@ -307,7 +307,7 @@ void ModelResult<T, Ptr>::modify(const Ptr &value)
307 return; 307 return;
308 } 308 }
309 auto parent = createIndexFromId(id); 309 auto parent = createIndexFromId(id);
310 SinkTrace() << "Modified entity" << childId; 310 SinkTraceCtx(mLogCtx) << "Modified entity" << childId;
311 auto i = mTree[id].indexOf(childId); 311 auto i = mTree[id].indexOf(childId);
312 Q_ASSERT(i >= 0); 312 Q_ASSERT(i >= 0);
313 mEntities.remove(childId); 313 mEntities.remove(childId);
diff --git a/common/modelresult.h b/common/modelresult.h
index b7fc0ec..daa48bd 100644
--- a/common/modelresult.h
+++ b/common/modelresult.h
@@ -26,6 +26,7 @@
26#include <QSharedPointer> 26#include <QSharedPointer>
27#include <functional> 27#include <functional>
28#include "query.h" 28#include "query.h"
29#include "log.h"
29#include "resultprovider.h" 30#include "resultprovider.h"
30#include "threadboundary.h" 31#include "threadboundary.h"
31 32
@@ -40,7 +41,7 @@ public:
40 DomainObjectBaseRole 41 DomainObjectBaseRole
41 }; 42 };
42 43
43 ModelResult(const Sink::Query &query, const QList<QByteArray> &propertyColumns); 44 ModelResult(const Sink::Query &query, const QList<QByteArray> &propertyColumns, const Sink::Log::Context &);
44 45
45 void setEmitter(const typename Sink::ResultEmitter<Ptr>::Ptr &); 46 void setEmitter(const typename Sink::ResultEmitter<Ptr>::Ptr &);
46 47
@@ -67,6 +68,7 @@ private:
67 QModelIndex createIndexFromId(const qint64 &id) const; 68 QModelIndex createIndexFromId(const qint64 &id) const;
68 void fetchEntities(const QModelIndex &parent); 69 void fetchEntities(const QModelIndex &parent);
69 70
71 Sink::Log::Context mLogCtx;
70 // TODO we should be able to directly use T as index, with an appropriate hash function, and thus have a QMap<T, T> and QList<T> 72 // TODO we should be able to directly use T as index, with an appropriate hash function, and thus have a QMap<T, T> and QList<T>
71 QMap<qint64 /* entity id */, Ptr> mEntities; 73 QMap<qint64 /* entity id */, Ptr> mEntities;
72 QMap<qint64 /* parent entity id */, QList<qint64> /* child entity id*/> mTree; 74 QMap<qint64 /* parent entity id */, QList<qint64> /* child entity id*/> mTree;
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index c9a8092..4cb5f21 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -49,10 +49,11 @@ using namespace Sink::Storage;
49class Pipeline::Private 49class Pipeline::Private
50{ 50{
51public: 51public:
52 Private(const ResourceContext &context) : resourceContext(context), entityStore(context, {"pipeline"}), revisionChanged(false) 52 Private(const ResourceContext &context, const Sink::Log::Context &ctx) : logCtx{ctx.subContext("pipeline")}, resourceContext(context), entityStore(context, ctx), revisionChanged(false)
53 { 53 {
54 } 54 }
55 55
56 Sink::Log::Context logCtx;
56 ResourceContext resourceContext; 57 ResourceContext resourceContext;
57 Storage::EntityStore entityStore; 58 Storage::EntityStore entityStore;
58 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; 59 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors;
@@ -62,7 +63,7 @@ public:
62}; 63};
63 64
64 65
65Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) 66Pipeline::Pipeline(const ResourceContext &context, const Sink::Log::Context &ctx) : QObject(nullptr), d(new Private(context, ctx))
66{ 67{
67 //Create main store immediately on first start 68 //Create main store immediately on first start
68 d->entityStore.startTransaction(DataStore::ReadWrite); 69 d->entityStore.startTransaction(DataStore::ReadWrite);
@@ -90,7 +91,7 @@ void Pipeline::startTransaction()
90 // for (auto processor : d->processors[bufferType]) { 91 // for (auto processor : d->processors[bufferType]) {
91 // processor->startBatch(); 92 // processor->startBatch();
92 // } 93 // }
93 SinkTrace() << "Starting transaction."; 94 SinkTraceCtx(d->logCtx) << "Starting transaction.";
94 d->transactionTime.start(); 95 d->transactionTime.start();
95 d->transactionItemCount = 0; 96 d->transactionItemCount = 0;
96 d->entityStore.startTransaction(DataStore::ReadWrite); 97 d->entityStore.startTransaction(DataStore::ReadWrite);
@@ -109,7 +110,7 @@ void Pipeline::commit()
109 } 110 }
110 const auto revision = d->entityStore.maxRevision(); 111 const auto revision = d->entityStore.maxRevision();
111 const auto elapsed = d->transactionTime.elapsed(); 112 const auto elapsed = d->transactionTime.elapsed();
112 SinkTrace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " 113 SinkTraceCtx(d->logCtx) << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
113 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 114 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
114 d->entityStore.commitTransaction(); 115 d->entityStore.commitTransaction();
115 if (d->revisionChanged) { 116 if (d->revisionChanged) {
@@ -125,7 +126,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
125 { 126 {
126 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 127 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
127 if (!Commands::VerifyCreateEntityBuffer(verifyer)) { 128 if (!Commands::VerifyCreateEntityBuffer(verifyer)) {
128 SinkWarning() << "invalid buffer, not a create entity buffer"; 129 SinkWarningCtx(d->logCtx) << "invalid buffer, not a create entity buffer";
129 return KAsync::error<qint64>(0); 130 return KAsync::error<qint64>(0);
130 } 131 }
131 } 132 }
@@ -137,7 +138,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
137 if (createEntity->entityId()) { 138 if (createEntity->entityId()) {
138 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 139 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
139 if (d->entityStore.contains(bufferType, key)) { 140 if (d->entityStore.contains(bufferType, key)) {
140 SinkError() << "An entity with this id already exists: " << key; 141 SinkErrorCtx(d->logCtx) << "An entity with this id already exists: " << key;
141 return KAsync::error<qint64>(0); 142 return KAsync::error<qint64>(0);
142 } 143 }
143 } 144 }
@@ -145,25 +146,25 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
145 if (key.isEmpty()) { 146 if (key.isEmpty()) {
146 key = DataStore::generateUid(); 147 key = DataStore::generateUid();
147 } 148 }
148 SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 149 SinkTraceCtx(d->logCtx) << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
149 Q_ASSERT(!key.isEmpty()); 150 Q_ASSERT(!key.isEmpty());
150 151
151 { 152 {
152 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 153 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
153 if (!VerifyEntityBuffer(verifyer)) { 154 if (!VerifyEntityBuffer(verifyer)) {
154 SinkWarning() << "invalid buffer, not an entity buffer"; 155 SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer";
155 return KAsync::error<qint64>(0); 156 return KAsync::error<qint64>(0);
156 } 157 }
157 } 158 }
158 auto entity = GetEntity(createEntity->delta()->Data()); 159 auto entity = GetEntity(createEntity->delta()->Data());
159 if (!entity->resource()->size() && !entity->local()->size()) { 160 if (!entity->resource()->size() && !entity->local()->size()) {
160 SinkWarning() << "No local and no resource buffer while trying to create entity."; 161 SinkWarningCtx(d->logCtx) << "No local and no resource buffer while trying to create entity.";
161 return KAsync::error<qint64>(0); 162 return KAsync::error<qint64>(0);
162 } 163 }
163 164
164 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); 165 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
165 if (!adaptorFactory) { 166 if (!adaptorFactory) {
166 SinkWarning() << "no adaptor factory for type " << bufferType; 167 SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType;
167 return KAsync::error<qint64>(0); 168 return KAsync::error<qint64>(0);
168 } 169 }
169 170
@@ -203,7 +204,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
203 { 204 {
204 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 205 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
205 if (!Commands::VerifyModifyEntityBuffer(verifyer)) { 206 if (!Commands::VerifyModifyEntityBuffer(verifyer)) {
206 SinkWarning() << "invalid buffer, not a modify entity buffer"; 207 SinkWarningCtx(d->logCtx) << "invalid buffer, not a modify entity buffer";
207 return KAsync::error<qint64>(0); 208 return KAsync::error<qint64>(0);
208 } 209 }
209 } 210 }
@@ -213,29 +214,29 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
213 if (modifyEntity->modifiedProperties()) { 214 if (modifyEntity->modifiedProperties()) {
214 changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); 215 changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties());
215 } else { 216 } else {
216 SinkWarning() << "No changeset available"; 217 SinkWarningCtx(d->logCtx) << "No changeset available";
217 } 218 }
218 const qint64 baseRevision = modifyEntity->revision(); 219 const qint64 baseRevision = modifyEntity->revision();
219 const bool replayToSource = modifyEntity->replayToSource(); 220 const bool replayToSource = modifyEntity->replayToSource();
220 221
221 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 222 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
222 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 223 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
223 SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 224 SinkTraceCtx(d->logCtx) << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
224 if (bufferType.isEmpty() || key.isEmpty()) { 225 if (bufferType.isEmpty() || key.isEmpty()) {
225 SinkWarning() << "entity type or key " << bufferType << key; 226 SinkWarningCtx(d->logCtx) << "entity type or key " << bufferType << key;
226 return KAsync::error<qint64>(0); 227 return KAsync::error<qint64>(0);
227 } 228 }
228 { 229 {
229 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 230 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
230 if (!VerifyEntityBuffer(verifyer)) { 231 if (!VerifyEntityBuffer(verifyer)) {
231 SinkWarning() << "invalid buffer, not an entity buffer"; 232 SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer";
232 return KAsync::error<qint64>(0); 233 return KAsync::error<qint64>(0);
233 } 234 }
234 } 235 }
235 236
236 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); 237 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
237 if (!adaptorFactory) { 238 if (!adaptorFactory) {
238 SinkWarning() << "no adaptor factory for type " << bufferType; 239 SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType;
239 return KAsync::error<qint64>(0); 240 return KAsync::error<qint64>(0);
240 } 241 }
241 242
@@ -255,7 +256,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
255 auto changeset = diff.changedProperties(); 256 auto changeset = diff.changedProperties();
256 const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); 257 const auto current = d->entityStore.readLatest(bufferType, diff.identifier());
257 if (current.identifier().isEmpty()) { 258 if (current.identifier().isEmpty()) {
258 SinkWarning() << "Failed to read current version: " << diff.identifier(); 259 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
259 return KAsync::error<qint64>(0); 260 return KAsync::error<qint64>(0);
260 } 261 }
261 262
@@ -276,11 +277,11 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
276 newEntity.setResource(targetResource); 277 newEntity.setResource(targetResource);
277 newEntity.setChangedProperties(newEntity.availableProperties().toSet()); 278 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
278 279
279 SinkTrace() << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; 280 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource;
280 auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); 281 auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity);
281 job = job.syncThen<void>([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { 282 job = job.syncThen<void>([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) {
282 if (!error) { 283 if (!error) {
283 SinkTrace() << "Move of " << current.identifier() << "was successfull"; 284 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull";
284 if (isMove) { 285 if (isMove) {
285 startTransaction(); 286 startTransaction();
286 flatbuffers::FlatBufferBuilder fbb; 287 flatbuffers::FlatBufferBuilder fbb;
@@ -293,7 +294,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
293 commit(); 294 commit();
294 } 295 }
295 } else { 296 } else {
296 SinkError() << "Failed to move entity " << targetResource << " to resource " << current.identifier(); 297 SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier();
297 } 298 }
298 }); 299 });
299 job.exec(); 300 job.exec();
@@ -321,7 +322,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
321 { 322 {
322 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 323 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
323 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { 324 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) {
324 SinkWarning() << "invalid buffer, not a delete entity buffer"; 325 SinkWarningCtx(d->logCtx) << "invalid buffer, not a delete entity buffer";
325 return KAsync::error<qint64>(0); 326 return KAsync::error<qint64>(0);
326 } 327 }
327 } 328 }
@@ -330,7 +331,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
330 const bool replayToSource = deleteEntity->replayToSource(); 331 const bool replayToSource = deleteEntity->replayToSource();
331 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 332 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
332 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 333 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
333 SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 334 SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
334 335
335 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { 336 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) {
336 foreach (const auto &processor, d->processors[bufferType]) { 337 foreach (const auto &processor, d->processors[bufferType]) {
diff --git a/common/pipeline.h b/common/pipeline.h
index b663dea..c9982b7 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -46,7 +46,7 @@ class SINK_EXPORT Pipeline : public QObject
46 Q_OBJECT 46 Q_OBJECT
47 47
48public: 48public:
49 Pipeline(const ResourceContext &context); 49 Pipeline(const ResourceContext &context, const Sink::Log::Context &ctx);
50 ~Pipeline(); 50 ~Pipeline();
51 51
52 void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); 52 void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors);
diff --git a/common/store.cpp b/common/store.cpp
index 8007626..554f540 100644
--- a/common/store.cpp
+++ b/common/store.cpp
@@ -57,7 +57,7 @@ QString Store::getTemporaryFilePath()
57/* 57/*
58 * Returns a map of resource instance identifiers and resource type 58 * Returns a map of resource instance identifiers and resource type
59 */ 59 */
60static QMap<QByteArray, QByteArray> getResources(const Sink::Query::Filter &query, const QByteArray &type = QByteArray()) 60static QMap<QByteArray, QByteArray> getResources(const Sink::Query::Filter &query, const QByteArray &type, const Sink::Log::Context &ctx)
61{ 61{
62 const QList<QByteArray> resourceFilter = query.ids; 62 const QList<QByteArray> resourceFilter = query.ids;
63 63
@@ -97,11 +97,11 @@ static QMap<QByteArray, QByteArray> getResources(const Sink::Query::Filter &quer
97 } 97 }
98 resources.insert(res, configuredResources.value(res)); 98 resources.insert(res, configuredResources.value(res));
99 } else { 99 } else {
100 SinkWarning() << "Resource is not existing: " << res; 100 SinkWarningCtx(ctx) << "Resource is not existing: " << res;
101 } 101 }
102 } 102 }
103 } 103 }
104 SinkTrace() << "Found resources: " << resources; 104 SinkTraceCtx(ctx) << "Found resources: " << resources;
105 return resources; 105 return resources;
106} 106}
107 107
@@ -133,7 +133,7 @@ QSharedPointer<QAbstractItemModel> Store::loadModel(Query query)
133 Log::Context ctx{query.id()}; 133 Log::Context ctx{query.id()};
134 query.setType(ApplicationDomain::getTypeName<DomainType>()); 134 query.setType(ApplicationDomain::getTypeName<DomainType>());
135 SinkTraceCtx(ctx) << "Loading model: " << query; 135 SinkTraceCtx(ctx) << "Loading model: " << query;
136 auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr>>::create(query, query.requestedProperties); 136 auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr>>::create(query, query.requestedProperties, ctx);
137 137
138 //* Client defines lifetime of model 138 //* Client defines lifetime of model
139 //* The model lifetime defines the duration of live-queries 139 //* The model lifetime defines the duration of live-queries
@@ -142,7 +142,7 @@ QSharedPointer<QAbstractItemModel> Store::loadModel(Query query)
142 //* The result provider needs to live for as long as results are provided (until the last thread exits). 142 //* The result provider needs to live for as long as results are provided (until the last thread exits).
143 143
144 // Query all resources and aggregate results 144 // Query all resources and aggregate results
145 auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName<DomainType>()); 145 auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName<DomainType>(), ctx);
146 auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create(); 146 auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create();
147 model->setEmitter(aggregatingEmitter); 147 model->setEmitter(aggregatingEmitter);
148 148
@@ -297,7 +297,7 @@ KAsync::Job<void> Store::synchronize(const Sink::Query &query)
297 297
298KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope) 298KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope)
299{ 299{
300 auto resources = getResources(scope.getResourceFilter()).keys(); 300 auto resources = getResources(scope.getResourceFilter(), {}, {}).keys();
301 SinkLog() << "Synchronize" << resources; 301 SinkLog() << "Synchronize" << resources;
302 return KAsync::value(resources) 302 return KAsync::value(resources)
303 .template each([scope](const QByteArray &resource) { 303 .template each([scope](const QByteArray &resource) {
@@ -377,7 +377,7 @@ QList<DomainType> Store::read(const Sink::Query &q)
377 auto query = q; 377 auto query = q;
378 query.setFlags(Query::SynchronousQuery); 378 query.setFlags(Query::SynchronousQuery);
379 QList<DomainType> list; 379 QList<DomainType> list;
380 auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName<DomainType>()); 380 auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName<DomainType>(), ctx);
381 auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create(); 381 auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create();
382 aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){ 382 aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){
383 SinkTraceCtx(ctx) << "Found value: " << value->identifier(); 383 SinkTraceCtx(ctx) << "Found value: " << value->identifier();
diff --git a/tests/mailquerybenchmark.cpp b/tests/mailquerybenchmark.cpp
index e55e744..080d30e 100644
--- a/tests/mailquerybenchmark.cpp
+++ b/tests/mailquerybenchmark.cpp
@@ -61,7 +61,7 @@ class MailQueryBenchmark : public QObject
61 { 61 {
62 TestResource::removeFromDisk(resourceIdentifier); 62 TestResource::removeFromDisk(resourceIdentifier);
63 63
64 auto pipeline = QSharedPointer<Sink::Pipeline>::create(Sink::ResourceContext{resourceIdentifier, "test"}); 64 auto pipeline = QSharedPointer<Sink::Pipeline>::create(Sink::ResourceContext{resourceIdentifier, "test"}, "test");
65 65
66 auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create(); 66 auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create();
67 67
diff --git a/tests/pipelinebenchmark.cpp b/tests/pipelinebenchmark.cpp
index 341c733..7052c4c 100644
--- a/tests/pipelinebenchmark.cpp
+++ b/tests/pipelinebenchmark.cpp
@@ -61,7 +61,7 @@ class PipelineBenchmark : public QObject
61 { 61 {
62 TestResource::removeFromDisk(resourceIdentifier); 62 TestResource::removeFromDisk(resourceIdentifier);
63 63
64 auto pipeline = QSharedPointer<Sink::Pipeline>::create(Sink::ResourceContext{resourceIdentifier, "test"}); 64 auto pipeline = QSharedPointer<Sink::Pipeline>::create(Sink::ResourceContext{resourceIdentifier, "test"}, "test");
65 pipeline->setPreprocessors("mail", preprocessors); 65 pipeline->setPreprocessors("mail", preprocessors);
66 66
67 QTime time; 67 QTime time;
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp
index e68aa53..0268ec5 100644
--- a/tests/pipelinetest.cpp
+++ b/tests/pipelinetest.cpp
@@ -213,7 +213,7 @@ private slots:
213 flatbuffers::FlatBufferBuilder entityFbb; 213 flatbuffers::FlatBufferBuilder entityFbb;
214 auto command = createEntityCommand(createEvent(entityFbb)); 214 auto command = createEntityCommand(createEvent(entityFbb));
215 215
216 Sink::Pipeline pipeline(getContext()); 216 Sink::Pipeline pipeline(getContext(), {"test"});
217 217
218 pipeline.startTransaction(); 218 pipeline.startTransaction();
219 pipeline.newEntity(command.constData(), command.size()); 219 pipeline.newEntity(command.constData(), command.size());
@@ -236,7 +236,7 @@ private slots:
236 flatbuffers::FlatBufferBuilder entityFbb; 236 flatbuffers::FlatBufferBuilder entityFbb;
237 auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); 237 auto command = createEntityCommand(createEvent(entityFbb, "summary", "description"));
238 238
239 Sink::Pipeline pipeline(getContext()); 239 Sink::Pipeline pipeline(getContext(), {"test"});
240 240
241 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); 241 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
242 242
@@ -282,7 +282,7 @@ private slots:
282 flatbuffers::FlatBufferBuilder entityFbb; 282 flatbuffers::FlatBufferBuilder entityFbb;
283 auto command = createEntityCommand(createEvent(entityFbb)); 283 auto command = createEntityCommand(createEvent(entityFbb));
284 284
285 Sink::Pipeline pipeline(getContext()); 285 Sink::Pipeline pipeline(getContext(), {"test"});
286 286
287 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); 287 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
288 288
@@ -325,7 +325,7 @@ private slots:
325 { 325 {
326 flatbuffers::FlatBufferBuilder entityFbb; 326 flatbuffers::FlatBufferBuilder entityFbb;
327 auto command = createEntityCommand(createEvent(entityFbb)); 327 auto command = createEntityCommand(createEvent(entityFbb));
328 Sink::Pipeline pipeline(getContext()); 328 Sink::Pipeline pipeline(getContext(), {"test"});
329 329
330 // Create the initial revision 330 // Create the initial revision
331 pipeline.startTransaction(); 331 pipeline.startTransaction();
@@ -359,7 +359,7 @@ private slots:
359 359
360 auto testProcessor = new TestProcessor; 360 auto testProcessor = new TestProcessor;
361 361
362 Sink::Pipeline pipeline(getContext()); 362 Sink::Pipeline pipeline(getContext(), {"test"});
363 pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << testProcessor); 363 pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << testProcessor);
364 pipeline.startTransaction(); 364 pipeline.startTransaction();
365 // pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); 365 // pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create());