summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/facade.cpp352
-rw-r--r--common/facade.h335
3 files changed, 370 insertions, 318 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index bdb9eac..01056d0 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -12,6 +12,7 @@ else (STORAGE_unqlite)
12endif (STORAGE_unqlite) 12endif (STORAGE_unqlite)
13 13
14set(command_SRCS 14set(command_SRCS
15 modelresult.cpp
15 definitions.cpp 16 definitions.cpp
16 log.cpp 17 log.cpp
17 entitybuffer.cpp 18 entitybuffer.cpp
diff --git a/common/facade.cpp b/common/facade.cpp
index e51b32a..b4931cf 100644
--- a/common/facade.cpp
+++ b/common/facade.cpp
@@ -1,5 +1,5 @@
1/* 1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm> 2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 * 3 *
4 * This program is free software; you can redistribute it and/or modify 4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by 5 * it under the terms of the GNU General Public License as published by
@@ -18,3 +18,353 @@
18 */ 18 */
19 19
20#include "facade.h" 20#include "facade.h"
21
22#include "commands.h"
23#include "domainadaptor.h"
24#include "log.h"
25#include "storage.h"
26#include "definitions.h"
27
28using namespace Akonadi2;
29
30/**
31 * A QueryRunner runs a query and updates the corresponding result set.
32 *
33 * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work),
34 * and by how long a result set must be updated. If the query is one off the runner dies after the execution,
35 * otherwise it lives on the react to changes and updates the corresponding result set.
36 *
37 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
38 */
39class QueryRunner : public QObject
40{
41 Q_OBJECT
42public:
43 typedef std::function<KAsync::Job<void>()> QueryFunction;
44
45 QueryRunner(const Akonadi2::Query &query) {};
46 /**
47 * Starts query
48 */
49 KAsync::Job<void> run(qint64 newRevision = 0)
50 {
51 return queryFunction();
52 }
53
54 /**
55 * Set the query to run
56 */
57 void setQuery(const QueryFunction &query)
58 {
59 queryFunction = query;
60 }
61
62public slots:
63 /**
64 * Rerun query with new revision
65 */
66 void revisionChanged(qint64 newRevision)
67 {
68 Trace() << "New revision: " << newRevision;
69 run().exec();
70 }
71
72private:
73 QueryFunction queryFunction;
74};
75
76static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
77{
78 //TODO use a result set with an iterator, to read values on demand
79 QVector<QByteArray> keys;
80 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
81 //Skip internals
82 if (Akonadi2::Storage::isInternalKey(key)) {
83 return true;
84 }
85 keys << Akonadi2::Storage::uidFromKey(key);
86 return true;
87 },
88 [](const Akonadi2::Storage::Error &error) {
89 qWarning() << "Error during query: " << error.message;
90 });
91
92 Trace() << "Full scan found " << keys.size() << " results";
93 return ResultSet(keys);
94}
95
96
97
98template<class DomainType>
99GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess)
100 : Akonadi2::StoreFacade<DomainType>(),
101 mResourceAccess(resourceAccess),
102 mDomainTypeAdaptorFactory(adaptorFactory),
103 mResourceInstanceIdentifier(resourceIdentifier)
104{
105 if (!mResourceAccess) {
106 mResourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resourceIdentifier);
107 }
108}
109
110template<class DomainType>
111GenericFacade<DomainType>::~GenericFacade()
112{
113}
114
115template<class DomainType>
116QByteArray GenericFacade<DomainType>::bufferTypeForDomainType()
117{
118 //We happen to have a one to one mapping
119 return Akonadi2::ApplicationDomain::getTypeName<DomainType>();
120}
121
122template<class DomainType>
123KAsync::Job<void> GenericFacade<DomainType>::create(const DomainType &domainObject)
124{
125 if (!mDomainTypeAdaptorFactory) {
126 Warning() << "No domain type adaptor factory available";
127 return KAsync::error<void>();
128 }
129 flatbuffers::FlatBufferBuilder entityFbb;
130 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
131 return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
132}
133
134template<class DomainType>
135KAsync::Job<void> GenericFacade<DomainType>::modify(const DomainType &domainObject)
136{
137 if (!mDomainTypeAdaptorFactory) {
138 Warning() << "No domain type adaptor factory available";
139 return KAsync::error<void>();
140 }
141 flatbuffers::FlatBufferBuilder entityFbb;
142 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
143 return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
144}
145
146template<class DomainType>
147KAsync::Job<void> GenericFacade<DomainType>::remove(const DomainType &domainObject)
148{
149 return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType());
150}
151
152template<class DomainType>
153KAsync::Job<void> GenericFacade<DomainType>::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
154{
155 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
156 resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) {
157 const qint64 newRevision = executeInitialQuery(query, parent, resultProvider);
158 mResourceAccess->sendRevisionReplayedCommand(newRevision);
159 });
160
161
162 //In case of a live query we keep the runner for as long alive as the result provider exists
163 if (query.liveQuery) {
164 auto runner = QSharedPointer<QueryRunner>::create(query);
165 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
166 runner->setQuery([this, query, &resultProvider] () -> KAsync::Job<void> {
167 return KAsync::start<void>([this, query, &resultProvider](KAsync::Future<void> &future) {
168 const qint64 newRevision = executeIncrementalQuery(query, resultProvider);
169 mResourceAccess->sendRevisionReplayedCommand(newRevision);
170 future.setFinished();
171 });
172 });
173 resultProvider.setQueryRunner(runner);
174 //Ensure the connection is open, if it wasn't already opened
175 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
176 mResourceAccess->open();
177 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged);
178 }
179 return KAsync::null<void>();
180}
181
182 //TODO move into result provider?
183template<class DomainType>
184void GenericFacade<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
185{
186 while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
187 switch (operation) {
188 case Akonadi2::Operation_Creation:
189 Trace() << "Got creation";
190 resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
191 break;
192 case Akonadi2::Operation_Modification:
193 Trace() << "Got modification";
194 resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
195 break;
196 case Akonadi2::Operation_Removal:
197 Trace() << "Got removal";
198 resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
199 break;
200 }
201 return true;
202 })){};
203}
204
205template<class DomainType>
206void GenericFacade<DomainType>::readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
207{
208 const auto bufferType = bufferTypeForDomainType();
209 //This only works for a 1:1 mapping of resource to domain types.
210 //Not i.e. for tags that are stored as flags in each entity of an imap store.
211 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
212 //could be added to the adaptor.
213 //
214 // Akonadi2::Storage::getLatest(transaction, bufferTye, key);
215 transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
216 Akonadi2::EntityBuffer buffer(value.data(), value.size());
217 const Akonadi2::Entity &entity = buffer.entity();
218 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
219 Q_ASSERT(metadataBuffer);
220 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
221 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
222 return false;
223 },
224 [](const Akonadi2::Storage::Error &error) {
225 qWarning() << "Error during query: " << error.message;
226 });
227}
228
229template<class DomainType>
230ResultSet GenericFacade<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
231{
232 QSet<QByteArray> appliedFilters;
233 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
234 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
235
236 //We do a full scan if there were no indexes available to create the initial set.
237 if (appliedFilters.isEmpty()) {
238 //TODO this should be replaced by an index lookup as well
239 resultSet = fullScan(transaction, bufferTypeForDomainType());
240 }
241 return resultSet;
242}
243
244template<class DomainType>
245ResultSet GenericFacade<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
246{
247 const auto bufferType = bufferTypeForDomainType();
248 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
249 remainingFilters = query.propertyFilter.keys().toSet();
250 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
251 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
252 //Spit out the revision keys one by one.
253 while (*revisionCounter <= topRevision) {
254 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
255 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
256 Trace() << "Revision" << *revisionCounter << type << uid;
257 if (type != bufferType) {
258 //Skip revision
259 *revisionCounter += 1;
260 continue;
261 }
262 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
263 *revisionCounter += 1;
264 return key;
265 }
266 Trace() << "Finished reading incremental result set:" << *revisionCounter;
267 //We're done
268 return QByteArray();
269 });
270}
271
272template<class DomainType>
273ResultSet GenericFacade<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery)
274{
275 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
276
277 //Read through the source values and return whatever matches the filter
278 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
279 while (resultSetPtr->next()) {
280 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
281 readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
282 //Always remove removals, they probably don't match due to non-available properties
283 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
284 if (initialQuery) {
285 //We're not interested in removals during the initial query
286 if (operation != Akonadi2::Operation_Removal) {
287 callback(domainObject, Akonadi2::Operation_Creation);
288 }
289 } else {
290 callback(domainObject, operation);
291 }
292 }
293 });
294 }
295 return false;
296 };
297 return ResultSet(generator);
298}
299
300
301template<class DomainType>
302std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> GenericFacade<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
303{
304 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
305 for (const auto &filterProperty : remainingFilters) {
306 const auto property = domainObject->getProperty(filterProperty);
307 if (property.isValid()) {
308 //TODO implement other comparison operators than equality
309 if (property != query.propertyFilter.value(filterProperty)) {
310 Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty);
311 return false;
312 }
313 } else {
314 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
315 }
316 }
317 return true;
318 };
319}
320
321template<class DomainType>
322qint64 GenericFacade<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
323{
324 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
325 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
326 Warning() << "Error during query: " << error.store << error.message;
327 });
328 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
329
330 QSet<QByteArray> remainingFilters;
331 auto resultSet = baseSetRetriever(transaction, remainingFilters);
332 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false);
333 replaySet(filteredSet, resultProvider);
334 resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction));
335 return Akonadi2::Storage::maxRevision(transaction);
336}
337
338
339template<class DomainType>
340qint64 GenericFacade<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
341{
342 const qint64 baseRevision = resultProvider.revision() + 1;
343 Trace() << "Running incremental query " << baseRevision;
344 return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
345 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
346 }, resultProvider);
347}
348
349template<class DomainType>
350qint64 GenericFacade<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
351{
352 auto modifiedQuery = query;
353 if (parent) {
354 Trace() << "Running initial query for parent:" << parent->identifier();
355 modifiedQuery.propertyFilter.insert("parent", parent->identifier());
356 } else {
357 Trace() << "Running initial query for toplevel";
358 modifiedQuery.propertyFilter.insert("parent", QVariant());
359 }
360 return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
361 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters);
362 }, resultProvider);
363}
364
365template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>;
366template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>;
367template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>;
368// template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::AkonadiResource>;
369
370#include "facade.moc"
diff --git a/common/facade.h b/common/facade.h
index 8b8a2a8..aa50941 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -1,5 +1,5 @@
1/* 1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> 2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
3 * 3 *
4 * This program is free software; you can redistribute it and/or modify 4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by 5 * it under the terms of the GNU General Public License as published by
@@ -25,79 +25,8 @@
25#include <Async/Async> 25#include <Async/Async>
26 26
27#include "resourceaccess.h" 27#include "resourceaccess.h"
28#include "commands.h"
29#include "domainadaptor.h"
30#include "log.h"
31#include "resultset.h" 28#include "resultset.h"
32#include "storage.h" 29#include "domainadaptor.h"
33#include "definitions.h"
34
35/**
36 * A QueryRunner runs a query and updates the corresponding result set.
37 *
38 * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work),
39 * and by how long a result set must be updated. If the query is one off the runner dies after the execution,
40 * otherwise it lives on the react to changes and updates the corresponding result set.
41 *
42 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
43 */
44class QueryRunner : public QObject
45{
46 Q_OBJECT
47public:
48 typedef std::function<KAsync::Job<void>()> QueryFunction;
49
50 QueryRunner(const Akonadi2::Query &query) {};
51 /**
52 * Starts query
53 */
54 KAsync::Job<void> run(qint64 newRevision = 0)
55 {
56 return queryFunction();
57 }
58
59 /**
60 * Set the query to run
61 */
62 void setQuery(const QueryFunction &query)
63 {
64 queryFunction = query;
65 }
66
67public slots:
68 /**
69 * Rerun query with new revision
70 */
71 void revisionChanged(qint64 newRevision)
72 {
73 Trace() << "New revision: " << newRevision;
74 run().exec();
75 }
76
77private:
78 QueryFunction queryFunction;
79};
80
81static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
82{
83 //TODO use a result set with an iterator, to read values on demand
84 QVector<QByteArray> keys;
85 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
86 //Skip internals
87 if (Akonadi2::Storage::isInternalKey(key)) {
88 return true;
89 }
90 keys << Akonadi2::Storage::uidFromKey(key);
91 return true;
92 },
93 [](const Akonadi2::Storage::Error &error) {
94 qWarning() << "Error during query: " << error.message;
95 });
96
97 Trace() << "Full scan found " << keys.size() << " results";
98 return ResultSet(keys);
99}
100
101 30
102namespace Akonadi2 { 31namespace Akonadi2 {
103/** 32/**
@@ -121,257 +50,29 @@ public:
121 * @param resourceIdentifier is the identifier of the resource instance 50 * @param resourceIdentifier is the identifier of the resource instance
122 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa 51 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa
123 */ 52 */
124 GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>()) 53 GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>());
125 : Akonadi2::StoreFacade<DomainType>(), 54 ~GenericFacade();
126 mResourceAccess(resourceAccess),
127 mDomainTypeAdaptorFactory(adaptorFactory),
128 mResourceInstanceIdentifier(resourceIdentifier)
129 {
130 if (!mResourceAccess) {
131 mResourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resourceIdentifier);
132 }
133 }
134
135 ~GenericFacade()
136 {
137 }
138
139 static QByteArray bufferTypeForDomainType()
140 {
141 //We happen to have a one to one mapping
142 return Akonadi2::ApplicationDomain::getTypeName<DomainType>();
143 }
144
145 KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE
146 {
147 if (!mDomainTypeAdaptorFactory) {
148 Warning() << "No domain type adaptor factory available";
149 return KAsync::error<void>();
150 }
151 flatbuffers::FlatBufferBuilder entityFbb;
152 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
153 return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
154 }
155
156 KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE
157 {
158 if (!mDomainTypeAdaptorFactory) {
159 Warning() << "No domain type adaptor factory available";
160 return KAsync::error<void>();
161 }
162 flatbuffers::FlatBufferBuilder entityFbb;
163 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
164 return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
165 }
166
167 KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE
168 {
169 return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType());
170 }
171
172 KAsync::Job<void> load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) Q_DECL_OVERRIDE
173 {
174 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
175 resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) {
176 const qint64 newRevision = executeInitialQuery(query, parent, resultProvider);
177 mResourceAccess->sendRevisionReplayedCommand(newRevision);
178 });
179
180 55
181 //In case of a live query we keep the runner for as long alive as the result provider exists 56 static QByteArray bufferTypeForDomainType();
182 if (query.liveQuery) { 57 KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE;
183 auto runner = QSharedPointer<QueryRunner>::create(query); 58 KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE;
184 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 59 KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE;
185 runner->setQuery([this, query, &resultProvider] () -> KAsync::Job<void> { 60 KAsync::Job<void> load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) Q_DECL_OVERRIDE;
186 return KAsync::start<void>([this, query, &resultProvider](KAsync::Future<void> &future) {
187 const qint64 newRevision = executeIncrementalQuery(query, resultProvider);
188 mResourceAccess->sendRevisionReplayedCommand(newRevision);
189 future.setFinished();
190 });
191 });
192 resultProvider.setQueryRunner(runner);
193 //Ensure the connection is open, if it wasn't already opened
194 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
195 mResourceAccess->open();
196 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged);
197 }
198 return KAsync::null<void>();
199 }
200 61
201private: 62private:
202
203 //TODO move into result provider? 63 //TODO move into result provider?
204 static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 64 static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
205 {
206 while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
207 switch (operation) {
208 case Akonadi2::Operation_Creation:
209 Trace() << "Got creation";
210 resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
211 break;
212 case Akonadi2::Operation_Modification:
213 Trace() << "Got modification";
214 resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
215 break;
216 case Akonadi2::Operation_Removal:
217 Trace() << "Got removal";
218 resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
219 break;
220 }
221 return true;
222 })){};
223 }
224
225 void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
226 {
227 const auto bufferType = bufferTypeForDomainType();
228 //This only works for a 1:1 mapping of resource to domain types.
229 //Not i.e. for tags that are stored as flags in each entity of an imap store.
230 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
231 //could be added to the adaptor.
232 //
233 // Akonadi2::Storage::getLatest(transaction, bufferTye, key);
234 transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
235 Akonadi2::EntityBuffer buffer(value.data(), value.size());
236 const Akonadi2::Entity &entity = buffer.entity();
237 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
238 Q_ASSERT(metadataBuffer);
239 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
240 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
241 return false;
242 },
243 [](const Akonadi2::Storage::Error &error) {
244 qWarning() << "Error during query: " << error.message;
245 });
246 }
247
248 ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
249 {
250 QSet<QByteArray> appliedFilters;
251 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
252 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
253
254 //We do a full scan if there were no indexes available to create the initial set.
255 if (appliedFilters.isEmpty()) {
256 //TODO this should be replaced by an index lookup as well
257 resultSet = fullScan(transaction, bufferTypeForDomainType());
258 }
259 return resultSet;
260 }
261
262 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
263 {
264 const auto bufferType = bufferTypeForDomainType();
265 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
266 remainingFilters = query.propertyFilter.keys().toSet();
267 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
268 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
269 //Spit out the revision keys one by one.
270 while (*revisionCounter <= topRevision) {
271 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
272 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
273 Trace() << "Revision" << *revisionCounter << type << uid;
274 if (type != bufferType) {
275 //Skip revision
276 *revisionCounter += 1;
277 continue;
278 }
279 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
280 *revisionCounter += 1;
281 return key;
282 }
283 //We're done
284 return QByteArray();
285 });
286 }
287
288 ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery)
289 {
290 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
291
292 //Read through the source values and return whatever matches the filter
293 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
294 while (resultSetPtr->next()) {
295 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
296 readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
297 //Always remove removals, they probably don't match due to non-available properties
298 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
299 if (initialQuery) {
300 //We're not interested in removals during the initial query
301 if (operation != Akonadi2::Operation_Removal) {
302 callback(domainObject, Akonadi2::Operation_Creation);
303 }
304 } else {
305 callback(domainObject, operation);
306 }
307 }
308 });
309 }
310 return false;
311 };
312 return ResultSet(generator);
313 }
314
315
316 std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
317 {
318 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
319 for (const auto &filterProperty : remainingFilters) {
320 const auto property = domainObject->getProperty(filterProperty);
321 if (property.isValid()) {
322 //TODO implement other comparison operators than equality
323 if (property != query.propertyFilter.value(filterProperty)) {
324 Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty);
325 return false;
326 }
327 } else {
328 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
329 }
330 }
331 return true;
332 };
333 }
334
335 qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
336 {
337 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
338 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
339 Warning() << "Error during query: " << error.store << error.message;
340 });
341 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
342
343 QSet<QByteArray> remainingFilters;
344 auto resultSet = baseSetRetriever(transaction, remainingFilters);
345 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false);
346 replaySet(filteredSet, resultProvider);
347 resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction));
348 return Akonadi2::Storage::maxRevision(transaction);
349 }
350 65
66 void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback);
351 67
352 qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 68 ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
353 { 69 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
354 const qint64 baseRevision = resultProvider.revision() + 1;
355 Trace() << "Running incremental query " << baseRevision;
356 return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
357 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
358 }, resultProvider);
359 }
360 70
361 qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 71 ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery);
362 { 72 std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query);
363 auto modifiedQuery = query; 73 qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
364 if (parent) { 74 qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
365 Trace() << "Running initial query for parent:" << parent->identifier(); 75 qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
366 modifiedQuery.propertyFilter.insert("parent", parent->identifier());
367 } else {
368 Trace() << "Running initial query for toplevel";
369 modifiedQuery.propertyFilter.insert("parent", QVariant());
370 }
371 return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
372 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters);
373 }, resultProvider);
374 }
375 76
376protected: 77protected:
377 //TODO use one resource access instance per application & per resource 78 //TODO use one resource access instance per application & per resource