diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 286 |
1 files changed, 66 insertions, 220 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 15d2401..de63288 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -1,5 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> | 2 | * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> |
3 | * Copyright (C) 2015 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | 4 | * |
4 | * This library is free software; you can redistribute it and/or | 5 | * This library is free software; you can redistribute it and/or |
5 | * modify it under the terms of the GNU Lesser General Public | 6 | * modify it under the terms of the GNU Lesser General Public |
@@ -41,19 +42,13 @@ class Pipeline::Private | |||
41 | { | 42 | { |
42 | public: | 43 | public: |
43 | Private(const QString &resourceName) | 44 | Private(const QString &resourceName) |
44 | : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite), | 45 | : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite) |
45 | stepScheduled(false) | ||
46 | { | 46 | { |
47 | } | 47 | } |
48 | 48 | ||
49 | Storage storage; | 49 | Storage storage; |
50 | Storage::Transaction transaction; | 50 | Storage::Transaction transaction; |
51 | QHash<QString, QVector<Preprocessor *> > nullPipeline; | 51 | QHash<QString, QVector<Preprocessor *> > processors; |
52 | QHash<QString, QVector<Preprocessor *> > newPipeline; | ||
53 | QHash<QString, QVector<Preprocessor *> > modifiedPipeline; | ||
54 | QHash<QString, QVector<Preprocessor *> > deletedPipeline; | ||
55 | QVector<PipelineState> activePipelines; | ||
56 | bool stepScheduled; | ||
57 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; | 52 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; |
58 | }; | 53 | }; |
59 | 54 | ||
@@ -68,21 +63,9 @@ Pipeline::~Pipeline() | |||
68 | delete d; | 63 | delete d; |
69 | } | 64 | } |
70 | 65 | ||
71 | void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors) | 66 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) |
72 | { | 67 | { |
73 | switch (pipelineType) { | 68 | d->processors[entityType] = processors; |
74 | case NewPipeline: | ||
75 | d->newPipeline[entityType] = preprocessors; | ||
76 | break; | ||
77 | case ModifiedPipeline: | ||
78 | d->modifiedPipeline[entityType] = preprocessors; | ||
79 | break; | ||
80 | case DeletedPipeline: | ||
81 | d->deletedPipeline[entityType] = preprocessors; | ||
82 | break; | ||
83 | default: | ||
84 | break; | ||
85 | }; | ||
86 | } | 69 | } |
87 | 70 | ||
88 | void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) | 71 | void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) |
@@ -92,6 +75,11 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac | |||
92 | 75 | ||
93 | void Pipeline::startTransaction() | 76 | void Pipeline::startTransaction() |
94 | { | 77 | { |
78 | //TODO call for all types | ||
79 | //But avoid doing it during cleanup | ||
80 | // for (auto processor : d->processors[bufferType]) { | ||
81 | // processor->startBatch(); | ||
82 | // } | ||
95 | if (d->transaction) { | 83 | if (d->transaction) { |
96 | return; | 84 | return; |
97 | } | 85 | } |
@@ -100,7 +88,13 @@ void Pipeline::startTransaction() | |||
100 | 88 | ||
101 | void Pipeline::commit() | 89 | void Pipeline::commit() |
102 | { | 90 | { |
91 | //TODO call for all types | ||
92 | //But avoid doing it during cleanup | ||
93 | // for (auto processor : d->processors[bufferType]) { | ||
94 | // processor->finalize(); | ||
95 | // } | ||
103 | const auto revision = Akonadi2::Storage::maxRevision(d->transaction); | 96 | const auto revision = Akonadi2::Storage::maxRevision(d->transaction); |
97 | Trace() << "Committing " << revision; | ||
104 | if (d->transaction) { | 98 | if (d->transaction) { |
105 | d->transaction.commit(); | 99 | d->transaction.commit(); |
106 | } | 100 | } |
@@ -118,14 +112,6 @@ Storage &Pipeline::storage() const | |||
118 | return d->storage; | 112 | return d->storage; |
119 | } | 113 | } |
120 | 114 | ||
121 | void Pipeline::null() | ||
122 | { | ||
123 | //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) | ||
124 | // PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); | ||
125 | // d->activePipelines << state; | ||
126 | // state.step(); | ||
127 | } | ||
128 | |||
129 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 115 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
130 | { | 116 | { |
131 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), | 117 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), |
@@ -181,15 +167,25 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
181 | 167 | ||
182 | storeNewRevision(newRevision, fbb, bufferType, key); | 168 | storeNewRevision(newRevision, fbb, bufferType, key); |
183 | 169 | ||
184 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 170 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
171 | if (!adaptorFactory) { | ||
172 | Warning() << "no adaptor factory for type " << bufferType; | ||
173 | return KAsync::error<qint64>(0); | ||
174 | } | ||
185 | 175 | ||
186 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { | 176 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
187 | PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future, newRevision]() { | 177 | d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { |
188 | future.setValue(newRevision); | 178 | auto entity = Akonadi2::GetEntity(value); |
189 | future.setFinished(); | 179 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
190 | }, bufferType); | 180 | for (auto processor : d->processors[bufferType]) { |
191 | d->activePipelines << state; | 181 | processor->newEntity(key, newRevision, *adaptor, d->transaction); |
192 | state.step(); | 182 | } |
183 | return false; | ||
184 | }, [this](const Akonadi2::Storage::Error &error) { | ||
185 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | ||
186 | }); | ||
187 | return KAsync::start<qint64>([newRevision](){ | ||
188 | return newRevision; | ||
193 | }); | 189 | }); |
194 | } | 190 | } |
195 | 191 | ||
@@ -287,14 +283,18 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
287 | 283 | ||
288 | storeNewRevision(newRevision, fbb, bufferType, key); | 284 | storeNewRevision(newRevision, fbb, bufferType, key); |
289 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 285 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
290 | 286 | d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool { | |
291 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { | 287 | auto entity = Akonadi2::GetEntity(value); |
292 | PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future, newRevision]() { | 288 | auto newEntity = adaptorFactory->createAdaptor(*entity); |
293 | future.setValue(newRevision); | 289 | for (auto processor : d->processors[bufferType]) { |
294 | future.setFinished(); | 290 | processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); |
295 | }, bufferType); | 291 | } |
296 | d->activePipelines << state; | 292 | return false; |
297 | state.step(); | 293 | }, [this](const Akonadi2::Storage::Error &error) { |
294 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | ||
295 | }); | ||
296 | return KAsync::start<qint64>([newRevision](){ | ||
297 | return newRevision; | ||
298 | }); | 298 | }); |
299 | } | 299 | } |
300 | 300 | ||
@@ -331,13 +331,24 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
331 | storeNewRevision(newRevision, fbb, bufferType, key); | 331 | storeNewRevision(newRevision, fbb, bufferType, key); |
332 | Log() << "Pipeline: deleted entity: "<< newRevision; | 332 | Log() << "Pipeline: deleted entity: "<< newRevision; |
333 | 333 | ||
334 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { | 334 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
335 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future, newRevision](){ | 335 | if (!adaptorFactory) { |
336 | future.setValue(newRevision); | 336 | Warning() << "no adaptor factory for type " << bufferType; |
337 | future.setFinished(); | 337 | return KAsync::error<qint64>(0); |
338 | }, bufferType); | 338 | } |
339 | d->activePipelines << state; | 339 | |
340 | state.step(); | 340 | // d->transaction.openDatabase(bufferType + ".main").scan(key, [this, bufferType, newRevision, adaptorFactory](const QByteArray &, const QByteArray &value) -> bool { |
341 | // auto entity = Akonadi2::GetEntity(value); | ||
342 | // auto newEntity = adaptorFactory->createAdaptor(*entity); | ||
343 | for (auto processor : d->processors[bufferType]) { | ||
344 | processor->deletedEntity(key, newRevision, Akonadi2::ApplicationDomain::BufferAdaptor(), d->transaction); | ||
345 | } | ||
346 | // return false; | ||
347 | // }, [this](const Akonadi2::Storage::Error &error) { | ||
348 | // ErrorMsg() << "Failed to find value in pipeline: " << error.message; | ||
349 | // }); | ||
350 | return KAsync::start<qint64>([newRevision](){ | ||
351 | return newRevision; | ||
341 | }); | 352 | }); |
342 | } | 353 | } |
343 | 354 | ||
@@ -372,164 +383,6 @@ qint64 Pipeline::cleanedUpRevision() | |||
372 | return Akonadi2::Storage::cleanedUpRevision(d->transaction); | 383 | return Akonadi2::Storage::cleanedUpRevision(d->transaction); |
373 | } | 384 | } |
374 | 385 | ||
375 | void Pipeline::pipelineStepped(const PipelineState &state) | ||
376 | { | ||
377 | scheduleStep(); | ||
378 | } | ||
379 | |||
380 | void Pipeline::scheduleStep() | ||
381 | { | ||
382 | if (!d->stepScheduled) { | ||
383 | d->stepScheduled = true; | ||
384 | QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); | ||
385 | } | ||
386 | } | ||
387 | |||
388 | void Pipeline::stepPipelines() | ||
389 | { | ||
390 | d->stepScheduled = false; | ||
391 | for (PipelineState &state: d->activePipelines) { | ||
392 | if (state.isIdle()) { | ||
393 | state.step(); | ||
394 | } | ||
395 | } | ||
396 | } | ||
397 | |||
398 | void Pipeline::pipelineCompleted(PipelineState state) | ||
399 | { | ||
400 | //TODO finalize the datastore, inform clients of the new rev | ||
401 | const int index = d->activePipelines.indexOf(state); | ||
402 | if (index > -1) { | ||
403 | d->activePipelines.remove(index); | ||
404 | } | ||
405 | state.callback(); | ||
406 | |||
407 | scheduleStep(); | ||
408 | if (d->activePipelines.isEmpty()) { | ||
409 | emit pipelinesDrained(); | ||
410 | } | ||
411 | } | ||
412 | |||
413 | |||
414 | class PipelineState::Private : public QSharedData | ||
415 | { | ||
416 | public: | ||
417 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c, qint64 r, const QByteArray &b) | ||
418 | : pipeline(p), | ||
419 | type(t), | ||
420 | key(k), | ||
421 | filterIt(filters), | ||
422 | idle(true), | ||
423 | callback(c), | ||
424 | revision(r), | ||
425 | bufferType(b) | ||
426 | {} | ||
427 | |||
428 | Private() | ||
429 | : pipeline(0), | ||
430 | filterIt(QVector<Preprocessor *>()), | ||
431 | idle(true), | ||
432 | revision(-1) | ||
433 | {} | ||
434 | |||
435 | Pipeline *pipeline; | ||
436 | Pipeline::Type type; | ||
437 | QByteArray key; | ||
438 | QVectorIterator<Preprocessor *> filterIt; | ||
439 | bool idle; | ||
440 | std::function<void()> callback; | ||
441 | qint64 revision; | ||
442 | QByteArray bufferType; | ||
443 | }; | ||
444 | |||
445 | PipelineState::PipelineState() | ||
446 | : d(new Private()) | ||
447 | { | ||
448 | |||
449 | } | ||
450 | |||
451 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType) | ||
452 | : d(new Private(pipeline, type, key, filters, callback, revision, bufferType)) | ||
453 | { | ||
454 | } | ||
455 | |||
456 | PipelineState::PipelineState(const PipelineState &other) | ||
457 | : d(other.d) | ||
458 | { | ||
459 | } | ||
460 | |||
461 | PipelineState::~PipelineState() | ||
462 | { | ||
463 | } | ||
464 | |||
465 | PipelineState &PipelineState::operator=(const PipelineState &rhs) | ||
466 | { | ||
467 | d = rhs.d; | ||
468 | return *this; | ||
469 | } | ||
470 | |||
471 | bool PipelineState::operator==(const PipelineState &rhs) | ||
472 | { | ||
473 | return d == rhs.d; | ||
474 | } | ||
475 | |||
476 | bool PipelineState::isIdle() const | ||
477 | { | ||
478 | return d->idle; | ||
479 | } | ||
480 | |||
481 | QByteArray PipelineState::key() const | ||
482 | { | ||
483 | return d->key; | ||
484 | } | ||
485 | |||
486 | Pipeline::Type PipelineState::type() const | ||
487 | { | ||
488 | return d->type; | ||
489 | } | ||
490 | |||
491 | qint64 PipelineState::revision() const | ||
492 | { | ||
493 | return d->revision; | ||
494 | } | ||
495 | |||
496 | QByteArray PipelineState::bufferType() const | ||
497 | { | ||
498 | return d->bufferType; | ||
499 | } | ||
500 | |||
501 | void PipelineState::step() | ||
502 | { | ||
503 | if (!d->pipeline) { | ||
504 | Q_ASSERT(false); | ||
505 | return; | ||
506 | } | ||
507 | |||
508 | d->idle = false; | ||
509 | if (d->filterIt.hasNext()) { | ||
510 | //TODO skip step if already processed | ||
511 | auto preprocessor = d->filterIt.next(); | ||
512 | preprocessor->process(*this, d->pipeline->transaction()); | ||
513 | } else { | ||
514 | //This object becomes invalid after this call | ||
515 | d->pipeline->pipelineCompleted(*this); | ||
516 | } | ||
517 | } | ||
518 | |||
519 | void PipelineState::processingCompleted(Preprocessor *filter) | ||
520 | { | ||
521 | if (d->pipeline && filter == d->filterIt.peekPrevious()) { | ||
522 | d->idle = true; | ||
523 | d->pipeline->pipelineStepped(*this); | ||
524 | } | ||
525 | } | ||
526 | |||
527 | void PipelineState::callback() | ||
528 | { | ||
529 | d->callback(); | ||
530 | } | ||
531 | |||
532 | |||
533 | Preprocessor::Preprocessor() | 386 | Preprocessor::Preprocessor() |
534 | : d(0) | 387 | : d(0) |
535 | { | 388 | { |
@@ -539,19 +392,12 @@ Preprocessor::~Preprocessor() | |||
539 | { | 392 | { |
540 | } | 393 | } |
541 | 394 | ||
542 | void Preprocessor::process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) | 395 | void Preprocessor::startBatch() |
543 | { | ||
544 | processingCompleted(state); | ||
545 | } | ||
546 | |||
547 | void Preprocessor::processingCompleted(PipelineState state) | ||
548 | { | 396 | { |
549 | state.processingCompleted(this); | ||
550 | } | 397 | } |
551 | 398 | ||
552 | QString Preprocessor::id() const | 399 | void Preprocessor::finalize() |
553 | { | 400 | { |
554 | return QLatin1String("unknown processor"); | ||
555 | } | 401 | } |
556 | 402 | ||
557 | } // namespace Akonadi2 | 403 | } // namespace Akonadi2 |