summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-12-22 22:05:40 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-12-22 22:05:40 +0100
commit6b1cf550608c2f17cbed9e375f15a4c14bfe8ace (patch)
tree2e693a731bb2e9ce1a793b14cef98cdc13e382cd /common/pipeline.cpp
parentb2ad8f785e801a35cadf254d827f56d648be510c (diff)
downloadsink-6b1cf550608c2f17cbed9e375f15a4c14bfe8ace.tar.gz
sink-6b1cf550608c2f17cbed9e375f15a4c14bfe8ace.zip
More Log::Context
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp45
1 files changed, 23 insertions, 22 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index c9a8092..4cb5f21 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -49,10 +49,11 @@ using namespace Sink::Storage;
49class Pipeline::Private 49class Pipeline::Private
50{ 50{
51public: 51public:
52 Private(const ResourceContext &context) : resourceContext(context), entityStore(context, {"pipeline"}), revisionChanged(false) 52 Private(const ResourceContext &context, const Sink::Log::Context &ctx) : logCtx{ctx.subContext("pipeline")}, resourceContext(context), entityStore(context, ctx), revisionChanged(false)
53 { 53 {
54 } 54 }
55 55
56 Sink::Log::Context logCtx;
56 ResourceContext resourceContext; 57 ResourceContext resourceContext;
57 Storage::EntityStore entityStore; 58 Storage::EntityStore entityStore;
58 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; 59 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors;
@@ -62,7 +63,7 @@ public:
62}; 63};
63 64
64 65
65Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) 66Pipeline::Pipeline(const ResourceContext &context, const Sink::Log::Context &ctx) : QObject(nullptr), d(new Private(context, ctx))
66{ 67{
67 //Create main store immediately on first start 68 //Create main store immediately on first start
68 d->entityStore.startTransaction(DataStore::ReadWrite); 69 d->entityStore.startTransaction(DataStore::ReadWrite);
@@ -90,7 +91,7 @@ void Pipeline::startTransaction()
90 // for (auto processor : d->processors[bufferType]) { 91 // for (auto processor : d->processors[bufferType]) {
91 // processor->startBatch(); 92 // processor->startBatch();
92 // } 93 // }
93 SinkTrace() << "Starting transaction."; 94 SinkTraceCtx(d->logCtx) << "Starting transaction.";
94 d->transactionTime.start(); 95 d->transactionTime.start();
95 d->transactionItemCount = 0; 96 d->transactionItemCount = 0;
96 d->entityStore.startTransaction(DataStore::ReadWrite); 97 d->entityStore.startTransaction(DataStore::ReadWrite);
@@ -109,7 +110,7 @@ void Pipeline::commit()
109 } 110 }
110 const auto revision = d->entityStore.maxRevision(); 111 const auto revision = d->entityStore.maxRevision();
111 const auto elapsed = d->transactionTime.elapsed(); 112 const auto elapsed = d->transactionTime.elapsed();
112 SinkTrace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " 113 SinkTraceCtx(d->logCtx) << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
113 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 114 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
114 d->entityStore.commitTransaction(); 115 d->entityStore.commitTransaction();
115 if (d->revisionChanged) { 116 if (d->revisionChanged) {
@@ -125,7 +126,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
125 { 126 {
126 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 127 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
127 if (!Commands::VerifyCreateEntityBuffer(verifyer)) { 128 if (!Commands::VerifyCreateEntityBuffer(verifyer)) {
128 SinkWarning() << "invalid buffer, not a create entity buffer"; 129 SinkWarningCtx(d->logCtx) << "invalid buffer, not a create entity buffer";
129 return KAsync::error<qint64>(0); 130 return KAsync::error<qint64>(0);
130 } 131 }
131 } 132 }
@@ -137,7 +138,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
137 if (createEntity->entityId()) { 138 if (createEntity->entityId()) {
138 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 139 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
139 if (d->entityStore.contains(bufferType, key)) { 140 if (d->entityStore.contains(bufferType, key)) {
140 SinkError() << "An entity with this id already exists: " << key; 141 SinkErrorCtx(d->logCtx) << "An entity with this id already exists: " << key;
141 return KAsync::error<qint64>(0); 142 return KAsync::error<qint64>(0);
142 } 143 }
143 } 144 }
@@ -145,25 +146,25 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
145 if (key.isEmpty()) { 146 if (key.isEmpty()) {
146 key = DataStore::generateUid(); 147 key = DataStore::generateUid();
147 } 148 }
148 SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 149 SinkTraceCtx(d->logCtx) << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
149 Q_ASSERT(!key.isEmpty()); 150 Q_ASSERT(!key.isEmpty());
150 151
151 { 152 {
152 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 153 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
153 if (!VerifyEntityBuffer(verifyer)) { 154 if (!VerifyEntityBuffer(verifyer)) {
154 SinkWarning() << "invalid buffer, not an entity buffer"; 155 SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer";
155 return KAsync::error<qint64>(0); 156 return KAsync::error<qint64>(0);
156 } 157 }
157 } 158 }
158 auto entity = GetEntity(createEntity->delta()->Data()); 159 auto entity = GetEntity(createEntity->delta()->Data());
159 if (!entity->resource()->size() && !entity->local()->size()) { 160 if (!entity->resource()->size() && !entity->local()->size()) {
160 SinkWarning() << "No local and no resource buffer while trying to create entity."; 161 SinkWarningCtx(d->logCtx) << "No local and no resource buffer while trying to create entity.";
161 return KAsync::error<qint64>(0); 162 return KAsync::error<qint64>(0);
162 } 163 }
163 164
164 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); 165 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
165 if (!adaptorFactory) { 166 if (!adaptorFactory) {
166 SinkWarning() << "no adaptor factory for type " << bufferType; 167 SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType;
167 return KAsync::error<qint64>(0); 168 return KAsync::error<qint64>(0);
168 } 169 }
169 170
@@ -203,7 +204,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
203 { 204 {
204 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 205 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
205 if (!Commands::VerifyModifyEntityBuffer(verifyer)) { 206 if (!Commands::VerifyModifyEntityBuffer(verifyer)) {
206 SinkWarning() << "invalid buffer, not a modify entity buffer"; 207 SinkWarningCtx(d->logCtx) << "invalid buffer, not a modify entity buffer";
207 return KAsync::error<qint64>(0); 208 return KAsync::error<qint64>(0);
208 } 209 }
209 } 210 }
@@ -213,29 +214,29 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
213 if (modifyEntity->modifiedProperties()) { 214 if (modifyEntity->modifiedProperties()) {
214 changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); 215 changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties());
215 } else { 216 } else {
216 SinkWarning() << "No changeset available"; 217 SinkWarningCtx(d->logCtx) << "No changeset available";
217 } 218 }
218 const qint64 baseRevision = modifyEntity->revision(); 219 const qint64 baseRevision = modifyEntity->revision();
219 const bool replayToSource = modifyEntity->replayToSource(); 220 const bool replayToSource = modifyEntity->replayToSource();
220 221
221 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 222 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
222 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 223 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
223 SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 224 SinkTraceCtx(d->logCtx) << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
224 if (bufferType.isEmpty() || key.isEmpty()) { 225 if (bufferType.isEmpty() || key.isEmpty()) {
225 SinkWarning() << "entity type or key " << bufferType << key; 226 SinkWarningCtx(d->logCtx) << "entity type or key " << bufferType << key;
226 return KAsync::error<qint64>(0); 227 return KAsync::error<qint64>(0);
227 } 228 }
228 { 229 {
229 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 230 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
230 if (!VerifyEntityBuffer(verifyer)) { 231 if (!VerifyEntityBuffer(verifyer)) {
231 SinkWarning() << "invalid buffer, not an entity buffer"; 232 SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer";
232 return KAsync::error<qint64>(0); 233 return KAsync::error<qint64>(0);
233 } 234 }
234 } 235 }
235 236
236 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); 237 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
237 if (!adaptorFactory) { 238 if (!adaptorFactory) {
238 SinkWarning() << "no adaptor factory for type " << bufferType; 239 SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType;
239 return KAsync::error<qint64>(0); 240 return KAsync::error<qint64>(0);
240 } 241 }
241 242
@@ -255,7 +256,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
255 auto changeset = diff.changedProperties(); 256 auto changeset = diff.changedProperties();
256 const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); 257 const auto current = d->entityStore.readLatest(bufferType, diff.identifier());
257 if (current.identifier().isEmpty()) { 258 if (current.identifier().isEmpty()) {
258 SinkWarning() << "Failed to read current version: " << diff.identifier(); 259 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
259 return KAsync::error<qint64>(0); 260 return KAsync::error<qint64>(0);
260 } 261 }
261 262
@@ -276,11 +277,11 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
276 newEntity.setResource(targetResource); 277 newEntity.setResource(targetResource);
277 newEntity.setChangedProperties(newEntity.availableProperties().toSet()); 278 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
278 279
279 SinkTrace() << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; 280 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource;
280 auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); 281 auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity);
281 job = job.syncThen<void>([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { 282 job = job.syncThen<void>([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) {
282 if (!error) { 283 if (!error) {
283 SinkTrace() << "Move of " << current.identifier() << "was successfull"; 284 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull";
284 if (isMove) { 285 if (isMove) {
285 startTransaction(); 286 startTransaction();
286 flatbuffers::FlatBufferBuilder fbb; 287 flatbuffers::FlatBufferBuilder fbb;
@@ -293,7 +294,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
293 commit(); 294 commit();
294 } 295 }
295 } else { 296 } else {
296 SinkError() << "Failed to move entity " << targetResource << " to resource " << current.identifier(); 297 SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier();
297 } 298 }
298 }); 299 });
299 job.exec(); 300 job.exec();
@@ -321,7 +322,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
321 { 322 {
322 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 323 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
323 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { 324 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) {
324 SinkWarning() << "invalid buffer, not a delete entity buffer"; 325 SinkWarningCtx(d->logCtx) << "invalid buffer, not a delete entity buffer";
325 return KAsync::error<qint64>(0); 326 return KAsync::error<qint64>(0);
326 } 327 }
327 } 328 }
@@ -330,7 +331,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
330 const bool replayToSource = deleteEntity->replayToSource(); 331 const bool replayToSource = deleteEntity->replayToSource();
331 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 332 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
332 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 333 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
333 SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 334 SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
334 335
335 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { 336 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) {
336 foreach (const auto &processor, d->processors[bufferType]) { 337 foreach (const auto &processor, d->processors[bufferType]) {