summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-23 18:56:43 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-23 18:56:43 +0200
commit1acf9f3c486813df807ff6931e56cc13eb26eeaf (patch)
tree559ead2e95986515b4a5f93b6f143b8f1d429bd3
parent62e7084dcd6f53275fcb21ba17e880e41b40094d (diff)
downloadsink-1acf9f3c486813df807ff6931e56cc13eb26eeaf.tar.gz
sink-1acf9f3c486813df807ff6931e56cc13eb26eeaf.zip
Store indexes as named databases in the same db.
Because we also keep using the same transactions this finally makes the resource somewhat performant. On my system genericresourcebenchmark now processes ~4200 messages per second instead of ~280.
-rw-r--r--common/domain/event.cpp8
-rw-r--r--common/domain/event.h6
-rw-r--r--common/entitystorage.cpp4
-rw-r--r--common/entitystorage.h8
-rw-r--r--common/index.cpp17
-rw-r--r--common/index.h4
-rw-r--r--common/pipeline.cpp14
-rw-r--r--common/pipeline.h19
-rw-r--r--examples/dummyresource/resourcefactory.cpp10
-rw-r--r--tests/genericresourcebenchmark.cpp56
-rw-r--r--tests/storagetest.cpp4
11 files changed, 100 insertions, 50 deletions
diff --git a/common/domain/event.cpp b/common/domain/event.cpp
index 15f5d11..e107441 100644
--- a/common/domain/event.cpp
+++ b/common/domain/event.cpp
@@ -34,11 +34,11 @@
34 34
35using namespace Akonadi2::ApplicationDomain; 35using namespace Akonadi2::ApplicationDomain;
36 36
37ResultSet TypeImplementation<Event>::queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters) 37ResultSet TypeImplementation<Event>::queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction)
38{ 38{
39 QVector<QByteArray> keys; 39 QVector<QByteArray> keys;
40 if (query.propertyFilter.contains("uid")) { 40 if (query.propertyFilter.contains("uid")) {
41 Index uidIndex(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".index.uid", Akonadi2::Storage::ReadOnly); 41 Index uidIndex("index.uid", transaction);
42 uidIndex.lookup(query.propertyFilter.value("uid").toByteArray(), [&](const QByteArray &value) { 42 uidIndex.lookup(query.propertyFilter.value("uid").toByteArray(), [&](const QByteArray &value) {
43 keys << value; 43 keys << value;
44 }, 44 },
@@ -50,11 +50,11 @@ ResultSet TypeImplementation<Event>::queryIndexes(const Akonadi2::Query &query,
50 return ResultSet(keys); 50 return ResultSet(keys);
51} 51}
52 52
53void TypeImplementation<Event>::index(const Event &type) 53void TypeImplementation<Event>::index(const Event &type, Akonadi2::Storage::Transaction &transaction)
54{ 54{
55 Index uidIndex(Akonadi2::storageLocation(), type.resourceInstanceIdentifier() + ".index.uid", Akonadi2::Storage::ReadWrite);
56 const auto uid = type.getProperty("uid"); 55 const auto uid = type.getProperty("uid");
57 if (uid.isValid()) { 56 if (uid.isValid()) {
57 Index uidIndex("index.uid", transaction);
58 uidIndex.add(uid.toByteArray(), type.identifier()); 58 uidIndex.add(uid.toByteArray(), type.identifier());
59 } 59 }
60} 60}
diff --git a/common/domain/event.h b/common/domain/event.h
index 13cfc6e..f21cd34 100644
--- a/common/domain/event.h
+++ b/common/domain/event.h
@@ -20,6 +20,8 @@
20 20
21#include "applicationdomaintype.h" 21#include "applicationdomaintype.h"
22 22
23#include "storage.h"
24
23class ResultSet; 25class ResultSet;
24class QByteArray; 26class QByteArray;
25 27
@@ -53,8 +55,8 @@ public:
53 * 55 *
54 * An empty result set indicates that a full scan is required. 56 * An empty result set indicates that a full scan is required.
55 */ 57 */
56 static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters); 58 static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction);
57 static void index(const Event &type); 59 static void index(const Event &type, Akonadi2::Storage::Transaction &transaction);
58 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 60 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
59 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); 61 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper();
60}; 62};
diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp
index 420c3b0..22fd9e6 100644
--- a/common/entitystorage.cpp
+++ b/common/entitystorage.cpp
@@ -91,10 +91,10 @@ ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std::
91 return ResultSet(generator); 91 return ResultSet(generator);
92} 92}
93 93
94ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) 94ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision)
95{ 95{
96 QSet<QByteArray> appliedFilters; 96 QSet<QByteArray> appliedFilters;
97 ResultSet resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters); 97 ResultSet resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
98 const auto remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; 98 const auto remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
99 99
100 //We do a full scan if there were no indexes available to create the initial set. 100 //We do a full scan if there were no indexes available to create the initial set.
diff --git a/common/entitystorage.h b/common/entitystorage.h
index 2fce880..8256938 100644
--- a/common/entitystorage.h
+++ b/common/entitystorage.h
@@ -44,11 +44,11 @@ protected:
44 44
45 virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr create(const QByteArray &key, qint64 revision, const QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> &adaptor) = 0; 45 virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr create(const QByteArray &key, qint64 revision, const QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> &adaptor) = 0;
46 virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &) = 0; 46 virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &) = 0;
47 virtual ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters) = 0; 47 virtual ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction) = 0;
48 48
49 void readValue(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)> &resultCallback); 49 void readValue(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)> &resultCallback);
50 ResultSet filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); 50 ResultSet filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision);
51 ResultSet getResultSet(const Akonadi2::Query &query, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); 51 ResultSet getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision);
52 52
53protected: 53protected:
54 QByteArray mResourceInstanceIdentifier; 54 QByteArray mResourceInstanceIdentifier;
@@ -77,9 +77,9 @@ protected:
77 return Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(object); 77 return Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(object);
78 } 78 }
79 79
80 ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters) Q_DECL_OVERRIDE 80 ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
81 { 81 {
82 return Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, resourceInstanceIdentifier, appliedFilters); 82 return Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, resourceInstanceIdentifier, appliedFilters, transaction);
83 } 83 }
84 84
85public: 85public:
diff --git a/common/index.cpp b/common/index.cpp
index 7e3c09e..75ffe3f 100644
--- a/common/index.cpp
+++ b/common/index.cpp
@@ -2,24 +2,27 @@
2#include <QDebug> 2#include <QDebug>
3 3
4Index::Index(const QString &storageRoot, const QString &name, Akonadi2::Storage::AccessMode mode) 4Index::Index(const QString &storageRoot, const QString &name, Akonadi2::Storage::AccessMode mode)
5 : mStorage(storageRoot, name, mode, true) 5 : mTransaction(Akonadi2::Storage(storageRoot, name, mode, true).createTransaction(mode)),
6 mDb(mTransaction.openDatabase(name.toLatin1(), std::function<void(const Akonadi2::Storage::Error &)>(), true))
7{
8
9}
10
11Index::Index(const QByteArray &name, Akonadi2::Storage::Transaction &transaction)
12 : mDb(transaction.openDatabase(name, std::function<void(const Akonadi2::Storage::Error &)>(), true))
6{ 13{
7 14
8} 15}
9 16
10void Index::add(const QByteArray &key, const QByteArray &value) 17void Index::add(const QByteArray &key, const QByteArray &value)
11{ 18{
12 mStorage.createTransaction(Akonadi2::Storage::ReadWrite).write(key, value); 19 mDb.write(key, value);
13} 20}
14 21
15void Index::lookup(const QByteArray &key, const std::function<void(const QByteArray &value)> &resultHandler, 22void Index::lookup(const QByteArray &key, const std::function<void(const QByteArray &value)> &resultHandler,
16 const std::function<void(const Error &error)> &errorHandler) 23 const std::function<void(const Error &error)> &errorHandler)
17{ 24{
18 if (!mStorage.exists()) { 25 mDb.scan(key, [this, resultHandler](const QByteArray &key, const QByteArray &value) -> bool {
19 errorHandler(Error("index", IndexNotAvailable, "Index not existing"));
20 return;
21 }
22 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan(key, [this, resultHandler](const QByteArray &key, const QByteArray &value) -> bool {
23 resultHandler(value); 26 resultHandler(value);
24 return true; 27 return true;
25 }, 28 },
diff --git a/common/index.h b/common/index.h
index 08b499f..e1b7e3a 100644
--- a/common/index.h
+++ b/common/index.h
@@ -26,6 +26,7 @@ public:
26 }; 26 };
27 27
28 Index(const QString &storageRoot, const QString &name, Akonadi2::Storage::AccessMode mode = Akonadi2::Storage::ReadOnly); 28 Index(const QString &storageRoot, const QString &name, Akonadi2::Storage::AccessMode mode = Akonadi2::Storage::ReadOnly);
29 Index(const QByteArray &name, Akonadi2::Storage::Transaction &);
29 30
30 void add(const QByteArray &key, const QByteArray &value); 31 void add(const QByteArray &key, const QByteArray &value);
31 // void remove(const QByteArray &key, const QByteArray &value); 32 // void remove(const QByteArray &key, const QByteArray &value);
@@ -35,5 +36,6 @@ public:
35 36
36private: 37private:
37 Q_DISABLE_COPY(Index); 38 Q_DISABLE_COPY(Index);
38 Akonadi2::Storage mStorage; 39 Akonadi2::Storage::Transaction mTransaction;
40 Akonadi2::Storage::NamedDatabase mDb;
39}; 41};
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index ce4ad41..c5e36ee 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -441,18 +441,8 @@ void PipelineState::step()
441 d->idle = false; 441 d->idle = false;
442 if (d->filterIt.hasNext()) { 442 if (d->filterIt.hasNext()) {
443 //TODO skip step if already processed 443 //TODO skip step if already processed
444 //FIXME error handling if no result is found
445 auto preprocessor = d->filterIt.next(); 444 auto preprocessor = d->filterIt.next();
446 //FIXME this read should not be necessary 445 preprocessor->process(*this, d->pipeline->transaction());
447 //Perhaps simply use entity that is initially stored and synchronously process all filters. (Making the first filter somewhat redundant)
448 d->pipeline->transaction().scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool {
449 auto entity = Akonadi2::GetEntity(value);
450 preprocessor->process(*this, *entity);
451 return false;
452 }, [this](const Akonadi2::Storage::Error &error) {
453 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
454 d->pipeline->pipelineCompleted(*this);
455 });
456 } else { 446 } else {
457 //This object becomes invalid after this call 447 //This object becomes invalid after this call
458 d->pipeline->pipelineCompleted(*this); 448 d->pipeline->pipelineCompleted(*this);
@@ -483,7 +473,7 @@ Preprocessor::~Preprocessor()
483{ 473{
484} 474}
485 475
486void Preprocessor::process(const PipelineState &state, const Akonadi2::Entity &) 476void Preprocessor::process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction)
487{ 477{
488 processingCompleted(state); 478 processingCompleted(state);
489} 479}
diff --git a/common/pipeline.h b/common/pipeline.h
index 496e037..fee6a5e 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -115,7 +115,7 @@ public:
115 virtual ~Preprocessor(); 115 virtual ~Preprocessor();
116 116
117 //TODO pass actual command as well, for changerecording 117 //TODO pass actual command as well, for changerecording
118 virtual void process(const PipelineState &state, const Akonadi2::Entity &); 118 virtual void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction);
119 //TODO to record progress 119 //TODO to record progress
120 virtual QString id() const; 120 virtual QString id() const;
121 121
@@ -133,17 +133,24 @@ private:
133class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor 133class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor
134{ 134{
135public: 135public:
136 SimpleProcessor(const QString &id, const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> &f) 136 SimpleProcessor(const QString &id, const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> &f)
137 : Akonadi2::Preprocessor(), 137 : Akonadi2::Preprocessor(),
138 mFunction(f), 138 mFunction(f),
139 mId(id) 139 mId(id)
140 { 140 {
141 } 141 }
142 142
143 void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) Q_DECL_OVERRIDE 143 void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
144 { 144 {
145 mFunction(state, e); 145 transaction.scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool {
146 processingCompleted(state); 146 auto entity = Akonadi2::GetEntity(value);
147 mFunction(state, *entity, transaction);
148 processingCompleted(state);
149 return false;
150 }, [this, state](const Akonadi2::Storage::Error &error) {
151 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
152 processingCompleted(state);
153 });
147 } 154 }
148 155
149 QString id() const 156 QString id() const
@@ -152,7 +159,7 @@ public:
152 } 159 }
153 160
154protected: 161protected:
155 std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> mFunction; 162 std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> mFunction;
156 QString mId; 163 QString mId;
157}; 164};
158 165
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp
index e029308..0e18282 100644
--- a/examples/dummyresource/resourcefactory.cpp
+++ b/examples/dummyresource/resourcefactory.cpp
@@ -42,13 +42,13 @@ DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QShared
42{ 42{
43 auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create(); 43 auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create();
44 const auto resourceIdentifier = mResourceInstanceIdentifier; 44 const auto resourceIdentifier = mResourceInstanceIdentifier;
45 auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { 45 auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) {
46 auto adaptor = eventFactory->createAdaptor(entity); 46 auto adaptor = eventFactory->createAdaptor(entity);
47 //FIXME set revision? 47 //FIXME set revision?
48 Akonadi2::ApplicationDomain::Event event(resourceIdentifier, state.key(), -1, adaptor); 48 Akonadi2::ApplicationDomain::Event event(resourceIdentifier, state.key(), -1, adaptor);
49 Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Event>::index(event); 49 Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Event>::index(event, transaction);
50 50
51 Index ridIndex(Akonadi2::storageLocation(), resourceIdentifier + ".index.rid", Akonadi2::Storage::ReadWrite); 51 Index ridIndex("index.rid", transaction);
52 const auto rid = event.getProperty("remoteId"); 52 const auto rid = event.getProperty("remoteId");
53 if (rid.isValid()) { 53 if (rid.isValid()) {
54 ridIndex.add(rid.toByteArray(), event.identifier()); 54 ridIndex.add(rid.toByteArray(), event.identifier());
@@ -63,8 +63,8 @@ DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QShared
63KAsync::Job<void> DummyResource::synchronizeWithSource() 63KAsync::Job<void> DummyResource::synchronizeWithSource()
64{ 64{
65 return KAsync::start<void>([this](KAsync::Future<void> &f) { 65 return KAsync::start<void>([this](KAsync::Future<void> &f) {
66 //TODO start transaction on index 66 auto transaction = Akonadi2::Storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".index.uid", Akonadi2::Storage::ReadOnly).createTransaction(Akonadi2::Storage::ReadOnly);
67 Index uidIndex(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".index.uid", Akonadi2::Storage::ReadOnly); 67 Index uidIndex("index.uid", transaction);
68 68
69 const auto data = DummyStore::instance().data(); 69 const auto data = DummyStore::instance().data();
70 for (auto it = data.constBegin(); it != data.constEnd(); it++) { 70 for (auto it = data.constBegin(); it != data.constEnd(); it++) {
diff --git a/tests/genericresourcebenchmark.cpp b/tests/genericresourcebenchmark.cpp
index 01dc95d..27678b0 100644
--- a/tests/genericresourcebenchmark.cpp
+++ b/tests/genericresourcebenchmark.cpp
@@ -12,6 +12,7 @@
12#include "genericresource.h" 12#include "genericresource.h"
13#include "definitions.h" 13#include "definitions.h"
14#include "domainadaptor.h" 14#include "domainadaptor.h"
15#include "index.h"
15#include <iostream> 16#include <iostream>
16 17
17class TestResource : public Akonadi2::GenericResource 18class TestResource : public Akonadi2::GenericResource
@@ -89,19 +90,20 @@ private Q_SLOTS:
89 90
90 void init() 91 void init()
91 { 92 {
93 Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Warning);
94 }
95
96 void initTestCase()
97 {
92 removeFromDisk("org.kde.test.instance1"); 98 removeFromDisk("org.kde.test.instance1");
93 removeFromDisk("org.kde.test.instance1.userqueue"); 99 removeFromDisk("org.kde.test.instance1.userqueue");
94 removeFromDisk("org.kde.test.instance1.synchronizerqueue"); 100 removeFromDisk("org.kde.test.instance1.synchronizerqueue");
95 Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Warning);
96 qDebug();
97 qDebug() << "-----------------------------------------";
98 qDebug();
99 } 101 }
100 102
101 103
102 void testWriteInProcess() 104 void testWriteInProcess()
103 { 105 {
104 int num = 50000; 106 int num = 10000;
105 107
106 auto pipeline = QSharedPointer<Akonadi2::Pipeline>::create("org.kde.test.instance1"); 108 auto pipeline = QSharedPointer<Akonadi2::Pipeline>::create("org.kde.test.instance1");
107 TestResource resource("org.kde.test.instance1", pipeline); 109 TestResource resource("org.kde.test.instance1", pipeline);
@@ -125,6 +127,50 @@ private Q_SLOTS:
125 std::cout << "All processed: " << allProcessedTime << " /sec " << num*1000/allProcessedTime << std::endl; 127 std::cout << "All processed: " << allProcessedTime << " /sec " << num*1000/allProcessedTime << std::endl;
126 } 128 }
127 129
130 void testWriteInProcessWithIndex()
131 {
132 int num = 10000;
133
134 auto pipeline = QSharedPointer<Akonadi2::Pipeline>::create("org.kde.test.instance1");
135
136 auto eventFactory = QSharedPointer<TestEventAdaptorFactory>::create();
137 const QByteArray resourceIdentifier = "org.kde.test.instance1";
138 auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) {
139 auto adaptor = eventFactory->createAdaptor(entity);
140 Akonadi2::ApplicationDomain::Event event(resourceIdentifier, state.key(), -1, adaptor);
141 Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Event>::index(event, transaction);
142
143 //Create a bunch of indexes
144 for (int i = 0; i < 10; i++) {
145 Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction);
146 ridIndex.add("foo", event.identifier());
147 }
148 });
149
150 pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer);
151 pipeline->setAdaptorFactory("event", eventFactory);
152
153 TestResource resource("org.kde.test.instance1", pipeline);
154
155 auto command = createEntityBuffer();
156
157 QTime time;
158 time.start();
159
160 for (int i = 0; i < num; i++) {
161 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command);
162 }
163 auto appendTime = time.elapsed();
164
165 //Wait until all messages have been processed
166 resource.processAllMessages().exec().waitForFinished();
167
168 auto allProcessedTime = time.elapsed();
169
170 std::cout << "Append to messagequeue " << appendTime << " /sec " << num*1000/appendTime << std::endl;
171 std::cout << "All processed: " << allProcessedTime << " /sec " << num*1000/allProcessedTime << std::endl;
172 }
173
128 void testCreateCommand() 174 void testCreateCommand()
129 { 175 {
130 Akonadi2::ApplicationDomain::Event event; 176 Akonadi2::ApplicationDomain::Event event;
diff --git a/tests/storagetest.cpp b/tests/storagetest.cpp
index e872c44..6ba4bcd 100644
--- a/tests/storagetest.cpp
+++ b/tests/storagetest.cpp
@@ -223,7 +223,7 @@ private Q_SLOTS:
223 bool gotError = false; 223 bool gotError = false;
224 Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite, false); 224 Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite, false);
225 auto transaction = store.createTransaction(Akonadi2::Storage::ReadWrite); 225 auto transaction = store.createTransaction(Akonadi2::Storage::ReadWrite);
226 auto db = transaction.openDatabase(); 226 auto db = transaction.openDatabase("default", nullptr, false);
227 db.write("key","value"); 227 db.write("key","value");
228 db.write("key","value"); 228 db.write("key","value");
229 229
@@ -246,7 +246,7 @@ private Q_SLOTS:
246 bool gotError = false; 246 bool gotError = false;
247 Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite, true); 247 Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite, true);
248 auto transaction = store.createTransaction(Akonadi2::Storage::ReadWrite); 248 auto transaction = store.createTransaction(Akonadi2::Storage::ReadWrite);
249 auto db = transaction.openDatabase(); 249 auto db = transaction.openDatabase("default", nullptr, true);
250 db.write("key","value1"); 250 db.write("key","value1");
251 db.write("key","value2"); 251 db.write("key","value2");
252 int numValues = db.scan("key", [&](const QByteArray &key, const QByteArray &value) -> bool { 252 int numValues = db.scan("key", [&](const QByteArray &key, const QByteArray &value) -> bool {