diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 17 |
1 files changed, 10 insertions, 7 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 637a1b8..7863f67 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -34,6 +34,7 @@ | |||
34 | #include "entitybuffer.h" | 34 | #include "entitybuffer.h" |
35 | #include "log.h" | 35 | #include "log.h" |
36 | #include "domain/applicationdomaintype.h" | 36 | #include "domain/applicationdomaintype.h" |
37 | #include "adaptorfactoryregistry.h" | ||
37 | #include "definitions.h" | 38 | #include "definitions.h" |
38 | #include "bufferutils.h" | 39 | #include "bufferutils.h" |
39 | 40 | ||
@@ -52,11 +53,11 @@ public: | |||
52 | Storage storage; | 53 | Storage storage; |
53 | Storage::Transaction transaction; | 54 | Storage::Transaction transaction; |
54 | QHash<QString, QVector<Preprocessor *>> processors; | 55 | QHash<QString, QVector<Preprocessor *>> processors; |
55 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; | ||
56 | bool revisionChanged; | 56 | bool revisionChanged; |
57 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | 57 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); |
58 | QTime transactionTime; | 58 | QTime transactionTime; |
59 | int transactionItemCount; | 59 | int transactionItemCount; |
60 | QByteArray resourceType; | ||
60 | }; | 61 | }; |
61 | 62 | ||
62 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
@@ -84,9 +85,9 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preproc | |||
84 | d->processors[entityType] = processors; | 85 | d->processors[entityType] = processors; |
85 | } | 86 | } |
86 | 87 | ||
87 | void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) | 88 | void Pipeline::setResourceType(const QByteArray &resourceType) |
88 | { | 89 | { |
89 | d->adaptorFactory.insert(entityType, factory); | 90 | d->resourceType = resourceType; |
90 | } | 91 | } |
91 | 92 | ||
92 | void Pipeline::startTransaction() | 93 | void Pipeline::startTransaction() |
@@ -102,7 +103,9 @@ void Pipeline::startTransaction() | |||
102 | Trace() << "Starting transaction."; | 103 | Trace() << "Starting transaction."; |
103 | d->transactionTime.start(); | 104 | d->transactionTime.start(); |
104 | d->transactionItemCount = 0; | 105 | d->transactionItemCount = 0; |
105 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite)); | 106 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { |
107 | Warning() << error.message; | ||
108 | })); | ||
106 | } | 109 | } |
107 | 110 | ||
108 | void Pipeline::commit() | 111 | void Pipeline::commit() |
@@ -189,7 +192,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
189 | auto metadataBuffer = metadataBuilder.Finish(); | 192 | auto metadataBuffer = metadataBuilder.Finish(); |
190 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | 193 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
191 | 194 | ||
192 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 195 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
193 | if (!adaptorFactory) { | 196 | if (!adaptorFactory) { |
194 | Warning() << "no adaptor factory for type " << bufferType; | 197 | Warning() << "no adaptor factory for type " << bufferType; |
195 | return KAsync::error<qint64>(0); | 198 | return KAsync::error<qint64>(0); |
@@ -244,7 +247,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
244 | } | 247 | } |
245 | 248 | ||
246 | // TODO use only readPropertyMapper and writePropertyMapper | 249 | // TODO use only readPropertyMapper and writePropertyMapper |
247 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 250 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
248 | if (!adaptorFactory) { | 251 | if (!adaptorFactory) { |
249 | Warning() << "no adaptor factory for type " << bufferType; | 252 | Warning() << "no adaptor factory for type " << bufferType; |
250 | return KAsync::error<qint64>(0); | 253 | return KAsync::error<qint64>(0); |
@@ -373,7 +376,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
373 | flatbuffers::FlatBufferBuilder fbb; | 376 | flatbuffers::FlatBufferBuilder fbb; |
374 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | 377 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); |
375 | 378 | ||
376 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 379 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
377 | if (!adaptorFactory) { | 380 | if (!adaptorFactory) { |
378 | Warning() << "no adaptor factory for type " << bufferType; | 381 | Warning() << "no adaptor factory for type " << bufferType; |
379 | return KAsync::error<qint64>(0); | 382 | return KAsync::error<qint64>(0); |