From db477b2f35411a55c051d59588b6fabd153e4013 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 19 Jan 2015 03:16:29 +0100 Subject: Fixed pipeline. Steps are now finally processed as they should be and a job tracks the processing progress. --- common/pipeline.cpp | 38 +++++++++++++++++++++++++------------- common/pipeline.h | 9 ++++++--- dummyresource/resourcefactory.cpp | 18 +++++++++++++----- 3 files changed, 44 insertions(+), 21 deletions(-) diff --git a/common/pipeline.cpp b/common/pipeline.cpp index dda7671..8f15fef 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -168,32 +168,30 @@ void Pipeline::pipelineStepped(const PipelineState &state) void Pipeline::scheduleStep() { - // if (!d->stepScheduled) { - // d->stepScheduled = true; - // QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); - // } - //FIXME make async again. For some reason the job in newEntity crashes if pipeline processing is async. - stepPipelines(); + if (!d->stepScheduled) { + d->stepScheduled = true; + QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); + } } void Pipeline::stepPipelines() { + d->stepScheduled = false; for (PipelineState &state: d->activePipelines) { if (state.isIdle()) { state.step(); } } - - d->stepScheduled = false; } -void Pipeline::pipelineCompleted(const PipelineState &state) +void Pipeline::pipelineCompleted(PipelineState state) { //TODO finalize the datastore, inform clients of the new rev const int index = d->activePipelines.indexOf(state); if (index > -1) { d->activePipelines.remove(index); } + state.callback(); if (state.type() != NullPipeline) { //TODO what revision is finalized? @@ -281,20 +279,23 @@ Pipeline::Type PipelineState::type() const void PipelineState::step() { if (!d->pipeline) { + Q_ASSERT(false); return; } d->idle = false; if (d->filterIt.hasNext()) { //TODO skip step if already processed - d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { + //FIXME error handling if no result is found + auto preprocessor = d->filterIt.next(); + d->pipeline->storage().scan(d->key.toStdString(), [this, preprocessor](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { auto entity = Akonadi2::GetEntity(dataValue); - d->filterIt.next()->process(*this, *entity); + preprocessor->process(*this, *entity); return false; }); } else { + //This object becomes invalid after this call d->pipeline->pipelineCompleted(*this); - d->callback(); } } @@ -307,6 +308,12 @@ void PipelineState::processingCompleted(Preprocessor *filter) } } +void PipelineState::callback() +{ + d->callback(); +} + + Preprocessor::Preprocessor() : d(0) { @@ -316,7 +323,7 @@ Preprocessor::~Preprocessor() { } -void Preprocessor::process(PipelineState state, const Akonadi2::Entity &) +void Preprocessor::process(const PipelineState &state, const Akonadi2::Entity &) { processingCompleted(state); } @@ -326,5 +333,10 @@ void Preprocessor::processingCompleted(PipelineState state) state.processingCompleted(this); } +QString Preprocessor::id() const +{ + return QLatin1String("unknown processor"); +} + } // namespace Akonadi2 diff --git a/common/pipeline.h b/common/pipeline.h index 918d21e..a574d27 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -66,7 +66,8 @@ private Q_SLOTS: private: void pipelineStepped(const PipelineState &state); - void pipelineCompleted(const PipelineState &state); + //Don't use a reference here (it would invalidate itself) + void pipelineCompleted(PipelineState state); void scheduleStep(); friend class PipelineState; @@ -94,6 +95,8 @@ public: void step(); void processingCompleted(Preprocessor *filter); + void callback(); + private: class Private; QExplicitlySharedDataPointer d; @@ -106,9 +109,9 @@ public: virtual ~Preprocessor(); //TODO pass actual command as well, for changerecording - virtual void process(PipelineState state, const Akonadi2::Entity &); + virtual void process(const PipelineState &state, const Akonadi2::Entity &); //TODO to record progress - // virtual QString id(); + virtual QString id() const; protected: void processingCompleted(PipelineState state); diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index dc716ef..6fe10ec 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -43,19 +43,27 @@ class SimpleProcessor : public Akonadi2::Preprocessor { public: - SimpleProcessor(const std::function &f) + SimpleProcessor(const QString &id, const std::function &f) : Akonadi2::Preprocessor(), - mFunction(f) + mFunction(f), + mId(id) { } - void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) { + void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) Q_DECL_OVERRIDE + { mFunction(state, e); processingCompleted(state); } + QString id() const + { + return mId; + } + protected: std::function mFunction; + QString mId; }; // template @@ -166,7 +174,7 @@ private slots: break; case Akonadi2::Commands::CreateEntityCommand: { //TODO job lifetime management - mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([&messageQueueCallback, whileCallback](Async::Future &future) { + mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([messageQueueCallback, whileCallback](Async::Future &future) { messageQueueCallback(true); whileCallback(false); future.setFinished(); @@ -239,7 +247,7 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) //i.e. If a resource stores tags as part of each message it needs to update the tag index //TODO setup preprocessors for each domain type and pipeline type allowing full customization //Eventually the order should be self configuring, for now it's hardcoded. - auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { + auto eventIndexer = new SimpleProcessor("summaryprocessor", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { auto adaptor = eventFactory->createAdaptor(entity); qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); }); -- cgit v1.2.3