diff options
-rw-r--r-- | common/pipeline.cpp | 45 | ||||
-rw-r--r-- | common/pipeline.h | 17 |
2 files changed, 48 insertions, 14 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 049d228..2c08aa0 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -181,16 +181,6 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
181 | key = Sink::Storage::generateUid(); | 181 | key = Sink::Storage::generateUid(); |
182 | } | 182 | } |
183 | Q_ASSERT(!key.isEmpty()); | 183 | Q_ASSERT(!key.isEmpty()); |
184 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; | ||
185 | |||
186 | // Add metadata buffer | ||
187 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
188 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
189 | metadataBuilder.add_revision(newRevision); | ||
190 | metadataBuilder.add_operation(Operation_Creation); | ||
191 | metadataBuilder.add_replayToSource(replayToSource); | ||
192 | auto metadataBuffer = metadataBuilder.Finish(); | ||
193 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
194 | 184 | ||
195 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 185 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
196 | if (!adaptorFactory) { | 186 | if (!adaptorFactory) { |
@@ -201,8 +191,22 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
201 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 191 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
202 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); | 192 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); |
203 | for (auto processor : d->processors[bufferType]) { | 193 | for (auto processor : d->processors[bufferType]) { |
204 | processor->newEntity(key, newRevision, *memoryAdaptor, d->transaction); | 194 | processor->resourceType = d->resourceType; |
195 | processor->pipeline = this; | ||
196 | processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); | ||
205 | } | 197 | } |
198 | //The maxRevision may have changed meanwhile if the entity created sub-entities | ||
199 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; | ||
200 | |||
201 | // Add metadata buffer | ||
202 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
203 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
204 | metadataBuilder.add_revision(newRevision); | ||
205 | metadataBuilder.add_operation(Operation_Creation); | ||
206 | metadataBuilder.add_replayToSource(replayToSource); | ||
207 | auto metadataBuffer = metadataBuilder.Finish(); | ||
208 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
209 | |||
206 | flatbuffers::FlatBufferBuilder fbb; | 210 | flatbuffers::FlatBufferBuilder fbb; |
207 | adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 211 | adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
208 | 212 | ||
@@ -461,6 +465,25 @@ void Preprocessor::finalize() | |||
461 | { | 465 | { |
462 | } | 466 | } |
463 | 467 | ||
468 | void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName) | ||
469 | { | ||
470 | flatbuffers::FlatBufferBuilder entityFbb; | ||
471 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(resourceType, typeName); | ||
472 | adaptorFactory->createBuffer(entity, entityFbb); | ||
473 | const auto entityBuffer = BufferUtils::extractBuffer(entityFbb); | ||
474 | |||
475 | flatbuffers::FlatBufferBuilder fbb; | ||
476 | auto entityId = fbb.CreateString(entity.identifier()); | ||
477 | // This is the resource buffer type and not the domain type | ||
478 | auto type = fbb.CreateString(typeName); | ||
479 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size()); | ||
480 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); | ||
481 | Sink::Commands::FinishCreateEntityBuffer(fbb, location); | ||
482 | |||
483 | const auto data = BufferUtils::extractBuffer(fbb); | ||
484 | pipeline->newEntity(data, data.size()).exec(); | ||
485 | } | ||
486 | |||
464 | } // namespace Sink | 487 | } // namespace Sink |
465 | 488 | ||
466 | #pragma clang diagnostic push | 489 | #pragma clang diagnostic push |
diff --git a/common/pipeline.h b/common/pipeline.h index 2ca87a4..c3abcf6 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -82,15 +82,26 @@ public: | |||
82 | virtual ~Preprocessor(); | 82 | virtual ~Preprocessor(); |
83 | 83 | ||
84 | virtual void startBatch(); | 84 | virtual void startBatch(); |
85 | virtual void newEntity(const QByteArray &key, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; | 85 | virtual void newEntity(const QByteArray &key, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) {}; |
86 | virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, | 86 | virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, |
87 | Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; | 87 | Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) {}; |
88 | virtual void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) = 0; | 88 | virtual void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) {}; |
89 | virtual void finalize(); | 89 | virtual void finalize(); |
90 | 90 | ||
91 | protected: | ||
92 | template <typename DomainType> | ||
93 | void createEntity(const DomainType &entity) | ||
94 | { | ||
95 | createEntity(entity, Sink::ApplicationDomain::getTypeName<DomainType>()); | ||
96 | } | ||
97 | void createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &type); | ||
98 | |||
91 | private: | 99 | private: |
100 | friend class Pipeline; | ||
92 | class Private; | 101 | class Private; |
93 | Private *const d; | 102 | Private *const d; |
103 | Pipeline *pipeline; | ||
104 | QByteArray resourceType; | ||
94 | }; | 105 | }; |
95 | 106 | ||
96 | } // namespace Sink | 107 | } // namespace Sink |