summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-10-29 00:43:15 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-10-29 00:43:15 +0100
commit70faf80be4146b17a59f9616404b21625d7400f6 (patch)
tree4fd4b44be533720f2804e8fd5f5434991138eb80 /common
parent20f049b65c4bd8c3d0c16bbf398641675648a93f (diff)
parentbb1b238d6982abe1e640fbf424234b2c5389642b (diff)
downloadsink-70faf80be4146b17a59f9616404b21625d7400f6.tar.gz
sink-70faf80be4146b17a59f9616404b21625d7400f6.zip
Merge branch 'feature/preprocessor' into develop
Diffstat (limited to 'common')
-rw-r--r--common/domain/event.cpp6
-rw-r--r--common/domain/event.h2
-rw-r--r--common/domain/mail.cpp7
-rw-r--r--common/domain/mail.h2
-rw-r--r--common/genericresource.cpp13
-rw-r--r--common/genericresource.h1
-rw-r--r--common/pipeline.cpp295
-rw-r--r--common/pipeline.h90
8 files changed, 97 insertions, 319 deletions
diff --git a/common/domain/event.cpp b/common/domain/event.cpp
index 9759fc3..83a6906 100644
--- a/common/domain/event.cpp
+++ b/common/domain/event.cpp
@@ -50,11 +50,11 @@ ResultSet TypeImplementation<Event>::queryIndexes(const Akonadi2::Query &query,
50 return ResultSet(keys); 50 return ResultSet(keys);
51} 51}
52 52
53void TypeImplementation<Event>::index(const Event &type, Akonadi2::Storage::Transaction &transaction) 53void TypeImplementation<Event>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction)
54{ 54{
55 const auto uid = type.getProperty("uid"); 55 const auto uid = bufferAdaptor.getProperty("uid");
56 if (uid.isValid()) { 56 if (uid.isValid()) {
57 Index("event.index.uid", transaction).add(uid.toByteArray(), type.identifier()); 57 Index("event.index.uid", transaction).add(uid.toByteArray(), identifier);
58 } 58 }
59} 59}
60 60
diff --git a/common/domain/event.h b/common/domain/event.h
index f21cd34..e9ba52a 100644
--- a/common/domain/event.h
+++ b/common/domain/event.h
@@ -56,7 +56,7 @@ public:
56 * An empty result set indicates that a full scan is required. 56 * An empty result set indicates that a full scan is required.
57 */ 57 */
58 static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction); 58 static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction);
59 static void index(const Event &type, Akonadi2::Storage::Transaction &transaction); 59 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction);
60 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 60 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
61 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); 61 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper();
62}; 62};
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp
index d40dde9..ffe322e 100644
--- a/common/domain/mail.cpp
+++ b/common/domain/mail.cpp
@@ -50,12 +50,11 @@ ResultSet TypeImplementation<Mail>::queryIndexes(const Akonadi2::Query &query, c
50 return ResultSet(keys); 50 return ResultSet(keys);
51} 51}
52 52
53void TypeImplementation<Mail>::index(const Mail &type, Akonadi2::Storage::Transaction &transaction) 53void TypeImplementation<Mail>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction)
54{ 54{
55 const auto uid = type.getProperty("uid"); 55 const auto uid = bufferAdaptor.getProperty("uid");
56 if (uid.isValid()) { 56 if (uid.isValid()) {
57 Index uidIndex("mail.index.uid", transaction); 57 Index("mail.index.uid", transaction).add(uid.toByteArray(), identifier);
58 uidIndex.add(uid.toByteArray(), type.identifier());
59 } 58 }
60} 59}
61 60
diff --git a/common/domain/mail.h b/common/domain/mail.h
index b58ce44..38f1d03 100644
--- a/common/domain/mail.h
+++ b/common/domain/mail.h
@@ -51,7 +51,7 @@ public:
51 * An empty result set indicates that a full scan is required. 51 * An empty result set indicates that a full scan is required.
52 */ 52 */
53 static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction); 53 static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction);
54 static void index(const Mail &type, Akonadi2::Storage::Transaction &transaction); 54 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction);
55 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 55 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
56 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); 56 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper();
57}; 57};
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index dcae43d..652154d 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -54,7 +54,7 @@ public Q_SLOTS:
54 { 54 {
55 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); 55 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly);
56 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); 56 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite);
57 qint64 lastReplayedRevision = 0; 57 qint64 lastReplayedRevision = 1;
58 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 58 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
59 lastReplayedRevision = value.toLongLong(); 59 lastReplayedRevision = value.toLongLong();
60 return false; 60 return false;
@@ -285,10 +285,13 @@ GenericResource::~GenericResource()
285 delete mSourceChangeReplay; 285 delete mSourceChangeReplay;
286} 286}
287 287
288// void GenericResource::revisionChanged() 288void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
289// { 289{
290// //TODO replay revision 290 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk();
291// } 291 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk();
292 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk();
293 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".changereplay", Akonadi2::Storage::ReadWrite).removeFromDisk();
294}
292 295
293void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) 296void GenericResource::onProcessorError(int errorCode, const QString &errorMessage)
294{ 297{
diff --git a/common/genericresource.h b/common/genericresource.h
index cfc6653..33de0e7 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -48,6 +48,7 @@ public:
48 48
49 int error() const; 49 int error() const;
50 50
51 static void removeFromDisk(const QByteArray &instanceIdentifier);
51private Q_SLOTS: 52private Q_SLOTS:
52 void updateLowerBoundRevision(); 53 void updateLowerBoundRevision();
53 54
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 15d2401..0ce478b 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -1,5 +1,6 @@
1/* 1/*
2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> 2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
3 * Copyright (C) 2015 Christian Mollekopf <mollekopf@kolabsys.com>
3 * 4 *
4 * This library is free software; you can redistribute it and/or 5 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public 6 * modify it under the terms of the GNU Lesser General Public
@@ -41,19 +42,13 @@ class Pipeline::Private
41{ 42{
42public: 43public:
43 Private(const QString &resourceName) 44 Private(const QString &resourceName)
44 : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite), 45 : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite)
45 stepScheduled(false)
46 { 46 {
47 } 47 }
48 48
49 Storage storage; 49 Storage storage;
50 Storage::Transaction transaction; 50 Storage::Transaction transaction;
51 QHash<QString, QVector<Preprocessor *> > nullPipeline; 51 QHash<QString, QVector<Preprocessor *> > processors;
52 QHash<QString, QVector<Preprocessor *> > newPipeline;
53 QHash<QString, QVector<Preprocessor *> > modifiedPipeline;
54 QHash<QString, QVector<Preprocessor *> > deletedPipeline;
55 QVector<PipelineState> activePipelines;
56 bool stepScheduled;
57 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; 52 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory;
58}; 53};
59 54
@@ -68,21 +63,9 @@ Pipeline::~Pipeline()
68 delete d; 63 delete d;
69} 64}
70 65
71void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors) 66void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors)
72{ 67{
73 switch (pipelineType) { 68 d->processors[entityType] = processors;
74 case NewPipeline:
75 d->newPipeline[entityType] = preprocessors;
76 break;
77 case ModifiedPipeline:
78 d->modifiedPipeline[entityType] = preprocessors;
79 break;
80 case DeletedPipeline:
81 d->deletedPipeline[entityType] = preprocessors;
82 break;
83 default:
84 break;
85 };
86} 69}
87 70
88void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) 71void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory)
@@ -92,6 +75,11 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac
92 75
93void Pipeline::startTransaction() 76void Pipeline::startTransaction()
94{ 77{
78 //TODO call for all types
79 //But avoid doing it during cleanup
80 // for (auto processor : d->processors[bufferType]) {
81 // processor->startBatch();
82 // }
95 if (d->transaction) { 83 if (d->transaction) {
96 return; 84 return;
97 } 85 }
@@ -100,7 +88,13 @@ void Pipeline::startTransaction()
100 88
101void Pipeline::commit() 89void Pipeline::commit()
102{ 90{
91 //TODO call for all types
92 //But avoid doing it during cleanup
93 // for (auto processor : d->processors[bufferType]) {
94 // processor->finalize();
95 // }
103 const auto revision = Akonadi2::Storage::maxRevision(d->transaction); 96 const auto revision = Akonadi2::Storage::maxRevision(d->transaction);
97 Trace() << "Committing " << revision;
104 if (d->transaction) { 98 if (d->transaction) {
105 d->transaction.commit(); 99 d->transaction.commit();
106 } 100 }
@@ -118,14 +112,6 @@ Storage &Pipeline::storage() const
118 return d->storage; 112 return d->storage;
119} 113}
120 114
121void Pipeline::null()
122{
123 //TODO: is there really any need for the null pipeline? if so, it should be doing something ;)
124 // PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline);
125 // d->activePipelines << state;
126 // state.step();
127}
128
129void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 115void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
130{ 116{
131 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), 117 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()),
@@ -181,15 +167,25 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
181 167
182 storeNewRevision(newRevision, fbb, bufferType, key); 168 storeNewRevision(newRevision, fbb, bufferType, key);
183 169
184 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 170 auto adaptorFactory = d->adaptorFactory.value(bufferType);
171 if (!adaptorFactory) {
172 Warning() << "no adaptor factory for type " << bufferType;
173 return KAsync::error<qint64>(0);
174 }
185 175
186 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { 176 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
187 PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future, newRevision]() { 177 d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool {
188 future.setValue(newRevision); 178 auto entity = Akonadi2::GetEntity(value);
189 future.setFinished(); 179 auto adaptor = adaptorFactory->createAdaptor(*entity);
190 }, bufferType); 180 for (auto processor : d->processors[bufferType]) {
191 d->activePipelines << state; 181 processor->newEntity(key, newRevision, *adaptor, d->transaction);
192 state.step(); 182 }
183 return false;
184 }, [this](const Akonadi2::Storage::Error &error) {
185 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
186 });
187 return KAsync::start<qint64>([newRevision](){
188 return newRevision;
193 }); 189 });
194} 190}
195 191
@@ -237,7 +233,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
237 auto diff = adaptorFactory->createAdaptor(*diffEntity); 233 auto diff = adaptorFactory->createAdaptor(*diffEntity);
238 234
239 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; 235 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
240 storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 236 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
241 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 237 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
242 if (!buffer.isValid()) { 238 if (!buffer.isValid()) {
243 Warning() << "Read invalid buffer from disk"; 239 Warning() << "Read invalid buffer from disk";
@@ -287,14 +283,18 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
287 283
288 storeNewRevision(newRevision, fbb, bufferType, key); 284 storeNewRevision(newRevision, fbb, bufferType, key);
289 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 285 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
290 286 d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool {
291 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { 287 auto entity = Akonadi2::GetEntity(value);
292 PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future, newRevision]() { 288 auto newEntity = adaptorFactory->createAdaptor(*entity);
293 future.setValue(newRevision); 289 for (auto processor : d->processors[bufferType]) {
294 future.setFinished(); 290 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction);
295 }, bufferType); 291 }
296 d->activePipelines << state; 292 return false;
297 state.step(); 293 }, [this](const Akonadi2::Storage::Error &error) {
294 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
295 });
296 return KAsync::start<qint64>([newRevision](){
297 return newRevision;
298 }); 298 });
299} 299}
300 300
@@ -328,16 +328,34 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
328 flatbuffers::FlatBufferBuilder fbb; 328 flatbuffers::FlatBufferBuilder fbb;
329 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 329 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
330 330
331 auto adaptorFactory = d->adaptorFactory.value(bufferType);
332 if (!adaptorFactory) {
333 Warning() << "no adaptor factory for type " << bufferType;
334 return KAsync::error<qint64>(0);
335 }
336
337 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
338 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
339 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
340 if (!buffer.isValid()) {
341 Warning() << "Read invalid buffer from disk";
342 } else {
343 current = adaptorFactory->createAdaptor(buffer.entity());
344 }
345 return false;
346 }, [this](const Akonadi2::Storage::Error &error) {
347 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
348 });
349
331 storeNewRevision(newRevision, fbb, bufferType, key); 350 storeNewRevision(newRevision, fbb, bufferType, key);
332 Log() << "Pipeline: deleted entity: "<< newRevision; 351 Log() << "Pipeline: deleted entity: "<< newRevision;
333 352
334 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { 353 for (auto processor : d->processors[bufferType]) {
335 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future, newRevision](){ 354 processor->deletedEntity(key, newRevision, *current, d->transaction);
336 future.setValue(newRevision); 355 }
337 future.setFinished(); 356
338 }, bufferType); 357 return KAsync::start<qint64>([newRevision](){
339 d->activePipelines << state; 358 return newRevision;
340 state.step();
341 }); 359 });
342} 360}
343 361
@@ -372,164 +390,6 @@ qint64 Pipeline::cleanedUpRevision()
372 return Akonadi2::Storage::cleanedUpRevision(d->transaction); 390 return Akonadi2::Storage::cleanedUpRevision(d->transaction);
373} 391}
374 392
375void Pipeline::pipelineStepped(const PipelineState &state)
376{
377 scheduleStep();
378}
379
380void Pipeline::scheduleStep()
381{
382 if (!d->stepScheduled) {
383 d->stepScheduled = true;
384 QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection);
385 }
386}
387
388void Pipeline::stepPipelines()
389{
390 d->stepScheduled = false;
391 for (PipelineState &state: d->activePipelines) {
392 if (state.isIdle()) {
393 state.step();
394 }
395 }
396}
397
398void Pipeline::pipelineCompleted(PipelineState state)
399{
400 //TODO finalize the datastore, inform clients of the new rev
401 const int index = d->activePipelines.indexOf(state);
402 if (index > -1) {
403 d->activePipelines.remove(index);
404 }
405 state.callback();
406
407 scheduleStep();
408 if (d->activePipelines.isEmpty()) {
409 emit pipelinesDrained();
410 }
411}
412
413
414class PipelineState::Private : public QSharedData
415{
416public:
417 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c, qint64 r, const QByteArray &b)
418 : pipeline(p),
419 type(t),
420 key(k),
421 filterIt(filters),
422 idle(true),
423 callback(c),
424 revision(r),
425 bufferType(b)
426 {}
427
428 Private()
429 : pipeline(0),
430 filterIt(QVector<Preprocessor *>()),
431 idle(true),
432 revision(-1)
433 {}
434
435 Pipeline *pipeline;
436 Pipeline::Type type;
437 QByteArray key;
438 QVectorIterator<Preprocessor *> filterIt;
439 bool idle;
440 std::function<void()> callback;
441 qint64 revision;
442 QByteArray bufferType;
443};
444
445PipelineState::PipelineState()
446 : d(new Private())
447{
448
449}
450
451PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType)
452 : d(new Private(pipeline, type, key, filters, callback, revision, bufferType))
453{
454}
455
456PipelineState::PipelineState(const PipelineState &other)
457 : d(other.d)
458{
459}
460
461PipelineState::~PipelineState()
462{
463}
464
465PipelineState &PipelineState::operator=(const PipelineState &rhs)
466{
467 d = rhs.d;
468 return *this;
469}
470
471bool PipelineState::operator==(const PipelineState &rhs)
472{
473 return d == rhs.d;
474}
475
476bool PipelineState::isIdle() const
477{
478 return d->idle;
479}
480
481QByteArray PipelineState::key() const
482{
483 return d->key;
484}
485
486Pipeline::Type PipelineState::type() const
487{
488 return d->type;
489}
490
491qint64 PipelineState::revision() const
492{
493 return d->revision;
494}
495
496QByteArray PipelineState::bufferType() const
497{
498 return d->bufferType;
499}
500
501void PipelineState::step()
502{
503 if (!d->pipeline) {
504 Q_ASSERT(false);
505 return;
506 }
507
508 d->idle = false;
509 if (d->filterIt.hasNext()) {
510 //TODO skip step if already processed
511 auto preprocessor = d->filterIt.next();
512 preprocessor->process(*this, d->pipeline->transaction());
513 } else {
514 //This object becomes invalid after this call
515 d->pipeline->pipelineCompleted(*this);
516 }
517}
518
519void PipelineState::processingCompleted(Preprocessor *filter)
520{
521 if (d->pipeline && filter == d->filterIt.peekPrevious()) {
522 d->idle = true;
523 d->pipeline->pipelineStepped(*this);
524 }
525}
526
527void PipelineState::callback()
528{
529 d->callback();
530}
531
532
533Preprocessor::Preprocessor() 393Preprocessor::Preprocessor()
534 : d(0) 394 : d(0)
535{ 395{
@@ -539,19 +399,12 @@ Preprocessor::~Preprocessor()
539{ 399{
540} 400}
541 401
542void Preprocessor::process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) 402void Preprocessor::startBatch()
543{
544 processingCompleted(state);
545}
546
547void Preprocessor::processingCompleted(PipelineState state)
548{ 403{
549 state.processingCompleted(this);
550} 404}
551 405
552QString Preprocessor::id() const 406void Preprocessor::finalize()
553{ 407{
554 return QLatin1String("unknown processor");
555} 408}
556 409
557} // namespace Akonadi2 410} // namespace Akonadi2
diff --git a/common/pipeline.h b/common/pipeline.h
index c8d9ddc..f11d880 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -35,7 +35,6 @@
35namespace Akonadi2 35namespace Akonadi2
36{ 36{
37 37
38class PipelineState;
39class Preprocessor; 38class Preprocessor;
40 39
41class AKONADI2COMMON_EXPORT Pipeline : public QObject 40class AKONADI2COMMON_EXPORT Pipeline : public QObject
@@ -43,19 +42,16 @@ class AKONADI2COMMON_EXPORT Pipeline : public QObject
43 Q_OBJECT 42 Q_OBJECT
44 43
45public: 44public:
46 enum Type { NullPipeline, NewPipeline, ModifiedPipeline, DeletedPipeline };
47
48 Pipeline(const QString &storagePath, QObject *parent = 0); 45 Pipeline(const QString &storagePath, QObject *parent = 0);
49 ~Pipeline(); 46 ~Pipeline();
50 47
51 Storage &storage() const; 48 Storage &storage() const;
52 49
53 void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); 50 void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors);
54 void startTransaction(); 51 void startTransaction();
55 void commit(); 52 void commit();
56 Storage::Transaction &transaction(); 53 Storage::Transaction &transaction();
57 54
58 void null();
59 void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); 55 void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory);
60 56
61 KAsync::Job<qint64> newEntity(void const *command, size_t size); 57 KAsync::Job<qint64> newEntity(void const *command, size_t size);
@@ -75,104 +71,30 @@ public:
75 71
76Q_SIGNALS: 72Q_SIGNALS:
77 void revisionUpdated(qint64); 73 void revisionUpdated(qint64);
78 void pipelinesDrained();
79
80private Q_SLOTS:
81 void stepPipelines();
82 74
83private: 75private:
84 void pipelineStepped(const PipelineState &state);
85 //Don't use a reference here (it would invalidate itself)
86 void pipelineCompleted(PipelineState state);
87 void scheduleStep();
88 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 76 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
89 77
90 friend class PipelineState;
91
92 class Private; 78 class Private;
93 Private * const d; 79 Private * const d;
94}; 80};
95 81
96class AKONADI2COMMON_EXPORT PipelineState
97{
98public:
99 PipelineState();
100 PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType);
101 PipelineState(const PipelineState &other);
102 ~PipelineState();
103
104 PipelineState &operator=(const PipelineState &rhs);
105 bool operator==(const PipelineState &rhs);
106
107 bool isIdle() const;
108 QByteArray key() const;
109 Pipeline::Type type() const;
110 qint64 revision() const;
111 QByteArray bufferType() const;
112
113 void step();
114 void processingCompleted(Preprocessor *filter);
115
116 void callback();
117
118private:
119 class Private;
120 QExplicitlySharedDataPointer<Private> d;
121};
122
123class AKONADI2COMMON_EXPORT Preprocessor 82class AKONADI2COMMON_EXPORT Preprocessor
124{ 83{
125public: 84public:
126 Preprocessor(); 85 Preprocessor();
127 virtual ~Preprocessor(); 86 virtual ~Preprocessor();
128 87
129 virtual void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction); 88 virtual void startBatch();
130 //TODO to record progress 89 virtual void newEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0;
131 virtual QString id() const; 90 virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0;
132 91 virtual void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) = 0;
133protected: 92 virtual void finalize();
134 void processingCompleted(PipelineState state);
135 93
136private: 94private:
137 class Private; 95 class Private;
138 Private * const d; 96 Private * const d;
139}; 97};
140 98
141/**
142 * A simple processor that takes a single function
143 */
144class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor
145{
146public:
147 SimpleProcessor(const QString &id, const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> &f)
148 : Akonadi2::Preprocessor(),
149 mFunction(f),
150 mId(id)
151 {
152 }
153
154 void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
155 {
156 transaction.openDatabase(state.bufferType() + ".main").scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool {
157 auto entity = Akonadi2::GetEntity(value);
158 mFunction(state, *entity, transaction);
159 processingCompleted(state);
160 return false;
161 }, [this, state](const Akonadi2::Storage::Error &error) {
162 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
163 processingCompleted(state);
164 });
165 }
166
167 QString id() const Q_DECL_OVERRIDE
168 {
169 return mId;
170 }
171
172protected:
173 std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> mFunction;
174 QString mId;
175};
176
177} // namespace Akonadi2 99} // namespace Akonadi2
178 100