summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-02-01 15:46:39 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-02-01 15:46:39 +0100
commit7654452afc4fa81fe4fbb26347b58c6ba202dcc9 (patch)
treeadf902752f6b442527d6263105384a665735c6bc /common/pipeline.cpp
parentb1571c2be7342a0f7474e6a94d9c55230241fa1c (diff)
downloadsink-7654452afc4fa81fe4fbb26347b58c6ba202dcc9.tar.gz
sink-7654452afc4fa81fe4fbb26347b58c6ba202dcc9.zip
Use pimpl
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp32
1 files changed, 17 insertions, 15 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 0a71c8d..db79dc8 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -53,8 +53,22 @@ public:
53 QHash<QString, QVector<Preprocessor *> > processors; 53 QHash<QString, QVector<Preprocessor *> > processors;
54 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; 54 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory;
55 bool revisionChanged; 55 bool revisionChanged;
56 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
56}; 57};
57 58
59void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
60{
61 Storage::mainDatabase(transaction, bufferType).write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
62 [uid, newRevision](const Storage::Error &error) {
63 Warning() << "Failed to write entity" << uid << newRevision;
64 }
65 );
66 revisionChanged = true;
67 Storage::setMaxRevision(transaction, newRevision);
68 Storage::recordRevision(transaction, newRevision, uid, bufferType);
69}
70
71
58Pipeline::Pipeline(const QString &resourceName, QObject *parent) 72Pipeline::Pipeline(const QString &resourceName, QObject *parent)
59 : QObject(parent), 73 : QObject(parent),
60 d(new Private(resourceName)) 74 d(new Private(resourceName))
@@ -118,18 +132,6 @@ Storage &Pipeline::storage() const
118 return d->storage; 132 return d->storage;
119} 133}
120 134
121void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
122{
123 Storage::mainDatabase(d->transaction, bufferType).write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
124 [](const Storage::Error &error) {
125 Warning() << "Failed to write entity";
126 }
127 );
128 d->revisionChanged = true;
129 Storage::setMaxRevision(d->transaction, newRevision);
130 Storage::recordRevision(d->transaction, newRevision, uid, bufferType);
131}
132
133KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) 135KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
134{ 136{
135 Trace() << "Pipeline: New Entity"; 137 Trace() << "Pipeline: New Entity";
@@ -185,7 +187,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
185 flatbuffers::FlatBufferBuilder fbb; 187 flatbuffers::FlatBufferBuilder fbb;
186 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 188 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
187 189
188 storeNewRevision(newRevision, fbb, bufferType, key); 190 d->storeNewRevision(newRevision, fbb, bufferType, key);
189 191
190 auto adaptorFactory = d->adaptorFactory.value(bufferType); 192 auto adaptorFactory = d->adaptorFactory.value(bufferType);
191 if (!adaptorFactory) { 193 if (!adaptorFactory) {
@@ -309,7 +311,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
309 flatbuffers::FlatBufferBuilder fbb; 311 flatbuffers::FlatBufferBuilder fbb;
310 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 312 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
311 313
312 storeNewRevision(newRevision, fbb, bufferType, key); 314 d->storeNewRevision(newRevision, fbb, bufferType, key);
313 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 315 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
314 Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { 316 Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool {
315 if (value.isEmpty()) { 317 if (value.isEmpty()) {
@@ -406,7 +408,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
406 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 408 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
407 }); 409 });
408 410
409 storeNewRevision(newRevision, fbb, bufferType, key); 411 d->storeNewRevision(newRevision, fbb, bufferType, key);
410 Log() << "Pipeline: deleted entity: "<< newRevision; 412 Log() << "Pipeline: deleted entity: "<< newRevision;
411 413
412 for (auto processor : d->processors[bufferType]) { 414 for (auto processor : d->processors[bufferType]) {