summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-10 15:28:06 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-10 15:28:06 +0200
commitce0feb3ef62c9438b0aedd601461cbb340faa021 (patch)
treea29643b36b5306decefb216b06071dab80d76ddf
parent555c373a0c4dfe386dcd2c88ae9548d95e307409 (diff)
downloadsink-ce0feb3ef62c9438b0aedd601461cbb340faa021.tar.gz
sink-ce0feb3ef62c9438b0aedd601461cbb340faa021.zip
Allow preprocessors to inject entities.
Currently only working when creating an entity, the new entity is always preprended in the store.
-rw-r--r--common/pipeline.cpp45
-rw-r--r--common/pipeline.h17
2 files changed, 48 insertions, 14 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 049d228..2c08aa0 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -181,16 +181,6 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
181 key = Sink::Storage::generateUid(); 181 key = Sink::Storage::generateUid();
182 } 182 }
183 Q_ASSERT(!key.isEmpty()); 183 Q_ASSERT(!key.isEmpty());
184 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
185
186 // Add metadata buffer
187 flatbuffers::FlatBufferBuilder metadataFbb;
188 auto metadataBuilder = MetadataBuilder(metadataFbb);
189 metadataBuilder.add_revision(newRevision);
190 metadataBuilder.add_operation(Operation_Creation);
191 metadataBuilder.add_replayToSource(replayToSource);
192 auto metadataBuffer = metadataBuilder.Finish();
193 FinishMetadataBuffer(metadataFbb, metadataBuffer);
194 184
195 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 185 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
196 if (!adaptorFactory) { 186 if (!adaptorFactory) {
@@ -201,8 +191,22 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
201 auto adaptor = adaptorFactory->createAdaptor(*entity); 191 auto adaptor = adaptorFactory->createAdaptor(*entity);
202 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); 192 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties());
203 for (auto processor : d->processors[bufferType]) { 193 for (auto processor : d->processors[bufferType]) {
204 processor->newEntity(key, newRevision, *memoryAdaptor, d->transaction); 194 processor->resourceType = d->resourceType;
195 processor->pipeline = this;
196 processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction);
205 } 197 }
198 //The maxRevision may have changed meanwhile if the entity created sub-entities
199 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
200
201 // Add metadata buffer
202 flatbuffers::FlatBufferBuilder metadataFbb;
203 auto metadataBuilder = MetadataBuilder(metadataFbb);
204 metadataBuilder.add_revision(newRevision);
205 metadataBuilder.add_operation(Operation_Creation);
206 metadataBuilder.add_replayToSource(replayToSource);
207 auto metadataBuffer = metadataBuilder.Finish();
208 FinishMetadataBuffer(metadataFbb, metadataBuffer);
209
206 flatbuffers::FlatBufferBuilder fbb; 210 flatbuffers::FlatBufferBuilder fbb;
207 adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 211 adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
208 212
@@ -461,6 +465,25 @@ void Preprocessor::finalize()
461{ 465{
462} 466}
463 467
468void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName)
469{
470 flatbuffers::FlatBufferBuilder entityFbb;
471 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(resourceType, typeName);
472 adaptorFactory->createBuffer(entity, entityFbb);
473 const auto entityBuffer = BufferUtils::extractBuffer(entityFbb);
474
475 flatbuffers::FlatBufferBuilder fbb;
476 auto entityId = fbb.CreateString(entity.identifier());
477 // This is the resource buffer type and not the domain type
478 auto type = fbb.CreateString(typeName);
479 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size());
480 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta);
481 Sink::Commands::FinishCreateEntityBuffer(fbb, location);
482
483 const auto data = BufferUtils::extractBuffer(fbb);
484 pipeline->newEntity(data, data.size()).exec();
485}
486
464} // namespace Sink 487} // namespace Sink
465 488
466#pragma clang diagnostic push 489#pragma clang diagnostic push
diff --git a/common/pipeline.h b/common/pipeline.h
index 2ca87a4..c3abcf6 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -82,15 +82,26 @@ public:
82 virtual ~Preprocessor(); 82 virtual ~Preprocessor();
83 83
84 virtual void startBatch(); 84 virtual void startBatch();
85 virtual void newEntity(const QByteArray &key, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; 85 virtual void newEntity(const QByteArray &key, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) {};
86 virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, 86 virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity,
87 Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; 87 Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) {};
88 virtual void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) = 0; 88 virtual void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) {};
89 virtual void finalize(); 89 virtual void finalize();
90 90
91protected:
92 template <typename DomainType>
93 void createEntity(const DomainType &entity)
94 {
95 createEntity(entity, Sink::ApplicationDomain::getTypeName<DomainType>());
96 }
97 void createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &type);
98
91private: 99private:
100 friend class Pipeline;
92 class Private; 101 class Private;
93 Private *const d; 102 Private *const d;
103 Pipeline *pipeline;
104 QByteArray resourceType;
94}; 105};
95 106
96} // namespace Sink 107} // namespace Sink