diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 45 |
1 files changed, 34 insertions, 11 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 049d228..2c08aa0 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -181,16 +181,6 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
181 | key = Sink::Storage::generateUid(); | 181 | key = Sink::Storage::generateUid(); |
182 | } | 182 | } |
183 | Q_ASSERT(!key.isEmpty()); | 183 | Q_ASSERT(!key.isEmpty()); |
184 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; | ||
185 | |||
186 | // Add metadata buffer | ||
187 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
188 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
189 | metadataBuilder.add_revision(newRevision); | ||
190 | metadataBuilder.add_operation(Operation_Creation); | ||
191 | metadataBuilder.add_replayToSource(replayToSource); | ||
192 | auto metadataBuffer = metadataBuilder.Finish(); | ||
193 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
194 | 184 | ||
195 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 185 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
196 | if (!adaptorFactory) { | 186 | if (!adaptorFactory) { |
@@ -201,8 +191,22 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
201 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 191 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
202 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); | 192 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); |
203 | for (auto processor : d->processors[bufferType]) { | 193 | for (auto processor : d->processors[bufferType]) { |
204 | processor->newEntity(key, newRevision, *memoryAdaptor, d->transaction); | 194 | processor->resourceType = d->resourceType; |
195 | processor->pipeline = this; | ||
196 | processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); | ||
205 | } | 197 | } |
198 | //The maxRevision may have changed meanwhile if the entity created sub-entities | ||
199 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; | ||
200 | |||
201 | // Add metadata buffer | ||
202 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
203 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
204 | metadataBuilder.add_revision(newRevision); | ||
205 | metadataBuilder.add_operation(Operation_Creation); | ||
206 | metadataBuilder.add_replayToSource(replayToSource); | ||
207 | auto metadataBuffer = metadataBuilder.Finish(); | ||
208 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
209 | |||
206 | flatbuffers::FlatBufferBuilder fbb; | 210 | flatbuffers::FlatBufferBuilder fbb; |
207 | adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 211 | adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
208 | 212 | ||
@@ -461,6 +465,25 @@ void Preprocessor::finalize() | |||
461 | { | 465 | { |
462 | } | 466 | } |
463 | 467 | ||
468 | void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName) | ||
469 | { | ||
470 | flatbuffers::FlatBufferBuilder entityFbb; | ||
471 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(resourceType, typeName); | ||
472 | adaptorFactory->createBuffer(entity, entityFbb); | ||
473 | const auto entityBuffer = BufferUtils::extractBuffer(entityFbb); | ||
474 | |||
475 | flatbuffers::FlatBufferBuilder fbb; | ||
476 | auto entityId = fbb.CreateString(entity.identifier()); | ||
477 | // This is the resource buffer type and not the domain type | ||
478 | auto type = fbb.CreateString(typeName); | ||
479 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size()); | ||
480 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); | ||
481 | Sink::Commands::FinishCreateEntityBuffer(fbb, location); | ||
482 | |||
483 | const auto data = BufferUtils::extractBuffer(fbb); | ||
484 | pipeline->newEntity(data, data.size()).exec(); | ||
485 | } | ||
486 | |||
464 | } // namespace Sink | 487 | } // namespace Sink |
465 | 488 | ||
466 | #pragma clang diagnostic push | 489 | #pragma clang diagnostic push |