diff options
-rw-r--r-- | common/entitystorage.cpp | 30 | ||||
-rw-r--r-- | common/entitystorage.h | 16 |
2 files changed, 22 insertions, 24 deletions
diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp index f84e9f5..420c3b0 100644 --- a/common/entitystorage.cpp +++ b/common/entitystorage.cpp | |||
@@ -19,16 +19,16 @@ | |||
19 | 19 | ||
20 | #include "entitystorage.h" | 20 | #include "entitystorage.h" |
21 | 21 | ||
22 | static void scan(const QSharedPointer<Akonadi2::Storage> &storage, const QByteArray &key, std::function<bool(const QByteArray &key, const Akonadi2::Entity &entity)> callback) | 22 | static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, std::function<bool(const QByteArray &key, const Akonadi2::Entity &entity)> callback) |
23 | { | 23 | { |
24 | storage->scan(key, [=](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { | 24 | transaction.scan(key, [=](const QByteArray &key, const QByteArray &value) -> bool { |
25 | //Skip internals | 25 | //Skip internals |
26 | if (Akonadi2::Storage::isInternalKey(keyValue, keySize)) { | 26 | if (Akonadi2::Storage::isInternalKey(key)) { |
27 | return true; | 27 | return true; |
28 | } | 28 | } |
29 | 29 | ||
30 | //Extract buffers | 30 | //Extract buffers |
31 | Akonadi2::EntityBuffer buffer(dataValue, dataSize); | 31 | Akonadi2::EntityBuffer buffer(value.data(), value.size()); |
32 | 32 | ||
33 | //FIXME implement buffer.isValid() | 33 | //FIXME implement buffer.isValid() |
34 | // const auto resourceBuffer = Akonadi2::EntityBuffer::readBuffer<DummyEvent>(buffer.entity().resource()); | 34 | // const auto resourceBuffer = Akonadi2::EntityBuffer::readBuffer<DummyEvent>(buffer.entity().resource()); |
@@ -39,16 +39,16 @@ static void scan(const QSharedPointer<Akonadi2::Storage> &storage, const QByteAr | |||
39 | // qWarning() << "invalid buffer " << QByteArray::fromRawData(static_cast<char*>(keyValue), keySize); | 39 | // qWarning() << "invalid buffer " << QByteArray::fromRawData(static_cast<char*>(keyValue), keySize); |
40 | // return true; | 40 | // return true; |
41 | // } | 41 | // } |
42 | return callback(QByteArray::fromRawData(static_cast<char*>(keyValue), keySize), buffer.entity()); | 42 | return callback(key, buffer.entity()); |
43 | }, | 43 | }, |
44 | [](const Akonadi2::Storage::Error &error) { | 44 | [](const Akonadi2::Storage::Error &error) { |
45 | qWarning() << "Error during query: " << error.message; | 45 | qWarning() << "Error during query: " << error.message; |
46 | }); | 46 | }); |
47 | } | 47 | } |
48 | 48 | ||
49 | void EntityStorageBase::readValue(const QSharedPointer<Akonadi2::Storage> &storage, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)> &resultCallback) | 49 | void EntityStorageBase::readValue(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)> &resultCallback) |
50 | { | 50 | { |
51 | scan(storage, key, [=](const QByteArray &key, const Akonadi2::Entity &entity) { | 51 | scan(transaction, key, [=](const QByteArray &key, const Akonadi2::Entity &entity) { |
52 | const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata()); | 52 | const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata()); |
53 | qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | 53 | qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; |
54 | //This only works for a 1:1 mapping of resource to domain types. | 54 | //This only works for a 1:1 mapping of resource to domain types. |
@@ -61,11 +61,11 @@ void EntityStorageBase::readValue(const QSharedPointer<Akonadi2::Storage> &stora | |||
61 | }); | 61 | }); |
62 | } | 62 | } |
63 | 63 | ||
64 | static ResultSet fullScan(const QSharedPointer<Akonadi2::Storage> &storage) | 64 | static ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction) |
65 | { | 65 | { |
66 | //TODO use a result set with an iterator, to read values on demand | 66 | //TODO use a result set with an iterator, to read values on demand |
67 | QVector<QByteArray> keys; | 67 | QVector<QByteArray> keys; |
68 | scan(storage, QByteArray(), [=, &keys](const QByteArray &key, const Akonadi2::Entity &) { | 68 | scan(transaction, QByteArray(), [=, &keys](const QByteArray &key, const Akonadi2::Entity &) { |
69 | keys << key; | 69 | keys << key; |
70 | return true; | 70 | return true; |
71 | }); | 71 | }); |
@@ -73,14 +73,14 @@ static ResultSet fullScan(const QSharedPointer<Akonadi2::Storage> &storage) | |||
73 | return ResultSet(keys); | 73 | return ResultSet(keys); |
74 | } | 74 | } |
75 | 75 | ||
76 | ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const QSharedPointer<Akonadi2::Storage> &storage, qint64 baseRevision, qint64 topRevision) | 76 | ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) |
77 | { | 77 | { |
78 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | 78 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); |
79 | 79 | ||
80 | //Read through the source values and return whatever matches the filter | 80 | //Read through the source values and return whatever matches the filter |
81 | std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)>)> generator = [this, resultSetPtr, storage, filter](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)> callback) -> bool { | 81 | std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)>)> generator = [this, resultSetPtr, &transaction, filter](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)> callback) -> bool { |
82 | while (resultSetPtr->next()) { | 82 | while (resultSetPtr->next()) { |
83 | readValue(storage, resultSetPtr->id(), [this, filter, callback](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) { | 83 | readValue(transaction, resultSetPtr->id(), [this, filter, callback](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) { |
84 | if (filter(domainObject)) { | 84 | if (filter(domainObject)) { |
85 | callback(domainObject); | 85 | callback(domainObject); |
86 | } | 86 | } |
@@ -91,7 +91,7 @@ ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std:: | |||
91 | return ResultSet(generator); | 91 | return ResultSet(generator); |
92 | } | 92 | } |
93 | 93 | ||
94 | ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::Storage> &storage, qint64 baseRevision, qint64 topRevision) | 94 | ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) |
95 | { | 95 | { |
96 | QSet<QByteArray> appliedFilters; | 96 | QSet<QByteArray> appliedFilters; |
97 | ResultSet resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters); | 97 | ResultSet resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters); |
@@ -99,7 +99,7 @@ ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, const QS | |||
99 | 99 | ||
100 | //We do a full scan if there were no indexes available to create the initial set. | 100 | //We do a full scan if there were no indexes available to create the initial set. |
101 | if (appliedFilters.isEmpty()) { | 101 | if (appliedFilters.isEmpty()) { |
102 | resultSet = fullScan(storage); | 102 | resultSet = fullScan(transaction); |
103 | } | 103 | } |
104 | 104 | ||
105 | auto filter = [remainingFilters, query, baseRevision, topRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | 105 | auto filter = [remainingFilters, query, baseRevision, topRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { |
@@ -118,5 +118,5 @@ ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, const QS | |||
118 | return true; | 118 | return true; |
119 | }; | 119 | }; |
120 | 120 | ||
121 | return filteredSet(resultSet, filter, storage, baseRevision, topRevision); | 121 | return filteredSet(resultSet, filter, transaction, baseRevision, topRevision); |
122 | } | 122 | } |
diff --git a/common/entitystorage.h b/common/entitystorage.h index a62d474..516a889 100644 --- a/common/entitystorage.h +++ b/common/entitystorage.h | |||
@@ -45,9 +45,9 @@ protected: | |||
45 | virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &) = 0; | 45 | virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &) = 0; |
46 | virtual ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters) = 0; | 46 | virtual ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters) = 0; |
47 | 47 | ||
48 | void readValue(const QSharedPointer<Akonadi2::Storage> &storage, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)> &resultCallback); | 48 | void readValue(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)> &resultCallback); |
49 | ResultSet filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const QSharedPointer<Akonadi2::Storage> &storage, qint64 baseRevision, qint64 topRevision); | 49 | ResultSet filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); |
50 | ResultSet getResultSet(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::Storage> &storage, qint64 baseRevision, qint64 topRevision); | 50 | ResultSet getResultSet(const Akonadi2::Query &query, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); |
51 | 51 | ||
52 | protected: | 52 | protected: |
53 | QByteArray mResourceInstanceIdentifier; | 53 | QByteArray mResourceInstanceIdentifier; |
@@ -85,23 +85,21 @@ public: | |||
85 | 85 | ||
86 | virtual void read(const Akonadi2::Query &query, const QPair<qint64, qint64> &revisionRange, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) | 86 | virtual void read(const Akonadi2::Query &query, const QPair<qint64, qint64> &revisionRange, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) |
87 | { | 87 | { |
88 | auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), mResourceInstanceIdentifier); | 88 | Akonadi2::Storage storage(Akonadi2::Store::storageLocation(), mResourceInstanceIdentifier); |
89 | storage->setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { | 89 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { |
90 | Warning() << "Error during query: " << error.store << error.message; | 90 | Warning() << "Error during query: " << error.store << error.message; |
91 | }); | 91 | }); |
92 | 92 | ||
93 | storage->startTransaction(Akonadi2::Storage::ReadOnly); | 93 | auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); |
94 | //TODO start transaction on indexes as well | ||
95 | 94 | ||
96 | Log() << "Querying" << revisionRange.first << revisionRange.second; | 95 | Log() << "Querying" << revisionRange.first << revisionRange.second; |
97 | auto resultSet = getResultSet(query, storage, revisionRange.first, revisionRange.second); | 96 | auto resultSet = getResultSet(query, transaction, revisionRange.first, revisionRange.second); |
98 | while(resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value) -> bool { | 97 | while(resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value) -> bool { |
99 | auto cloned = copy(*value); | 98 | auto cloned = copy(*value); |
100 | resultProvider->add(cloned.template staticCast<DomainType>()); | 99 | resultProvider->add(cloned.template staticCast<DomainType>()); |
101 | return true; | 100 | return true; |
102 | })){}; | 101 | })){}; |
103 | //TODO replay removals and modifications | 102 | //TODO replay removals and modifications |
104 | storage->abortTransaction(); | ||
105 | } | 103 | } |
106 | 104 | ||
107 | }; | 105 | }; |