summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/pipeline.cpp33
-rw-r--r--tests/pipelinetest.cpp53
2 files changed, 71 insertions, 15 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index ae4cc3d..0ce478b 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -233,7 +233,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
233 auto diff = adaptorFactory->createAdaptor(*diffEntity); 233 auto diff = adaptorFactory->createAdaptor(*diffEntity);
234 234
235 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; 235 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
236 d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 236 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
237 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 237 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
238 if (!buffer.isValid()) { 238 if (!buffer.isValid()) {
239 Warning() << "Read invalid buffer from disk"; 239 Warning() << "Read invalid buffer from disk";
@@ -328,25 +328,32 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
328 flatbuffers::FlatBufferBuilder fbb; 328 flatbuffers::FlatBufferBuilder fbb;
329 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 329 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
330 330
331 storeNewRevision(newRevision, fbb, bufferType, key);
332 Log() << "Pipeline: deleted entity: "<< newRevision;
333
334 auto adaptorFactory = d->adaptorFactory.value(bufferType); 331 auto adaptorFactory = d->adaptorFactory.value(bufferType);
335 if (!adaptorFactory) { 332 if (!adaptorFactory) {
336 Warning() << "no adaptor factory for type " << bufferType; 333 Warning() << "no adaptor factory for type " << bufferType;
337 return KAsync::error<qint64>(0); 334 return KAsync::error<qint64>(0);
338 } 335 }
339 336
340 // d->transaction.openDatabase(bufferType + ".main").scan(key, [this, bufferType, newRevision, adaptorFactory](const QByteArray &, const QByteArray &value) -> bool { 337 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
341 // auto entity = Akonadi2::GetEntity(value); 338 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
342 // auto newEntity = adaptorFactory->createAdaptor(*entity); 339 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
343 for (auto processor : d->processors[bufferType]) { 340 if (!buffer.isValid()) {
344 processor->deletedEntity(key, newRevision, Akonadi2::ApplicationDomain::BufferAdaptor(), d->transaction); 341 Warning() << "Read invalid buffer from disk";
342 } else {
343 current = adaptorFactory->createAdaptor(buffer.entity());
345 } 344 }
346 // return false; 345 return false;
347 // }, [this](const Akonadi2::Storage::Error &error) { 346 }, [this](const Akonadi2::Storage::Error &error) {
348 // ErrorMsg() << "Failed to find value in pipeline: " << error.message; 347 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
349 // }); 348 });
349
350 storeNewRevision(newRevision, fbb, bufferType, key);
351 Log() << "Pipeline: deleted entity: "<< newRevision;
352
353 for (auto processor : d->processors[bufferType]) {
354 processor->deletedEntity(key, newRevision, *current, d->transaction);
355 }
356
350 return KAsync::start<qint64>([newRevision](){ 357 return KAsync::start<qint64>([newRevision](){
351 return newRevision; 358 return newRevision;
352 }); 359 });
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp
index 0b4c13e..47090a8 100644
--- a/tests/pipelinetest.cpp
+++ b/tests/pipelinetest.cpp
@@ -157,6 +157,7 @@ public:
157 { 157 {
158 deletedUids << uid; 158 deletedUids << uid;
159 deletedRevisions << revision; 159 deletedRevisions << revision;
160 deletedSummaries << oldEntity.getProperty("summary").toByteArray();
160 } 161 }
161 162
162 QList<QByteArray> newUids; 163 QList<QByteArray> newUids;
@@ -165,6 +166,7 @@ public:
165 QList<qint64> modifiedRevisions; 166 QList<qint64> modifiedRevisions;
166 QList<QByteArray> deletedUids; 167 QList<QByteArray> deletedUids;
167 QList<qint64> deletedRevisions; 168 QList<qint64> deletedRevisions;
169 QList<QByteArray> deletedSummaries;
168}; 170};
169 171
170/** 172/**
@@ -245,18 +247,63 @@ private Q_SLOTS:
245 QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 1); 247 QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 1);
246 } 248 }
247 249
248 void testDelete() 250 void testModifyWithUnrelatedOperationInbetween()
249 { 251 {
250 flatbuffers::FlatBufferBuilder entityFbb; 252 flatbuffers::FlatBufferBuilder entityFbb;
251 auto command = createEntityCommand(createEvent(entityFbb)); 253 auto command = createEntityCommand(createEvent(entityFbb));
252 254
255 Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1");
256
257 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
258 pipeline.setAdaptorFactory("event", adaptorFactory);
259
253 //Create the initial revision 260 //Create the initial revision
261 pipeline.startTransaction();
262 pipeline.newEntity(command.constData(), command.size());
263 pipeline.commit();
264
265 //Get uid of written entity
266 auto keys = getKeys("org.kde.pipelinetest.instance1", "event.main");
267 QCOMPARE(keys.size(), 1);
268 const auto uid = Akonadi2::Storage::uidFromKey(keys.first());
269
270
271 //Create another operation inbetween
272 {
273 entityFbb.Clear();
274 auto command = createEntityCommand(createEvent(entityFbb));
275 pipeline.startTransaction();
276 pipeline.newEntity(command.constData(), command.size());
277 pipeline.commit();
278 }
279
280 //Execute the modification on revision 2
281 entityFbb.Clear();
282 auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 2);
283 pipeline.startTransaction();
284 pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size());
285 pipeline.commit();
286
287 //Ensure we've got the new revision with the modification
288 auto buffer = getEntity("org.kde.pipelinetest.instance1", "event.main", Akonadi2::Storage::assembleKey(uid, 3));
289 QVERIFY(!buffer.isEmpty());
290 Akonadi2::EntityBuffer entityBuffer(buffer.data(), buffer.size());
291 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity());
292 QCOMPARE(adaptor->getProperty("summary").toString(), QString("summary2"));
293 }
294
295 void testDelete()
296 {
297 flatbuffers::FlatBufferBuilder entityFbb;
298 auto command = createEntityCommand(createEvent(entityFbb));
254 Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1"); 299 Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1");
300 pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create());
301
302 //Create the initial revision
255 pipeline.startTransaction(); 303 pipeline.startTransaction();
256 pipeline.newEntity(command.constData(), command.size()); 304 pipeline.newEntity(command.constData(), command.size());
257 pipeline.commit(); 305 pipeline.commit();
258 306
259 // const auto uid = Akonadi2::Storage::uidFromKey(key);
260 auto result = getKeys("org.kde.pipelinetest.instance1", "event.main"); 307 auto result = getKeys("org.kde.pipelinetest.instance1", "event.main");
261 QCOMPARE(result.size(), 1); 308 QCOMPARE(result.size(), 1);
262 309
@@ -322,8 +369,10 @@ private Q_SLOTS:
322 pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()); 369 pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size());
323 QCOMPARE(testProcessor.deletedUids.size(), 1); 370 QCOMPARE(testProcessor.deletedUids.size(), 1);
324 QCOMPARE(testProcessor.deletedUids.size(), 1); 371 QCOMPARE(testProcessor.deletedUids.size(), 1);
372 QCOMPARE(testProcessor.deletedSummaries.size(), 1);
325 //Key doesn't contain revision and is just the uid 373 //Key doesn't contain revision and is just the uid
326 QCOMPARE(testProcessor.deletedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.deletedUids.at(0))); 374 QCOMPARE(testProcessor.deletedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.deletedUids.at(0)));
375 QCOMPARE(testProcessor.deletedSummaries.at(0), QByteArray("summary2"));
327 } 376 }
328 } 377 }
329}; 378};