summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-28 00:24:53 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-28 00:24:53 +0200
commite9c75177590d8546ebd9425f16c4269a9c92f517 (patch)
tree8a953631e467d9df50657e22bd90954b7b71c990 /common/pipeline.cpp
parent8f01eb530262d1442fc4fa0782a41e052412d43b (diff)
downloadsink-e9c75177590d8546ebd9425f16c4269a9c92f517.tar.gz
sink-e9c75177590d8546ebd9425f16c4269a9c92f517.zip
Refactored the generic resource to use separate classes for
changereplay and synchronization. This cleans up the API and avoids the excessive passing around of transactions. It also provides more flexibility in eventually using different synchronization strategies for different resources.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp17
1 files changed, 10 insertions, 7 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 637a1b8..7863f67 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -34,6 +34,7 @@
34#include "entitybuffer.h" 34#include "entitybuffer.h"
35#include "log.h" 35#include "log.h"
36#include "domain/applicationdomaintype.h" 36#include "domain/applicationdomaintype.h"
37#include "adaptorfactoryregistry.h"
37#include "definitions.h" 38#include "definitions.h"
38#include "bufferutils.h" 39#include "bufferutils.h"
39 40
@@ -52,11 +53,11 @@ public:
52 Storage storage; 53 Storage storage;
53 Storage::Transaction transaction; 54 Storage::Transaction transaction;
54 QHash<QString, QVector<Preprocessor *>> processors; 55 QHash<QString, QVector<Preprocessor *>> processors;
55 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory;
56 bool revisionChanged; 56 bool revisionChanged;
57 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 57 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
58 QTime transactionTime; 58 QTime transactionTime;
59 int transactionItemCount; 59 int transactionItemCount;
60 QByteArray resourceType;
60}; 61};
61 62
62void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
@@ -84,9 +85,9 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preproc
84 d->processors[entityType] = processors; 85 d->processors[entityType] = processors;
85} 86}
86 87
87void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) 88void Pipeline::setResourceType(const QByteArray &resourceType)
88{ 89{
89 d->adaptorFactory.insert(entityType, factory); 90 d->resourceType = resourceType;
90} 91}
91 92
92void Pipeline::startTransaction() 93void Pipeline::startTransaction()
@@ -102,7 +103,9 @@ void Pipeline::startTransaction()
102 Trace() << "Starting transaction."; 103 Trace() << "Starting transaction.";
103 d->transactionTime.start(); 104 d->transactionTime.start();
104 d->transactionItemCount = 0; 105 d->transactionItemCount = 0;
105 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite)); 106 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
107 Warning() << error.message;
108 }));
106} 109}
107 110
108void Pipeline::commit() 111void Pipeline::commit()
@@ -189,7 +192,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
189 auto metadataBuffer = metadataBuilder.Finish(); 192 auto metadataBuffer = metadataBuilder.Finish();
190 FinishMetadataBuffer(metadataFbb, metadataBuffer); 193 FinishMetadataBuffer(metadataFbb, metadataBuffer);
191 194
192 auto adaptorFactory = d->adaptorFactory.value(bufferType); 195 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
193 if (!adaptorFactory) { 196 if (!adaptorFactory) {
194 Warning() << "no adaptor factory for type " << bufferType; 197 Warning() << "no adaptor factory for type " << bufferType;
195 return KAsync::error<qint64>(0); 198 return KAsync::error<qint64>(0);
@@ -244,7 +247,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
244 } 247 }
245 248
246 // TODO use only readPropertyMapper and writePropertyMapper 249 // TODO use only readPropertyMapper and writePropertyMapper
247 auto adaptorFactory = d->adaptorFactory.value(bufferType); 250 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
248 if (!adaptorFactory) { 251 if (!adaptorFactory) {
249 Warning() << "no adaptor factory for type " << bufferType; 252 Warning() << "no adaptor factory for type " << bufferType;
250 return KAsync::error<qint64>(0); 253 return KAsync::error<qint64>(0);
@@ -373,7 +376,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
373 flatbuffers::FlatBufferBuilder fbb; 376 flatbuffers::FlatBufferBuilder fbb;
374 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 377 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
375 378
376 auto adaptorFactory = d->adaptorFactory.value(bufferType); 379 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
377 if (!adaptorFactory) { 380 if (!adaptorFactory) {
378 Warning() << "no adaptor factory for type " << bufferType; 381 Warning() << "no adaptor factory for type " << bufferType;
379 return KAsync::error<qint64>(0); 382 return KAsync::error<qint64>(0);