summaryrefslogtreecommitdiffstats
path: root/common/facade.h
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-11-19 23:47:34 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-11-19 23:47:34 +0100
commitddb28417ccbcd22e771b7610c1727eac63471609 (patch)
tree445243cc7c7df45bfe7f31c4382d41cd5deb9465 /common/facade.h
parent94a2cd6ec21bf0466a9a50d6e4a0a956ed47bc82 (diff)
downloadsink-ddb28417ccbcd22e771b7610c1727eac63471609.tar.gz
sink-ddb28417ccbcd22e771b7610c1727eac63471609.zip
Moved facade implementation to cpp file
Diffstat (limited to 'common/facade.h')
-rw-r--r--common/facade.h335
1 files changed, 18 insertions, 317 deletions
diff --git a/common/facade.h b/common/facade.h
index 8b8a2a8..aa50941 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -1,5 +1,5 @@
1/* 1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> 2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
3 * 3 *
4 * This program is free software; you can redistribute it and/or modify 4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by 5 * it under the terms of the GNU General Public License as published by
@@ -25,79 +25,8 @@
25#include <Async/Async> 25#include <Async/Async>
26 26
27#include "resourceaccess.h" 27#include "resourceaccess.h"
28#include "commands.h"
29#include "domainadaptor.h"
30#include "log.h"
31#include "resultset.h" 28#include "resultset.h"
32#include "storage.h" 29#include "domainadaptor.h"
33#include "definitions.h"
34
35/**
36 * A QueryRunner runs a query and updates the corresponding result set.
37 *
38 * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work),
39 * and by how long a result set must be updated. If the query is one off the runner dies after the execution,
40 * otherwise it lives on the react to changes and updates the corresponding result set.
41 *
42 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
43 */
44class QueryRunner : public QObject
45{
46 Q_OBJECT
47public:
48 typedef std::function<KAsync::Job<void>()> QueryFunction;
49
50 QueryRunner(const Akonadi2::Query &query) {};
51 /**
52 * Starts query
53 */
54 KAsync::Job<void> run(qint64 newRevision = 0)
55 {
56 return queryFunction();
57 }
58
59 /**
60 * Set the query to run
61 */
62 void setQuery(const QueryFunction &query)
63 {
64 queryFunction = query;
65 }
66
67public slots:
68 /**
69 * Rerun query with new revision
70 */
71 void revisionChanged(qint64 newRevision)
72 {
73 Trace() << "New revision: " << newRevision;
74 run().exec();
75 }
76
77private:
78 QueryFunction queryFunction;
79};
80
81static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
82{
83 //TODO use a result set with an iterator, to read values on demand
84 QVector<QByteArray> keys;
85 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
86 //Skip internals
87 if (Akonadi2::Storage::isInternalKey(key)) {
88 return true;
89 }
90 keys << Akonadi2::Storage::uidFromKey(key);
91 return true;
92 },
93 [](const Akonadi2::Storage::Error &error) {
94 qWarning() << "Error during query: " << error.message;
95 });
96
97 Trace() << "Full scan found " << keys.size() << " results";
98 return ResultSet(keys);
99}
100
101 30
102namespace Akonadi2 { 31namespace Akonadi2 {
103/** 32/**
@@ -121,257 +50,29 @@ public:
121 * @param resourceIdentifier is the identifier of the resource instance 50 * @param resourceIdentifier is the identifier of the resource instance
122 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa 51 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa
123 */ 52 */
124 GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>()) 53 GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>());
125 : Akonadi2::StoreFacade<DomainType>(), 54 ~GenericFacade();
126 mResourceAccess(resourceAccess),
127 mDomainTypeAdaptorFactory(adaptorFactory),
128 mResourceInstanceIdentifier(resourceIdentifier)
129 {
130 if (!mResourceAccess) {
131 mResourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resourceIdentifier);
132 }
133 }
134
135 ~GenericFacade()
136 {
137 }
138
139 static QByteArray bufferTypeForDomainType()
140 {
141 //We happen to have a one to one mapping
142 return Akonadi2::ApplicationDomain::getTypeName<DomainType>();
143 }
144
145 KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE
146 {
147 if (!mDomainTypeAdaptorFactory) {
148 Warning() << "No domain type adaptor factory available";
149 return KAsync::error<void>();
150 }
151 flatbuffers::FlatBufferBuilder entityFbb;
152 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
153 return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
154 }
155
156 KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE
157 {
158 if (!mDomainTypeAdaptorFactory) {
159 Warning() << "No domain type adaptor factory available";
160 return KAsync::error<void>();
161 }
162 flatbuffers::FlatBufferBuilder entityFbb;
163 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
164 return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
165 }
166
167 KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE
168 {
169 return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType());
170 }
171
172 KAsync::Job<void> load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) Q_DECL_OVERRIDE
173 {
174 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
175 resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) {
176 const qint64 newRevision = executeInitialQuery(query, parent, resultProvider);
177 mResourceAccess->sendRevisionReplayedCommand(newRevision);
178 });
179
180 55
181 //In case of a live query we keep the runner for as long alive as the result provider exists 56 static QByteArray bufferTypeForDomainType();
182 if (query.liveQuery) { 57 KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE;
183 auto runner = QSharedPointer<QueryRunner>::create(query); 58 KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE;
184 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 59 KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE;
185 runner->setQuery([this, query, &resultProvider] () -> KAsync::Job<void> { 60 KAsync::Job<void> load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) Q_DECL_OVERRIDE;
186 return KAsync::start<void>([this, query, &resultProvider](KAsync::Future<void> &future) {
187 const qint64 newRevision = executeIncrementalQuery(query, resultProvider);
188 mResourceAccess->sendRevisionReplayedCommand(newRevision);
189 future.setFinished();
190 });
191 });
192 resultProvider.setQueryRunner(runner);
193 //Ensure the connection is open, if it wasn't already opened
194 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
195 mResourceAccess->open();
196 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged);
197 }
198 return KAsync::null<void>();
199 }
200 61
201private: 62private:
202
203 //TODO move into result provider? 63 //TODO move into result provider?
204 static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 64 static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
205 {
206 while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
207 switch (operation) {
208 case Akonadi2::Operation_Creation:
209 Trace() << "Got creation";
210 resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
211 break;
212 case Akonadi2::Operation_Modification:
213 Trace() << "Got modification";
214 resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
215 break;
216 case Akonadi2::Operation_Removal:
217 Trace() << "Got removal";
218 resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
219 break;
220 }
221 return true;
222 })){};
223 }
224
225 void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
226 {
227 const auto bufferType = bufferTypeForDomainType();
228 //This only works for a 1:1 mapping of resource to domain types.
229 //Not i.e. for tags that are stored as flags in each entity of an imap store.
230 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
231 //could be added to the adaptor.
232 //
233 // Akonadi2::Storage::getLatest(transaction, bufferTye, key);
234 transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
235 Akonadi2::EntityBuffer buffer(value.data(), value.size());
236 const Akonadi2::Entity &entity = buffer.entity();
237 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
238 Q_ASSERT(metadataBuffer);
239 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
240 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
241 return false;
242 },
243 [](const Akonadi2::Storage::Error &error) {
244 qWarning() << "Error during query: " << error.message;
245 });
246 }
247
248 ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
249 {
250 QSet<QByteArray> appliedFilters;
251 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
252 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
253
254 //We do a full scan if there were no indexes available to create the initial set.
255 if (appliedFilters.isEmpty()) {
256 //TODO this should be replaced by an index lookup as well
257 resultSet = fullScan(transaction, bufferTypeForDomainType());
258 }
259 return resultSet;
260 }
261
262 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
263 {
264 const auto bufferType = bufferTypeForDomainType();
265 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
266 remainingFilters = query.propertyFilter.keys().toSet();
267 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
268 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
269 //Spit out the revision keys one by one.
270 while (*revisionCounter <= topRevision) {
271 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
272 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
273 Trace() << "Revision" << *revisionCounter << type << uid;
274 if (type != bufferType) {
275 //Skip revision
276 *revisionCounter += 1;
277 continue;
278 }
279 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
280 *revisionCounter += 1;
281 return key;
282 }
283 //We're done
284 return QByteArray();
285 });
286 }
287
288 ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery)
289 {
290 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
291
292 //Read through the source values and return whatever matches the filter
293 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
294 while (resultSetPtr->next()) {
295 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
296 readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
297 //Always remove removals, they probably don't match due to non-available properties
298 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
299 if (initialQuery) {
300 //We're not interested in removals during the initial query
301 if (operation != Akonadi2::Operation_Removal) {
302 callback(domainObject, Akonadi2::Operation_Creation);
303 }
304 } else {
305 callback(domainObject, operation);
306 }
307 }
308 });
309 }
310 return false;
311 };
312 return ResultSet(generator);
313 }
314
315
316 std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
317 {
318 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
319 for (const auto &filterProperty : remainingFilters) {
320 const auto property = domainObject->getProperty(filterProperty);
321 if (property.isValid()) {
322 //TODO implement other comparison operators than equality
323 if (property != query.propertyFilter.value(filterProperty)) {
324 Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty);
325 return false;
326 }
327 } else {
328 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
329 }
330 }
331 return true;
332 };
333 }
334
335 qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
336 {
337 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
338 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
339 Warning() << "Error during query: " << error.store << error.message;
340 });
341 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
342
343 QSet<QByteArray> remainingFilters;
344 auto resultSet = baseSetRetriever(transaction, remainingFilters);
345 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false);
346 replaySet(filteredSet, resultProvider);
347 resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction));
348 return Akonadi2::Storage::maxRevision(transaction);
349 }
350 65
66 void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback);
351 67
352 qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 68 ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
353 { 69 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
354 const qint64 baseRevision = resultProvider.revision() + 1;
355 Trace() << "Running incremental query " << baseRevision;
356 return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
357 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
358 }, resultProvider);
359 }
360 70
361 qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 71 ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery);
362 { 72 std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query);
363 auto modifiedQuery = query; 73 qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
364 if (parent) { 74 qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
365 Trace() << "Running initial query for parent:" << parent->identifier(); 75 qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
366 modifiedQuery.propertyFilter.insert("parent", parent->identifier());
367 } else {
368 Trace() << "Running initial query for toplevel";
369 modifiedQuery.propertyFilter.insert("parent", QVariant());
370 }
371 return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
372 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters);
373 }, resultProvider);
374 }
375 76
376protected: 77protected:
377 //TODO use one resource access instance per application & per resource 78 //TODO use one resource access instance per application & per resource