diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-06 16:19:51 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-10 10:40:01 +0200 |
commit | f689ad1021a7805f6f8b6a81f534b4cb9ca91f51 (patch) | |
tree | c18d746b775279f143c8d8052924bb4d83fbb91f /common/entitystorage.cpp | |
parent | c3f6e72c2d46906a4699127b558ca248729ce577 (diff) | |
download | sink-f689ad1021a7805f6f8b6a81f534b4cb9ca91f51.tar.gz sink-f689ad1021a7805f6f8b6a81f534b4cb9ca91f51.zip |
Change replay
So far only includes modifications and additions,
removals are not yet stored as separate revisions.
Diffstat (limited to 'common/entitystorage.cpp')
-rw-r--r-- | common/entitystorage.cpp | 75 |
1 files changed, 60 insertions, 15 deletions
diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp index 0eb2763..60d58ad 100644 --- a/common/entitystorage.cpp +++ b/common/entitystorage.cpp | |||
@@ -44,18 +44,21 @@ static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteA | |||
44 | }); | 44 | }); |
45 | } | 45 | } |
46 | 46 | ||
47 | void EntityStorageBase::readValue(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)> &resultCallback) | 47 | void EntityStorageBase::readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback) |
48 | { | 48 | { |
49 | //This only works for a 1:1 mapping of resource to domain types. | ||
50 | //Not i.e. for tags that are stored as flags in each entity of an imap store. | ||
51 | //additional properties that don't have a 1:1 mapping (such as separately stored tags), | ||
52 | //could be added to the adaptor. | ||
53 | //TODO: resource implementations should be able to customize the retrieval function for non 1:1 entity-buffer mapping cases | ||
49 | scan(transaction, key, [=](const QByteArray &key, const Akonadi2::Entity &entity) { | 54 | scan(transaction, key, [=](const QByteArray &key, const Akonadi2::Entity &entity) { |
50 | const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata()); | 55 | const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata()); |
56 | Q_ASSERT(metadataBuffer); | ||
51 | qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | 57 | qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; |
52 | //This only works for a 1:1 mapping of resource to domain types. | 58 | auto operation = metadataBuffer->operation(); |
53 | //Not i.e. for tags that are stored as flags in each entity of an imap store. | ||
54 | //additional properties that don't have a 1:1 mapping (such as separately stored tags), | ||
55 | //could be added to the adaptor. | ||
56 | 59 | ||
57 | auto domainObject = create(key, revision, mDomainTypeAdaptorFactory->createAdaptor(entity)); | 60 | auto domainObject = create(key, revision, mDomainTypeAdaptorFactory->createAdaptor(entity)); |
58 | resultCallback(domainObject); | 61 | resultCallback(domainObject, operation); |
59 | return false; | 62 | return false; |
60 | }, mBufferType); | 63 | }, mBufferType); |
61 | } | 64 | } |
@@ -80,16 +83,22 @@ static ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, con | |||
80 | return ResultSet(keys); | 83 | return ResultSet(keys); |
81 | } | 84 | } |
82 | 85 | ||
83 | ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) | 86 | ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) |
84 | { | 87 | { |
85 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | 88 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); |
86 | 89 | ||
87 | //Read through the source values and return whatever matches the filter | 90 | //Read through the source values and return whatever matches the filter |
88 | std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)>)> generator = [this, resultSetPtr, &transaction, filter](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &)> callback) -> bool { | 91 | std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool { |
89 | while (resultSetPtr->next()) { | 92 | while (resultSetPtr->next()) { |
90 | readValue(transaction, resultSetPtr->id(), [this, filter, callback](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) { | 93 | //TODO. every read value is actually a revision that contains one of three operations. Reflect that so the result set can be updated appropriately. |
94 | //TODO while getting the initial set everything is adding | ||
95 | readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { | ||
91 | if (filter(domainObject)) { | 96 | if (filter(domainObject)) { |
92 | callback(domainObject); | 97 | if (initialQuery) { |
98 | callback(domainObject, Akonadi2::Operation_Creation); | ||
99 | } else { | ||
100 | callback(domainObject, operation); | ||
101 | } | ||
93 | } | 102 | } |
94 | }); | 103 | }); |
95 | } | 104 | } |
@@ -98,15 +107,51 @@ ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std:: | |||
98 | return ResultSet(generator); | 107 | return ResultSet(generator); |
99 | } | 108 | } |
100 | 109 | ||
101 | ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) | 110 | ResultSet EntityStorageBase::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) |
102 | { | 111 | { |
103 | QSet<QByteArray> appliedFilters; | 112 | QSet<QByteArray> appliedFilters; |
104 | ResultSet resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); | 113 | auto resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); |
105 | const auto remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; | 114 | remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; |
106 | 115 | ||
107 | //We do a full scan if there were no indexes available to create the initial set. | 116 | //We do a full scan if there were no indexes available to create the initial set. |
108 | if (appliedFilters.isEmpty()) { | 117 | if (appliedFilters.isEmpty()) { |
109 | resultSet = fullScan(transaction, mBufferType); | 118 | //TODO this should be replaced by an index lookup as well |
119 | return fullScan(transaction, mBufferType); | ||
120 | } | ||
121 | return resultSet; | ||
122 | } | ||
123 | |||
124 | ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) | ||
125 | { | ||
126 | QSet<QByteArray> remainingFilters = query.propertyFilter.keys().toSet(); | ||
127 | ResultSet resultSet; | ||
128 | const bool initialQuery = (baseRevision == 0); | ||
129 | if (initialQuery) { | ||
130 | Trace() << "Initial result set update"; | ||
131 | resultSet = loadInitialResultSet(query, transaction, remainingFilters); | ||
132 | } else { | ||
133 | //TODO fallback in case the old revision is no longer available to clear + redo complete initial scan | ||
134 | Trace() << "Incremental result set update" << baseRevision << topRevision; | ||
135 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
136 | resultSet = ResultSet([revisionCounter, topRevision, &transaction, this]() -> QByteArray { | ||
137 | //Spit out the revision keys one by one. | ||
138 | while (*revisionCounter <= topRevision) { | ||
139 | const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); | ||
140 | const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); | ||
141 | Trace() << "Revision" << *revisionCounter << type << uid; | ||
142 | if (type != mBufferType) { | ||
143 | //Skip revision | ||
144 | *revisionCounter += 1; | ||
145 | continue; | ||
146 | } | ||
147 | const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); | ||
148 | *revisionCounter += 1; | ||
149 | return key; | ||
150 | } | ||
151 | //We're done | ||
152 | //FIXME make sure result set understands that this means we're done | ||
153 | return QByteArray(); | ||
154 | }); | ||
110 | } | 155 | } |
111 | 156 | ||
112 | auto filter = [remainingFilters, query, baseRevision, topRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | 157 | auto filter = [remainingFilters, query, baseRevision, topRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { |
@@ -125,5 +170,5 @@ ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2 | |||
125 | return true; | 170 | return true; |
126 | }; | 171 | }; |
127 | 172 | ||
128 | return filteredSet(resultSet, filter, transaction, baseRevision, topRevision); | 173 | return filteredSet(resultSet, filter, transaction, initialQuery); |
129 | } | 174 | } |