summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-23 18:56:43 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-23 18:56:43 +0200
commit1acf9f3c486813df807ff6931e56cc13eb26eeaf (patch)
tree559ead2e95986515b4a5f93b6f143b8f1d429bd3 /common
parent62e7084dcd6f53275fcb21ba17e880e41b40094d (diff)
downloadsink-1acf9f3c486813df807ff6931e56cc13eb26eeaf.tar.gz
sink-1acf9f3c486813df807ff6931e56cc13eb26eeaf.zip
Store indexes as named databases in the same db.
Because we also keep using the same transactions this finally makes the resource somewhat performant. On my system genericresourcebenchmark now processes ~4200 messages per second instead of ~280.
Diffstat (limited to 'common')
-rw-r--r--common/domain/event.cpp8
-rw-r--r--common/domain/event.h6
-rw-r--r--common/entitystorage.cpp4
-rw-r--r--common/entitystorage.h8
-rw-r--r--common/index.cpp17
-rw-r--r--common/index.h4
-rw-r--r--common/pipeline.cpp14
-rw-r--r--common/pipeline.h19
8 files changed, 42 insertions, 38 deletions
diff --git a/common/domain/event.cpp b/common/domain/event.cpp
index 15f5d11..e107441 100644
--- a/common/domain/event.cpp
+++ b/common/domain/event.cpp
@@ -34,11 +34,11 @@
34 34
35using namespace Akonadi2::ApplicationDomain; 35using namespace Akonadi2::ApplicationDomain;
36 36
37ResultSet TypeImplementation<Event>::queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters) 37ResultSet TypeImplementation<Event>::queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction)
38{ 38{
39 QVector<QByteArray> keys; 39 QVector<QByteArray> keys;
40 if (query.propertyFilter.contains("uid")) { 40 if (query.propertyFilter.contains("uid")) {
41 Index uidIndex(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".index.uid", Akonadi2::Storage::ReadOnly); 41 Index uidIndex("index.uid", transaction);
42 uidIndex.lookup(query.propertyFilter.value("uid").toByteArray(), [&](const QByteArray &value) { 42 uidIndex.lookup(query.propertyFilter.value("uid").toByteArray(), [&](const QByteArray &value) {
43 keys << value; 43 keys << value;
44 }, 44 },
@@ -50,11 +50,11 @@ ResultSet TypeImplementation<Event>::queryIndexes(const Akonadi2::Query &query,
50 return ResultSet(keys); 50 return ResultSet(keys);
51} 51}
52 52
53void TypeImplementation<Event>::index(const Event &type) 53void TypeImplementation<Event>::index(const Event &type, Akonadi2::Storage::Transaction &transaction)
54{ 54{
55 Index uidIndex(Akonadi2::storageLocation(), type.resourceInstanceIdentifier() + ".index.uid", Akonadi2::Storage::ReadWrite);
56 const auto uid = type.getProperty("uid"); 55 const auto uid = type.getProperty("uid");
57 if (uid.isValid()) { 56 if (uid.isValid()) {
57 Index uidIndex("index.uid", transaction);
58 uidIndex.add(uid.toByteArray(), type.identifier()); 58 uidIndex.add(uid.toByteArray(), type.identifier());
59 } 59 }
60} 60}
diff --git a/common/domain/event.h b/common/domain/event.h
index 13cfc6e..f21cd34 100644
--- a/common/domain/event.h
+++ b/common/domain/event.h
@@ -20,6 +20,8 @@
20 20
21#include "applicationdomaintype.h" 21#include "applicationdomaintype.h"
22 22
23#include "storage.h"
24
23class ResultSet; 25class ResultSet;
24class QByteArray; 26class QByteArray;
25 27
@@ -53,8 +55,8 @@ public:
53 * 55 *
54 * An empty result set indicates that a full scan is required. 56 * An empty result set indicates that a full scan is required.
55 */ 57 */
56 static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters); 58 static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction);
57 static void index(const Event &type); 59 static void index(const Event &type, Akonadi2::Storage::Transaction &transaction);
58 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 60 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
59 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); 61 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper();
60}; 62};
diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp
index 420c3b0..22fd9e6 100644
--- a/common/entitystorage.cpp
+++ b/common/entitystorage.cpp
@@ -91,10 +91,10 @@ ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std::
91 return ResultSet(generator); 91 return ResultSet(generator);
92} 92}
93 93
94ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) 94ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision)
95{ 95{
96 QSet<QByteArray> appliedFilters; 96 QSet<QByteArray> appliedFilters;
97 ResultSet resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters); 97 ResultSet resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
98 const auto remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; 98 const auto remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
99 99
100 //We do a full scan if there were no indexes available to create the initial set. 100 //We do a full scan if there were no indexes available to create the initial set.
diff --git a/common/entitystorage.h b/common/entitystorage.h
index 2fce880..8256938 100644
--- a/common/entitystorage.h
+++ b/common/entitystorage.h
@@ -44,11 +44,11 @@ protected:
44 44
45 virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr create(const QByteArray &key, qint64 revision, const QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> &adaptor) = 0; 45 virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr create(const QByteArray &key, qint64 revision, const QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> &adaptor) = 0;
46 virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &) = 0; 46 virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &) = 0;
47 virtual ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters) = 0; 47 virtual ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction) = 0;
48 48
49 void readValue(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)> &resultCallback); 49 void readValue(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)> &resultCallback);
50 ResultSet filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); 50 ResultSet filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision);
51 ResultSet getResultSet(const Akonadi2::Query &query, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); 51 ResultSet getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision);
52 52
53protected: 53protected:
54 QByteArray mResourceInstanceIdentifier; 54 QByteArray mResourceInstanceIdentifier;
@@ -77,9 +77,9 @@ protected:
77 return Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(object); 77 return Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(object);
78 } 78 }
79 79
80 ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters) Q_DECL_OVERRIDE 80 ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
81 { 81 {
82 return Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, resourceInstanceIdentifier, appliedFilters); 82 return Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, resourceInstanceIdentifier, appliedFilters, transaction);
83 } 83 }
84 84
85public: 85public:
diff --git a/common/index.cpp b/common/index.cpp
index 7e3c09e..75ffe3f 100644
--- a/common/index.cpp
+++ b/common/index.cpp
@@ -2,24 +2,27 @@
2#include <QDebug> 2#include <QDebug>
3 3
4Index::Index(const QString &storageRoot, const QString &name, Akonadi2::Storage::AccessMode mode) 4Index::Index(const QString &storageRoot, const QString &name, Akonadi2::Storage::AccessMode mode)
5 : mStorage(storageRoot, name, mode, true) 5 : mTransaction(Akonadi2::Storage(storageRoot, name, mode, true).createTransaction(mode)),
6 mDb(mTransaction.openDatabase(name.toLatin1(), std::function<void(const Akonadi2::Storage::Error &)>(), true))
7{
8
9}
10
11Index::Index(const QByteArray &name, Akonadi2::Storage::Transaction &transaction)
12 : mDb(transaction.openDatabase(name, std::function<void(const Akonadi2::Storage::Error &)>(), true))
6{ 13{
7 14
8} 15}
9 16
10void Index::add(const QByteArray &key, const QByteArray &value) 17void Index::add(const QByteArray &key, const QByteArray &value)
11{ 18{
12 mStorage.createTransaction(Akonadi2::Storage::ReadWrite).write(key, value); 19 mDb.write(key, value);
13} 20}
14 21
15void Index::lookup(const QByteArray &key, const std::function<void(const QByteArray &value)> &resultHandler, 22void Index::lookup(const QByteArray &key, const std::function<void(const QByteArray &value)> &resultHandler,
16 const std::function<void(const Error &error)> &errorHandler) 23 const std::function<void(const Error &error)> &errorHandler)
17{ 24{
18 if (!mStorage.exists()) { 25 mDb.scan(key, [this, resultHandler](const QByteArray &key, const QByteArray &value) -> bool {
19 errorHandler(Error("index", IndexNotAvailable, "Index not existing"));
20 return;
21 }
22 mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan(key, [this, resultHandler](const QByteArray &key, const QByteArray &value) -> bool {
23 resultHandler(value); 26 resultHandler(value);
24 return true; 27 return true;
25 }, 28 },
diff --git a/common/index.h b/common/index.h
index 08b499f..e1b7e3a 100644
--- a/common/index.h
+++ b/common/index.h
@@ -26,6 +26,7 @@ public:
26 }; 26 };
27 27
28 Index(const QString &storageRoot, const QString &name, Akonadi2::Storage::AccessMode mode = Akonadi2::Storage::ReadOnly); 28 Index(const QString &storageRoot, const QString &name, Akonadi2::Storage::AccessMode mode = Akonadi2::Storage::ReadOnly);
29 Index(const QByteArray &name, Akonadi2::Storage::Transaction &);
29 30
30 void add(const QByteArray &key, const QByteArray &value); 31 void add(const QByteArray &key, const QByteArray &value);
31 // void remove(const QByteArray &key, const QByteArray &value); 32 // void remove(const QByteArray &key, const QByteArray &value);
@@ -35,5 +36,6 @@ public:
35 36
36private: 37private:
37 Q_DISABLE_COPY(Index); 38 Q_DISABLE_COPY(Index);
38 Akonadi2::Storage mStorage; 39 Akonadi2::Storage::Transaction mTransaction;
40 Akonadi2::Storage::NamedDatabase mDb;
39}; 41};
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index ce4ad41..c5e36ee 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -441,18 +441,8 @@ void PipelineState::step()
441 d->idle = false; 441 d->idle = false;
442 if (d->filterIt.hasNext()) { 442 if (d->filterIt.hasNext()) {
443 //TODO skip step if already processed 443 //TODO skip step if already processed
444 //FIXME error handling if no result is found
445 auto preprocessor = d->filterIt.next(); 444 auto preprocessor = d->filterIt.next();
446 //FIXME this read should not be necessary 445 preprocessor->process(*this, d->pipeline->transaction());
447 //Perhaps simply use entity that is initially stored and synchronously process all filters. (Making the first filter somewhat redundant)
448 d->pipeline->transaction().scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool {
449 auto entity = Akonadi2::GetEntity(value);
450 preprocessor->process(*this, *entity);
451 return false;
452 }, [this](const Akonadi2::Storage::Error &error) {
453 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
454 d->pipeline->pipelineCompleted(*this);
455 });
456 } else { 446 } else {
457 //This object becomes invalid after this call 447 //This object becomes invalid after this call
458 d->pipeline->pipelineCompleted(*this); 448 d->pipeline->pipelineCompleted(*this);
@@ -483,7 +473,7 @@ Preprocessor::~Preprocessor()
483{ 473{
484} 474}
485 475
486void Preprocessor::process(const PipelineState &state, const Akonadi2::Entity &) 476void Preprocessor::process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction)
487{ 477{
488 processingCompleted(state); 478 processingCompleted(state);
489} 479}
diff --git a/common/pipeline.h b/common/pipeline.h
index 496e037..fee6a5e 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -115,7 +115,7 @@ public:
115 virtual ~Preprocessor(); 115 virtual ~Preprocessor();
116 116
117 //TODO pass actual command as well, for changerecording 117 //TODO pass actual command as well, for changerecording
118 virtual void process(const PipelineState &state, const Akonadi2::Entity &); 118 virtual void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction);
119 //TODO to record progress 119 //TODO to record progress
120 virtual QString id() const; 120 virtual QString id() const;
121 121
@@ -133,17 +133,24 @@ private:
133class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor 133class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor
134{ 134{
135public: 135public:
136 SimpleProcessor(const QString &id, const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> &f) 136 SimpleProcessor(const QString &id, const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> &f)
137 : Akonadi2::Preprocessor(), 137 : Akonadi2::Preprocessor(),
138 mFunction(f), 138 mFunction(f),
139 mId(id) 139 mId(id)
140 { 140 {
141 } 141 }
142 142
143 void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) Q_DECL_OVERRIDE 143 void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
144 { 144 {
145 mFunction(state, e); 145 transaction.scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool {
146 processingCompleted(state); 146 auto entity = Akonadi2::GetEntity(value);
147 mFunction(state, *entity, transaction);
148 processingCompleted(state);
149 return false;
150 }, [this, state](const Akonadi2::Storage::Error &error) {
151 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
152 processingCompleted(state);
153 });
147 } 154 }
148 155
149 QString id() const 156 QString id() const
@@ -152,7 +159,7 @@ public:
152 } 159 }
153 160
154protected: 161protected:
155 std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> mFunction; 162 std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> mFunction;
156 QString mId; 163 QString mId;
157}; 164};
158 165