diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-28 16:39:16 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-28 16:39:16 +0100 |
commit | 129333371d28c06d85f75ca579ce17798e615e84 (patch) | |
tree | 2ae01db9d26f6f72a74fa77e6937e03304e81a2c | |
parent | 20f049b65c4bd8c3d0c16bbf398641675648a93f (diff) | |
download | sink-129333371d28c06d85f75ca579ce17798e615e84.tar.gz sink-129333371d28c06d85f75ca579ce17798e615e84.zip |
Made pipeline preprocessing synchronous.
Instead of having the asynchronous preprocessor concept with different
pipelines for new/modify/delete we have a single pipeline with
synchronous preprocessors that act upon new/modify/delete.
This keeps the code simpler due to lack of asynchronity and keeps the
new/modify/delete operations together (which at least for the indexing
makes a lot of sense).
Not supporting asynchronity is ok because the tasks done in
preprocessing are not cpu intensive (if they were we had a problem
since they are directly involved in the round-trip time), and the main
cost comes from i/o, meaning we don't gain much by doing multithreading.
Costly tasks (such as full-text indexing) should rather be implemented
as post-processing, since that doesn't increase the round-trip time directly,
and eventually consistent is typically good enough for that.
-rw-r--r-- | common/domain/event.cpp | 6 | ||||
-rw-r--r-- | common/domain/event.h | 2 | ||||
-rw-r--r-- | common/domain/mail.cpp | 7 | ||||
-rw-r--r-- | common/domain/mail.h | 2 | ||||
-rw-r--r-- | common/genericresource.cpp | 10 | ||||
-rw-r--r-- | common/genericresource.h | 1 | ||||
-rw-r--r-- | common/pipeline.cpp | 286 | ||||
-rw-r--r-- | common/pipeline.h | 90 | ||||
-rw-r--r-- | examples/dummyresource/resourcefactory.cpp | 89 | ||||
-rw-r--r-- | tests/genericresourcebenchmark.cpp | 35 | ||||
-rw-r--r-- | tests/genericresourcetest.cpp | 4 | ||||
-rw-r--r-- | tests/pipelinetest.cpp | 75 |
12 files changed, 251 insertions, 356 deletions
diff --git a/common/domain/event.cpp b/common/domain/event.cpp index 9759fc3..83a6906 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp | |||
@@ -50,11 +50,11 @@ ResultSet TypeImplementation<Event>::queryIndexes(const Akonadi2::Query &query, | |||
50 | return ResultSet(keys); | 50 | return ResultSet(keys); |
51 | } | 51 | } |
52 | 52 | ||
53 | void TypeImplementation<Event>::index(const Event &type, Akonadi2::Storage::Transaction &transaction) | 53 | void TypeImplementation<Event>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) |
54 | { | 54 | { |
55 | const auto uid = type.getProperty("uid"); | 55 | const auto uid = bufferAdaptor.getProperty("uid"); |
56 | if (uid.isValid()) { | 56 | if (uid.isValid()) { |
57 | Index("event.index.uid", transaction).add(uid.toByteArray(), type.identifier()); | 57 | Index("event.index.uid", transaction).add(uid.toByteArray(), identifier); |
58 | } | 58 | } |
59 | } | 59 | } |
60 | 60 | ||
diff --git a/common/domain/event.h b/common/domain/event.h index f21cd34..e9ba52a 100644 --- a/common/domain/event.h +++ b/common/domain/event.h | |||
@@ -56,7 +56,7 @@ public: | |||
56 | * An empty result set indicates that a full scan is required. | 56 | * An empty result set indicates that a full scan is required. |
57 | */ | 57 | */ |
58 | static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction); | 58 | static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction); |
59 | static void index(const Event &type, Akonadi2::Storage::Transaction &transaction); | 59 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction); |
60 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); | 60 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); |
61 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); | 61 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); |
62 | }; | 62 | }; |
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index d40dde9..ffe322e 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp | |||
@@ -50,12 +50,11 @@ ResultSet TypeImplementation<Mail>::queryIndexes(const Akonadi2::Query &query, c | |||
50 | return ResultSet(keys); | 50 | return ResultSet(keys); |
51 | } | 51 | } |
52 | 52 | ||
53 | void TypeImplementation<Mail>::index(const Mail &type, Akonadi2::Storage::Transaction &transaction) | 53 | void TypeImplementation<Mail>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) |
54 | { | 54 | { |
55 | const auto uid = type.getProperty("uid"); | 55 | const auto uid = bufferAdaptor.getProperty("uid"); |
56 | if (uid.isValid()) { | 56 | if (uid.isValid()) { |
57 | Index uidIndex("mail.index.uid", transaction); | 57 | Index("mail.index.uid", transaction).add(uid.toByteArray(), identifier); |
58 | uidIndex.add(uid.toByteArray(), type.identifier()); | ||
59 | } | 58 | } |
60 | } | 59 | } |
61 | 60 | ||
diff --git a/common/domain/mail.h b/common/domain/mail.h index b58ce44..38f1d03 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h | |||
@@ -51,7 +51,7 @@ public: | |||
51 | * An empty result set indicates that a full scan is required. | 51 | * An empty result set indicates that a full scan is required. |
52 | */ | 52 | */ |
53 | static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction); | 53 | static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction); |
54 | static void index(const Mail &type, Akonadi2::Storage::Transaction &transaction); | 54 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction); |
55 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); | 55 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); |
56 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); | 56 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); |
57 | }; | 57 | }; |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index dcae43d..ec68f33 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -285,10 +285,12 @@ GenericResource::~GenericResource() | |||
285 | delete mSourceChangeReplay; | 285 | delete mSourceChangeReplay; |
286 | } | 286 | } |
287 | 287 | ||
288 | // void GenericResource::revisionChanged() | 288 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) |
289 | // { | 289 | { |
290 | // //TODO replay revision | 290 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk(); |
291 | // } | 291 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); |
292 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); | ||
293 | } | ||
292 | 294 | ||
293 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) | 295 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) |
294 | { | 296 | { |
diff --git a/common/genericresource.h b/common/genericresource.h index cfc6653..33de0e7 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -48,6 +48,7 @@ public: | |||
48 | 48 | ||
49 | int error() const; | 49 | int error() const; |
50 | 50 | ||
51 | static void removeFromDisk(const QByteArray &instanceIdentifier); | ||
51 | private Q_SLOTS: | 52 | private Q_SLOTS: |
52 | void updateLowerBoundRevision(); | 53 | void updateLowerBoundRevision(); |
53 | 54 | ||
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 15d2401..de63288 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -1,5 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> | 2 | * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> |
3 | * Copyright (C) 2015 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | 4 | * |
4 | * This library is free software; you can redistribute it and/or | 5 | * This library is free software; you can redistribute it and/or |
5 | * modify it under the terms of the GNU Lesser General Public | 6 | * modify it under the terms of the GNU Lesser General Public |
@@ -41,19 +42,13 @@ class Pipeline::Private | |||
41 | { | 42 | { |
42 | public: | 43 | public: |
43 | Private(const QString &resourceName) | 44 | Private(const QString &resourceName) |
44 | : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite), | 45 | : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite) |
45 | stepScheduled(false) | ||
46 | { | 46 | { |
47 | } | 47 | } |
48 | 48 | ||
49 | Storage storage; | 49 | Storage storage; |
50 | Storage::Transaction transaction; | 50 | Storage::Transaction transaction; |
51 | QHash<QString, QVector<Preprocessor *> > nullPipeline; | 51 | QHash<QString, QVector<Preprocessor *> > processors; |
52 | QHash<QString, QVector<Preprocessor *> > newPipeline; | ||
53 | QHash<QString, QVector<Preprocessor *> > modifiedPipeline; | ||
54 | QHash<QString, QVector<Preprocessor *> > deletedPipeline; | ||
55 | QVector<PipelineState> activePipelines; | ||
56 | bool stepScheduled; | ||
57 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; | 52 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; |
58 | }; | 53 | }; |
59 | 54 | ||
@@ -68,21 +63,9 @@ Pipeline::~Pipeline() | |||
68 | delete d; | 63 | delete d; |
69 | } | 64 | } |
70 | 65 | ||
71 | void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors) | 66 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) |
72 | { | 67 | { |
73 | switch (pipelineType) { | 68 | d->processors[entityType] = processors; |
74 | case NewPipeline: | ||
75 | d->newPipeline[entityType] = preprocessors; | ||
76 | break; | ||
77 | case ModifiedPipeline: | ||
78 | d->modifiedPipeline[entityType] = preprocessors; | ||
79 | break; | ||
80 | case DeletedPipeline: | ||
81 | d->deletedPipeline[entityType] = preprocessors; | ||
82 | break; | ||
83 | default: | ||
84 | break; | ||
85 | }; | ||
86 | } | 69 | } |
87 | 70 | ||
88 | void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) | 71 | void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) |
@@ -92,6 +75,11 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac | |||
92 | 75 | ||
93 | void Pipeline::startTransaction() | 76 | void Pipeline::startTransaction() |
94 | { | 77 | { |
78 | //TODO call for all types | ||
79 | //But avoid doing it during cleanup | ||
80 | // for (auto processor : d->processors[bufferType]) { | ||
81 | // processor->startBatch(); | ||
82 | // } | ||
95 | if (d->transaction) { | 83 | if (d->transaction) { |
96 | return; | 84 | return; |
97 | } | 85 | } |
@@ -100,7 +88,13 @@ void Pipeline::startTransaction() | |||
100 | 88 | ||
101 | void Pipeline::commit() | 89 | void Pipeline::commit() |
102 | { | 90 | { |
91 | //TODO call for all types | ||
92 | //But avoid doing it during cleanup | ||
93 | // for (auto processor : d->processors[bufferType]) { | ||
94 | // processor->finalize(); | ||
95 | // } | ||
103 | const auto revision = Akonadi2::Storage::maxRevision(d->transaction); | 96 | const auto revision = Akonadi2::Storage::maxRevision(d->transaction); |
97 | Trace() << "Committing " << revision; | ||
104 | if (d->transaction) { | 98 | if (d->transaction) { |
105 | d->transaction.commit(); | 99 | d->transaction.commit(); |
106 | } | 100 | } |
@@ -118,14 +112,6 @@ Storage &Pipeline::storage() const | |||
118 | return d->storage; | 112 | return d->storage; |
119 | } | 113 | } |
120 | 114 | ||
121 | void Pipeline::null() | ||
122 | { | ||
123 | //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) | ||
124 | // PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); | ||
125 | // d->activePipelines << state; | ||
126 | // state.step(); | ||
127 | } | ||
128 | |||
129 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 115 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
130 | { | 116 | { |
131 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), | 117 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), |
@@ -181,15 +167,25 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
181 | 167 | ||
182 | storeNewRevision(newRevision, fbb, bufferType, key); | 168 | storeNewRevision(newRevision, fbb, bufferType, key); |
183 | 169 | ||
184 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 170 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
171 | if (!adaptorFactory) { | ||
172 | Warning() << "no adaptor factory for type " << bufferType; | ||
173 | return KAsync::error<qint64>(0); | ||
174 | } | ||
185 | 175 | ||
186 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { | 176 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
187 | PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future, newRevision]() { | 177 | d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { |
188 | future.setValue(newRevision); | 178 | auto entity = Akonadi2::GetEntity(value); |
189 | future.setFinished(); | 179 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
190 | }, bufferType); | 180 | for (auto processor : d->processors[bufferType]) { |
191 | d->activePipelines << state; | 181 | processor->newEntity(key, newRevision, *adaptor, d->transaction); |
192 | state.step(); | 182 | } |
183 | return false; | ||
184 | }, [this](const Akonadi2::Storage::Error &error) { | ||
185 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | ||
186 | }); | ||
187 | return KAsync::start<qint64>([newRevision](){ | ||
188 | return newRevision; | ||
193 | }); | 189 | }); |
194 | } | 190 | } |
195 | 191 | ||
@@ -287,14 +283,18 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
287 | 283 | ||
288 | storeNewRevision(newRevision, fbb, bufferType, key); | 284 | storeNewRevision(newRevision, fbb, bufferType, key); |
289 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 285 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
290 | 286 | d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool { | |
291 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { | 287 | auto entity = Akonadi2::GetEntity(value); |
292 | PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future, newRevision]() { | 288 | auto newEntity = adaptorFactory->createAdaptor(*entity); |
293 | future.setValue(newRevision); | 289 | for (auto processor : d->processors[bufferType]) { |
294 | future.setFinished(); | 290 | processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); |
295 | }, bufferType); | 291 | } |
296 | d->activePipelines << state; | 292 | return false; |
297 | state.step(); | 293 | }, [this](const Akonadi2::Storage::Error &error) { |
294 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | ||
295 | }); | ||
296 | return KAsync::start<qint64>([newRevision](){ | ||
297 | return newRevision; | ||
298 | }); | 298 | }); |
299 | } | 299 | } |
300 | 300 | ||
@@ -331,13 +331,24 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
331 | storeNewRevision(newRevision, fbb, bufferType, key); | 331 | storeNewRevision(newRevision, fbb, bufferType, key); |
332 | Log() << "Pipeline: deleted entity: "<< newRevision; | 332 | Log() << "Pipeline: deleted entity: "<< newRevision; |
333 | 333 | ||
334 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { | 334 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
335 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future, newRevision](){ | 335 | if (!adaptorFactory) { |
336 | future.setValue(newRevision); | 336 | Warning() << "no adaptor factory for type " << bufferType; |
337 | future.setFinished(); | 337 | return KAsync::error<qint64>(0); |
338 | }, bufferType); | 338 | } |
339 | d->activePipelines << state; | 339 | |
340 | state.step(); | 340 | // d->transaction.openDatabase(bufferType + ".main").scan(key, [this, bufferType, newRevision, adaptorFactory](const QByteArray &, const QByteArray &value) -> bool { |
341 | // auto entity = Akonadi2::GetEntity(value); | ||
342 | // auto newEntity = adaptorFactory->createAdaptor(*entity); | ||
343 | for (auto processor : d->processors[bufferType]) { | ||
344 | processor->deletedEntity(key, newRevision, Akonadi2::ApplicationDomain::BufferAdaptor(), d->transaction); | ||
345 | } | ||
346 | // return false; | ||
347 | // }, [this](const Akonadi2::Storage::Error &error) { | ||
348 | // ErrorMsg() << "Failed to find value in pipeline: " << error.message; | ||
349 | // }); | ||
350 | return KAsync::start<qint64>([newRevision](){ | ||
351 | return newRevision; | ||
341 | }); | 352 | }); |
342 | } | 353 | } |
343 | 354 | ||
@@ -372,164 +383,6 @@ qint64 Pipeline::cleanedUpRevision() | |||
372 | return Akonadi2::Storage::cleanedUpRevision(d->transaction); | 383 | return Akonadi2::Storage::cleanedUpRevision(d->transaction); |
373 | } | 384 | } |
374 | 385 | ||
375 | void Pipeline::pipelineStepped(const PipelineState &state) | ||
376 | { | ||
377 | scheduleStep(); | ||
378 | } | ||
379 | |||
380 | void Pipeline::scheduleStep() | ||
381 | { | ||
382 | if (!d->stepScheduled) { | ||
383 | d->stepScheduled = true; | ||
384 | QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); | ||
385 | } | ||
386 | } | ||
387 | |||
388 | void Pipeline::stepPipelines() | ||
389 | { | ||
390 | d->stepScheduled = false; | ||
391 | for (PipelineState &state: d->activePipelines) { | ||
392 | if (state.isIdle()) { | ||
393 | state.step(); | ||
394 | } | ||
395 | } | ||
396 | } | ||
397 | |||
398 | void Pipeline::pipelineCompleted(PipelineState state) | ||
399 | { | ||
400 | //TODO finalize the datastore, inform clients of the new rev | ||
401 | const int index = d->activePipelines.indexOf(state); | ||
402 | if (index > -1) { | ||
403 | d->activePipelines.remove(index); | ||
404 | } | ||
405 | state.callback(); | ||
406 | |||
407 | scheduleStep(); | ||
408 | if (d->activePipelines.isEmpty()) { | ||
409 | emit pipelinesDrained(); | ||
410 | } | ||
411 | } | ||
412 | |||
413 | |||
414 | class PipelineState::Private : public QSharedData | ||
415 | { | ||
416 | public: | ||
417 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c, qint64 r, const QByteArray &b) | ||
418 | : pipeline(p), | ||
419 | type(t), | ||
420 | key(k), | ||
421 | filterIt(filters), | ||
422 | idle(true), | ||
423 | callback(c), | ||
424 | revision(r), | ||
425 | bufferType(b) | ||
426 | {} | ||
427 | |||
428 | Private() | ||
429 | : pipeline(0), | ||
430 | filterIt(QVector<Preprocessor *>()), | ||
431 | idle(true), | ||
432 | revision(-1) | ||
433 | {} | ||
434 | |||
435 | Pipeline *pipeline; | ||
436 | Pipeline::Type type; | ||
437 | QByteArray key; | ||
438 | QVectorIterator<Preprocessor *> filterIt; | ||
439 | bool idle; | ||
440 | std::function<void()> callback; | ||
441 | qint64 revision; | ||
442 | QByteArray bufferType; | ||
443 | }; | ||
444 | |||
445 | PipelineState::PipelineState() | ||
446 | : d(new Private()) | ||
447 | { | ||
448 | |||
449 | } | ||
450 | |||
451 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType) | ||
452 | : d(new Private(pipeline, type, key, filters, callback, revision, bufferType)) | ||
453 | { | ||
454 | } | ||
455 | |||
456 | PipelineState::PipelineState(const PipelineState &other) | ||
457 | : d(other.d) | ||
458 | { | ||
459 | } | ||
460 | |||
461 | PipelineState::~PipelineState() | ||
462 | { | ||
463 | } | ||
464 | |||
465 | PipelineState &PipelineState::operator=(const PipelineState &rhs) | ||
466 | { | ||
467 | d = rhs.d; | ||
468 | return *this; | ||
469 | } | ||
470 | |||
471 | bool PipelineState::operator==(const PipelineState &rhs) | ||
472 | { | ||
473 | return d == rhs.d; | ||
474 | } | ||
475 | |||
476 | bool PipelineState::isIdle() const | ||
477 | { | ||
478 | return d->idle; | ||
479 | } | ||
480 | |||
481 | QByteArray PipelineState::key() const | ||
482 | { | ||
483 | return d->key; | ||
484 | } | ||
485 | |||
486 | Pipeline::Type PipelineState::type() const | ||
487 | { | ||
488 | return d->type; | ||
489 | } | ||
490 | |||
491 | qint64 PipelineState::revision() const | ||
492 | { | ||
493 | return d->revision; | ||
494 | } | ||
495 | |||
496 | QByteArray PipelineState::bufferType() const | ||
497 | { | ||
498 | return d->bufferType; | ||
499 | } | ||
500 | |||
501 | void PipelineState::step() | ||
502 | { | ||
503 | if (!d->pipeline) { | ||
504 | Q_ASSERT(false); | ||
505 | return; | ||
506 | } | ||
507 | |||
508 | d->idle = false; | ||
509 | if (d->filterIt.hasNext()) { | ||
510 | //TODO skip step if already processed | ||
511 | auto preprocessor = d->filterIt.next(); | ||
512 | preprocessor->process(*this, d->pipeline->transaction()); | ||
513 | } else { | ||
514 | //This object becomes invalid after this call | ||
515 | d->pipeline->pipelineCompleted(*this); | ||
516 | } | ||
517 | } | ||
518 | |||
519 | void PipelineState::processingCompleted(Preprocessor *filter) | ||
520 | { | ||
521 | if (d->pipeline && filter == d->filterIt.peekPrevious()) { | ||
522 | d->idle = true; | ||
523 | d->pipeline->pipelineStepped(*this); | ||
524 | } | ||
525 | } | ||
526 | |||
527 | void PipelineState::callback() | ||
528 | { | ||
529 | d->callback(); | ||
530 | } | ||
531 | |||
532 | |||
533 | Preprocessor::Preprocessor() | 386 | Preprocessor::Preprocessor() |
534 | : d(0) | 387 | : d(0) |
535 | { | 388 | { |
@@ -539,19 +392,12 @@ Preprocessor::~Preprocessor() | |||
539 | { | 392 | { |
540 | } | 393 | } |
541 | 394 | ||
542 | void Preprocessor::process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) | 395 | void Preprocessor::startBatch() |
543 | { | ||
544 | processingCompleted(state); | ||
545 | } | ||
546 | |||
547 | void Preprocessor::processingCompleted(PipelineState state) | ||
548 | { | 396 | { |
549 | state.processingCompleted(this); | ||
550 | } | 397 | } |
551 | 398 | ||
552 | QString Preprocessor::id() const | 399 | void Preprocessor::finalize() |
553 | { | 400 | { |
554 | return QLatin1String("unknown processor"); | ||
555 | } | 401 | } |
556 | 402 | ||
557 | } // namespace Akonadi2 | 403 | } // namespace Akonadi2 |
diff --git a/common/pipeline.h b/common/pipeline.h index c8d9ddc..f11d880 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -35,7 +35,6 @@ | |||
35 | namespace Akonadi2 | 35 | namespace Akonadi2 |
36 | { | 36 | { |
37 | 37 | ||
38 | class PipelineState; | ||
39 | class Preprocessor; | 38 | class Preprocessor; |
40 | 39 | ||
41 | class AKONADI2COMMON_EXPORT Pipeline : public QObject | 40 | class AKONADI2COMMON_EXPORT Pipeline : public QObject |
@@ -43,19 +42,16 @@ class AKONADI2COMMON_EXPORT Pipeline : public QObject | |||
43 | Q_OBJECT | 42 | Q_OBJECT |
44 | 43 | ||
45 | public: | 44 | public: |
46 | enum Type { NullPipeline, NewPipeline, ModifiedPipeline, DeletedPipeline }; | ||
47 | |||
48 | Pipeline(const QString &storagePath, QObject *parent = 0); | 45 | Pipeline(const QString &storagePath, QObject *parent = 0); |
49 | ~Pipeline(); | 46 | ~Pipeline(); |
50 | 47 | ||
51 | Storage &storage() const; | 48 | Storage &storage() const; |
52 | 49 | ||
53 | void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); | 50 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); |
54 | void startTransaction(); | 51 | void startTransaction(); |
55 | void commit(); | 52 | void commit(); |
56 | Storage::Transaction &transaction(); | 53 | Storage::Transaction &transaction(); |
57 | 54 | ||
58 | void null(); | ||
59 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); | 55 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); |
60 | 56 | ||
61 | KAsync::Job<qint64> newEntity(void const *command, size_t size); | 57 | KAsync::Job<qint64> newEntity(void const *command, size_t size); |
@@ -75,104 +71,30 @@ public: | |||
75 | 71 | ||
76 | Q_SIGNALS: | 72 | Q_SIGNALS: |
77 | void revisionUpdated(qint64); | 73 | void revisionUpdated(qint64); |
78 | void pipelinesDrained(); | ||
79 | |||
80 | private Q_SLOTS: | ||
81 | void stepPipelines(); | ||
82 | 74 | ||
83 | private: | 75 | private: |
84 | void pipelineStepped(const PipelineState &state); | ||
85 | //Don't use a reference here (it would invalidate itself) | ||
86 | void pipelineCompleted(PipelineState state); | ||
87 | void scheduleStep(); | ||
88 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | 76 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); |
89 | 77 | ||
90 | friend class PipelineState; | ||
91 | |||
92 | class Private; | 78 | class Private; |
93 | Private * const d; | 79 | Private * const d; |
94 | }; | 80 | }; |
95 | 81 | ||
96 | class AKONADI2COMMON_EXPORT PipelineState | ||
97 | { | ||
98 | public: | ||
99 | PipelineState(); | ||
100 | PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType); | ||
101 | PipelineState(const PipelineState &other); | ||
102 | ~PipelineState(); | ||
103 | |||
104 | PipelineState &operator=(const PipelineState &rhs); | ||
105 | bool operator==(const PipelineState &rhs); | ||
106 | |||
107 | bool isIdle() const; | ||
108 | QByteArray key() const; | ||
109 | Pipeline::Type type() const; | ||
110 | qint64 revision() const; | ||
111 | QByteArray bufferType() const; | ||
112 | |||
113 | void step(); | ||
114 | void processingCompleted(Preprocessor *filter); | ||
115 | |||
116 | void callback(); | ||
117 | |||
118 | private: | ||
119 | class Private; | ||
120 | QExplicitlySharedDataPointer<Private> d; | ||
121 | }; | ||
122 | |||
123 | class AKONADI2COMMON_EXPORT Preprocessor | 82 | class AKONADI2COMMON_EXPORT Preprocessor |
124 | { | 83 | { |
125 | public: | 84 | public: |
126 | Preprocessor(); | 85 | Preprocessor(); |
127 | virtual ~Preprocessor(); | 86 | virtual ~Preprocessor(); |
128 | 87 | ||
129 | virtual void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction); | 88 | virtual void startBatch(); |
130 | //TODO to record progress | 89 | virtual void newEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0; |
131 | virtual QString id() const; | 90 | virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0; |
132 | 91 | virtual void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) = 0; | |
133 | protected: | 92 | virtual void finalize(); |
134 | void processingCompleted(PipelineState state); | ||
135 | 93 | ||
136 | private: | 94 | private: |
137 | class Private; | 95 | class Private; |
138 | Private * const d; | 96 | Private * const d; |
139 | }; | 97 | }; |
140 | 98 | ||
141 | /** | ||
142 | * A simple processor that takes a single function | ||
143 | */ | ||
144 | class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor | ||
145 | { | ||
146 | public: | ||
147 | SimpleProcessor(const QString &id, const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> &f) | ||
148 | : Akonadi2::Preprocessor(), | ||
149 | mFunction(f), | ||
150 | mId(id) | ||
151 | { | ||
152 | } | ||
153 | |||
154 | void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
155 | { | ||
156 | transaction.openDatabase(state.bufferType() + ".main").scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool { | ||
157 | auto entity = Akonadi2::GetEntity(value); | ||
158 | mFunction(state, *entity, transaction); | ||
159 | processingCompleted(state); | ||
160 | return false; | ||
161 | }, [this, state](const Akonadi2::Storage::Error &error) { | ||
162 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | ||
163 | processingCompleted(state); | ||
164 | }); | ||
165 | } | ||
166 | |||
167 | QString id() const Q_DECL_OVERRIDE | ||
168 | { | ||
169 | return mId; | ||
170 | } | ||
171 | |||
172 | protected: | ||
173 | std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> mFunction; | ||
174 | QString mId; | ||
175 | }; | ||
176 | |||
177 | } // namespace Akonadi2 | 99 | } // namespace Akonadi2 |
178 | 100 | ||
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index 8e6bd42..0a2e90b 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp | |||
@@ -48,31 +48,76 @@ static void index(const QByteArray &index, const QVariant &value, const QByteArr | |||
48 | } | 48 | } |
49 | } | 49 | } |
50 | 50 | ||
51 | DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Akonadi2::Pipeline> &pipeline) | 51 | /** |
52 | : Akonadi2::GenericResource(instanceIdentifier, pipeline) | 52 | * Index types: |
53 | { | 53 | * * uid - property |
54 | auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create(); | 54 | * |
55 | const auto resourceIdentifier = mResourceInstanceIdentifier; | 55 | * * Property can be: |
56 | * * fixed value like uid | ||
57 | * * fixed value where we want to do smaller/greater-than comparisons. (like start date) | ||
58 | * * range indexes like what date range an event affects. | ||
59 | * * group indexes like tree hierarchies as nested sets | ||
60 | */ | ||
61 | template<typename DomainType> | ||
62 | class IndexUpdater : public Akonadi2::Preprocessor { | ||
63 | public: | ||
64 | IndexUpdater(const QByteArray &index, const QByteArray &type) | ||
65 | :mIndexIdentifier(index), | ||
66 | mBufferType(type) | ||
67 | { | ||
56 | 68 | ||
57 | auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { | 69 | } |
58 | Akonadi2::ApplicationDomain::Event event(resourceIdentifier, Akonadi2::Storage::uidFromKey(state.key()), -1, eventFactory->createAdaptor(entity)); | 70 | |
59 | Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Event>::index(event, transaction); | 71 | void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE |
60 | index("event.index.rid", event.getProperty("remoteId"), event.identifier(), transaction); | 72 | { |
61 | }); | 73 | Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction); |
74 | add(newEntity.getProperty("remoteId"), uid, transaction); | ||
75 | } | ||
62 | 76 | ||
63 | mPipeline->setPreprocessors(ENTITY_TYPE_EVENT, Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); | 77 | void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE |
64 | mPipeline->setAdaptorFactory(ENTITY_TYPE_EVENT, eventFactory); | 78 | { |
65 | //TODO cleanup indexes during removal | 79 | } |
66 | 80 | ||
81 | void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
67 | { | 82 | { |
68 | auto mailFactory = QSharedPointer<DummyMailAdaptorFactory>::create(); | 83 | } |
69 | auto mailIndexer = new Akonadi2::SimpleProcessor("mailIndexer", [mailFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { | 84 | private: |
70 | Akonadi2::ApplicationDomain::Mail mail(resourceIdentifier, Akonadi2::Storage::uidFromKey(state.key()), -1, mailFactory->createAdaptor(entity)); | 85 | void add(const QVariant &value, const QByteArray &uid, Akonadi2::Storage::Transaction &transaction) |
71 | Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Mail>::index(mail, transaction); | 86 | { |
72 | index("mail.index.rid", mail.getProperty("remoteId"), mail.identifier(), transaction); | 87 | if (value.isValid()) { |
73 | }); | 88 | Index(mIndexIdentifier, transaction).add(value.toByteArray(), uid); |
89 | } | ||
90 | } | ||
91 | |||
92 | void remove(const QByteArray &uid, Akonadi2::Storage::Transaction &transaction) | ||
93 | { | ||
94 | //Knowning the indexed value would probably help removing the uid efficiently. Otherwise we have to execute a full scan. | ||
95 | // Index(mIndexIdentifier, transaction).remove(uid); | ||
96 | } | ||
97 | |||
98 | void modify(Akonadi2::Storage::Transaction &transaction) | ||
99 | { | ||
100 | //Knowning the indexed value would probably help removing the uid efficiently. Otherwise we have to execute a full scan. | ||
101 | // Index(mIndexIdentifier, transaction).remove(uid); | ||
102 | } | ||
74 | 103 | ||
75 | mPipeline->setPreprocessors(ENTITY_TYPE_MAIL, Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << mailIndexer); | 104 | QByteArray mIndexIdentifier; |
105 | QByteArray mBufferType; | ||
106 | }; | ||
107 | |||
108 | DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Akonadi2::Pipeline> &pipeline) | ||
109 | : Akonadi2::GenericResource(instanceIdentifier, pipeline) | ||
110 | { | ||
111 | { | ||
112 | auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create(); | ||
113 | auto eventIndexer = new IndexUpdater<Akonadi2::ApplicationDomain::Event>("event.index.rid", ENTITY_TYPE_EVENT); | ||
114 | mPipeline->setPreprocessors(ENTITY_TYPE_EVENT, QVector<Akonadi2::Preprocessor*>() << eventIndexer); | ||
115 | mPipeline->setAdaptorFactory(ENTITY_TYPE_EVENT, eventFactory); | ||
116 | } | ||
117 | { | ||
118 | auto mailFactory = QSharedPointer<DummyMailAdaptorFactory>::create(); | ||
119 | auto mailIndexer = new IndexUpdater<Akonadi2::ApplicationDomain::Mail>("mail.index.rid", ENTITY_TYPE_MAIL); | ||
120 | mPipeline->setPreprocessors(ENTITY_TYPE_MAIL, QVector<Akonadi2::Preprocessor*>() << mailIndexer); | ||
76 | mPipeline->setAdaptorFactory(ENTITY_TYPE_MAIL, mailFactory); | 121 | mPipeline->setAdaptorFactory(ENTITY_TYPE_MAIL, mailFactory); |
77 | } | 122 | } |
78 | } | 123 | } |
@@ -171,9 +216,7 @@ KAsync::Job<void> DummyResource::synchronizeWithSource() | |||
171 | 216 | ||
172 | void DummyResource::removeFromDisk(const QByteArray &instanceIdentifier) | 217 | void DummyResource::removeFromDisk(const QByteArray &instanceIdentifier) |
173 | { | 218 | { |
174 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk(); | 219 | GenericResource::removeFromDisk(instanceIdentifier); |
175 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); | ||
176 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); | ||
177 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".event.index.uid", Akonadi2::Storage::ReadWrite).removeFromDisk(); | 220 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".event.index.uid", Akonadi2::Storage::ReadWrite).removeFromDisk(); |
178 | } | 221 | } |
179 | 222 | ||
diff --git a/tests/genericresourcebenchmark.cpp b/tests/genericresourcebenchmark.cpp index b8635d7..fbe0d12 100644 --- a/tests/genericresourcebenchmark.cpp +++ b/tests/genericresourcebenchmark.cpp | |||
@@ -60,6 +60,25 @@ static QByteArray createEntityBuffer() | |||
60 | return QByteArray(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); | 60 | return QByteArray(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); |
61 | } | 61 | } |
62 | 62 | ||
63 | class IndexUpdater : public Akonadi2::Preprocessor { | ||
64 | public: | ||
65 | void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
66 | { | ||
67 | for (int i = 0; i < 10; i++) { | ||
68 | Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction); | ||
69 | ridIndex.add("foo", uid); | ||
70 | } | ||
71 | } | ||
72 | |||
73 | void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
74 | { | ||
75 | } | ||
76 | |||
77 | void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
78 | { | ||
79 | } | ||
80 | }; | ||
81 | |||
63 | /** | 82 | /** |
64 | * Benchmark write performance of generic resource implementation including queues and pipeline. | 83 | * Benchmark write performance of generic resource implementation including queues and pipeline. |
65 | */ | 84 | */ |
@@ -124,19 +143,9 @@ private Q_SLOTS: | |||
124 | 143 | ||
125 | auto eventFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | 144 | auto eventFactory = QSharedPointer<TestEventAdaptorFactory>::create(); |
126 | const QByteArray resourceIdentifier = "org.kde.test.instance1"; | 145 | const QByteArray resourceIdentifier = "org.kde.test.instance1"; |
127 | auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { | 146 | auto indexer = QSharedPointer<IndexUpdater>::create(); |
128 | auto adaptor = eventFactory->createAdaptor(entity); | 147 | |
129 | Akonadi2::ApplicationDomain::Event event(resourceIdentifier, state.key(), -1, adaptor); | 148 | pipeline->setPreprocessors("event", QVector<Akonadi2::Preprocessor*>() << indexer.data()); |
130 | Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Event>::index(event, transaction); | ||
131 | |||
132 | //Create a bunch of indexes | ||
133 | for (int i = 0; i < 10; i++) { | ||
134 | Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction); | ||
135 | ridIndex.add("foo", event.identifier()); | ||
136 | } | ||
137 | }); | ||
138 | |||
139 | pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); | ||
140 | pipeline->setAdaptorFactory("event", eventFactory); | 149 | pipeline->setAdaptorFactory("event", eventFactory); |
141 | 150 | ||
142 | TestResource resource("org.kde.test.instance1", pipeline); | 151 | TestResource resource("org.kde.test.instance1", pipeline); |
diff --git a/tests/genericresourcetest.cpp b/tests/genericresourcetest.cpp index 6dd4108..141a5f8 100644 --- a/tests/genericresourcetest.cpp +++ b/tests/genericresourcetest.cpp | |||
@@ -32,9 +32,7 @@ private Q_SLOTS: | |||
32 | 32 | ||
33 | void init() | 33 | void init() |
34 | { | 34 | { |
35 | removeFromDisk("org.kde.test.instance1"); | 35 | Akonadi2::GenericResource::removeFromDisk("org.kde.test.instance1"); |
36 | removeFromDisk("org.kde.test.instance1.userqueue"); | ||
37 | removeFromDisk("org.kde.test.instance1.synchronizerqueue"); | ||
38 | Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); | 36 | Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); |
39 | } | 37 | } |
40 | 38 | ||
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 7efba13..0b4c13e 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp | |||
@@ -139,6 +139,34 @@ QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision) | |||
139 | return command; | 139 | return command; |
140 | } | 140 | } |
141 | 141 | ||
142 | class TestProcessor : public Akonadi2::Preprocessor { | ||
143 | public: | ||
144 | void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
145 | { | ||
146 | newUids << uid; | ||
147 | newRevisions << revision; | ||
148 | } | ||
149 | |||
150 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
151 | { | ||
152 | modifiedUids << uid; | ||
153 | modifiedRevisions << revision; | ||
154 | } | ||
155 | |||
156 | void deletedEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
157 | { | ||
158 | deletedUids << uid; | ||
159 | deletedRevisions << revision; | ||
160 | } | ||
161 | |||
162 | QList<QByteArray> newUids; | ||
163 | QList<qint64> newRevisions; | ||
164 | QList<QByteArray> modifiedUids; | ||
165 | QList<qint64> modifiedRevisions; | ||
166 | QList<QByteArray> deletedUids; | ||
167 | QList<qint64> deletedRevisions; | ||
168 | }; | ||
169 | |||
142 | /** | 170 | /** |
143 | * Test of the pipeline implementation to ensure new revisions are created correctly in the database. | 171 | * Test of the pipeline implementation to ensure new revisions are created correctly in the database. |
144 | */ | 172 | */ |
@@ -251,6 +279,53 @@ private Q_SLOTS: | |||
251 | //And all revisions are gone | 279 | //And all revisions are gone |
252 | QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 0); | 280 | QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 0); |
253 | } | 281 | } |
282 | |||
283 | void testPreprocessor() | ||
284 | { | ||
285 | flatbuffers::FlatBufferBuilder entityFbb; | ||
286 | |||
287 | TestProcessor testProcessor; | ||
288 | |||
289 | Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1"); | ||
290 | pipeline.setPreprocessors("event", QVector<Akonadi2::Preprocessor*>() << &testProcessor); | ||
291 | pipeline.startTransaction(); | ||
292 | pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); | ||
293 | |||
294 | //Actual test | ||
295 | { | ||
296 | auto command = createEntityCommand(createEvent(entityFbb)); | ||
297 | pipeline.newEntity(command.constData(), command.size()); | ||
298 | QCOMPARE(testProcessor.newUids.size(), 1); | ||
299 | QCOMPARE(testProcessor.newRevisions.size(), 1); | ||
300 | //Key doesn't contain revision and is just the uid | ||
301 | QCOMPARE(testProcessor.newUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.newUids.at(0))); | ||
302 | } | ||
303 | pipeline.commit(); | ||
304 | entityFbb.Clear(); | ||
305 | pipeline.startTransaction(); | ||
306 | auto keys = getKeys("org.kde.pipelinetest.instance1", "event.main"); | ||
307 | QCOMPARE(keys.size(), 1); | ||
308 | const auto uid = Akonadi2::Storage::uidFromKey(keys.first()); | ||
309 | { | ||
310 | auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1); | ||
311 | pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); | ||
312 | QCOMPARE(testProcessor.modifiedUids.size(), 1); | ||
313 | QCOMPARE(testProcessor.modifiedRevisions.size(), 1); | ||
314 | //Key doesn't contain revision and is just the uid | ||
315 | QCOMPARE(testProcessor.modifiedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.modifiedUids.at(0))); | ||
316 | } | ||
317 | pipeline.commit(); | ||
318 | entityFbb.Clear(); | ||
319 | pipeline.startTransaction(); | ||
320 | { | ||
321 | auto deleteCommand = deleteEntityCommand(uid, 1); | ||
322 | pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()); | ||
323 | QCOMPARE(testProcessor.deletedUids.size(), 1); | ||
324 | QCOMPARE(testProcessor.deletedUids.size(), 1); | ||
325 | //Key doesn't contain revision and is just the uid | ||
326 | QCOMPARE(testProcessor.deletedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.deletedUids.at(0))); | ||
327 | } | ||
328 | } | ||
254 | }; | 329 | }; |
255 | 330 | ||
256 | QTEST_MAIN(PipelineTest) | 331 | QTEST_MAIN(PipelineTest) |