diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-05-08 10:32:01 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-05-08 10:32:01 +0200 |
commit | fdeb92ca128e6eb51bbcace0b47519b12a08ce93 (patch) | |
tree | f0b4162ca74d5ee1fe6209069c910e41c92de570 /common/pipeline.cpp | |
parent | 2df41a3a919f1a131d311112e6fc444eff12c229 (diff) | |
download | sink-fdeb92ca128e6eb51bbcace0b47519b12a08ce93.tar.gz sink-fdeb92ca128e6eb51bbcace0b47519b12a08ce93.zip |
Run preprocessors before persising the value.
And allow preprocessors to modify the result.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 58 |
1 files changed, 18 insertions, 40 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 65a2f5b..637a1b8 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -189,31 +189,23 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
189 | auto metadataBuffer = metadataBuilder.Finish(); | 189 | auto metadataBuffer = metadataBuilder.Finish(); |
190 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | 190 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
191 | 191 | ||
192 | flatbuffers::FlatBufferBuilder fbb; | ||
193 | EntityBuffer::assembleEntityBuffer( | ||
194 | fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | ||
195 | |||
196 | d->storeNewRevision(newRevision, fbb, bufferType, key); | ||
197 | |||
198 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 192 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
199 | if (!adaptorFactory) { | 193 | if (!adaptorFactory) { |
200 | Warning() << "no adaptor factory for type " << bufferType; | 194 | Warning() << "no adaptor factory for type " << bufferType; |
201 | return KAsync::error<qint64>(0); | 195 | return KAsync::error<qint64>(0); |
202 | } | 196 | } |
203 | 197 | ||
198 | auto adaptor = adaptorFactory->createAdaptor(*entity); | ||
199 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); | ||
200 | for (auto processor : d->processors[bufferType]) { | ||
201 | processor->newEntity(key, newRevision, *memoryAdaptor, d->transaction); | ||
202 | } | ||
203 | flatbuffers::FlatBufferBuilder fbb; | ||
204 | adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | ||
205 | |||
206 | d->storeNewRevision(newRevision, fbb, bufferType, key); | ||
207 | |||
204 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 208 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
205 | Storage::mainDatabase(d->transaction, bufferType) | ||
206 | .scan(Storage::assembleKey(key, newRevision), | ||
207 | [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { | ||
208 | auto entity = GetEntity(value); | ||
209 | Q_ASSERT(entity->resource() || entity->local()); | ||
210 | auto adaptor = adaptorFactory->createAdaptor(*entity); | ||
211 | for (auto processor : d->processors[bufferType]) { | ||
212 | processor->newEntity(key, newRevision, *adaptor, d->transaction); | ||
213 | } | ||
214 | return false; | ||
215 | }, | ||
216 | [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); | ||
217 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); | 209 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); |
218 | } | 210 | } |
219 | 211 | ||
@@ -281,9 +273,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
281 | return KAsync::error<qint64>(0); | 273 | return KAsync::error<qint64>(0); |
282 | } | 274 | } |
283 | 275 | ||
284 | // resource and uid don't matter at this point | 276 | auto newAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(current), current->availableProperties()); |
285 | const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); | ||
286 | auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(existingObject); | ||
287 | 277 | ||
288 | // Apply diff | 278 | // Apply diff |
289 | // FIXME only apply the properties that are available in the buffer | 279 | // FIXME only apply the properties that are available in the buffer |
@@ -293,19 +283,21 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
293 | changeset << property; | 283 | changeset << property; |
294 | const auto value = diff->getProperty(property); | 284 | const auto value = diff->getProperty(property); |
295 | if (value.isValid()) { | 285 | if (value.isValid()) { |
296 | newObject->setProperty(property, value); | 286 | newAdaptor->setProperty(property, value); |
297 | } | 287 | } |
298 | } | 288 | } |
299 | // Altough we only set some properties, we want all to be serialized | ||
300 | newObject->setChangedProperties(changeset); | ||
301 | 289 | ||
302 | // Remove deletions | 290 | // Remove deletions |
303 | if (modifyEntity->deletions()) { | 291 | if (modifyEntity->deletions()) { |
304 | for (const auto &property : *modifyEntity->deletions()) { | 292 | for (const auto &property : *modifyEntity->deletions()) { |
305 | newObject->setProperty(BufferUtils::extractBuffer(property), QVariant()); | 293 | newAdaptor->setProperty(BufferUtils::extractBuffer(property), QVariant()); |
306 | } | 294 | } |
307 | } | 295 | } |
308 | 296 | ||
297 | for (auto processor : d->processors[bufferType]) { | ||
298 | processor->modifiedEntity(key, newRevision, *current, *newAdaptor, d->transaction); | ||
299 | } | ||
300 | |||
309 | // Add metadata buffer | 301 | // Add metadata buffer |
310 | flatbuffers::FlatBufferBuilder metadataFbb; | 302 | flatbuffers::FlatBufferBuilder metadataFbb; |
311 | auto metadataBuilder = MetadataBuilder(metadataFbb); | 303 | auto metadataBuilder = MetadataBuilder(metadataFbb); |
@@ -316,24 +308,10 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
316 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | 308 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
317 | 309 | ||
318 | flatbuffers::FlatBufferBuilder fbb; | 310 | flatbuffers::FlatBufferBuilder fbb; |
319 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 311 | adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
320 | 312 | ||
321 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 313 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
322 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 314 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
323 | Storage::mainDatabase(d->transaction, bufferType) | ||
324 | .scan(Storage::assembleKey(key, newRevision), | ||
325 | [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { | ||
326 | if (value.isEmpty()) { | ||
327 | ErrorMsg() << "Read buffer is empty."; | ||
328 | } | ||
329 | auto entity = GetEntity(value.data()); | ||
330 | auto newEntity = adaptorFactory->createAdaptor(*entity); | ||
331 | for (auto processor : d->processors[bufferType]) { | ||
332 | processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); | ||
333 | } | ||
334 | return false; | ||
335 | }, | ||
336 | [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); | ||
337 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); | 315 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); |
338 | } | 316 | } |
339 | 317 | ||