diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-02-01 15:46:39 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-02-01 15:46:39 +0100 |
commit | 7654452afc4fa81fe4fbb26347b58c6ba202dcc9 (patch) | |
tree | adf902752f6b442527d6263105384a665735c6bc /common/pipeline.cpp | |
parent | b1571c2be7342a0f7474e6a94d9c55230241fa1c (diff) | |
download | sink-7654452afc4fa81fe4fbb26347b58c6ba202dcc9.tar.gz sink-7654452afc4fa81fe4fbb26347b58c6ba202dcc9.zip |
Use pimpl
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 32 |
1 files changed, 17 insertions, 15 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 0a71c8d..db79dc8 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -53,8 +53,22 @@ public: | |||
53 | QHash<QString, QVector<Preprocessor *> > processors; | 53 | QHash<QString, QVector<Preprocessor *> > processors; |
54 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; | 54 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; |
55 | bool revisionChanged; | 55 | bool revisionChanged; |
56 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | ||
56 | }; | 57 | }; |
57 | 58 | ||
59 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | ||
60 | { | ||
61 | Storage::mainDatabase(transaction, bufferType).write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | ||
62 | [uid, newRevision](const Storage::Error &error) { | ||
63 | Warning() << "Failed to write entity" << uid << newRevision; | ||
64 | } | ||
65 | ); | ||
66 | revisionChanged = true; | ||
67 | Storage::setMaxRevision(transaction, newRevision); | ||
68 | Storage::recordRevision(transaction, newRevision, uid, bufferType); | ||
69 | } | ||
70 | |||
71 | |||
58 | Pipeline::Pipeline(const QString &resourceName, QObject *parent) | 72 | Pipeline::Pipeline(const QString &resourceName, QObject *parent) |
59 | : QObject(parent), | 73 | : QObject(parent), |
60 | d(new Private(resourceName)) | 74 | d(new Private(resourceName)) |
@@ -118,18 +132,6 @@ Storage &Pipeline::storage() const | |||
118 | return d->storage; | 132 | return d->storage; |
119 | } | 133 | } |
120 | 134 | ||
121 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | ||
122 | { | ||
123 | Storage::mainDatabase(d->transaction, bufferType).write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | ||
124 | [](const Storage::Error &error) { | ||
125 | Warning() << "Failed to write entity"; | ||
126 | } | ||
127 | ); | ||
128 | d->revisionChanged = true; | ||
129 | Storage::setMaxRevision(d->transaction, newRevision); | ||
130 | Storage::recordRevision(d->transaction, newRevision, uid, bufferType); | ||
131 | } | ||
132 | |||
133 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | 135 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) |
134 | { | 136 | { |
135 | Trace() << "Pipeline: New Entity"; | 137 | Trace() << "Pipeline: New Entity"; |
@@ -185,7 +187,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
185 | flatbuffers::FlatBufferBuilder fbb; | 187 | flatbuffers::FlatBufferBuilder fbb; |
186 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 188 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); |
187 | 189 | ||
188 | storeNewRevision(newRevision, fbb, bufferType, key); | 190 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
189 | 191 | ||
190 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 192 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
191 | if (!adaptorFactory) { | 193 | if (!adaptorFactory) { |
@@ -309,7 +311,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
309 | flatbuffers::FlatBufferBuilder fbb; | 311 | flatbuffers::FlatBufferBuilder fbb; |
310 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 312 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
311 | 313 | ||
312 | storeNewRevision(newRevision, fbb, bufferType, key); | 314 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
313 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 315 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
314 | Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { | 316 | Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { |
315 | if (value.isEmpty()) { | 317 | if (value.isEmpty()) { |
@@ -406,7 +408,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
406 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 408 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; |
407 | }); | 409 | }); |
408 | 410 | ||
409 | storeNewRevision(newRevision, fbb, bufferType, key); | 411 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
410 | Log() << "Pipeline: deleted entity: "<< newRevision; | 412 | Log() << "Pipeline: deleted entity: "<< newRevision; |
411 | 413 | ||
412 | for (auto processor : d->processors[bufferType]) { | 414 | for (auto processor : d->processors[bufferType]) { |