summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/domainadaptor.h9
-rw-r--r--common/domaintypeadaptorfactoryinterface.h1
-rw-r--r--common/indexupdater.h8
-rw-r--r--common/pipeline.cpp58
-rw-r--r--common/pipeline.h4
-rw-r--r--tests/pipelinetest.cpp8
6 files changed, 40 insertions, 48 deletions
diff --git a/common/domainadaptor.h b/common/domainadaptor.h
index 99afb60..c620f91 100644
--- a/common/domainadaptor.h
+++ b/common/domainadaptor.h
@@ -178,6 +178,15 @@ public:
178 Sink::EntityBuffer::assembleEntityBuffer(fbb, metadataData, metadataSize, resFbb.GetBufferPointer(), resFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize()); 178 Sink::EntityBuffer::assembleEntityBuffer(fbb, metadataData, metadataSize, resFbb.GetBufferPointer(), resFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize());
179 } 179 }
180 180
181 virtual void createBuffer(const QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) Q_DECL_OVERRIDE
182 {
183 //TODO rewrite the unterlying functions so we don't have to wrap the bufferAdaptor
184 auto newObject = Sink::ApplicationDomain::ApplicationDomainType("", "", 0, bufferAdaptor);
185 //Serialize all properties
186 newObject.setChangedProperties(bufferAdaptor->availableProperties().toSet());
187 createBuffer(newObject, fbb, metadataData, metadataSize);
188 }
189
181 190
182protected: 191protected:
183 QSharedPointer<ReadPropertyMapper<LocalBuffer>> mLocalMapper; 192 QSharedPointer<ReadPropertyMapper<LocalBuffer>> mLocalMapper;
diff --git a/common/domaintypeadaptorfactoryinterface.h b/common/domaintypeadaptorfactoryinterface.h
index 72aa9b9..b498796 100644
--- a/common/domaintypeadaptorfactoryinterface.h
+++ b/common/domaintypeadaptorfactoryinterface.h
@@ -46,4 +46,5 @@ public:
46 */ 46 */
47 virtual void 47 virtual void
48 createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; 48 createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0;
49 virtual void createBuffer(const QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0;
49}; 50};
diff --git a/common/indexupdater.h b/common/indexupdater.h
index deaaa16..936d03a 100644
--- a/common/indexupdater.h
+++ b/common/indexupdater.h
@@ -28,12 +28,12 @@ public:
28 { 28 {
29 } 29 }
30 30
31 void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 31 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE
32 { 32 {
33 add(newEntity.getProperty(mProperty), uid, transaction); 33 add(newEntity.getProperty(mProperty), uid, transaction);
34 } 34 }
35 35
36 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, 36 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity,
37 Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 37 Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE
38 { 38 {
39 remove(oldEntity.getProperty(mProperty), uid, transaction); 39 remove(oldEntity.getProperty(mProperty), uid, transaction);
@@ -68,12 +68,12 @@ template <typename DomainType>
68class DefaultIndexUpdater : public Sink::Preprocessor 68class DefaultIndexUpdater : public Sink::Preprocessor
69{ 69{
70public: 70public:
71 void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 71 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE
72 { 72 {
73 Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction); 73 Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction);
74 } 74 }
75 75
76 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, 76 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity,
77 Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 77 Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE
78 { 78 {
79 Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction); 79 Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction);
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 65a2f5b..637a1b8 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -189,31 +189,23 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
189 auto metadataBuffer = metadataBuilder.Finish(); 189 auto metadataBuffer = metadataBuilder.Finish();
190 FinishMetadataBuffer(metadataFbb, metadataBuffer); 190 FinishMetadataBuffer(metadataFbb, metadataBuffer);
191 191
192 flatbuffers::FlatBufferBuilder fbb;
193 EntityBuffer::assembleEntityBuffer(
194 fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
195
196 d->storeNewRevision(newRevision, fbb, bufferType, key);
197
198 auto adaptorFactory = d->adaptorFactory.value(bufferType); 192 auto adaptorFactory = d->adaptorFactory.value(bufferType);
199 if (!adaptorFactory) { 193 if (!adaptorFactory) {
200 Warning() << "no adaptor factory for type " << bufferType; 194 Warning() << "no adaptor factory for type " << bufferType;
201 return KAsync::error<qint64>(0); 195 return KAsync::error<qint64>(0);
202 } 196 }
203 197
198 auto adaptor = adaptorFactory->createAdaptor(*entity);
199 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties());
200 for (auto processor : d->processors[bufferType]) {
201 processor->newEntity(key, newRevision, *memoryAdaptor, d->transaction);
202 }
203 flatbuffers::FlatBufferBuilder fbb;
204 adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
205
206 d->storeNewRevision(newRevision, fbb, bufferType, key);
207
204 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 208 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
205 Storage::mainDatabase(d->transaction, bufferType)
206 .scan(Storage::assembleKey(key, newRevision),
207 [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool {
208 auto entity = GetEntity(value);
209 Q_ASSERT(entity->resource() || entity->local());
210 auto adaptor = adaptorFactory->createAdaptor(*entity);
211 for (auto processor : d->processors[bufferType]) {
212 processor->newEntity(key, newRevision, *adaptor, d->transaction);
213 }
214 return false;
215 },
216 [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; });
217 return KAsync::start<qint64>([newRevision]() { return newRevision; }); 209 return KAsync::start<qint64>([newRevision]() { return newRevision; });
218} 210}
219 211
@@ -281,9 +273,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
281 return KAsync::error<qint64>(0); 273 return KAsync::error<qint64>(0);
282 } 274 }
283 275
284 // resource and uid don't matter at this point 276 auto newAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(current), current->availableProperties());
285 const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current);
286 auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(existingObject);
287 277
288 // Apply diff 278 // Apply diff
289 // FIXME only apply the properties that are available in the buffer 279 // FIXME only apply the properties that are available in the buffer
@@ -293,19 +283,21 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
293 changeset << property; 283 changeset << property;
294 const auto value = diff->getProperty(property); 284 const auto value = diff->getProperty(property);
295 if (value.isValid()) { 285 if (value.isValid()) {
296 newObject->setProperty(property, value); 286 newAdaptor->setProperty(property, value);
297 } 287 }
298 } 288 }
299 // Altough we only set some properties, we want all to be serialized
300 newObject->setChangedProperties(changeset);
301 289
302 // Remove deletions 290 // Remove deletions
303 if (modifyEntity->deletions()) { 291 if (modifyEntity->deletions()) {
304 for (const auto &property : *modifyEntity->deletions()) { 292 for (const auto &property : *modifyEntity->deletions()) {
305 newObject->setProperty(BufferUtils::extractBuffer(property), QVariant()); 293 newAdaptor->setProperty(BufferUtils::extractBuffer(property), QVariant());
306 } 294 }
307 } 295 }
308 296
297 for (auto processor : d->processors[bufferType]) {
298 processor->modifiedEntity(key, newRevision, *current, *newAdaptor, d->transaction);
299 }
300
309 // Add metadata buffer 301 // Add metadata buffer
310 flatbuffers::FlatBufferBuilder metadataFbb; 302 flatbuffers::FlatBufferBuilder metadataFbb;
311 auto metadataBuilder = MetadataBuilder(metadataFbb); 303 auto metadataBuilder = MetadataBuilder(metadataFbb);
@@ -316,24 +308,10 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
316 FinishMetadataBuffer(metadataFbb, metadataBuffer); 308 FinishMetadataBuffer(metadataFbb, metadataBuffer);
317 309
318 flatbuffers::FlatBufferBuilder fbb; 310 flatbuffers::FlatBufferBuilder fbb;
319 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 311 adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
320 312
321 d->storeNewRevision(newRevision, fbb, bufferType, key); 313 d->storeNewRevision(newRevision, fbb, bufferType, key);
322 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 314 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
323 Storage::mainDatabase(d->transaction, bufferType)
324 .scan(Storage::assembleKey(key, newRevision),
325 [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool {
326 if (value.isEmpty()) {
327 ErrorMsg() << "Read buffer is empty.";
328 }
329 auto entity = GetEntity(value.data());
330 auto newEntity = adaptorFactory->createAdaptor(*entity);
331 for (auto processor : d->processors[bufferType]) {
332 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction);
333 }
334 return false;
335 },
336 [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; });
337 return KAsync::start<qint64>([newRevision]() { return newRevision; }); 315 return KAsync::start<qint64>([newRevision]() { return newRevision; });
338} 316}
339 317
diff --git a/common/pipeline.h b/common/pipeline.h
index dc2cc4d..c65cbfd 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -83,9 +83,9 @@ public:
83 virtual ~Preprocessor(); 83 virtual ~Preprocessor();
84 84
85 virtual void startBatch(); 85 virtual void startBatch();
86 virtual void newEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; 86 virtual void newEntity(const QByteArray &key, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0;
87 virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, 87 virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity,
88 const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; 88 Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0;
89 virtual void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) = 0; 89 virtual void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) = 0;
90 virtual void finalize(); 90 virtual void finalize();
91 91
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp
index cdd260d..9290050 100644
--- a/tests/pipelinetest.cpp
+++ b/tests/pipelinetest.cpp
@@ -146,13 +146,13 @@ QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision)
146class TestProcessor : public Sink::Preprocessor 146class TestProcessor : public Sink::Preprocessor
147{ 147{
148public: 148public:
149 void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 149 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE
150 { 150 {
151 newUids << uid; 151 newUids << uid;
152 newRevisions << revision; 152 newRevisions << revision;
153 } 153 }
154 154
155 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, 155 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity,
156 Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 156 Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE
157 { 157 {
158 modifiedUids << uid; 158 modifiedUids << uid;
@@ -198,6 +198,10 @@ private slots:
198 auto command = createEntityCommand(createEvent(entityFbb)); 198 auto command = createEntityCommand(createEvent(entityFbb));
199 199
200 Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); 200 Sink::Pipeline pipeline("org.kde.pipelinetest.instance1");
201
202 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
203 pipeline.setAdaptorFactory("event", adaptorFactory);
204
201 pipeline.startTransaction(); 205 pipeline.startTransaction();
202 pipeline.newEntity(command.constData(), command.size()); 206 pipeline.newEntity(command.constData(), command.size());
203 pipeline.commit(); 207 pipeline.commit();