diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-06-19 12:27:38 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-06-19 12:27:38 +0200 |
commit | b289dd9bf618a2512cbf15b2a6fc4ae77e4792c8 (patch) | |
tree | f303b59e64026e841308d654c60aa8d1475e07ff /common/pipeline.cpp | |
parent | 207effdd7112141ad4fc5cdd46f332870a0c065c (diff) | |
download | sink-b289dd9bf618a2512cbf15b2a6fc4ae77e4792c8.tar.gz sink-b289dd9bf618a2512cbf15b2a6fc4ae77e4792c8.zip |
Moved mailpreprocessors to a shared location
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 36 |
1 files changed, 28 insertions, 8 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 5d8a34c..63a60ce 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -46,7 +46,7 @@ namespace Sink { | |||
46 | class Pipeline::Private | 46 | class Pipeline::Private |
47 | { | 47 | { |
48 | public: | 48 | public: |
49 | Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false) | 49 | Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false), resourceInstanceIdentifier(resourceName.toUtf8()) |
50 | { | 50 | { |
51 | } | 51 | } |
52 | 52 | ||
@@ -58,6 +58,7 @@ public: | |||
58 | QTime transactionTime; | 58 | QTime transactionTime; |
59 | int transactionItemCount; | 59 | int transactionItemCount; |
60 | QByteArray resourceType; | 60 | QByteArray resourceType; |
61 | QByteArray resourceInstanceIdentifier; | ||
61 | }; | 62 | }; |
62 | 63 | ||
63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 64 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
@@ -82,6 +83,9 @@ Pipeline::~Pipeline() | |||
82 | 83 | ||
83 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) | 84 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) |
84 | { | 85 | { |
86 | for (auto p : processors) { | ||
87 | p->setup(d->resourceType, d->resourceInstanceIdentifier, this); | ||
88 | } | ||
85 | d->processors[entityType] = processors; | 89 | d->processors[entityType] = processors; |
86 | } | 90 | } |
87 | 91 | ||
@@ -191,8 +195,6 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
191 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 195 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
192 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); | 196 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); |
193 | for (auto processor : d->processors[bufferType]) { | 197 | for (auto processor : d->processors[bufferType]) { |
194 | processor->resourceType = d->resourceType; | ||
195 | processor->pipeline = this; | ||
196 | processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); | 198 | processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); |
197 | } | 199 | } |
198 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 200 | //The maxRevision may have changed meanwhile if the entity created sub-entities |
@@ -303,8 +305,6 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
303 | } | 305 | } |
304 | 306 | ||
305 | for (auto processor : d->processors[bufferType]) { | 307 | for (auto processor : d->processors[bufferType]) { |
306 | processor->resourceType = d->resourceType; | ||
307 | processor->pipeline = this; | ||
308 | processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); | 308 | processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); |
309 | } | 309 | } |
310 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 310 | //The maxRevision may have changed meanwhile if the entity created sub-entities |
@@ -452,12 +452,27 @@ qint64 Pipeline::cleanedUpRevision() | |||
452 | return Storage::cleanedUpRevision(d->transaction); | 452 | return Storage::cleanedUpRevision(d->transaction); |
453 | } | 453 | } |
454 | 454 | ||
455 | Preprocessor::Preprocessor() : d(0) | 455 | class Preprocessor::Private { |
456 | public: | ||
457 | QByteArray resourceType; | ||
458 | QByteArray resourceInstanceIdentifier; | ||
459 | Pipeline *pipeline; | ||
460 | }; | ||
461 | |||
462 | Preprocessor::Preprocessor() : d(new Preprocessor::Private) | ||
456 | { | 463 | { |
457 | } | 464 | } |
458 | 465 | ||
459 | Preprocessor::~Preprocessor() | 466 | Preprocessor::~Preprocessor() |
460 | { | 467 | { |
468 | delete d; | ||
469 | } | ||
470 | |||
471 | void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline) | ||
472 | { | ||
473 | d->resourceType = resourceType; | ||
474 | d->resourceInstanceIdentifier = resourceInstanceIdentifier; | ||
475 | d->pipeline = pipeline; | ||
461 | } | 476 | } |
462 | 477 | ||
463 | void Preprocessor::startBatch() | 478 | void Preprocessor::startBatch() |
@@ -468,10 +483,15 @@ void Preprocessor::finalize() | |||
468 | { | 483 | { |
469 | } | 484 | } |
470 | 485 | ||
486 | QByteArray Preprocessor::resourceInstanceIdentifier() const | ||
487 | { | ||
488 | return d->resourceInstanceIdentifier; | ||
489 | } | ||
490 | |||
471 | void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName) | 491 | void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName) |
472 | { | 492 | { |
473 | flatbuffers::FlatBufferBuilder entityFbb; | 493 | flatbuffers::FlatBufferBuilder entityFbb; |
474 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(resourceType, typeName); | 494 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, typeName); |
475 | adaptorFactory->createBuffer(entity, entityFbb); | 495 | adaptorFactory->createBuffer(entity, entityFbb); |
476 | const auto entityBuffer = BufferUtils::extractBuffer(entityFbb); | 496 | const auto entityBuffer = BufferUtils::extractBuffer(entityFbb); |
477 | 497 | ||
@@ -484,7 +504,7 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain | |||
484 | Sink::Commands::FinishCreateEntityBuffer(fbb, location); | 504 | Sink::Commands::FinishCreateEntityBuffer(fbb, location); |
485 | 505 | ||
486 | const auto data = BufferUtils::extractBuffer(fbb); | 506 | const auto data = BufferUtils::extractBuffer(fbb); |
487 | pipeline->newEntity(data, data.size()).exec(); | 507 | d->pipeline->newEntity(data, data.size()).exec(); |
488 | } | 508 | } |
489 | 509 | ||
490 | } // namespace Sink | 510 | } // namespace Sink |