summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp17
1 files changed, 10 insertions, 7 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 637a1b8..7863f67 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -34,6 +34,7 @@
34#include "entitybuffer.h" 34#include "entitybuffer.h"
35#include "log.h" 35#include "log.h"
36#include "domain/applicationdomaintype.h" 36#include "domain/applicationdomaintype.h"
37#include "adaptorfactoryregistry.h"
37#include "definitions.h" 38#include "definitions.h"
38#include "bufferutils.h" 39#include "bufferutils.h"
39 40
@@ -52,11 +53,11 @@ public:
52 Storage storage; 53 Storage storage;
53 Storage::Transaction transaction; 54 Storage::Transaction transaction;
54 QHash<QString, QVector<Preprocessor *>> processors; 55 QHash<QString, QVector<Preprocessor *>> processors;
55 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory;
56 bool revisionChanged; 56 bool revisionChanged;
57 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 57 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
58 QTime transactionTime; 58 QTime transactionTime;
59 int transactionItemCount; 59 int transactionItemCount;
60 QByteArray resourceType;
60}; 61};
61 62
62void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
@@ -84,9 +85,9 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preproc
84 d->processors[entityType] = processors; 85 d->processors[entityType] = processors;
85} 86}
86 87
87void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) 88void Pipeline::setResourceType(const QByteArray &resourceType)
88{ 89{
89 d->adaptorFactory.insert(entityType, factory); 90 d->resourceType = resourceType;
90} 91}
91 92
92void Pipeline::startTransaction() 93void Pipeline::startTransaction()
@@ -102,7 +103,9 @@ void Pipeline::startTransaction()
102 Trace() << "Starting transaction."; 103 Trace() << "Starting transaction.";
103 d->transactionTime.start(); 104 d->transactionTime.start();
104 d->transactionItemCount = 0; 105 d->transactionItemCount = 0;
105 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite)); 106 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
107 Warning() << error.message;
108 }));
106} 109}
107 110
108void Pipeline::commit() 111void Pipeline::commit()
@@ -189,7 +192,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
189 auto metadataBuffer = metadataBuilder.Finish(); 192 auto metadataBuffer = metadataBuilder.Finish();
190 FinishMetadataBuffer(metadataFbb, metadataBuffer); 193 FinishMetadataBuffer(metadataFbb, metadataBuffer);
191 194
192 auto adaptorFactory = d->adaptorFactory.value(bufferType); 195 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
193 if (!adaptorFactory) { 196 if (!adaptorFactory) {
194 Warning() << "no adaptor factory for type " << bufferType; 197 Warning() << "no adaptor factory for type " << bufferType;
195 return KAsync::error<qint64>(0); 198 return KAsync::error<qint64>(0);
@@ -244,7 +247,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
244 } 247 }
245 248
246 // TODO use only readPropertyMapper and writePropertyMapper 249 // TODO use only readPropertyMapper and writePropertyMapper
247 auto adaptorFactory = d->adaptorFactory.value(bufferType); 250 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
248 if (!adaptorFactory) { 251 if (!adaptorFactory) {
249 Warning() << "no adaptor factory for type " << bufferType; 252 Warning() << "no adaptor factory for type " << bufferType;
250 return KAsync::error<qint64>(0); 253 return KAsync::error<qint64>(0);
@@ -373,7 +376,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
373 flatbuffers::FlatBufferBuilder fbb; 376 flatbuffers::FlatBufferBuilder fbb;
374 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 377 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
375 378
376 auto adaptorFactory = d->adaptorFactory.value(bufferType); 379 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
377 if (!adaptorFactory) { 380 if (!adaptorFactory) {
378 Warning() << "no adaptor factory for type " << bufferType; 381 Warning() << "no adaptor factory for type " << bufferType;
379 return KAsync::error<qint64>(0); 382 return KAsync::error<qint64>(0);