summaryrefslogtreecommitdiffstats
path: root/common/facade.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-11-27 17:30:04 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-11-27 17:30:04 +0100
commit5b41b26a349967acf2197f9f9228526193fd826e (patch)
tree166452bcc0757564deefe233bf031d2ccb0564d2 /common/facade.cpp
parent13af56e436f49df32d3b2f6f223cf1dec2eabaac (diff)
downloadsink-5b41b26a349967acf2197f9f9228526193fd826e.tar.gz
sink-5b41b26a349967acf2197f9f9228526193fd826e.zip
Introduced a QueryRunner object
The QueryRunner object lives for the duration of the query (so just for the initial query for non-live queries, and for the lifetime of the result model for live queries). It's supposed to handle all the threading internally and decouple the lifetime of the facade.
Diffstat (limited to 'common/facade.cpp')
-rw-r--r--common/facade.cpp282
1 files changed, 5 insertions, 277 deletions
diff --git a/common/facade.cpp b/common/facade.cpp
index 92124fc..1d6b9a7 100644
--- a/common/facade.cpp
+++ b/common/facade.cpp
@@ -24,76 +24,10 @@
24#include "storage.h" 24#include "storage.h"
25#include "definitions.h" 25#include "definitions.h"
26#include "domainadaptor.h" 26#include "domainadaptor.h"
27#include "queryrunner.h"
27 28
28using namespace Akonadi2; 29using namespace Akonadi2;
29 30
30/**
31 * A QueryRunner runs a query and updates the corresponding result set.
32 *
33 * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work),
34 * and by how long a result set must be updated. If the query is one off the runner dies after the execution,
35 * otherwise it lives on the react to changes and updates the corresponding result set.
36 *
37 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
38 */
39class QueryRunner : public QObject
40{
41 Q_OBJECT
42public:
43 typedef std::function<KAsync::Job<void>()> QueryFunction;
44
45 QueryRunner(const Akonadi2::Query &query) {};
46 /**
47 * Starts query
48 */
49 KAsync::Job<void> run(qint64 newRevision = 0)
50 {
51 return queryFunction();
52 }
53
54 /**
55 * Set the query to run
56 */
57 void setQuery(const QueryFunction &query)
58 {
59 queryFunction = query;
60 }
61
62public slots:
63 /**
64 * Rerun query with new revision
65 */
66 void revisionChanged(qint64 newRevision)
67 {
68 Trace() << "New revision: " << newRevision;
69 run().exec();
70 }
71
72private:
73 QueryFunction queryFunction;
74};
75
76static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
77{
78 //TODO use a result set with an iterator, to read values on demand
79 QVector<QByteArray> keys;
80 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
81 //Skip internals
82 if (Akonadi2::Storage::isInternalKey(key)) {
83 return true;
84 }
85 keys << Akonadi2::Storage::uidFromKey(key);
86 return true;
87 },
88 [](const Akonadi2::Storage::Error &error) {
89 qWarning() << "Error during query: " << error.message;
90 });
91
92 Trace() << "Full scan found " << keys.size() << " results";
93 return ResultSet(keys);
94}
95
96
97 31
98template<class DomainType> 32template<class DomainType>
99GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess) 33GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess)
@@ -150,220 +84,14 @@ KAsync::Job<void> GenericFacade<DomainType>::remove(const DomainType &domainObje
150} 84}
151 85
152template<class DomainType> 86template<class DomainType>
153KAsync::Job<void> GenericFacade<DomainType>::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 87QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> GenericFacade<DomainType>::load(const Akonadi2::Query &query)
154{
155 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
156 resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) {
157 const qint64 newRevision = executeInitialQuery(query, parent, resultProvider);
158 mResourceAccess->sendRevisionReplayedCommand(newRevision);
159 });
160
161
162 //In case of a live query we keep the runner for as long alive as the result provider exists
163 if (query.liveQuery) {
164 auto runner = QSharedPointer<QueryRunner>::create(query);
165 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
166 runner->setQuery([this, query, &resultProvider] () -> KAsync::Job<void> {
167 return KAsync::start<void>([this, query, &resultProvider](KAsync::Future<void> &future) {
168 const qint64 newRevision = executeIncrementalQuery(query, resultProvider);
169 mResourceAccess->sendRevisionReplayedCommand(newRevision);
170 future.setFinished();
171 });
172 });
173 resultProvider.setQueryRunner(runner);
174 //Ensure the connection is open, if it wasn't already opened
175 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
176 mResourceAccess->open();
177 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged);
178 }
179 return KAsync::null<void>();
180}
181
182 //TODO move into result provider?
183template<class DomainType>
184void GenericFacade<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
185{
186 while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
187 switch (operation) {
188 case Akonadi2::Operation_Creation:
189 // Trace() << "Got creation";
190 resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
191 break;
192 case Akonadi2::Operation_Modification:
193 // Trace() << "Got modification";
194 resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
195 break;
196 case Akonadi2::Operation_Removal:
197 // Trace() << "Got removal";
198 resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
199 break;
200 }
201 return true;
202 })){};
203}
204
205template<class DomainType>
206void GenericFacade<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
207{
208 //This only works for a 1:1 mapping of resource to domain types.
209 //Not i.e. for tags that are stored as flags in each entity of an imap store.
210 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
211 //could be added to the adaptor.
212 //
213 // Akonadi2::Storage::getLatest(transaction, bufferTye, key);
214 db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
215 Akonadi2::EntityBuffer buffer(value.data(), value.size());
216 const Akonadi2::Entity &entity = buffer.entity();
217 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
218 Q_ASSERT(metadataBuffer);
219 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
220 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
221 return false;
222 },
223 [](const Akonadi2::Storage::Error &error) {
224 qWarning() << "Error during query: " << error.message;
225 });
226}
227
228template<class DomainType>
229ResultSet GenericFacade<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
230{ 88{
231 QSet<QByteArray> appliedFilters; 89 //The runner lives for the lifetime of the query
232 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); 90 auto runner = new QueryRunner<DomainType>(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType());
233 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; 91 return qMakePair(KAsync::null<void>(), runner->emitter());
234
235 //We do a full scan if there were no indexes available to create the initial set.
236 if (appliedFilters.isEmpty()) {
237 //TODO this should be replaced by an index lookup as well
238 resultSet = fullScan(transaction, bufferTypeForDomainType());
239 }
240 return resultSet;
241}
242
243template<class DomainType>
244ResultSet GenericFacade<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
245{
246 const auto bufferType = bufferTypeForDomainType();
247 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
248 remainingFilters = query.propertyFilter.keys().toSet();
249 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
250 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
251 //Spit out the revision keys one by one.
252 while (*revisionCounter <= topRevision) {
253 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
254 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
255 Trace() << "Revision" << *revisionCounter << type << uid;
256 if (type != bufferType) {
257 //Skip revision
258 *revisionCounter += 1;
259 continue;
260 }
261 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
262 *revisionCounter += 1;
263 return key;
264 }
265 Trace() << "Finished reading incremental result set:" << *revisionCounter;
266 //We're done
267 return QByteArray();
268 });
269}
270
271template<class DomainType>
272ResultSet GenericFacade<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery)
273{
274 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
275
276 //Read through the source values and return whatever matches the filter
277 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
278 while (resultSetPtr->next()) {
279 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
280 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
281 //Always remove removals, they probably don't match due to non-available properties
282 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
283 if (initialQuery) {
284 //We're not interested in removals during the initial query
285 if (operation != Akonadi2::Operation_Removal) {
286 callback(domainObject, Akonadi2::Operation_Creation);
287 }
288 } else {
289 callback(domainObject, operation);
290 }
291 }
292 });
293 }
294 return false;
295 };
296 return ResultSet(generator);
297}
298
299
300template<class DomainType>
301std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> GenericFacade<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
302{
303 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
304 for (const auto &filterProperty : remainingFilters) {
305 const auto property = domainObject->getProperty(filterProperty);
306 if (property.isValid()) {
307 //TODO implement other comparison operators than equality
308 if (property != query.propertyFilter.value(filterProperty)) {
309 Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty);
310 return false;
311 }
312 } else {
313 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
314 }
315 }
316 return true;
317 };
318}
319
320template<class DomainType>
321qint64 GenericFacade<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery)
322{
323 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
324 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
325 Warning() << "Error during query: " << error.store << error.message;
326 });
327 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
328 auto db = transaction.openDatabase(bufferTypeForDomainType() + ".main");
329
330 QSet<QByteArray> remainingFilters;
331 auto resultSet = baseSetRetriever(transaction, remainingFilters);
332 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery);
333 replaySet(filteredSet, resultProvider);
334 resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction));
335 return Akonadi2::Storage::maxRevision(transaction);
336} 92}
337 93
338 94
339template<class DomainType>
340qint64 GenericFacade<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
341{
342 const qint64 baseRevision = resultProvider.revision() + 1;
343 Trace() << "Running incremental query " << baseRevision;
344 return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
345 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
346 }, resultProvider, false);
347}
348
349template<class DomainType>
350qint64 GenericFacade<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
351{
352 auto modifiedQuery = query;
353 if (!query.parentProperty.isEmpty()) {
354 if (parent) {
355 Trace() << "Running initial query for parent:" << parent->identifier();
356 modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier());
357 } else {
358 Trace() << "Running initial query for toplevel";
359 modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant());
360 }
361 }
362 return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
363 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters);
364 }, resultProvider, true);
365}
366
367template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>; 95template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>;
368template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>; 96template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>;
369template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>; 97template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>;