diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-28 16:39:16 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-28 16:39:16 +0100 |
commit | 129333371d28c06d85f75ca579ce17798e615e84 (patch) | |
tree | 2ae01db9d26f6f72a74fa77e6937e03304e81a2c /tests | |
parent | 20f049b65c4bd8c3d0c16bbf398641675648a93f (diff) | |
download | sink-129333371d28c06d85f75ca579ce17798e615e84.tar.gz sink-129333371d28c06d85f75ca579ce17798e615e84.zip |
Made pipeline preprocessing synchronous.
Instead of having the asynchronous preprocessor concept with different
pipelines for new/modify/delete we have a single pipeline with
synchronous preprocessors that act upon new/modify/delete.
This keeps the code simpler due to lack of asynchronity and keeps the
new/modify/delete operations together (which at least for the indexing
makes a lot of sense).
Not supporting asynchronity is ok because the tasks done in
preprocessing are not cpu intensive (if they were we had a problem
since they are directly involved in the round-trip time), and the main
cost comes from i/o, meaning we don't gain much by doing multithreading.
Costly tasks (such as full-text indexing) should rather be implemented
as post-processing, since that doesn't increase the round-trip time directly,
and eventually consistent is typically good enough for that.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/genericresourcebenchmark.cpp | 35 | ||||
-rw-r--r-- | tests/genericresourcetest.cpp | 4 | ||||
-rw-r--r-- | tests/pipelinetest.cpp | 75 |
3 files changed, 98 insertions, 16 deletions
diff --git a/tests/genericresourcebenchmark.cpp b/tests/genericresourcebenchmark.cpp index b8635d7..fbe0d12 100644 --- a/tests/genericresourcebenchmark.cpp +++ b/tests/genericresourcebenchmark.cpp | |||
@@ -60,6 +60,25 @@ static QByteArray createEntityBuffer() | |||
60 | return QByteArray(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); | 60 | return QByteArray(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); |
61 | } | 61 | } |
62 | 62 | ||
63 | class IndexUpdater : public Akonadi2::Preprocessor { | ||
64 | public: | ||
65 | void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
66 | { | ||
67 | for (int i = 0; i < 10; i++) { | ||
68 | Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction); | ||
69 | ridIndex.add("foo", uid); | ||
70 | } | ||
71 | } | ||
72 | |||
73 | void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
74 | { | ||
75 | } | ||
76 | |||
77 | void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
78 | { | ||
79 | } | ||
80 | }; | ||
81 | |||
63 | /** | 82 | /** |
64 | * Benchmark write performance of generic resource implementation including queues and pipeline. | 83 | * Benchmark write performance of generic resource implementation including queues and pipeline. |
65 | */ | 84 | */ |
@@ -124,19 +143,9 @@ private Q_SLOTS: | |||
124 | 143 | ||
125 | auto eventFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | 144 | auto eventFactory = QSharedPointer<TestEventAdaptorFactory>::create(); |
126 | const QByteArray resourceIdentifier = "org.kde.test.instance1"; | 145 | const QByteArray resourceIdentifier = "org.kde.test.instance1"; |
127 | auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { | 146 | auto indexer = QSharedPointer<IndexUpdater>::create(); |
128 | auto adaptor = eventFactory->createAdaptor(entity); | 147 | |
129 | Akonadi2::ApplicationDomain::Event event(resourceIdentifier, state.key(), -1, adaptor); | 148 | pipeline->setPreprocessors("event", QVector<Akonadi2::Preprocessor*>() << indexer.data()); |
130 | Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Event>::index(event, transaction); | ||
131 | |||
132 | //Create a bunch of indexes | ||
133 | for (int i = 0; i < 10; i++) { | ||
134 | Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction); | ||
135 | ridIndex.add("foo", event.identifier()); | ||
136 | } | ||
137 | }); | ||
138 | |||
139 | pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); | ||
140 | pipeline->setAdaptorFactory("event", eventFactory); | 149 | pipeline->setAdaptorFactory("event", eventFactory); |
141 | 150 | ||
142 | TestResource resource("org.kde.test.instance1", pipeline); | 151 | TestResource resource("org.kde.test.instance1", pipeline); |
diff --git a/tests/genericresourcetest.cpp b/tests/genericresourcetest.cpp index 6dd4108..141a5f8 100644 --- a/tests/genericresourcetest.cpp +++ b/tests/genericresourcetest.cpp | |||
@@ -32,9 +32,7 @@ private Q_SLOTS: | |||
32 | 32 | ||
33 | void init() | 33 | void init() |
34 | { | 34 | { |
35 | removeFromDisk("org.kde.test.instance1"); | 35 | Akonadi2::GenericResource::removeFromDisk("org.kde.test.instance1"); |
36 | removeFromDisk("org.kde.test.instance1.userqueue"); | ||
37 | removeFromDisk("org.kde.test.instance1.synchronizerqueue"); | ||
38 | Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); | 36 | Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); |
39 | } | 37 | } |
40 | 38 | ||
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 7efba13..0b4c13e 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp | |||
@@ -139,6 +139,34 @@ QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision) | |||
139 | return command; | 139 | return command; |
140 | } | 140 | } |
141 | 141 | ||
142 | class TestProcessor : public Akonadi2::Preprocessor { | ||
143 | public: | ||
144 | void newEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
145 | { | ||
146 | newUids << uid; | ||
147 | newRevisions << revision; | ||
148 | } | ||
149 | |||
150 | void modifiedEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
151 | { | ||
152 | modifiedUids << uid; | ||
153 | modifiedRevisions << revision; | ||
154 | } | ||
155 | |||
156 | void deletedEntity(const QByteArray &uid, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
157 | { | ||
158 | deletedUids << uid; | ||
159 | deletedRevisions << revision; | ||
160 | } | ||
161 | |||
162 | QList<QByteArray> newUids; | ||
163 | QList<qint64> newRevisions; | ||
164 | QList<QByteArray> modifiedUids; | ||
165 | QList<qint64> modifiedRevisions; | ||
166 | QList<QByteArray> deletedUids; | ||
167 | QList<qint64> deletedRevisions; | ||
168 | }; | ||
169 | |||
142 | /** | 170 | /** |
143 | * Test of the pipeline implementation to ensure new revisions are created correctly in the database. | 171 | * Test of the pipeline implementation to ensure new revisions are created correctly in the database. |
144 | */ | 172 | */ |
@@ -251,6 +279,53 @@ private Q_SLOTS: | |||
251 | //And all revisions are gone | 279 | //And all revisions are gone |
252 | QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 0); | 280 | QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 0); |
253 | } | 281 | } |
282 | |||
283 | void testPreprocessor() | ||
284 | { | ||
285 | flatbuffers::FlatBufferBuilder entityFbb; | ||
286 | |||
287 | TestProcessor testProcessor; | ||
288 | |||
289 | Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1"); | ||
290 | pipeline.setPreprocessors("event", QVector<Akonadi2::Preprocessor*>() << &testProcessor); | ||
291 | pipeline.startTransaction(); | ||
292 | pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); | ||
293 | |||
294 | //Actual test | ||
295 | { | ||
296 | auto command = createEntityCommand(createEvent(entityFbb)); | ||
297 | pipeline.newEntity(command.constData(), command.size()); | ||
298 | QCOMPARE(testProcessor.newUids.size(), 1); | ||
299 | QCOMPARE(testProcessor.newRevisions.size(), 1); | ||
300 | //Key doesn't contain revision and is just the uid | ||
301 | QCOMPARE(testProcessor.newUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.newUids.at(0))); | ||
302 | } | ||
303 | pipeline.commit(); | ||
304 | entityFbb.Clear(); | ||
305 | pipeline.startTransaction(); | ||
306 | auto keys = getKeys("org.kde.pipelinetest.instance1", "event.main"); | ||
307 | QCOMPARE(keys.size(), 1); | ||
308 | const auto uid = Akonadi2::Storage::uidFromKey(keys.first()); | ||
309 | { | ||
310 | auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1); | ||
311 | pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); | ||
312 | QCOMPARE(testProcessor.modifiedUids.size(), 1); | ||
313 | QCOMPARE(testProcessor.modifiedRevisions.size(), 1); | ||
314 | //Key doesn't contain revision and is just the uid | ||
315 | QCOMPARE(testProcessor.modifiedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.modifiedUids.at(0))); | ||
316 | } | ||
317 | pipeline.commit(); | ||
318 | entityFbb.Clear(); | ||
319 | pipeline.startTransaction(); | ||
320 | { | ||
321 | auto deleteCommand = deleteEntityCommand(uid, 1); | ||
322 | pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()); | ||
323 | QCOMPARE(testProcessor.deletedUids.size(), 1); | ||
324 | QCOMPARE(testProcessor.deletedUids.size(), 1); | ||
325 | //Key doesn't contain revision and is just the uid | ||
326 | QCOMPARE(testProcessor.deletedUids.at(0), Akonadi2::Storage::uidFromKey(testProcessor.deletedUids.at(0))); | ||
327 | } | ||
328 | } | ||
254 | }; | 329 | }; |
255 | 330 | ||
256 | QTEST_MAIN(PipelineTest) | 331 | QTEST_MAIN(PipelineTest) |