summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp45
1 files changed, 34 insertions, 11 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 049d228..2c08aa0 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -181,16 +181,6 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
181 key = Sink::Storage::generateUid(); 181 key = Sink::Storage::generateUid();
182 } 182 }
183 Q_ASSERT(!key.isEmpty()); 183 Q_ASSERT(!key.isEmpty());
184 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
185
186 // Add metadata buffer
187 flatbuffers::FlatBufferBuilder metadataFbb;
188 auto metadataBuilder = MetadataBuilder(metadataFbb);
189 metadataBuilder.add_revision(newRevision);
190 metadataBuilder.add_operation(Operation_Creation);
191 metadataBuilder.add_replayToSource(replayToSource);
192 auto metadataBuffer = metadataBuilder.Finish();
193 FinishMetadataBuffer(metadataFbb, metadataBuffer);
194 184
195 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 185 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
196 if (!adaptorFactory) { 186 if (!adaptorFactory) {
@@ -201,8 +191,22 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
201 auto adaptor = adaptorFactory->createAdaptor(*entity); 191 auto adaptor = adaptorFactory->createAdaptor(*entity);
202 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); 192 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties());
203 for (auto processor : d->processors[bufferType]) { 193 for (auto processor : d->processors[bufferType]) {
204 processor->newEntity(key, newRevision, *memoryAdaptor, d->transaction); 194 processor->resourceType = d->resourceType;
195 processor->pipeline = this;
196 processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction);
205 } 197 }
198 //The maxRevision may have changed meanwhile if the entity created sub-entities
199 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
200
201 // Add metadata buffer
202 flatbuffers::FlatBufferBuilder metadataFbb;
203 auto metadataBuilder = MetadataBuilder(metadataFbb);
204 metadataBuilder.add_revision(newRevision);
205 metadataBuilder.add_operation(Operation_Creation);
206 metadataBuilder.add_replayToSource(replayToSource);
207 auto metadataBuffer = metadataBuilder.Finish();
208 FinishMetadataBuffer(metadataFbb, metadataBuffer);
209
206 flatbuffers::FlatBufferBuilder fbb; 210 flatbuffers::FlatBufferBuilder fbb;
207 adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 211 adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
208 212
@@ -461,6 +465,25 @@ void Preprocessor::finalize()
461{ 465{
462} 466}
463 467
468void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName)
469{
470 flatbuffers::FlatBufferBuilder entityFbb;
471 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(resourceType, typeName);
472 adaptorFactory->createBuffer(entity, entityFbb);
473 const auto entityBuffer = BufferUtils::extractBuffer(entityFbb);
474
475 flatbuffers::FlatBufferBuilder fbb;
476 auto entityId = fbb.CreateString(entity.identifier());
477 // This is the resource buffer type and not the domain type
478 auto type = fbb.CreateString(typeName);
479 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size());
480 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta);
481 Sink::Commands::FinishCreateEntityBuffer(fbb, location);
482
483 const auto data = BufferUtils::extractBuffer(fbb);
484 pipeline->newEntity(data, data.size()).exec();
485}
486
464} // namespace Sink 487} // namespace Sink
465 488
466#pragma clang diagnostic push 489#pragma clang diagnostic push