summaryrefslogtreecommitdiffstats
path: root/common/facade.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-11-19 23:47:34 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-11-19 23:47:34 +0100
commitddb28417ccbcd22e771b7610c1727eac63471609 (patch)
tree445243cc7c7df45bfe7f31c4382d41cd5deb9465 /common/facade.cpp
parent94a2cd6ec21bf0466a9a50d6e4a0a956ed47bc82 (diff)
downloadsink-ddb28417ccbcd22e771b7610c1727eac63471609.tar.gz
sink-ddb28417ccbcd22e771b7610c1727eac63471609.zip
Moved facade implementation to cpp file
Diffstat (limited to 'common/facade.cpp')
-rw-r--r--common/facade.cpp352
1 files changed, 351 insertions, 1 deletions
diff --git a/common/facade.cpp b/common/facade.cpp
index e51b32a..b4931cf 100644
--- a/common/facade.cpp
+++ b/common/facade.cpp
@@ -1,5 +1,5 @@
1/* 1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm> 2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 * 3 *
4 * This program is free software; you can redistribute it and/or modify 4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by 5 * it under the terms of the GNU General Public License as published by
@@ -18,3 +18,353 @@
18 */ 18 */
19 19
20#include "facade.h" 20#include "facade.h"
21
22#include "commands.h"
23#include "domainadaptor.h"
24#include "log.h"
25#include "storage.h"
26#include "definitions.h"
27
28using namespace Akonadi2;
29
30/**
31 * A QueryRunner runs a query and updates the corresponding result set.
32 *
33 * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work),
34 * and by how long a result set must be updated. If the query is one off the runner dies after the execution,
35 * otherwise it lives on the react to changes and updates the corresponding result set.
36 *
37 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
38 */
39class QueryRunner : public QObject
40{
41 Q_OBJECT
42public:
43 typedef std::function<KAsync::Job<void>()> QueryFunction;
44
45 QueryRunner(const Akonadi2::Query &query) {};
46 /**
47 * Starts query
48 */
49 KAsync::Job<void> run(qint64 newRevision = 0)
50 {
51 return queryFunction();
52 }
53
54 /**
55 * Set the query to run
56 */
57 void setQuery(const QueryFunction &query)
58 {
59 queryFunction = query;
60 }
61
62public slots:
63 /**
64 * Rerun query with new revision
65 */
66 void revisionChanged(qint64 newRevision)
67 {
68 Trace() << "New revision: " << newRevision;
69 run().exec();
70 }
71
72private:
73 QueryFunction queryFunction;
74};
75
76static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
77{
78 //TODO use a result set with an iterator, to read values on demand
79 QVector<QByteArray> keys;
80 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
81 //Skip internals
82 if (Akonadi2::Storage::isInternalKey(key)) {
83 return true;
84 }
85 keys << Akonadi2::Storage::uidFromKey(key);
86 return true;
87 },
88 [](const Akonadi2::Storage::Error &error) {
89 qWarning() << "Error during query: " << error.message;
90 });
91
92 Trace() << "Full scan found " << keys.size() << " results";
93 return ResultSet(keys);
94}
95
96
97
98template<class DomainType>
99GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess)
100 : Akonadi2::StoreFacade<DomainType>(),
101 mResourceAccess(resourceAccess),
102 mDomainTypeAdaptorFactory(adaptorFactory),
103 mResourceInstanceIdentifier(resourceIdentifier)
104{
105 if (!mResourceAccess) {
106 mResourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resourceIdentifier);
107 }
108}
109
110template<class DomainType>
111GenericFacade<DomainType>::~GenericFacade()
112{
113}
114
115template<class DomainType>
116QByteArray GenericFacade<DomainType>::bufferTypeForDomainType()
117{
118 //We happen to have a one to one mapping
119 return Akonadi2::ApplicationDomain::getTypeName<DomainType>();
120}
121
122template<class DomainType>
123KAsync::Job<void> GenericFacade<DomainType>::create(const DomainType &domainObject)
124{
125 if (!mDomainTypeAdaptorFactory) {
126 Warning() << "No domain type adaptor factory available";
127 return KAsync::error<void>();
128 }
129 flatbuffers::FlatBufferBuilder entityFbb;
130 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
131 return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
132}
133
134template<class DomainType>
135KAsync::Job<void> GenericFacade<DomainType>::modify(const DomainType &domainObject)
136{
137 if (!mDomainTypeAdaptorFactory) {
138 Warning() << "No domain type adaptor factory available";
139 return KAsync::error<void>();
140 }
141 flatbuffers::FlatBufferBuilder entityFbb;
142 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
143 return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
144}
145
146template<class DomainType>
147KAsync::Job<void> GenericFacade<DomainType>::remove(const DomainType &domainObject)
148{
149 return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType());
150}
151
152template<class DomainType>
153KAsync::Job<void> GenericFacade<DomainType>::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
154{
155 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
156 resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) {
157 const qint64 newRevision = executeInitialQuery(query, parent, resultProvider);
158 mResourceAccess->sendRevisionReplayedCommand(newRevision);
159 });
160
161
162 //In case of a live query we keep the runner for as long alive as the result provider exists
163 if (query.liveQuery) {
164 auto runner = QSharedPointer<QueryRunner>::create(query);
165 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
166 runner->setQuery([this, query, &resultProvider] () -> KAsync::Job<void> {
167 return KAsync::start<void>([this, query, &resultProvider](KAsync::Future<void> &future) {
168 const qint64 newRevision = executeIncrementalQuery(query, resultProvider);
169 mResourceAccess->sendRevisionReplayedCommand(newRevision);
170 future.setFinished();
171 });
172 });
173 resultProvider.setQueryRunner(runner);
174 //Ensure the connection is open, if it wasn't already opened
175 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
176 mResourceAccess->open();
177 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged);
178 }
179 return KAsync::null<void>();
180}
181
182 //TODO move into result provider?
183template<class DomainType>
184void GenericFacade<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
185{
186 while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
187 switch (operation) {
188 case Akonadi2::Operation_Creation:
189 Trace() << "Got creation";
190 resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
191 break;
192 case Akonadi2::Operation_Modification:
193 Trace() << "Got modification";
194 resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
195 break;
196 case Akonadi2::Operation_Removal:
197 Trace() << "Got removal";
198 resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
199 break;
200 }
201 return true;
202 })){};
203}
204
205template<class DomainType>
206void GenericFacade<DomainType>::readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
207{
208 const auto bufferType = bufferTypeForDomainType();
209 //This only works for a 1:1 mapping of resource to domain types.
210 //Not i.e. for tags that are stored as flags in each entity of an imap store.
211 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
212 //could be added to the adaptor.
213 //
214 // Akonadi2::Storage::getLatest(transaction, bufferTye, key);
215 transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
216 Akonadi2::EntityBuffer buffer(value.data(), value.size());
217 const Akonadi2::Entity &entity = buffer.entity();
218 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
219 Q_ASSERT(metadataBuffer);
220 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
221 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
222 return false;
223 },
224 [](const Akonadi2::Storage::Error &error) {
225 qWarning() << "Error during query: " << error.message;
226 });
227}
228
229template<class DomainType>
230ResultSet GenericFacade<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
231{
232 QSet<QByteArray> appliedFilters;
233 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
234 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
235
236 //We do a full scan if there were no indexes available to create the initial set.
237 if (appliedFilters.isEmpty()) {
238 //TODO this should be replaced by an index lookup as well
239 resultSet = fullScan(transaction, bufferTypeForDomainType());
240 }
241 return resultSet;
242}
243
244template<class DomainType>
245ResultSet GenericFacade<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
246{
247 const auto bufferType = bufferTypeForDomainType();
248 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
249 remainingFilters = query.propertyFilter.keys().toSet();
250 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
251 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
252 //Spit out the revision keys one by one.
253 while (*revisionCounter <= topRevision) {
254 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
255 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
256 Trace() << "Revision" << *revisionCounter << type << uid;
257 if (type != bufferType) {
258 //Skip revision
259 *revisionCounter += 1;
260 continue;
261 }
262 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
263 *revisionCounter += 1;
264 return key;
265 }
266 Trace() << "Finished reading incremental result set:" << *revisionCounter;
267 //We're done
268 return QByteArray();
269 });
270}
271
272template<class DomainType>
273ResultSet GenericFacade<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery)
274{
275 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
276
277 //Read through the source values and return whatever matches the filter
278 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
279 while (resultSetPtr->next()) {
280 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
281 readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
282 //Always remove removals, they probably don't match due to non-available properties
283 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
284 if (initialQuery) {
285 //We're not interested in removals during the initial query
286 if (operation != Akonadi2::Operation_Removal) {
287 callback(domainObject, Akonadi2::Operation_Creation);
288 }
289 } else {
290 callback(domainObject, operation);
291 }
292 }
293 });
294 }
295 return false;
296 };
297 return ResultSet(generator);
298}
299
300
301template<class DomainType>
302std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> GenericFacade<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
303{
304 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
305 for (const auto &filterProperty : remainingFilters) {
306 const auto property = domainObject->getProperty(filterProperty);
307 if (property.isValid()) {
308 //TODO implement other comparison operators than equality
309 if (property != query.propertyFilter.value(filterProperty)) {
310 Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty);
311 return false;
312 }
313 } else {
314 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
315 }
316 }
317 return true;
318 };
319}
320
321template<class DomainType>
322qint64 GenericFacade<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
323{
324 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
325 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
326 Warning() << "Error during query: " << error.store << error.message;
327 });
328 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
329
330 QSet<QByteArray> remainingFilters;
331 auto resultSet = baseSetRetriever(transaction, remainingFilters);
332 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false);
333 replaySet(filteredSet, resultProvider);
334 resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction));
335 return Akonadi2::Storage::maxRevision(transaction);
336}
337
338
339template<class DomainType>
340qint64 GenericFacade<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
341{
342 const qint64 baseRevision = resultProvider.revision() + 1;
343 Trace() << "Running incremental query " << baseRevision;
344 return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
345 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
346 }, resultProvider);
347}
348
349template<class DomainType>
350qint64 GenericFacade<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
351{
352 auto modifiedQuery = query;
353 if (parent) {
354 Trace() << "Running initial query for parent:" << parent->identifier();
355 modifiedQuery.propertyFilter.insert("parent", parent->identifier());
356 } else {
357 Trace() << "Running initial query for toplevel";
358 modifiedQuery.propertyFilter.insert("parent", QVariant());
359 }
360 return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
361 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters);
362 }, resultProvider);
363}
364
365template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>;
366template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>;
367template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>;
368// template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::AkonadiResource>;
369
370#include "facade.moc"