diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 45 |
1 files changed, 23 insertions, 22 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index c9a8092..4cb5f21 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -49,10 +49,11 @@ using namespace Sink::Storage; | |||
49 | class Pipeline::Private | 49 | class Pipeline::Private |
50 | { | 50 | { |
51 | public: | 51 | public: |
52 | Private(const ResourceContext &context) : resourceContext(context), entityStore(context, {"pipeline"}), revisionChanged(false) | 52 | Private(const ResourceContext &context, const Sink::Log::Context &ctx) : logCtx{ctx.subContext("pipeline")}, resourceContext(context), entityStore(context, ctx), revisionChanged(false) |
53 | { | 53 | { |
54 | } | 54 | } |
55 | 55 | ||
56 | Sink::Log::Context logCtx; | ||
56 | ResourceContext resourceContext; | 57 | ResourceContext resourceContext; |
57 | Storage::EntityStore entityStore; | 58 | Storage::EntityStore entityStore; |
58 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; | 59 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; |
@@ -62,7 +63,7 @@ public: | |||
62 | }; | 63 | }; |
63 | 64 | ||
64 | 65 | ||
65 | Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) | 66 | Pipeline::Pipeline(const ResourceContext &context, const Sink::Log::Context &ctx) : QObject(nullptr), d(new Private(context, ctx)) |
66 | { | 67 | { |
67 | //Create main store immediately on first start | 68 | //Create main store immediately on first start |
68 | d->entityStore.startTransaction(DataStore::ReadWrite); | 69 | d->entityStore.startTransaction(DataStore::ReadWrite); |
@@ -90,7 +91,7 @@ void Pipeline::startTransaction() | |||
90 | // for (auto processor : d->processors[bufferType]) { | 91 | // for (auto processor : d->processors[bufferType]) { |
91 | // processor->startBatch(); | 92 | // processor->startBatch(); |
92 | // } | 93 | // } |
93 | SinkTrace() << "Starting transaction."; | 94 | SinkTraceCtx(d->logCtx) << "Starting transaction."; |
94 | d->transactionTime.start(); | 95 | d->transactionTime.start(); |
95 | d->transactionItemCount = 0; | 96 | d->transactionItemCount = 0; |
96 | d->entityStore.startTransaction(DataStore::ReadWrite); | 97 | d->entityStore.startTransaction(DataStore::ReadWrite); |
@@ -109,7 +110,7 @@ void Pipeline::commit() | |||
109 | } | 110 | } |
110 | const auto revision = d->entityStore.maxRevision(); | 111 | const auto revision = d->entityStore.maxRevision(); |
111 | const auto elapsed = d->transactionTime.elapsed(); | 112 | const auto elapsed = d->transactionTime.elapsed(); |
112 | SinkTrace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " | 113 | SinkTraceCtx(d->logCtx) << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " |
113 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; | 114 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; |
114 | d->entityStore.commitTransaction(); | 115 | d->entityStore.commitTransaction(); |
115 | if (d->revisionChanged) { | 116 | if (d->revisionChanged) { |
@@ -125,7 +126,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
125 | { | 126 | { |
126 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 127 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
127 | if (!Commands::VerifyCreateEntityBuffer(verifyer)) { | 128 | if (!Commands::VerifyCreateEntityBuffer(verifyer)) { |
128 | SinkWarning() << "invalid buffer, not a create entity buffer"; | 129 | SinkWarningCtx(d->logCtx) << "invalid buffer, not a create entity buffer"; |
129 | return KAsync::error<qint64>(0); | 130 | return KAsync::error<qint64>(0); |
130 | } | 131 | } |
131 | } | 132 | } |
@@ -137,7 +138,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
137 | if (createEntity->entityId()) { | 138 | if (createEntity->entityId()) { |
138 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); | 139 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); |
139 | if (d->entityStore.contains(bufferType, key)) { | 140 | if (d->entityStore.contains(bufferType, key)) { |
140 | SinkError() << "An entity with this id already exists: " << key; | 141 | SinkErrorCtx(d->logCtx) << "An entity with this id already exists: " << key; |
141 | return KAsync::error<qint64>(0); | 142 | return KAsync::error<qint64>(0); |
142 | } | 143 | } |
143 | } | 144 | } |
@@ -145,25 +146,25 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
145 | if (key.isEmpty()) { | 146 | if (key.isEmpty()) { |
146 | key = DataStore::generateUid(); | 147 | key = DataStore::generateUid(); |
147 | } | 148 | } |
148 | SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 149 | SinkTraceCtx(d->logCtx) << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
149 | Q_ASSERT(!key.isEmpty()); | 150 | Q_ASSERT(!key.isEmpty()); |
150 | 151 | ||
151 | { | 152 | { |
152 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 153 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
153 | if (!VerifyEntityBuffer(verifyer)) { | 154 | if (!VerifyEntityBuffer(verifyer)) { |
154 | SinkWarning() << "invalid buffer, not an entity buffer"; | 155 | SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer"; |
155 | return KAsync::error<qint64>(0); | 156 | return KAsync::error<qint64>(0); |
156 | } | 157 | } |
157 | } | 158 | } |
158 | auto entity = GetEntity(createEntity->delta()->Data()); | 159 | auto entity = GetEntity(createEntity->delta()->Data()); |
159 | if (!entity->resource()->size() && !entity->local()->size()) { | 160 | if (!entity->resource()->size() && !entity->local()->size()) { |
160 | SinkWarning() << "No local and no resource buffer while trying to create entity."; | 161 | SinkWarningCtx(d->logCtx) << "No local and no resource buffer while trying to create entity."; |
161 | return KAsync::error<qint64>(0); | 162 | return KAsync::error<qint64>(0); |
162 | } | 163 | } |
163 | 164 | ||
164 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); | 165 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); |
165 | if (!adaptorFactory) { | 166 | if (!adaptorFactory) { |
166 | SinkWarning() << "no adaptor factory for type " << bufferType; | 167 | SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType; |
167 | return KAsync::error<qint64>(0); | 168 | return KAsync::error<qint64>(0); |
168 | } | 169 | } |
169 | 170 | ||
@@ -203,7 +204,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
203 | { | 204 | { |
204 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 205 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
205 | if (!Commands::VerifyModifyEntityBuffer(verifyer)) { | 206 | if (!Commands::VerifyModifyEntityBuffer(verifyer)) { |
206 | SinkWarning() << "invalid buffer, not a modify entity buffer"; | 207 | SinkWarningCtx(d->logCtx) << "invalid buffer, not a modify entity buffer"; |
207 | return KAsync::error<qint64>(0); | 208 | return KAsync::error<qint64>(0); |
208 | } | 209 | } |
209 | } | 210 | } |
@@ -213,29 +214,29 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
213 | if (modifyEntity->modifiedProperties()) { | 214 | if (modifyEntity->modifiedProperties()) { |
214 | changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); | 215 | changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); |
215 | } else { | 216 | } else { |
216 | SinkWarning() << "No changeset available"; | 217 | SinkWarningCtx(d->logCtx) << "No changeset available"; |
217 | } | 218 | } |
218 | const qint64 baseRevision = modifyEntity->revision(); | 219 | const qint64 baseRevision = modifyEntity->revision(); |
219 | const bool replayToSource = modifyEntity->replayToSource(); | 220 | const bool replayToSource = modifyEntity->replayToSource(); |
220 | 221 | ||
221 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); | 222 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); |
222 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); | 223 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); |
223 | SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 224 | SinkTraceCtx(d->logCtx) << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
224 | if (bufferType.isEmpty() || key.isEmpty()) { | 225 | if (bufferType.isEmpty() || key.isEmpty()) { |
225 | SinkWarning() << "entity type or key " << bufferType << key; | 226 | SinkWarningCtx(d->logCtx) << "entity type or key " << bufferType << key; |
226 | return KAsync::error<qint64>(0); | 227 | return KAsync::error<qint64>(0); |
227 | } | 228 | } |
228 | { | 229 | { |
229 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); | 230 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); |
230 | if (!VerifyEntityBuffer(verifyer)) { | 231 | if (!VerifyEntityBuffer(verifyer)) { |
231 | SinkWarning() << "invalid buffer, not an entity buffer"; | 232 | SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer"; |
232 | return KAsync::error<qint64>(0); | 233 | return KAsync::error<qint64>(0); |
233 | } | 234 | } |
234 | } | 235 | } |
235 | 236 | ||
236 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); | 237 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); |
237 | if (!adaptorFactory) { | 238 | if (!adaptorFactory) { |
238 | SinkWarning() << "no adaptor factory for type " << bufferType; | 239 | SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType; |
239 | return KAsync::error<qint64>(0); | 240 | return KAsync::error<qint64>(0); |
240 | } | 241 | } |
241 | 242 | ||
@@ -255,7 +256,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
255 | auto changeset = diff.changedProperties(); | 256 | auto changeset = diff.changedProperties(); |
256 | const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); | 257 | const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); |
257 | if (current.identifier().isEmpty()) { | 258 | if (current.identifier().isEmpty()) { |
258 | SinkWarning() << "Failed to read current version: " << diff.identifier(); | 259 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); |
259 | return KAsync::error<qint64>(0); | 260 | return KAsync::error<qint64>(0); |
260 | } | 261 | } |
261 | 262 | ||
@@ -276,11 +277,11 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
276 | newEntity.setResource(targetResource); | 277 | newEntity.setResource(targetResource); |
277 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); | 278 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); |
278 | 279 | ||
279 | SinkTrace() << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; | 280 | SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; |
280 | auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); | 281 | auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); |
281 | job = job.syncThen<void>([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { | 282 | job = job.syncThen<void>([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { |
282 | if (!error) { | 283 | if (!error) { |
283 | SinkTrace() << "Move of " << current.identifier() << "was successfull"; | 284 | SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; |
284 | if (isMove) { | 285 | if (isMove) { |
285 | startTransaction(); | 286 | startTransaction(); |
286 | flatbuffers::FlatBufferBuilder fbb; | 287 | flatbuffers::FlatBufferBuilder fbb; |
@@ -293,7 +294,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
293 | commit(); | 294 | commit(); |
294 | } | 295 | } |
295 | } else { | 296 | } else { |
296 | SinkError() << "Failed to move entity " << targetResource << " to resource " << current.identifier(); | 297 | SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); |
297 | } | 298 | } |
298 | }); | 299 | }); |
299 | job.exec(); | 300 | job.exec(); |
@@ -321,7 +322,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
321 | { | 322 | { |
322 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 323 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
323 | if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { | 324 | if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { |
324 | SinkWarning() << "invalid buffer, not a delete entity buffer"; | 325 | SinkWarningCtx(d->logCtx) << "invalid buffer, not a delete entity buffer"; |
325 | return KAsync::error<qint64>(0); | 326 | return KAsync::error<qint64>(0); |
326 | } | 327 | } |
327 | } | 328 | } |
@@ -330,7 +331,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
330 | const bool replayToSource = deleteEntity->replayToSource(); | 331 | const bool replayToSource = deleteEntity->replayToSource(); |
331 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); | 332 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); |
332 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 333 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
333 | SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 334 | SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
334 | 335 | ||
335 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { | 336 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { |
336 | foreach (const auto &processor, d->processors[bufferType]) { | 337 | foreach (const auto &processor, d->processors[bufferType]) { |