diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 33 |
1 files changed, 20 insertions, 13 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 27b9deb..ce4ad41 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -169,8 +169,8 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
169 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 169 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
170 | Log() << "Pipeline: wrote entity: " << key << newRevision; | 170 | Log() << "Pipeline: wrote entity: " << key << newRevision; |
171 | 171 | ||
172 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | 172 | return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) { |
173 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { | 173 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], newRevision, [&future]() { |
174 | future.setFinished(); | 174 | future.setFinished(); |
175 | }); | 175 | }); |
176 | d->activePipelines << state; | 176 | d->activePipelines << state; |
@@ -268,8 +268,8 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
268 | d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 268 | d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
269 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 269 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
270 | 270 | ||
271 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | 271 | return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) { |
272 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { | 272 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], newRevision, [&future]() { |
273 | future.setFinished(); | 273 | future.setFinished(); |
274 | }); | 274 | }); |
275 | d->activePipelines << state; | 275 | d->activePipelines << state; |
@@ -300,8 +300,8 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
300 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 300 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
301 | Log() << "Pipeline: deleted entity: "<< newRevision; | 301 | Log() << "Pipeline: deleted entity: "<< newRevision; |
302 | 302 | ||
303 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | 303 | return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) { |
304 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [&future](){ | 304 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], newRevision, [&future](){ |
305 | future.setFinished(); | 305 | future.setFinished(); |
306 | }); | 306 | }); |
307 | d->activePipelines << state; | 307 | d->activePipelines << state; |
@@ -342,8 +342,7 @@ void Pipeline::pipelineCompleted(PipelineState state) | |||
342 | state.callback(); | 342 | state.callback(); |
343 | 343 | ||
344 | if (state.type() != NullPipeline) { | 344 | if (state.type() != NullPipeline) { |
345 | //TODO what revision is finalized? | 345 | emit revisionUpdated(state.revision()); |
346 | emit revisionUpdated(storage().maxRevision()); | ||
347 | } | 346 | } |
348 | scheduleStep(); | 347 | scheduleStep(); |
349 | if (d->activePipelines.isEmpty()) { | 348 | if (d->activePipelines.isEmpty()) { |
@@ -355,19 +354,21 @@ void Pipeline::pipelineCompleted(PipelineState state) | |||
355 | class PipelineState::Private : public QSharedData | 354 | class PipelineState::Private : public QSharedData |
356 | { | 355 | { |
357 | public: | 356 | public: |
358 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c) | 357 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c, qint64 r) |
359 | : pipeline(p), | 358 | : pipeline(p), |
360 | type(t), | 359 | type(t), |
361 | key(k), | 360 | key(k), |
362 | filterIt(filters), | 361 | filterIt(filters), |
363 | idle(true), | 362 | idle(true), |
364 | callback(c) | 363 | callback(c), |
364 | revision(r) | ||
365 | {} | 365 | {} |
366 | 366 | ||
367 | Private() | 367 | Private() |
368 | : pipeline(0), | 368 | : pipeline(0), |
369 | filterIt(QVector<Preprocessor *>()), | 369 | filterIt(QVector<Preprocessor *>()), |
370 | idle(true) | 370 | idle(true), |
371 | revision(-1) | ||
371 | {} | 372 | {} |
372 | 373 | ||
373 | Pipeline *pipeline; | 374 | Pipeline *pipeline; |
@@ -376,6 +377,7 @@ public: | |||
376 | QVectorIterator<Preprocessor *> filterIt; | 377 | QVectorIterator<Preprocessor *> filterIt; |
377 | bool idle; | 378 | bool idle; |
378 | std::function<void()> callback; | 379 | std::function<void()> callback; |
380 | qint64 revision; | ||
379 | }; | 381 | }; |
380 | 382 | ||
381 | PipelineState::PipelineState() | 383 | PipelineState::PipelineState() |
@@ -384,8 +386,8 @@ PipelineState::PipelineState() | |||
384 | 386 | ||
385 | } | 387 | } |
386 | 388 | ||
387 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, const std::function<void()> &callback) | 389 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback) |
388 | : d(new Private(pipeline, type, key, filters, callback)) | 390 | : d(new Private(pipeline, type, key, filters, callback, revision)) |
389 | { | 391 | { |
390 | } | 392 | } |
391 | 393 | ||
@@ -424,6 +426,11 @@ Pipeline::Type PipelineState::type() const | |||
424 | return d->type; | 426 | return d->type; |
425 | } | 427 | } |
426 | 428 | ||
429 | qint64 PipelineState::revision() const | ||
430 | { | ||
431 | return d->revision; | ||
432 | } | ||
433 | |||
427 | void PipelineState::step() | 434 | void PipelineState::step() |
428 | { | 435 | { |
429 | if (!d->pipeline) { | 436 | if (!d->pipeline) { |