summaryrefslogtreecommitdiffstats
path: root/common/facade.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/facade.cpp')
-rw-r--r--common/facade.cpp282
1 files changed, 5 insertions, 277 deletions
diff --git a/common/facade.cpp b/common/facade.cpp
index 92124fc..1d6b9a7 100644
--- a/common/facade.cpp
+++ b/common/facade.cpp
@@ -24,76 +24,10 @@
24#include "storage.h" 24#include "storage.h"
25#include "definitions.h" 25#include "definitions.h"
26#include "domainadaptor.h" 26#include "domainadaptor.h"
27#include "queryrunner.h"
27 28
28using namespace Akonadi2; 29using namespace Akonadi2;
29 30
30/**
31 * A QueryRunner runs a query and updates the corresponding result set.
32 *
33 * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work),
34 * and by how long a result set must be updated. If the query is one off the runner dies after the execution,
35 * otherwise it lives on the react to changes and updates the corresponding result set.
36 *
37 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
38 */
39class QueryRunner : public QObject
40{
41 Q_OBJECT
42public:
43 typedef std::function<KAsync::Job<void>()> QueryFunction;
44
45 QueryRunner(const Akonadi2::Query &query) {};
46 /**
47 * Starts query
48 */
49 KAsync::Job<void> run(qint64 newRevision = 0)
50 {
51 return queryFunction();
52 }
53
54 /**
55 * Set the query to run
56 */
57 void setQuery(const QueryFunction &query)
58 {
59 queryFunction = query;
60 }
61
62public slots:
63 /**
64 * Rerun query with new revision
65 */
66 void revisionChanged(qint64 newRevision)
67 {
68 Trace() << "New revision: " << newRevision;
69 run().exec();
70 }
71
72private:
73 QueryFunction queryFunction;
74};
75
76static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
77{
78 //TODO use a result set with an iterator, to read values on demand
79 QVector<QByteArray> keys;
80 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
81 //Skip internals
82 if (Akonadi2::Storage::isInternalKey(key)) {
83 return true;
84 }
85 keys << Akonadi2::Storage::uidFromKey(key);
86 return true;
87 },
88 [](const Akonadi2::Storage::Error &error) {
89 qWarning() << "Error during query: " << error.message;
90 });
91
92 Trace() << "Full scan found " << keys.size() << " results";
93 return ResultSet(keys);
94}
95
96
97 31
98template<class DomainType> 32template<class DomainType>
99GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess) 33GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess)
@@ -150,220 +84,14 @@ KAsync::Job<void> GenericFacade<DomainType>::remove(const DomainType &domainObje
150} 84}
151 85
152template<class DomainType> 86template<class DomainType>
153KAsync::Job<void> GenericFacade<DomainType>::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 87QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> GenericFacade<DomainType>::load(const Akonadi2::Query &query)
154{
155 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
156 resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) {
157 const qint64 newRevision = executeInitialQuery(query, parent, resultProvider);
158 mResourceAccess->sendRevisionReplayedCommand(newRevision);
159 });
160
161
162 //In case of a live query we keep the runner for as long alive as the result provider exists
163 if (query.liveQuery) {
164 auto runner = QSharedPointer<QueryRunner>::create(query);
165 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
166 runner->setQuery([this, query, &resultProvider] () -> KAsync::Job<void> {
167 return KAsync::start<void>([this, query, &resultProvider](KAsync::Future<void> &future) {
168 const qint64 newRevision = executeIncrementalQuery(query, resultProvider);
169 mResourceAccess->sendRevisionReplayedCommand(newRevision);
170 future.setFinished();
171 });
172 });
173 resultProvider.setQueryRunner(runner);
174 //Ensure the connection is open, if it wasn't already opened
175 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
176 mResourceAccess->open();
177 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged);
178 }
179 return KAsync::null<void>();
180}
181
182 //TODO move into result provider?
183template<class DomainType>
184void GenericFacade<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
185{
186 while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
187 switch (operation) {
188 case Akonadi2::Operation_Creation:
189 // Trace() << "Got creation";
190 resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
191 break;
192 case Akonadi2::Operation_Modification:
193 // Trace() << "Got modification";
194 resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
195 break;
196 case Akonadi2::Operation_Removal:
197 // Trace() << "Got removal";
198 resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
199 break;
200 }
201 return true;
202 })){};
203}
204
205template<class DomainType>
206void GenericFacade<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
207{
208 //This only works for a 1:1 mapping of resource to domain types.
209 //Not i.e. for tags that are stored as flags in each entity of an imap store.
210 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
211 //could be added to the adaptor.
212 //
213 // Akonadi2::Storage::getLatest(transaction, bufferTye, key);
214 db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
215 Akonadi2::EntityBuffer buffer(value.data(), value.size());
216 const Akonadi2::Entity &entity = buffer.entity();
217 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
218 Q_ASSERT(metadataBuffer);
219 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
220 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
221 return false;
222 },
223 [](const Akonadi2::Storage::Error &error) {
224 qWarning() << "Error during query: " << error.message;
225 });
226}
227
228template<class DomainType>
229ResultSet GenericFacade<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
230{ 88{
231 QSet<QByteArray> appliedFilters; 89 //The runner lives for the lifetime of the query
232 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); 90 auto runner = new QueryRunner<DomainType>(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType());
233 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; 91 return qMakePair(KAsync::null<void>(), runner->emitter());
234
235 //We do a full scan if there were no indexes available to create the initial set.
236 if (appliedFilters.isEmpty()) {
237 //TODO this should be replaced by an index lookup as well
238 resultSet = fullScan(transaction, bufferTypeForDomainType());
239 }
240 return resultSet;
241}
242
243template<class DomainType>
244ResultSet GenericFacade<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
245{
246 const auto bufferType = bufferTypeForDomainType();
247 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
248 remainingFilters = query.propertyFilter.keys().toSet();
249 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
250 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
251 //Spit out the revision keys one by one.
252 while (*revisionCounter <= topRevision) {
253 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
254 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
255 Trace() << "Revision" << *revisionCounter << type << uid;
256 if (type != bufferType) {
257 //Skip revision
258 *revisionCounter += 1;
259 continue;
260 }
261 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
262 *revisionCounter += 1;
263 return key;
264 }
265 Trace() << "Finished reading incremental result set:" << *revisionCounter;
266 //We're done
267 return QByteArray();
268 });
269}
270
271template<class DomainType>
272ResultSet GenericFacade<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery)
273{
274 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
275
276 //Read through the source values and return whatever matches the filter
277 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
278 while (resultSetPtr->next()) {
279 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
280 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
281 //Always remove removals, they probably don't match due to non-available properties
282 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
283 if (initialQuery) {
284 //We're not interested in removals during the initial query
285 if (operation != Akonadi2::Operation_Removal) {
286 callback(domainObject, Akonadi2::Operation_Creation);
287 }
288 } else {
289 callback(domainObject, operation);
290 }
291 }
292 });
293 }
294 return false;
295 };
296 return ResultSet(generator);
297}
298
299
300template<class DomainType>
301std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> GenericFacade<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
302{
303 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
304 for (const auto &filterProperty : remainingFilters) {
305 const auto property = domainObject->getProperty(filterProperty);
306 if (property.isValid()) {
307 //TODO implement other comparison operators than equality
308 if (property != query.propertyFilter.value(filterProperty)) {
309 Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty);
310 return false;
311 }
312 } else {
313 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
314 }
315 }
316 return true;
317 };
318}
319
320template<class DomainType>
321qint64 GenericFacade<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery)
322{
323 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
324 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
325 Warning() << "Error during query: " << error.store << error.message;
326 });
327 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
328 auto db = transaction.openDatabase(bufferTypeForDomainType() + ".main");
329
330 QSet<QByteArray> remainingFilters;
331 auto resultSet = baseSetRetriever(transaction, remainingFilters);
332 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery);
333 replaySet(filteredSet, resultProvider);
334 resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction));
335 return Akonadi2::Storage::maxRevision(transaction);
336} 92}
337 93
338 94
339template<class DomainType>
340qint64 GenericFacade<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
341{
342 const qint64 baseRevision = resultProvider.revision() + 1;
343 Trace() << "Running incremental query " << baseRevision;
344 return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
345 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
346 }, resultProvider, false);
347}
348
349template<class DomainType>
350qint64 GenericFacade<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
351{
352 auto modifiedQuery = query;
353 if (!query.parentProperty.isEmpty()) {
354 if (parent) {
355 Trace() << "Running initial query for parent:" << parent->identifier();
356 modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier());
357 } else {
358 Trace() << "Running initial query for toplevel";
359 modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant());
360 }
361 }
362 return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
363 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters);
364 }, resultProvider, true);
365}
366
367template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>; 95template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>;
368template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>; 96template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>;
369template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>; 97template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>;