summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-10-28 16:39:16 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-10-28 16:39:16 +0100
commit129333371d28c06d85f75ca579ce17798e615e84 (patch)
tree2ae01db9d26f6f72a74fa77e6937e03304e81a2c /common/pipeline.cpp
parent20f049b65c4bd8c3d0c16bbf398641675648a93f (diff)
downloadsink-129333371d28c06d85f75ca579ce17798e615e84.tar.gz
sink-129333371d28c06d85f75ca579ce17798e615e84.zip
Made pipeline preprocessing synchronous.
Instead of having the asynchronous preprocessor concept with different pipelines for new/modify/delete we have a single pipeline with synchronous preprocessors that act upon new/modify/delete. This keeps the code simpler due to lack of asynchronity and keeps the new/modify/delete operations together (which at least for the indexing makes a lot of sense). Not supporting asynchronity is ok because the tasks done in preprocessing are not cpu intensive (if they were we had a problem since they are directly involved in the round-trip time), and the main cost comes from i/o, meaning we don't gain much by doing multithreading. Costly tasks (such as full-text indexing) should rather be implemented as post-processing, since that doesn't increase the round-trip time directly, and eventually consistent is typically good enough for that.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp286
1 files changed, 66 insertions, 220 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 15d2401..de63288 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -1,5 +1,6 @@
1/* 1/*
2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> 2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
3 * Copyright (C) 2015 Christian Mollekopf <mollekopf@kolabsys.com>
3 * 4 *
4 * This library is free software; you can redistribute it and/or 5 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public 6 * modify it under the terms of the GNU Lesser General Public
@@ -41,19 +42,13 @@ class Pipeline::Private
41{ 42{
42public: 43public:
43 Private(const QString &resourceName) 44 Private(const QString &resourceName)
44 : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite), 45 : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite)
45 stepScheduled(false)
46 { 46 {
47 } 47 }
48 48
49 Storage storage; 49 Storage storage;
50 Storage::Transaction transaction; 50 Storage::Transaction transaction;
51 QHash<QString, QVector<Preprocessor *> > nullPipeline; 51 QHash<QString, QVector<Preprocessor *> > processors;
52 QHash<QString, QVector<Preprocessor *> > newPipeline;
53 QHash<QString, QVector<Preprocessor *> > modifiedPipeline;
54 QHash<QString, QVector<Preprocessor *> > deletedPipeline;
55 QVector<PipelineState> activePipelines;
56 bool stepScheduled;
57 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; 52 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory;
58}; 53};
59 54
@@ -68,21 +63,9 @@ Pipeline::~Pipeline()
68 delete d; 63 delete d;
69} 64}
70 65
71void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors) 66void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors)
72{ 67{
73 switch (pipelineType) { 68 d->processors[entityType] = processors;
74 case NewPipeline:
75 d->newPipeline[entityType] = preprocessors;
76 break;
77 case ModifiedPipeline:
78 d->modifiedPipeline[entityType] = preprocessors;
79 break;
80 case DeletedPipeline:
81 d->deletedPipeline[entityType] = preprocessors;
82 break;
83 default:
84 break;
85 };
86} 69}
87 70
88void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) 71void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory)
@@ -92,6 +75,11 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac
92 75
93void Pipeline::startTransaction() 76void Pipeline::startTransaction()
94{ 77{
78 //TODO call for all types
79 //But avoid doing it during cleanup
80 // for (auto processor : d->processors[bufferType]) {
81 // processor->startBatch();
82 // }
95 if (d->transaction) { 83 if (d->transaction) {
96 return; 84 return;
97 } 85 }
@@ -100,7 +88,13 @@ void Pipeline::startTransaction()
100 88
101void Pipeline::commit() 89void Pipeline::commit()
102{ 90{
91 //TODO call for all types
92 //But avoid doing it during cleanup
93 // for (auto processor : d->processors[bufferType]) {
94 // processor->finalize();
95 // }
103 const auto revision = Akonadi2::Storage::maxRevision(d->transaction); 96 const auto revision = Akonadi2::Storage::maxRevision(d->transaction);
97 Trace() << "Committing " << revision;
104 if (d->transaction) { 98 if (d->transaction) {
105 d->transaction.commit(); 99 d->transaction.commit();
106 } 100 }
@@ -118,14 +112,6 @@ Storage &Pipeline::storage() const
118 return d->storage; 112 return d->storage;
119} 113}
120 114
121void Pipeline::null()
122{
123 //TODO: is there really any need for the null pipeline? if so, it should be doing something ;)
124 // PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline);
125 // d->activePipelines << state;
126 // state.step();
127}
128
129void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 115void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
130{ 116{
131 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), 117 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()),
@@ -181,15 +167,25 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
181 167
182 storeNewRevision(newRevision, fbb, bufferType, key); 168 storeNewRevision(newRevision, fbb, bufferType, key);
183 169
184 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 170 auto adaptorFactory = d->adaptorFactory.value(bufferType);
171 if (!adaptorFactory) {
172 Warning() << "no adaptor factory for type " << bufferType;
173 return KAsync::error<qint64>(0);
174 }
185 175
186 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { 176 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
187 PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future, newRevision]() { 177 d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool {
188 future.setValue(newRevision); 178 auto entity = Akonadi2::GetEntity(value);
189 future.setFinished(); 179 auto adaptor = adaptorFactory->createAdaptor(*entity);
190 }, bufferType); 180 for (auto processor : d->processors[bufferType]) {
191 d->activePipelines << state; 181 processor->newEntity(key, newRevision, *adaptor, d->transaction);
192 state.step(); 182 }
183 return false;
184 }, [this](const Akonadi2::Storage::Error &error) {
185 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
186 });
187 return KAsync::start<qint64>([newRevision](){
188 return newRevision;
193 }); 189 });
194} 190}
195 191
@@ -287,14 +283,18 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
287 283
288 storeNewRevision(newRevision, fbb, bufferType, key); 284 storeNewRevision(newRevision, fbb, bufferType, key);
289 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 285 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
290 286 d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool {
291 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { 287 auto entity = Akonadi2::GetEntity(value);
292 PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future, newRevision]() { 288 auto newEntity = adaptorFactory->createAdaptor(*entity);
293 future.setValue(newRevision); 289 for (auto processor : d->processors[bufferType]) {
294 future.setFinished(); 290 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction);
295 }, bufferType); 291 }
296 d->activePipelines << state; 292 return false;
297 state.step(); 293 }, [this](const Akonadi2::Storage::Error &error) {
294 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
295 });
296 return KAsync::start<qint64>([newRevision](){
297 return newRevision;
298 }); 298 });
299} 299}
300 300
@@ -331,13 +331,24 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
331 storeNewRevision(newRevision, fbb, bufferType, key); 331 storeNewRevision(newRevision, fbb, bufferType, key);
332 Log() << "Pipeline: deleted entity: "<< newRevision; 332 Log() << "Pipeline: deleted entity: "<< newRevision;
333 333
334 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { 334 auto adaptorFactory = d->adaptorFactory.value(bufferType);
335 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future, newRevision](){ 335 if (!adaptorFactory) {
336 future.setValue(newRevision); 336 Warning() << "no adaptor factory for type " << bufferType;
337 future.setFinished(); 337 return KAsync::error<qint64>(0);
338 }, bufferType); 338 }
339 d->activePipelines << state; 339
340 state.step(); 340 // d->transaction.openDatabase(bufferType + ".main").scan(key, [this, bufferType, newRevision, adaptorFactory](const QByteArray &, const QByteArray &value) -> bool {
341 // auto entity = Akonadi2::GetEntity(value);
342 // auto newEntity = adaptorFactory->createAdaptor(*entity);
343 for (auto processor : d->processors[bufferType]) {
344 processor->deletedEntity(key, newRevision, Akonadi2::ApplicationDomain::BufferAdaptor(), d->transaction);
345 }
346 // return false;
347 // }, [this](const Akonadi2::Storage::Error &error) {
348 // ErrorMsg() << "Failed to find value in pipeline: " << error.message;
349 // });
350 return KAsync::start<qint64>([newRevision](){
351 return newRevision;
341 }); 352 });
342} 353}
343 354
@@ -372,164 +383,6 @@ qint64 Pipeline::cleanedUpRevision()
372 return Akonadi2::Storage::cleanedUpRevision(d->transaction); 383 return Akonadi2::Storage::cleanedUpRevision(d->transaction);
373} 384}
374 385
375void Pipeline::pipelineStepped(const PipelineState &state)
376{
377 scheduleStep();
378}
379
380void Pipeline::scheduleStep()
381{
382 if (!d->stepScheduled) {
383 d->stepScheduled = true;
384 QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection);
385 }
386}
387
388void Pipeline::stepPipelines()
389{
390 d->stepScheduled = false;
391 for (PipelineState &state: d->activePipelines) {
392 if (state.isIdle()) {
393 state.step();
394 }
395 }
396}
397
398void Pipeline::pipelineCompleted(PipelineState state)
399{
400 //TODO finalize the datastore, inform clients of the new rev
401 const int index = d->activePipelines.indexOf(state);
402 if (index > -1) {
403 d->activePipelines.remove(index);
404 }
405 state.callback();
406
407 scheduleStep();
408 if (d->activePipelines.isEmpty()) {
409 emit pipelinesDrained();
410 }
411}
412
413
414class PipelineState::Private : public QSharedData
415{
416public:
417 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c, qint64 r, const QByteArray &b)
418 : pipeline(p),
419 type(t),
420 key(k),
421 filterIt(filters),
422 idle(true),
423 callback(c),
424 revision(r),
425 bufferType(b)
426 {}
427
428 Private()
429 : pipeline(0),
430 filterIt(QVector<Preprocessor *>()),
431 idle(true),
432 revision(-1)
433 {}
434
435 Pipeline *pipeline;
436 Pipeline::Type type;
437 QByteArray key;
438 QVectorIterator<Preprocessor *> filterIt;
439 bool idle;
440 std::function<void()> callback;
441 qint64 revision;
442 QByteArray bufferType;
443};
444
445PipelineState::PipelineState()
446 : d(new Private())
447{
448
449}
450
451PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType)
452 : d(new Private(pipeline, type, key, filters, callback, revision, bufferType))
453{
454}
455
456PipelineState::PipelineState(const PipelineState &other)
457 : d(other.d)
458{
459}
460
461PipelineState::~PipelineState()
462{
463}
464
465PipelineState &PipelineState::operator=(const PipelineState &rhs)
466{
467 d = rhs.d;
468 return *this;
469}
470
471bool PipelineState::operator==(const PipelineState &rhs)
472{
473 return d == rhs.d;
474}
475
476bool PipelineState::isIdle() const
477{
478 return d->idle;
479}
480
481QByteArray PipelineState::key() const
482{
483 return d->key;
484}
485
486Pipeline::Type PipelineState::type() const
487{
488 return d->type;
489}
490
491qint64 PipelineState::revision() const
492{
493 return d->revision;
494}
495
496QByteArray PipelineState::bufferType() const
497{
498 return d->bufferType;
499}
500
501void PipelineState::step()
502{
503 if (!d->pipeline) {
504 Q_ASSERT(false);
505 return;
506 }
507
508 d->idle = false;
509 if (d->filterIt.hasNext()) {
510 //TODO skip step if already processed
511 auto preprocessor = d->filterIt.next();
512 preprocessor->process(*this, d->pipeline->transaction());
513 } else {
514 //This object becomes invalid after this call
515 d->pipeline->pipelineCompleted(*this);
516 }
517}
518
519void PipelineState::processingCompleted(Preprocessor *filter)
520{
521 if (d->pipeline && filter == d->filterIt.peekPrevious()) {
522 d->idle = true;
523 d->pipeline->pipelineStepped(*this);
524 }
525}
526
527void PipelineState::callback()
528{
529 d->callback();
530}
531
532
533Preprocessor::Preprocessor() 386Preprocessor::Preprocessor()
534 : d(0) 387 : d(0)
535{ 388{
@@ -539,19 +392,12 @@ Preprocessor::~Preprocessor()
539{ 392{
540} 393}
541 394
542void Preprocessor::process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) 395void Preprocessor::startBatch()
543{
544 processingCompleted(state);
545}
546
547void Preprocessor::processingCompleted(PipelineState state)
548{ 396{
549 state.processingCompleted(this);
550} 397}
551 398
552QString Preprocessor::id() const 399void Preprocessor::finalize()
553{ 400{
554 return QLatin1String("unknown processor");
555} 401}
556 402
557} // namespace Akonadi2 403} // namespace Akonadi2