summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/pipeline.cpp33
-rw-r--r--common/pipeline.h3
2 files changed, 22 insertions, 14 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 27b9deb..ce4ad41 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -169,8 +169,8 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
169 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 169 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
170 Log() << "Pipeline: wrote entity: " << key << newRevision; 170 Log() << "Pipeline: wrote entity: " << key << newRevision;
171 171
172 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 172 return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) {
173 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { 173 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], newRevision, [&future]() {
174 future.setFinished(); 174 future.setFinished();
175 }); 175 });
176 d->activePipelines << state; 176 d->activePipelines << state;
@@ -268,8 +268,8 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
268 d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 268 d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
269 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 269 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
270 270
271 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 271 return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) {
272 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { 272 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], newRevision, [&future]() {
273 future.setFinished(); 273 future.setFinished();
274 }); 274 });
275 d->activePipelines << state; 275 d->activePipelines << state;
@@ -300,8 +300,8 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
300 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 300 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
301 Log() << "Pipeline: deleted entity: "<< newRevision; 301 Log() << "Pipeline: deleted entity: "<< newRevision;
302 302
303 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 303 return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) {
304 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [&future](){ 304 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], newRevision, [&future](){
305 future.setFinished(); 305 future.setFinished();
306 }); 306 });
307 d->activePipelines << state; 307 d->activePipelines << state;
@@ -342,8 +342,7 @@ void Pipeline::pipelineCompleted(PipelineState state)
342 state.callback(); 342 state.callback();
343 343
344 if (state.type() != NullPipeline) { 344 if (state.type() != NullPipeline) {
345 //TODO what revision is finalized? 345 emit revisionUpdated(state.revision());
346 emit revisionUpdated(storage().maxRevision());
347 } 346 }
348 scheduleStep(); 347 scheduleStep();
349 if (d->activePipelines.isEmpty()) { 348 if (d->activePipelines.isEmpty()) {
@@ -355,19 +354,21 @@ void Pipeline::pipelineCompleted(PipelineState state)
355class PipelineState::Private : public QSharedData 354class PipelineState::Private : public QSharedData
356{ 355{
357public: 356public:
358 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c) 357 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c, qint64 r)
359 : pipeline(p), 358 : pipeline(p),
360 type(t), 359 type(t),
361 key(k), 360 key(k),
362 filterIt(filters), 361 filterIt(filters),
363 idle(true), 362 idle(true),
364 callback(c) 363 callback(c),
364 revision(r)
365 {} 365 {}
366 366
367 Private() 367 Private()
368 : pipeline(0), 368 : pipeline(0),
369 filterIt(QVector<Preprocessor *>()), 369 filterIt(QVector<Preprocessor *>()),
370 idle(true) 370 idle(true),
371 revision(-1)
371 {} 372 {}
372 373
373 Pipeline *pipeline; 374 Pipeline *pipeline;
@@ -376,6 +377,7 @@ public:
376 QVectorIterator<Preprocessor *> filterIt; 377 QVectorIterator<Preprocessor *> filterIt;
377 bool idle; 378 bool idle;
378 std::function<void()> callback; 379 std::function<void()> callback;
380 qint64 revision;
379}; 381};
380 382
381PipelineState::PipelineState() 383PipelineState::PipelineState()
@@ -384,8 +386,8 @@ PipelineState::PipelineState()
384 386
385} 387}
386 388
387PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, const std::function<void()> &callback) 389PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback)
388 : d(new Private(pipeline, type, key, filters, callback)) 390 : d(new Private(pipeline, type, key, filters, callback, revision))
389{ 391{
390} 392}
391 393
@@ -424,6 +426,11 @@ Pipeline::Type PipelineState::type() const
424 return d->type; 426 return d->type;
425} 427}
426 428
429qint64 PipelineState::revision() const
430{
431 return d->revision;
432}
433
427void PipelineState::step() 434void PipelineState::step()
428{ 435{
429 if (!d->pipeline) { 436 if (!d->pipeline) {
diff --git a/common/pipeline.h b/common/pipeline.h
index 7307b2e..496e037 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -85,7 +85,7 @@ class AKONADI2COMMON_EXPORT PipelineState
85{ 85{
86public: 86public:
87 PipelineState(); 87 PipelineState();
88 PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, const std::function<void()> &callback); 88 PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback);
89 PipelineState(const PipelineState &other); 89 PipelineState(const PipelineState &other);
90 ~PipelineState(); 90 ~PipelineState();
91 91
@@ -95,6 +95,7 @@ public:
95 bool isIdle() const; 95 bool isIdle() const;
96 QByteArray key() const; 96 QByteArray key() const;
97 Pipeline::Type type() const; 97 Pipeline::Type type() const;
98 qint64 revision() const;
98 //TODO expose command 99 //TODO expose command
99 100
100 void step(); 101 void step();