From db477b2f35411a55c051d59588b6fabd153e4013 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 19 Jan 2015 03:16:29 +0100 Subject: Fixed pipeline. Steps are now finally processed as they should be and a job tracks the processing progress. --- common/pipeline.cpp | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) (limited to 'common/pipeline.cpp') diff --git a/common/pipeline.cpp b/common/pipeline.cpp index dda7671..8f15fef 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -168,32 +168,30 @@ void Pipeline::pipelineStepped(const PipelineState &state) void Pipeline::scheduleStep() { - // if (!d->stepScheduled) { - // d->stepScheduled = true; - // QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); - // } - //FIXME make async again. For some reason the job in newEntity crashes if pipeline processing is async. - stepPipelines(); + if (!d->stepScheduled) { + d->stepScheduled = true; + QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); + } } void Pipeline::stepPipelines() { + d->stepScheduled = false; for (PipelineState &state: d->activePipelines) { if (state.isIdle()) { state.step(); } } - - d->stepScheduled = false; } -void Pipeline::pipelineCompleted(const PipelineState &state) +void Pipeline::pipelineCompleted(PipelineState state) { //TODO finalize the datastore, inform clients of the new rev const int index = d->activePipelines.indexOf(state); if (index > -1) { d->activePipelines.remove(index); } + state.callback(); if (state.type() != NullPipeline) { //TODO what revision is finalized? @@ -281,20 +279,23 @@ Pipeline::Type PipelineState::type() const void PipelineState::step() { if (!d->pipeline) { + Q_ASSERT(false); return; } d->idle = false; if (d->filterIt.hasNext()) { //TODO skip step if already processed - d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { + //FIXME error handling if no result is found + auto preprocessor = d->filterIt.next(); + d->pipeline->storage().scan(d->key.toStdString(), [this, preprocessor](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { auto entity = Akonadi2::GetEntity(dataValue); - d->filterIt.next()->process(*this, *entity); + preprocessor->process(*this, *entity); return false; }); } else { + //This object becomes invalid after this call d->pipeline->pipelineCompleted(*this); - d->callback(); } } @@ -307,6 +308,12 @@ void PipelineState::processingCompleted(Preprocessor *filter) } } +void PipelineState::callback() +{ + d->callback(); +} + + Preprocessor::Preprocessor() : d(0) { @@ -316,7 +323,7 @@ Preprocessor::~Preprocessor() { } -void Preprocessor::process(PipelineState state, const Akonadi2::Entity &) +void Preprocessor::process(const PipelineState &state, const Akonadi2::Entity &) { processingCompleted(state); } @@ -326,5 +333,10 @@ void Preprocessor::processingCompleted(PipelineState state) state.processingCompleted(this); } +QString Preprocessor::id() const +{ + return QLatin1String("unknown processor"); +} + } // namespace Akonadi2 -- cgit v1.2.3