diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 38 |
1 files changed, 25 insertions, 13 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index dda7671..8f15fef 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -168,32 +168,30 @@ void Pipeline::pipelineStepped(const PipelineState &state) | |||
168 | 168 | ||
169 | void Pipeline::scheduleStep() | 169 | void Pipeline::scheduleStep() |
170 | { | 170 | { |
171 | // if (!d->stepScheduled) { | 171 | if (!d->stepScheduled) { |
172 | // d->stepScheduled = true; | 172 | d->stepScheduled = true; |
173 | // QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); | 173 | QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); |
174 | // } | 174 | } |
175 | //FIXME make async again. For some reason the job in newEntity crashes if pipeline processing is async. | ||
176 | stepPipelines(); | ||
177 | } | 175 | } |
178 | 176 | ||
179 | void Pipeline::stepPipelines() | 177 | void Pipeline::stepPipelines() |
180 | { | 178 | { |
179 | d->stepScheduled = false; | ||
181 | for (PipelineState &state: d->activePipelines) { | 180 | for (PipelineState &state: d->activePipelines) { |
182 | if (state.isIdle()) { | 181 | if (state.isIdle()) { |
183 | state.step(); | 182 | state.step(); |
184 | } | 183 | } |
185 | } | 184 | } |
186 | |||
187 | d->stepScheduled = false; | ||
188 | } | 185 | } |
189 | 186 | ||
190 | void Pipeline::pipelineCompleted(const PipelineState &state) | 187 | void Pipeline::pipelineCompleted(PipelineState state) |
191 | { | 188 | { |
192 | //TODO finalize the datastore, inform clients of the new rev | 189 | //TODO finalize the datastore, inform clients of the new rev |
193 | const int index = d->activePipelines.indexOf(state); | 190 | const int index = d->activePipelines.indexOf(state); |
194 | if (index > -1) { | 191 | if (index > -1) { |
195 | d->activePipelines.remove(index); | 192 | d->activePipelines.remove(index); |
196 | } | 193 | } |
194 | state.callback(); | ||
197 | 195 | ||
198 | if (state.type() != NullPipeline) { | 196 | if (state.type() != NullPipeline) { |
199 | //TODO what revision is finalized? | 197 | //TODO what revision is finalized? |
@@ -281,20 +279,23 @@ Pipeline::Type PipelineState::type() const | |||
281 | void PipelineState::step() | 279 | void PipelineState::step() |
282 | { | 280 | { |
283 | if (!d->pipeline) { | 281 | if (!d->pipeline) { |
282 | Q_ASSERT(false); | ||
284 | return; | 283 | return; |
285 | } | 284 | } |
286 | 285 | ||
287 | d->idle = false; | 286 | d->idle = false; |
288 | if (d->filterIt.hasNext()) { | 287 | if (d->filterIt.hasNext()) { |
289 | //TODO skip step if already processed | 288 | //TODO skip step if already processed |
290 | d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { | 289 | //FIXME error handling if no result is found |
290 | auto preprocessor = d->filterIt.next(); | ||
291 | d->pipeline->storage().scan(d->key.toStdString(), [this, preprocessor](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { | ||
291 | auto entity = Akonadi2::GetEntity(dataValue); | 292 | auto entity = Akonadi2::GetEntity(dataValue); |
292 | d->filterIt.next()->process(*this, *entity); | 293 | preprocessor->process(*this, *entity); |
293 | return false; | 294 | return false; |
294 | }); | 295 | }); |
295 | } else { | 296 | } else { |
297 | //This object becomes invalid after this call | ||
296 | d->pipeline->pipelineCompleted(*this); | 298 | d->pipeline->pipelineCompleted(*this); |
297 | d->callback(); | ||
298 | } | 299 | } |
299 | } | 300 | } |
300 | 301 | ||
@@ -307,6 +308,12 @@ void PipelineState::processingCompleted(Preprocessor *filter) | |||
307 | } | 308 | } |
308 | } | 309 | } |
309 | 310 | ||
311 | void PipelineState::callback() | ||
312 | { | ||
313 | d->callback(); | ||
314 | } | ||
315 | |||
316 | |||
310 | Preprocessor::Preprocessor() | 317 | Preprocessor::Preprocessor() |
311 | : d(0) | 318 | : d(0) |
312 | { | 319 | { |
@@ -316,7 +323,7 @@ Preprocessor::~Preprocessor() | |||
316 | { | 323 | { |
317 | } | 324 | } |
318 | 325 | ||
319 | void Preprocessor::process(PipelineState state, const Akonadi2::Entity &) | 326 | void Preprocessor::process(const PipelineState &state, const Akonadi2::Entity &) |
320 | { | 327 | { |
321 | processingCompleted(state); | 328 | processingCompleted(state); |
322 | } | 329 | } |
@@ -326,5 +333,10 @@ void Preprocessor::processingCompleted(PipelineState state) | |||
326 | state.processingCompleted(this); | 333 | state.processingCompleted(this); |
327 | } | 334 | } |
328 | 335 | ||
336 | QString Preprocessor::id() const | ||
337 | { | ||
338 | return QLatin1String("unknown processor"); | ||
339 | } | ||
340 | |||
329 | } // namespace Akonadi2 | 341 | } // namespace Akonadi2 |
330 | 342 | ||