diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/domainadaptor.h | 9 | ||||
-rw-r--r-- | common/domaintypeadaptorfactoryinterface.h | 1 | ||||
-rw-r--r-- | common/indexupdater.h | 8 | ||||
-rw-r--r-- | common/pipeline.cpp | 58 | ||||
-rw-r--r-- | common/pipeline.h | 4 |
5 files changed, 34 insertions, 46 deletions
diff --git a/common/domainadaptor.h b/common/domainadaptor.h index 99afb60..c620f91 100644 --- a/common/domainadaptor.h +++ b/common/domainadaptor.h | |||
@@ -178,6 +178,15 @@ public: | |||
178 | Sink::EntityBuffer::assembleEntityBuffer(fbb, metadataData, metadataSize, resFbb.GetBufferPointer(), resFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize()); | 178 | Sink::EntityBuffer::assembleEntityBuffer(fbb, metadataData, metadataSize, resFbb.GetBufferPointer(), resFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize()); |
179 | } | 179 | } |
180 | 180 | ||
181 | virtual void createBuffer(const QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) Q_DECL_OVERRIDE | ||
182 | { | ||
183 | //TODO rewrite the unterlying functions so we don't have to wrap the bufferAdaptor | ||
184 | auto newObject = Sink::ApplicationDomain::ApplicationDomainType("", "", 0, bufferAdaptor); | ||
185 | //Serialize all properties | ||
186 | newObject.setChangedProperties(bufferAdaptor->availableProperties().toSet()); | ||
187 | createBuffer(newObject, fbb, metadataData, metadataSize); | ||
188 | } | ||
189 | |||
181 | 190 | ||
182 | protected: | 191 | protected: |
183 | QSharedPointer<ReadPropertyMapper<LocalBuffer>> mLocalMapper; | 192 | QSharedPointer<ReadPropertyMapper<LocalBuffer>> mLocalMapper; |
diff --git a/common/domaintypeadaptorfactoryinterface.h b/common/domaintypeadaptorfactoryinterface.h index 72aa9b9..b498796 100644 --- a/common/domaintypeadaptorfactoryinterface.h +++ b/common/domaintypeadaptorfactoryinterface.h | |||
@@ -46,4 +46,5 @@ public: | |||
46 | */ | 46 | */ |
47 | virtual void | 47 | virtual void |
48 | createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; | 48 | createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; |
49 | virtual void createBuffer(const QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; | ||
49 | }; | 50 | }; |
diff --git a/common/indexupdater.h b/common/indexupdater.h index deaaa16..936d03a 100644 --- a/common/indexupdater.h +++ b/common/indexupdater.h | |||
@@ -28,12 +28,12 @@ public: | |||
28 | { | 28 | { |
29 | } | 29 | } |
30 | 30 | ||
31 | void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE | 31 | void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE |
32 | { | 32 | { |
33 | add(newEntity.getProperty(mProperty), uid, transaction); | 33 | add(newEntity.getProperty(mProperty), uid, transaction); |
34 | } | 34 | } |
35 | 35 | ||
36 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, | 36 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, |
37 | Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE | 37 | Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE |
38 | { | 38 | { |
39 | remove(oldEntity.getProperty(mProperty), uid, transaction); | 39 | remove(oldEntity.getProperty(mProperty), uid, transaction); |
@@ -68,12 +68,12 @@ template <typename DomainType> | |||
68 | class DefaultIndexUpdater : public Sink::Preprocessor | 68 | class DefaultIndexUpdater : public Sink::Preprocessor |
69 | { | 69 | { |
70 | public: | 70 | public: |
71 | void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE | 71 | void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE |
72 | { | 72 | { |
73 | Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction); | 73 | Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction); |
74 | } | 74 | } |
75 | 75 | ||
76 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, | 76 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, |
77 | Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE | 77 | Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE |
78 | { | 78 | { |
79 | Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction); | 79 | Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction); |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 65a2f5b..637a1b8 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -189,31 +189,23 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
189 | auto metadataBuffer = metadataBuilder.Finish(); | 189 | auto metadataBuffer = metadataBuilder.Finish(); |
190 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | 190 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
191 | 191 | ||
192 | flatbuffers::FlatBufferBuilder fbb; | ||
193 | EntityBuffer::assembleEntityBuffer( | ||
194 | fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | ||
195 | |||
196 | d->storeNewRevision(newRevision, fbb, bufferType, key); | ||
197 | |||
198 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 192 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
199 | if (!adaptorFactory) { | 193 | if (!adaptorFactory) { |
200 | Warning() << "no adaptor factory for type " << bufferType; | 194 | Warning() << "no adaptor factory for type " << bufferType; |
201 | return KAsync::error<qint64>(0); | 195 | return KAsync::error<qint64>(0); |
202 | } | 196 | } |
203 | 197 | ||
198 | auto adaptor = adaptorFactory->createAdaptor(*entity); | ||
199 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); | ||
200 | for (auto processor : d->processors[bufferType]) { | ||
201 | processor->newEntity(key, newRevision, *memoryAdaptor, d->transaction); | ||
202 | } | ||
203 | flatbuffers::FlatBufferBuilder fbb; | ||
204 | adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | ||
205 | |||
206 | d->storeNewRevision(newRevision, fbb, bufferType, key); | ||
207 | |||
204 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 208 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
205 | Storage::mainDatabase(d->transaction, bufferType) | ||
206 | .scan(Storage::assembleKey(key, newRevision), | ||
207 | [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { | ||
208 | auto entity = GetEntity(value); | ||
209 | Q_ASSERT(entity->resource() || entity->local()); | ||
210 | auto adaptor = adaptorFactory->createAdaptor(*entity); | ||
211 | for (auto processor : d->processors[bufferType]) { | ||
212 | processor->newEntity(key, newRevision, *adaptor, d->transaction); | ||
213 | } | ||
214 | return false; | ||
215 | }, | ||
216 | [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); | ||
217 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); | 209 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); |
218 | } | 210 | } |
219 | 211 | ||
@@ -281,9 +273,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
281 | return KAsync::error<qint64>(0); | 273 | return KAsync::error<qint64>(0); |
282 | } | 274 | } |
283 | 275 | ||
284 | // resource and uid don't matter at this point | 276 | auto newAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(current), current->availableProperties()); |
285 | const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); | ||
286 | auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(existingObject); | ||
287 | 277 | ||
288 | // Apply diff | 278 | // Apply diff |
289 | // FIXME only apply the properties that are available in the buffer | 279 | // FIXME only apply the properties that are available in the buffer |
@@ -293,19 +283,21 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
293 | changeset << property; | 283 | changeset << property; |
294 | const auto value = diff->getProperty(property); | 284 | const auto value = diff->getProperty(property); |
295 | if (value.isValid()) { | 285 | if (value.isValid()) { |
296 | newObject->setProperty(property, value); | 286 | newAdaptor->setProperty(property, value); |
297 | } | 287 | } |
298 | } | 288 | } |
299 | // Altough we only set some properties, we want all to be serialized | ||
300 | newObject->setChangedProperties(changeset); | ||
301 | 289 | ||
302 | // Remove deletions | 290 | // Remove deletions |
303 | if (modifyEntity->deletions()) { | 291 | if (modifyEntity->deletions()) { |
304 | for (const auto &property : *modifyEntity->deletions()) { | 292 | for (const auto &property : *modifyEntity->deletions()) { |
305 | newObject->setProperty(BufferUtils::extractBuffer(property), QVariant()); | 293 | newAdaptor->setProperty(BufferUtils::extractBuffer(property), QVariant()); |
306 | } | 294 | } |
307 | } | 295 | } |
308 | 296 | ||
297 | for (auto processor : d->processors[bufferType]) { | ||
298 | processor->modifiedEntity(key, newRevision, *current, *newAdaptor, d->transaction); | ||
299 | } | ||
300 | |||
309 | // Add metadata buffer | 301 | // Add metadata buffer |
310 | flatbuffers::FlatBufferBuilder metadataFbb; | 302 | flatbuffers::FlatBufferBuilder metadataFbb; |
311 | auto metadataBuilder = MetadataBuilder(metadataFbb); | 303 | auto metadataBuilder = MetadataBuilder(metadataFbb); |
@@ -316,24 +308,10 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
316 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | 308 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
317 | 309 | ||
318 | flatbuffers::FlatBufferBuilder fbb; | 310 | flatbuffers::FlatBufferBuilder fbb; |
319 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 311 | adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
320 | 312 | ||
321 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 313 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
322 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 314 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
323 | Storage::mainDatabase(d->transaction, bufferType) | ||
324 | .scan(Storage::assembleKey(key, newRevision), | ||
325 | [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { | ||
326 | if (value.isEmpty()) { | ||
327 | ErrorMsg() << "Read buffer is empty."; | ||
328 | } | ||
329 | auto entity = GetEntity(value.data()); | ||
330 | auto newEntity = adaptorFactory->createAdaptor(*entity); | ||
331 | for (auto processor : d->processors[bufferType]) { | ||
332 | processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); | ||
333 | } | ||
334 | return false; | ||
335 | }, | ||
336 | [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); | ||
337 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); | 315 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); |
338 | } | 316 | } |
339 | 317 | ||
diff --git a/common/pipeline.h b/common/pipeline.h index dc2cc4d..c65cbfd 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -83,9 +83,9 @@ public: | |||
83 | virtual ~Preprocessor(); | 83 | virtual ~Preprocessor(); |
84 | 84 | ||
85 | virtual void startBatch(); | 85 | virtual void startBatch(); |
86 | virtual void newEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; | 86 | virtual void newEntity(const QByteArray &key, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; |
87 | virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, | 87 | virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, |
88 | const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; | 88 | Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; |
89 | virtual void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) = 0; | 89 | virtual void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) = 0; |
90 | virtual void finalize(); | 90 | virtual void finalize(); |
91 | 91 | ||