summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-19 12:27:38 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-19 12:27:38 +0200
commitb289dd9bf618a2512cbf15b2a6fc4ae77e4792c8 (patch)
treef303b59e64026e841308d654c60aa8d1475e07ff /common/pipeline.cpp
parent207effdd7112141ad4fc5cdd46f332870a0c065c (diff)
downloadsink-b289dd9bf618a2512cbf15b2a6fc4ae77e4792c8.tar.gz
sink-b289dd9bf618a2512cbf15b2a6fc4ae77e4792c8.zip
Moved mailpreprocessors to a shared location
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp36
1 files changed, 28 insertions, 8 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 5d8a34c..63a60ce 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -46,7 +46,7 @@ namespace Sink {
46class Pipeline::Private 46class Pipeline::Private
47{ 47{
48public: 48public:
49 Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false) 49 Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false), resourceInstanceIdentifier(resourceName.toUtf8())
50 { 50 {
51 } 51 }
52 52
@@ -58,6 +58,7 @@ public:
58 QTime transactionTime; 58 QTime transactionTime;
59 int transactionItemCount; 59 int transactionItemCount;
60 QByteArray resourceType; 60 QByteArray resourceType;
61 QByteArray resourceInstanceIdentifier;
61}; 62};
62 63
63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 64void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
@@ -82,6 +83,9 @@ Pipeline::~Pipeline()
82 83
83void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) 84void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors)
84{ 85{
86 for (auto p : processors) {
87 p->setup(d->resourceType, d->resourceInstanceIdentifier, this);
88 }
85 d->processors[entityType] = processors; 89 d->processors[entityType] = processors;
86} 90}
87 91
@@ -191,8 +195,6 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
191 auto adaptor = adaptorFactory->createAdaptor(*entity); 195 auto adaptor = adaptorFactory->createAdaptor(*entity);
192 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); 196 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties());
193 for (auto processor : d->processors[bufferType]) { 197 for (auto processor : d->processors[bufferType]) {
194 processor->resourceType = d->resourceType;
195 processor->pipeline = this;
196 processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); 198 processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction);
197 } 199 }
198 //The maxRevision may have changed meanwhile if the entity created sub-entities 200 //The maxRevision may have changed meanwhile if the entity created sub-entities
@@ -303,8 +305,6 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
303 } 305 }
304 306
305 for (auto processor : d->processors[bufferType]) { 307 for (auto processor : d->processors[bufferType]) {
306 processor->resourceType = d->resourceType;
307 processor->pipeline = this;
308 processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); 308 processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction);
309 } 309 }
310 //The maxRevision may have changed meanwhile if the entity created sub-entities 310 //The maxRevision may have changed meanwhile if the entity created sub-entities
@@ -452,12 +452,27 @@ qint64 Pipeline::cleanedUpRevision()
452 return Storage::cleanedUpRevision(d->transaction); 452 return Storage::cleanedUpRevision(d->transaction);
453} 453}
454 454
455Preprocessor::Preprocessor() : d(0) 455class Preprocessor::Private {
456public:
457 QByteArray resourceType;
458 QByteArray resourceInstanceIdentifier;
459 Pipeline *pipeline;
460};
461
462Preprocessor::Preprocessor() : d(new Preprocessor::Private)
456{ 463{
457} 464}
458 465
459Preprocessor::~Preprocessor() 466Preprocessor::~Preprocessor()
460{ 467{
468 delete d;
469}
470
471void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline)
472{
473 d->resourceType = resourceType;
474 d->resourceInstanceIdentifier = resourceInstanceIdentifier;
475 d->pipeline = pipeline;
461} 476}
462 477
463void Preprocessor::startBatch() 478void Preprocessor::startBatch()
@@ -468,10 +483,15 @@ void Preprocessor::finalize()
468{ 483{
469} 484}
470 485
486QByteArray Preprocessor::resourceInstanceIdentifier() const
487{
488 return d->resourceInstanceIdentifier;
489}
490
471void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName) 491void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName)
472{ 492{
473 flatbuffers::FlatBufferBuilder entityFbb; 493 flatbuffers::FlatBufferBuilder entityFbb;
474 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(resourceType, typeName); 494 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, typeName);
475 adaptorFactory->createBuffer(entity, entityFbb); 495 adaptorFactory->createBuffer(entity, entityFbb);
476 const auto entityBuffer = BufferUtils::extractBuffer(entityFbb); 496 const auto entityBuffer = BufferUtils::extractBuffer(entityFbb);
477 497
@@ -484,7 +504,7 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain
484 Sink::Commands::FinishCreateEntityBuffer(fbb, location); 504 Sink::Commands::FinishCreateEntityBuffer(fbb, location);
485 505
486 const auto data = BufferUtils::extractBuffer(fbb); 506 const auto data = BufferUtils::extractBuffer(fbb);
487 pipeline->newEntity(data, data.size()).exec(); 507 d->pipeline->newEntity(data, data.size()).exec();
488} 508}
489 509
490} // namespace Sink 510} // namespace Sink