diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 1eea631..8ace855 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -34,10 +34,12 @@ | |||
34 | #include "entitybuffer.h" | 34 | #include "entitybuffer.h" |
35 | #include "log.h" | 35 | #include "log.h" |
36 | #include "domain/applicationdomaintype.h" | 36 | #include "domain/applicationdomaintype.h" |
37 | #include "domain/applicationdomaintype_p.h" | ||
37 | #include "adaptorfactoryregistry.h" | 38 | #include "adaptorfactoryregistry.h" |
38 | #include "definitions.h" | 39 | #include "definitions.h" |
39 | #include "bufferutils.h" | 40 | #include "bufferutils.h" |
40 | #include "storage/entitystore.h" | 41 | #include "storage/entitystore.h" |
42 | #include "store.h" | ||
41 | 43 | ||
42 | SINK_DEBUG_AREA("pipeline") | 44 | SINK_DEBUG_AREA("pipeline") |
43 | 45 | ||
@@ -186,6 +188,13 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
186 | return KAsync::value(d->entityStore.maxRevision()); | 188 | return KAsync::value(d->entityStore.maxRevision()); |
187 | } | 189 | } |
188 | 190 | ||
191 | template <class T> | ||
192 | struct CreateHelper { | ||
193 | KAsync::Job<void> operator()(const ApplicationDomain::ApplicationDomainType &arg) const { | ||
194 | return Sink::Store::create<T>(arg); | ||
195 | } | ||
196 | }; | ||
197 | |||
189 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 198 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
190 | { | 199 | { |
191 | d->transactionItemCount++; | 200 | d->transactionItemCount++; |
@@ -239,6 +248,45 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
239 | deletions = BufferUtils::fromVector(*modifyEntity->deletions()); | 248 | deletions = BufferUtils::fromVector(*modifyEntity->deletions()); |
240 | } | 249 | } |
241 | 250 | ||
251 | if (modifyEntity->targetResource()) { | ||
252 | auto targetResource = BufferUtils::extractBuffer(modifyEntity->targetResource()); | ||
253 | auto changeset = diff.changedProperties(); | ||
254 | const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); | ||
255 | if (current.identifier().isEmpty()) { | ||
256 | SinkWarning() << "Failed to read current version: " << diff.identifier(); | ||
257 | return KAsync::error<qint64>(0); | ||
258 | } | ||
259 | |||
260 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); | ||
261 | |||
262 | // Apply diff | ||
263 | for (const auto &property : changeset) { | ||
264 | const auto value = diff.getProperty(property); | ||
265 | if (value.isValid()) { | ||
266 | newEntity.setProperty(property, value); | ||
267 | } | ||
268 | } | ||
269 | |||
270 | // Remove deletions | ||
271 | for (const auto property : deletions) { | ||
272 | newEntity.setProperty(property, QVariant()); | ||
273 | } | ||
274 | newEntity.setResource(targetResource); | ||
275 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); | ||
276 | |||
277 | SinkTrace() << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; | ||
278 | auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); | ||
279 | job = job.syncThen<void>([=](const KAsync::Error &error) { | ||
280 | if (!error) { | ||
281 | SinkTrace() << "Move of " << newEntity.identifier() << "was successfull"; | ||
282 | } else { | ||
283 | SinkError() << "Failed to move entity " << targetResource << " to resource " << newEntity.identifier(); | ||
284 | } | ||
285 | }); | ||
286 | job.exec(); | ||
287 | return KAsync::value<qint64>(0); | ||
288 | } | ||
289 | |||
242 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { | 290 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { |
243 | foreach (const auto &processor, d->processors[bufferType]) { | 291 | foreach (const auto &processor, d->processors[bufferType]) { |
244 | processor->modifiedEntity(oldEntity, newEntity); | 292 | processor->modifiedEntity(oldEntity, newEntity); |