summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-10-28 16:39:16 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-10-28 16:39:16 +0100
commit129333371d28c06d85f75ca579ce17798e615e84 (patch)
tree2ae01db9d26f6f72a74fa77e6937e03304e81a2c
parent20f049b65c4bd8c3d0c16bbf398641675648a93f (diff)
downloadsink-129333371d28c06d85f75ca579ce17798e615e84.tar.gz
sink-129333371d28c06d85f75ca579ce17798e615e84.zip
Made pipeline preprocessing synchronous.
Instead of having the asynchronous preprocessor concept with different pipelines for new/modify/delete we have a single pipeline with synchronous preprocessors that act upon new/modify/delete. This keeps the code simpler due to lack of asynchronity and keeps the new/modify/delete operations together (which at least for the indexing makes a lot of sense). Not supporting asynchronity is ok because the tasks done in preprocessing are not cpu intensive (if they were we had a problem since they are directly involved in the round-trip time), and the main cost comes from i/o, meaning we don't gain much by doing multithreading. Costly tasks (such as full-text indexing) should rather be implemented as post-processing, since that doesn't increase the round-trip time directly, and eventually consistent is typically good enough for that.
-rw-r--r--common/domain/event.cpp6
-rw-r--r--common/domain/event.h2
-rw-r--r--common/domain/mail.cpp7
-rw-r--r--common/domain/mail.h2
-rw-r--r--common/genericresource.cpp10
-rw-r--r--common/genericresource.h1
-rw-r--r--common/pipeline.cpp286
-rw-r--r--common/pipeline.h90
-rw-r--r--examples/dummyresource/resourcefactory.cpp89
-rw-r--r--tests/genericresourcebenchmark.cpp35
-rw-r--r--tests/genericresourcetest.cpp4
-rw-r--r--tests/pipelinetest.cpp75
12 files changed, 251 insertions, 356 deletions
diff --git a/common/domain/event.cpp b/common/domain/event.cpp
index 9759fc3..83a6906 100644
--- a/common/domain/event.cpp
+++ b/common/domain/event.cpp
@@ -50,11 +50,11 @@ ResultSet TypeImplementation<Event>::queryIndexes(const Akonadi2::Query &query,
50 return ResultSet(keys); 50 return ResultSet(keys);
51} 51}
52 52
53void TypeImplementation<Event>::index(const Event &type, Akonadi2::Storage::Transaction &transaction) 53void TypeImplementation<Event>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction)
54{ 54{
55 const auto uid = type.getProperty("uid"); 55 const auto uid = bufferAdaptor.getProperty("uid");
56 if (uid.isValid()) { 56 if (uid.isValid()) {
57 Index("event.index.uid", transaction).add(uid.toByteArray(), type.identifier()); 57 Index("event.index.uid", transaction).add(uid.toByteArray(), identifier);
58 } 58 }
59} 59}
60 60
diff --git a/common/domain/event.h b/common/domain/event.h
index f21cd34..e9ba52a 100644
--- a/common/domain/event.h
+++ b/common/domain/event.h
@@ -56,7 +56,7 @@ public:
56 * An empty result set indicates that a full scan is required. 56 * An empty result set indicates that a full scan is required.
57 */ 57 */
58 static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction); 58 static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction);
59 static void index(const Event &type, Akonadi2::Storage::Transaction &transaction); 59 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction);
60 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 60 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
61 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); 61 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper();
62}; 62};
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp
index d40dde9..ffe322e 100644
--- a/common/domain/mail.cpp
+++ b/common/domain/mail.cpp
@@ -50,12 +50,11 @@ ResultSet TypeImplementation<Mail>::queryIndexes(const Akonadi2::Query &query, c
50 return ResultSet(keys); 50 return ResultSet(keys);
51} 51}
52 52
53void TypeImplementation<Mail>::index(const Mail &type, Akonadi2::Storage::Transaction &transaction) 53void TypeImplementation<Mail>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction)
54{ 54{
55 const auto uid = type.getProperty("uid"); 55 const auto uid = bufferAdaptor.getProperty("uid");
56 if (uid.isValid()) { 56 if (uid.isValid()) {
57 Index uidIndex("mail.index.uid", transaction); 57 Index("mail.index.uid", transaction).add(uid.toByteArray(), identifier);
58 uidIndex.add(uid.toByteArray(), type.identifier());
59 } 58 }
60} 59}
61 60
diff --git a/common/domain/mail.h b/common/domain/mail.h
index b58ce44..38f1d03 100644
--- a/common/domain/mail.h
+++ b/common/domain/mail.h
@@ -51,7 +51,7 @@ public:
51 * An empty result set indicates that a full scan is required. 51 * An empty result set indicates that a full scan is required.
52 */ 52 */
53 static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction); 53 static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction);
54 static void index(const Mail &type, Akonadi2::Storage::Transaction &transaction); 54 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction);
55 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 55 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
56 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); 56 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper();
57}; 57};
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index dcae43d..ec68f33 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -285,10 +285,12 @@ GenericResource::~GenericResource()
285 delete mSourceChangeReplay; 285 delete mSourceChangeReplay;
286} 286}
287 287
288// void GenericResource::revisionChanged() 288void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
289// { 289{
290// //TODO replay revision 290 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk();
291// } 291 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk();
292 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk();
293}
292 294
293void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) 295void GenericResource::onProcessorError(int errorCode, const QString &errorMessage)
294{ 296{
diff --git a/common/genericresource.h b/common/genericresource.h
index cfc6653..33de0e7 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -48,6 +48,7 @@ public:
48 48
49 int error() const; 49 int error() const;
50 50
51 static void removeFromDisk(const QByteArray &instanceIdentifier);
51private Q_SLOTS: 52private Q_SLOTS:
52 void updateLowerBoundRevision(); 53 void updateLowerBoundRevision();
53 54
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 15d2401..de63288 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -1,5 +1,6 @@
1/* 1/*
2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> 2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
3 * Copyright (C) 2015 Christian Mollekopf <mollekopf@kolabsys.com>
3 * 4 *
4 * This library is free software; you can redistribute it and/or 5 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public 6 * modify it under the terms of the GNU Lesser General Public
@@ -41,19 +42,13 @@ class Pipeline::Private
41{ 42{
42public: 43public:
43 Private(const QString &resourceName) 44 Private(const QString &resourceName)
44 : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite), 45 : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite)
45 stepScheduled(false)
46 { 46 {
47 } 47 }
48 48
49 Storage storage; 49 Storage storage;
50 Storage::Transaction transaction; 50 Storage::Transaction transaction;
51 QHash<QString, QVector<Preprocessor *> > nullPipeline; 51 QHash<QString, QVector<Preprocessor *> > processors;
52 QHash<QString, QVector<Preprocessor *> > newPipeline;
53 QHash<QString, QVector<Preprocessor *> > modifiedPipeline;
54 QHash<QString, QVector<Preprocessor *> > deletedPipeline;
55 QVector<PipelineState> activePipelines;
56 bool stepScheduled;
57 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; 52 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory;
58}; 53};
59 54
@@ -68,21 +63,9 @@ Pipeline::~Pipeline()
68 delete d; 63 delete d;
69} 64}
70 65
71void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors) 66void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors)
72{ 67{
73 switch (pipelineType) { 68 d->processors[entityType] = processors;
74 case NewPipeline:
75 d->newPipeline[entityType] = preprocessors;
76 break;
77 case ModifiedPipeline:
78 d->modifiedPipeline[entityType] = preprocessors;
79 break;
80 case DeletedPipeline:
81 d->deletedPipeline[entityType] = preprocessors;
82 break;
83 default:
84 break;
85 };
86} 69}
87 70
88void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) 71void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory)
@@ -92,6 +75,11 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac
92 75
93void Pipeline::startTransaction() 76void Pipeline::startTransaction()
94{ 77{
78 //TODO call for all types
79 //But avoid doing it during cleanup
80 // for (auto processor : d->processors[bufferType]) {
81 // processor->startBatch();
82 // }
95 if (d->transaction) { 83 if (d->transaction) {
96 return; 84 return;
97 } 85 }
@@ -100,7 +88,13 @@ void Pipeline::startTransaction()
100 88
101void Pipeline::commit() 89void Pipeline::commit()
102{ 90{
91 //TODO call for all types
92 //But avoid doing it during cleanup
93 // for (auto processor : d->processors[bufferType]) {
94 // processor->finalize();
95 // }
103 const auto revision = Akonadi2::Storage::maxRevision(d->transaction); 96 const auto revision = Akonadi2::Storage::maxRevision(d->transaction);
97 Trace() << "Committing " << revision;
104 if (d->transaction) { 98 if (d->transaction) {
105 d->transaction.commit(); 99 d->transaction.commit();
106 } 100 }
@@ -118,14 +112,6 @@ Storage &Pipeline::storage() const
118 return d->storage; 112 return d->storage;
119} 113}
120 114
121void Pipeline::null()
122{
123 //TODO: is there really any need for the null pipeline? if so, it should be doing something ;)
124 // PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline);
125 // d->activePipelines << state;
126 // state.step();
127}
128
129void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 115void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
130{ 116{
131 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), 117 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()),
@@ -181,15 +167,25 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
181 167
182 storeNewRevision(newRevision, fbb, bufferType, key); 168 storeNewRevision(newRevision, fbb, bufferType, key);
183 169
184 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 170 auto adaptorFactory = d->adaptorFactory.value(bufferType);
171 if (!adaptorFactory) {
172 Warning() << "no adaptor factory for type " << bufferType;
173 return KAsync::error<qint64>(0);
174 }
185 175
186 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { 176 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
187 PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future, newRevision]() { 177 d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool {
188 future.setValue(newRevision); 178 auto entity = Akonadi2::GetEntity(value);
189 future.setFinished(); 179 auto adaptor = adaptorFactory->createAdaptor(*entity);
190 }, bufferType); 180 for (auto processor : d->processors[bufferType]) {
191 d->activePipelines << state; 181 processor->newEntity(key, newRevision, *adaptor, d->transaction);
192 state.step(); 182 }
183 return false;
184 }, [this](const Akonadi2::Storage::Error &error) {
185 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
186 });
187 return KAsync::start<qint64>([newRevision](){
188 return newRevision;
193 }); 189 });
194} 190}
195 191
@@ -287,14 +283,18 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
287 283
288 storeNewRevision(newRevision, fbb, bufferType, key); 284 storeNewRevision(newRevision, fbb, bufferType, key);
289 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 285 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
290 286 d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool {
291 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { 287 auto entity = Akonadi2::GetEntity(value);
292 PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future, newRevision]() { 288 auto newEntity = adaptorFactory->createAdaptor(*entity);
293 future.setValue(newRevision); 289 for (auto processor : d->processors[bufferType]) {
294 future.setFinished(); 290 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction);
295 }, bufferType); 291 }
296 d->activePipelines << state; 292 return false;
297 state.step(); 293 }, [this](const Akonadi2::Storage::Error &error) {
294 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
295 });
296 return KAsync::start<qint64>([newRevision](){
297 return newRevision;
298 }); 298 });
299} 299}
300 300
@@ -331,13 +331,24 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
331 storeNewRevision(newRevision, fbb, bufferType, key); 331 storeNewRevision(newRevision, fbb, bufferType, key);
332 Log() << "Pipeline: deleted entity: "<< newRevision; 332 Log() << "Pipeline: deleted entity: "<< newRevision;
333 333
334 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { 334 auto adaptorFactory = d->adaptorFactory.value(bufferType);
335 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future, newRevision](){ 335 if (!adaptorFactory) {
336 future.setValue(newRevision); 336 Warning() << "no adaptor factory for type " << bufferType;
337 future.setFinished(); 337 return KAsync::error<qint64>(0);
338 }, bufferType); 338 }
339 d->activePipelines << state; 339
340 state.step(); 340 // d->transaction.openDatabase(bufferType + ".main").scan(key, [this, bufferType, newRevision, adaptorFactory](const QByteArray &, const QByteArray &value) -> bool {
341 // auto entity = Akonadi2::GetEntity(value);
342 // auto newEntity = adaptorFactory->createAdaptor(*entity);
343 for (auto processor : d->processors[bufferType]) {
344 processor->deletedEntity(key, newRevision, Akonadi2::ApplicationDomain::BufferAdaptor(), d->transaction);
345 }
346 // return false;
347 // }, [this](const Akonadi2::Storage::Error &error) {
348 // ErrorMsg() << "Failed to find value in pipeline: " << error.message;
349 // });
350 return KAsync::start<qint64>([newRevision](){
351 return newRevision;
341 }); 352 });
342} 353}
343 354
@@ -372,164 +383,6 @@ qint64 Pipeline::cleanedUpRevision()
372 return Akonadi2::Storage::cleanedUpRevision(d->transaction); 383 return Akonadi2::Storage::cleanedUpRevision(d->transaction);
373} 384}
374 385
375void Pipeline::pipelineStepped(const PipelineState &state)
376{
377 scheduleStep();
378}
379
380void Pipeline::scheduleStep()
381{
382 if (!d->stepScheduled) {
383 d->stepScheduled = true;
384 QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection);
385 }
386}
387
388void Pipeline::stepPipelines()
389{
390 d->stepScheduled = false;
391 for (PipelineState &state: d->activePipelines) {
392 if (state.isIdle()) {
393 state.step();
394 }
395 }
396}
397
398void Pipeline::pipelineCompleted(PipelineState state)
399{
400 //TODO finalize the datastore, inform clients of the new rev
401 const int index = d->activePipelines.indexOf(state);
402 if (index > -1) {
403 d->activePipelines.remove(index);
404 }
405 state.callback();
406
407 scheduleStep();
408 if (d->activePipelines.isEmpty()) {
409 emit pipelinesDrained();
410 }
411}
412
413
414class PipelineState::Private : public QSharedData
415{
416public:
417 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c, qint64 r, const QByteArray &b)
418 : pipeline(p),
419 type(t),
420 key(k),
421 filterIt(filters),
422 idle(true),
423 callback(c),
424 revision(r),
425 bufferType(b)
426 {}
427
428 Private()
429 : pipeline(0),
430 filterIt(QVector<Preprocessor *>()),
431 idle(true),
432 revision(-1)
433 {}
434
435 Pipeline *pipeline;
436 Pipeline::Type type;
437 QByteArray key;
438 QVectorIterator<Preprocessor *> filterIt;
439 bool idle;
440 std::function<void()> callback;
441 qint64 revision;
442 QByteArray bufferType;
443};
444
445PipelineState::PipelineState()
446 : d(new Private())
447{
448
449}
450
451PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType)
452 : d(new Private(pipeline, type, key, filters, callback, revision, bufferType))
453{
454}
455
456PipelineState::PipelineState(const PipelineState &other)
457 : d(other.d)
458{
459}
460
461PipelineState::~PipelineState()
462{
463}
464
465PipelineState &PipelineState::operator=(const PipelineState &rhs)
466{
467 d = rhs.d;
468 return *this;
469}
470
471bool PipelineState::operator==(const PipelineState &rhs)
472{
473 return d == rhs.d;
474}
475
476bool PipelineState::isIdle() const
477{
478 return d->idle;
479}
480
481QByteArray PipelineState::key() const
482{
483 return d->key;
484}
485
486Pipeline::Type PipelineState::type() const
487{
488 return d->type;
489}
490
491qint64 PipelineState::revision() const
492{
493 return d->revision;
494}
495
496QByteArray PipelineState::bufferType() const
497{
498 return d->bufferType;
499}
500
501void PipelineState::step()
502{
503 if (!d->pipeline) {
504 Q_ASSERT(false);
505 return;
506 }
507
508 d->idle = false;
509 if (d->filterIt.hasNext()) {
510 //TODO skip step if already processed
511 auto preprocessor = d->filterIt.next();
512 preprocessor->process(*this, d->pipeline->transaction());
513 } else {
514 //This object becomes invalid after this call
515 d->pipeline->pipelineCompleted(*this);
516 }
517}
518
519void PipelineState::processingCompleted(Preprocessor *filter)
520{
521 if (d->pipeline && filter == d->filterIt.peekPrevious()) {
522 d->idle = true;
523 d->pipeline->pipelineStepped(*this);
524 }
525}
526
527void PipelineState::callback()
528{
529 d->callback();
530}
531
532
533Preprocessor::Preprocessor() 386Preprocessor::Preprocessor()
534 : d(0) 387 : d(0)
535{ 388{
@@ -539,19 +392,12 @@ Preprocessor::~Preprocessor()
539{ 392{
540} 393}
541 394
542void Preprocessor::process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) 395void Preprocessor::startBatch()
543{
544 processingCompleted(state);
545}
546
547void Preprocessor::processingCompleted(PipelineState state)
548{ 396{
549 state.processingCompleted(this);
550} 397}
551 398
552QString Preprocessor::id() const 399void Preprocessor::finalize()
553{ 400{
554 return QLatin1String("unknown processor");
555} 401}
556 402
557} // namespace Akonadi2 403} // namespace Akonadi2
diff --git a/common/pipeline.h b/common/pipeline.h
index c8d9ddc..f11d880 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -35,7 +35,6 @@
35namespace Akonadi2 35namespace Akonadi2
36{ 36{
37 37
38class PipelineState;
39class Preprocessor; 38class Preprocessor;
40 39
41class AKONADI2COMMON_EXPORT Pipeline : public QObject 40class AKONADI2COMMON_EXPORT Pipeline : public QObject
@@ -43,19 +42,16 @@ class AKONADI2COMMON_EXPORT Pipeline : public QObject
43 Q_OBJECT 42 Q_OBJECT
44 43
45public: 44public:
46 enum Type { NullPipeline, NewPipeline, ModifiedPipeline, DeletedPipeline };
47
48 Pipeline(const QString &storagePath, QObject *parent = 0); 45 Pipeline(const QString &storagePath, QObject *parent = 0);
49 ~Pipeline(); 46 ~Pipeline();
50 47
51 Storage &storage() const; 48 Storage &storage() const;
52 49
53 void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); 50 void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors);
54 void startTransaction(); 51 void startTransaction();
55 void commit(); 52 void commit();
56 Storage::Transaction &transaction(); 53 Storage::Transaction &transaction();
57 54
58 void null();
59 void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); 55 void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory);
60 56
61 KAsync::Job<qint64> newEntity(void const *command, size_t size); 57 KAsync::Job<qint64> newEntity(void const *command, size_t size);
@@ -75,104 +71,30 @@ public:
75 71
76Q_SIGNALS: 72Q_SIGNALS:
77 void revisionUpdated(qint64); 73 void revisionUpdated(qint64);
78 void pipelinesDrained();
79
80private Q_SLOTS:
81 void stepPipelines();
82 74
83private: 75private:
84 void pipelineStepped(const PipelineState &state);
85 //Don't use a reference here (it would invalidate itself)
86 void pipelineCompleted(PipelineState state);
87 void scheduleStep();
88 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 76 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
89 77
90 friend class PipelineState;
91
92 class Private; 78 class Private;
93 Private * const d; 79 Private * const d;
94}; 80};
95 81
96class AKONADI2COMMON_EXPORT PipelineState
97{
98public:
99 PipelineState();
100 PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType);
101 PipelineState(const PipelineState &other);
102 ~PipelineState();
103
104 PipelineState &operator=(const PipelineState &rhs);
105 bool operator==(const PipelineState &rhs);
106
107 bool isIdle() const;
108 QByteArray key() const;
109 Pipeline::Type type() const;
110 qint64 revision() const;
111 QByteArray bufferType() const;
112
113 void step();
114 void processingCompleted(Preprocessor *filter);
115
116 void callback();
117
118private:
119 class Private;
120 QExplicitlySharedDataPointer<Private> d;
121};
122
123class AKONADI2COMMON_EXPORT Preprocessor 82class AKONADI2COMMON_EXPORT Preprocessor
124{ 83{
125public: 84public:
126 Preprocessor(); 85 Preprocessor();
127 virtual ~Preprocessor(); 86 virtual ~Preprocessor();
128 87
129 virtual void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction); 88 virtual void startBatch();
130 //TODO to record progress 89 virtual void newEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0;
131 virtual QString id() const; 90 virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0;
132 91 virtual void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) = 0;
133protected: 92 virtual void finalize();
134 void processingCompleted(PipelineState state);
135 93
136private: 94private:
137 class Private; 95 class Private;
138 Private * const d; 96 Private * const d;
139}; 97};
140 98
141/**
142 * A simple processor that takes a single function
143 */
144class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor
145{
146public:
147 SimpleProcessor(const QString &id, const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> &f)
148 : Akonadi2::Preprocessor(),
149 mFunction(f),
150 mId(id)
151 {
152 }
153
154 void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
155 {
156 transaction.openDatabase(state.bufferType() + ".main").scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool {
157 auto entity = Akonadi2::GetEntity(value);
158 mFunction(state, *entity, transaction);
159 processingCompleted(state);
160 return false;
161 }, [this, state](const Akonadi2::Storage::Error &error) {
162 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
163 processingCompleted(state);
164 });
165 }
166
167 QString id() const Q_DECL_OVERRIDE
168 {
169 return mId;
170 }
171
172protected:
173 std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> mFunction;
174 QString mId;
175};
176
177} // namespace Akonadi2 99} // namespace Akonadi2
178 100
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp
index 8e6bd42..0a2e90b 100644
--- a/examples/dummyresource/resourcefactory.cpp
+++ b/examples/dummyresource/resourcefactory.cpp
@@ -48,31 +48,76 @@ static void index(const QByteArray &index, const QVariant &value, const QByteArr
48 } 48 }
49} 49}
50 50
51DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Akonadi2::Pipeline> &pipeline) 51/**
52 : Akonadi2::GenericResource(instanceIdentifier, pipeline) 52 * Index types:
53{ 53 * * uid - property
54 auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create(); 54 *
55 const auto resourceIdentifier = mResourceInstanceIdentifier; 55 * * Property can be:
56 * * fixed value like uid
57 * * fixed value where we want to do smaller/greater-than comparisons. (like start date)
58 * * range indexes like what date range an event affects.
59 * * group indexes like tree hierarchies as nested sets
60 */
61template<typename DomainType>
62class IndexUpdater : public Akonadi2::Preprocessor {
63public:
64 IndexUpdater(const QByteArray &index, const QByteArray &type)
65 :mIndexIdentifier(index),
66 mBufferType(type)
67 {
56 68
57 auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { 69 }
58 Akonadi2::ApplicationDomain::Event event(resourceIdentifier, Akonadi2::Storage::uidFromKey(state.key()), -1, eventFactory->createAdaptor(entity)); 70
59 Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Event>::index(event, transaction); 71 void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
60 index("event.index.rid", event.getProperty("remoteId"), event.identifier(), transaction); 72 {
61 }); 73 Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction);
74 add(newEntity.getProperty("remoteId"), uid, transaction);
75 }
62 76
63 mPipeline->setPreprocessors(ENTITY_TYPE_EVENT, Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); 77 void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
64 mPipeline->setAdaptorFactory(ENTITY_TYPE_EVENT, eventFactory); 78 {
65 //TODO cleanup indexes during removal 79 }
66 80
81 void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
67 { 82 {
68 auto mailFactory = QSharedPointer<DummyMailAdaptorFactory>::create(); 83 }
69 auto mailIndexer = new Akonadi2::SimpleProcessor("mailIndexer", [mailFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { 84private:
70 Akonadi2::ApplicationDomain::Mail mail(resourceIdentifier, Akonadi2::Storage::uidFromKey(state.key()), -1, mailFactory->createAdaptor(entity)); 85 void add(const QVariant &value, const QByteArray &uid, Akonadi2::Storage::Transaction &transaction)
71 Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Mail>::index(mail, transaction); 86 {
72 index("mail.index.rid", mail.getProperty("remoteId"), mail.identifier(), transaction); 87 if (value.isValid()) {
73 }); 88 Index(mIndexIdentifier, transaction).add(value.toByteArray(), uid);
89 }
90 }
91
92 void remove(const QByteArray &uid, Akonadi2::Storage::Transaction &transaction)
93 {
94 //Knowning the indexed value would probably help removing the uid efficiently. Otherwise we have to execute a full scan.
95 // Index(mIndexIdentifier, transaction).remove(uid);
96 }
97
98 void modify(Akonadi2::Storage::Transaction &transaction)
99 {
100 //Knowning the indexed value would probably help removing the uid efficiently. Otherwise we have to execute a full scan.
101 // Index(mIndexIdentifier, transaction).remove(uid);
102 }
74 103
75 mPipeline->setPreprocessors(ENTITY_TYPE_MAIL, Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << mailIndexer); 104 QByteArray mIndexIdentifier;
105 QByteArray mBufferType;
106};
107
108DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Akonadi2::Pipeline> &pipeline)
109 : Akonadi2::GenericResource(instanceIdentifier, pipeline)
110{
111 {
112 auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create();
113 auto eventIndexer = new IndexUpdater<Akonadi2::ApplicationDomain::Event>("event.index.rid", ENTITY_TYPE_EVENT);
114 mPipeline->setPreprocessors(ENTITY_TYPE_EVENT, QVector<Akonadi2::Preprocessor*>() << eventIndexer);
115 mPipeline->setAdaptorFactory(ENTITY_TYPE_EVENT, eventFactory);
116 }
117 {
118 auto mailFactory = QSharedPointer<DummyMailAdaptorFactory>::create();
119 auto mailIndexer = new IndexUpdater<Akonadi2::ApplicationDomain::Mail>("mail.index.rid", ENTITY_TYPE_MAIL);
120 mPipeline->setPreprocessors(ENTITY_TYPE_MAIL, QVector<Akonadi2::Preprocessor*>() << mailIndexer);
76 mPipeline->setAdaptorFactory(ENTITY_TYPE_MAIL, mailFactory); 121 mPipeline->setAdaptorFactory(ENTITY_TYPE_MAIL, mailFactory);
77 } 122 }
78} 123}
@@ -171,9 +216,7 @@ KAsync::Job<void> DummyResource::synchronizeWithSource()
171 216
172void DummyResource::removeFromDisk(const QByteArray &instanceIdentifier) 217void DummyResource::removeFromDisk(const QByteArray &instanceIdentifier)
173{ 218{
174 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk(); 219 GenericResource::removeFromDisk(instanceIdentifier);
175 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk();
176 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk();
177 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".event.index.uid", Akonadi2::Storage::ReadWrite).removeFromDisk(); 220 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".event.index.uid", Akonadi2::Storage::ReadWrite).removeFromDisk();
178} 221}
179 222
diff --git a/tests/genericresourcebenchmark.cpp b/tests/genericresourcebenchmark.cpp
index b8635d7..fbe0d12 100644
--- a/tests/genericresourcebenchmark.cpp
+++ b/tests/genericresourcebenchmark.cpp
@@ -60,6 +60,25 @@ static QByteArray createEntityBuffer()
60 return QByteArray(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); 60 return QByteArray(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
61} 61}
62 62
63class IndexUpdater : public Akonadi2::Preprocessor {
64public:
65 void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
66 {
67 for (int i = 0; i < 10; i++) {
68 Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction);
69 ridIndex.add("foo", uid);
70 }
71 }
72
73 void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
74 {
75 }
76
77 void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
78 {
79 }
80};
81
63/** 82/**
64 * Benchmark write performance of generic resource implementation including queues and pipeline. 83 * Benchmark write performance of generic resource implementation including queues and pipeline.
65 */ 84 */
@@ -124,19 +143,9 @@ private Q_SLOTS:
124 143
125 auto eventFactory = QSharedPointer<TestEventAdaptorFactory>::create(); 144 auto eventFactory = QSharedPointer<TestEventAdaptorFactory>::create();
126 const QByteArray resourceIdentifier = "org.kde.test.instance1"; 145 const QByteArray resourceIdentifier = "org.kde.test.instance1";
127 auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { 146 auto indexer = QSharedPointer<IndexUpdater>::create();
128 auto adaptor = eventFactory->createAdaptor(entity); 147
129 Akonadi2::ApplicationDomain::Event event(resourceIdentifier, state.key(), -1, adaptor); 148 pipeline->setPreprocessors("event", QVector<Akonadi2::Preprocessor*>() << indexer.data());
130 Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Event>::index(event, transaction);
131
132 //Create a bunch of indexes
133 for (int i = 0; i < 10; i++) {
134 Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction);
135 ridIndex.add("foo", event.identifier());
136 }
137 });
138
139 pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer);
140 pipeline->setAdaptorFactory("event", eventFactory); 149 pipeline->setAdaptorFactory("event", eventFactory);
141 150
142 TestResource resource("org.kde.test.instance1", pipeline); 151 TestResource resource("org.kde.test.instance1", pipeline);
diff --git a/tests/genericresourcetest.cpp b/tests/genericresourcetest.cpp
index 6dd4108..141a5f8 100644
--- a/tests/genericresourcetest.cpp
+++ b/tests/genericresourcetest.cpp
@@ -32,9 +32,7 @@ private Q_SLOTS:
32 32
33 void init() 33 void init()
34 { 34 {
35 removeFromDisk("org.kde.test.instance1"); 35 Akonadi2::GenericResource::removeFromDisk("org.kde.test.instance1");
36 removeFromDisk("org.kde.test.instance1.userqueue");
37 removeFromDisk("org.kde.test.instance1.synchronizerqueue");
38 Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); 36 Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace);
39 } 37 }
40 38
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp
index 7efba13..0b4c13e 100644
--- a/tests/pipelinetest.cpp
+++ b/tests/pipelinetest.cpp
@@ -139,6 +139,34 @@ QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision)
139 return command; 139 return command;
140} 140}
141 141
142class TestProcessor : public Akonadi2::Preprocessor {
143public:
144 void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
145 {
146 newUids << uid;
147 newRevisions << revision;
148 }
149
150 void modifiedEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
151 {
152 modifiedUids << uid;
153 modifiedRevisions << revision;
154 }
155
156 void deletedEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
157 {
158 deletedUids << uid;
159 deletedRevisions << revision;
160 }
161
162 QList<QByteArray> newUids;
163 QList<qint64> newRevisions;
164 QList<QByteArray> modifiedUids;
165 QList<qint64> modifiedRevisions;
166 QList<QByteArray> deletedUids;
167 QList<qint64> deletedRevisions;
168};
169
142/** 170/**
143 * Test of the pipeline implementation to ensure new revisions are created correctly in the database. 171 * Test of the pipeline implementation to ensure new revisions are created correctly in the database.
144 */ 172 */
@@ -251,6 +279,53 @@ private Q_SLOTS:
251 //And all revisions are gone 279 //And all revisions are gone
252 QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 0); 280 QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 0);
253 } 281 }
282
283 void testPreprocessor()
284 {
285 flatbuffers::FlatBufferBuilder entityFbb;
286
287 TestProcessor testProcessor;
288
289 Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1");
290 pipeline.setPreprocessors("event", QVector<Akonadi2::Preprocessor*>() << &testProcessor);
291 pipeline.startTransaction();
292 pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create());
293
294 //Actual test
295 {
296 auto command = createEntityCommand(createEvent(entityFbb));
297 pipeline.newEntity(command.constData(), command.size());
298 QCOMPARE(testProcessor.newUids.size(), 1);
299 QCOMPARE(testProcessor.newRevisions.size(), 1);
300 //Key doesn't contain revision and is just the uid
301 QCOMPARE(testProcessor.newUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.newUids.at(0)));
302 }
303 pipeline.commit();
304 entityFbb.Clear();
305 pipeline.startTransaction();
306 auto keys = getKeys("org.kde.pipelinetest.instance1", "event.main");
307 QCOMPARE(keys.size(), 1);
308 const auto uid = Akonadi2::Storage::uidFromKey(keys.first());
309 {
310 auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1);
311 pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size());
312 QCOMPARE(testProcessor.modifiedUids.size(), 1);
313 QCOMPARE(testProcessor.modifiedRevisions.size(), 1);
314 //Key doesn't contain revision and is just the uid
315 QCOMPARE(testProcessor.modifiedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.modifiedUids.at(0)));
316 }
317 pipeline.commit();
318 entityFbb.Clear();
319 pipeline.startTransaction();
320 {
321 auto deleteCommand = deleteEntityCommand(uid, 1);
322 pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size());
323 QCOMPARE(testProcessor.deletedUids.size(), 1);
324 QCOMPARE(testProcessor.deletedUids.size(), 1);
325 //Key doesn't contain revision and is just the uid
326 QCOMPARE(testProcessor.deletedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.deletedUids.at(0)));
327 }
328 }
254}; 329};
255 330
256QTEST_MAIN(PipelineTest) 331QTEST_MAIN(PipelineTest)