summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-11-27 17:30:04 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-11-27 17:30:04 +0100
commit5b41b26a349967acf2197f9f9228526193fd826e (patch)
tree166452bcc0757564deefe233bf031d2ccb0564d2 /common
parent13af56e436f49df32d3b2f6f223cf1dec2eabaac (diff)
downloadsink-5b41b26a349967acf2197f9f9228526193fd826e.tar.gz
sink-5b41b26a349967acf2197f9f9228526193fd826e.zip
Introduced a QueryRunner object
The QueryRunner object lives for the duration of the query (so just for the initial query for non-live queries, and for the lifetime of the result model for live queries). It's supposed to handle all the threading internally and decouple the lifetime of the facade.
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/clientapi.cpp62
-rw-r--r--common/facade.cpp282
-rw-r--r--common/facade.h17
-rw-r--r--common/facadeinterface.h29
-rw-r--r--common/modelresult.cpp22
-rw-r--r--common/modelresult.h6
-rw-r--r--common/queryrunner.cpp292
-rw-r--r--common/queryrunner.h107
-rw-r--r--common/resourceaccess.h2
-rw-r--r--common/resourcefacade.cpp17
-rw-r--r--common/resourcefacade.h3
-rw-r--r--common/resultprovider.h150
-rw-r--r--common/threadboundary.cpp5
14 files changed, 522 insertions, 473 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 01056d0..be312b9 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -26,6 +26,7 @@ set(command_SRCS
26 resource.cpp 26 resource.cpp
27 genericresource.cpp 27 genericresource.cpp
28 resourceaccess.cpp 28 resourceaccess.cpp
29 queryrunner.cpp
29 listener.cpp 30 listener.cpp
30 storage_common.cpp 31 storage_common.cpp
31 threadboundary.cpp 32 threadboundary.cpp
diff --git a/common/clientapi.cpp b/common/clientapi.cpp
index 02f8ce6..b24dfa8 100644
--- a/common/clientapi.cpp
+++ b/common/clientapi.cpp
@@ -34,6 +34,7 @@
34#include "definitions.h" 34#include "definitions.h"
35#include "resourceconfig.h" 35#include "resourceconfig.h"
36#include "facadefactory.h" 36#include "facadefactory.h"
37#include "modelresult.h"
37#include "log.h" 38#include "log.h"
38 39
39#define ASYNCINTHREAD 40#define ASYNCINTHREAD
@@ -100,38 +101,8 @@ template <class DomainType>
100QSharedPointer<ResultEmitter<typename DomainType::Ptr> > Store::load(Query query) 101QSharedPointer<ResultEmitter<typename DomainType::Ptr> > Store::load(Query query)
101{ 102{
102 auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); 103 auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create();
103 104 qWarning() << "Main thread " << QThread::currentThreadId();
104 //Execute the search in a thread. 105 //FIXME remove
105 //We must guarantee that the emitter is returned before the first result is emitted.
106 //The result provider must be threadsafe.
107 async::run([query, resultSet](){
108 QEventLoop eventLoop;
109 resultSet->onDone([&eventLoop](){
110 eventLoop.quit();
111 });
112 // Query all resources and aggregate results
113 KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName<DomainType>()))
114 .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, KAsync::Future<void> &future) {
115 if (auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resource), resource)) {
116 facade->load(query, *resultSet).template then<void>([&future](){future.setFinished();}).exec();
117 //Keep the facade alive for the lifetime of the resultSet.
118 resultSet->setFacade(facade);
119 } else {
120 //Ignore the error and carry on
121 future.setFinished();
122 }
123 }).template then<void>([query, resultSet]() {
124 resultSet->initialResultSetComplete();
125 if (!query.liveQuery) {
126 resultSet->complete();
127 }
128 }).exec();
129
130 //Keep the thread alive until the result is ready
131 if (!resultSet->isDone()) {
132 eventLoop.exec();
133 }
134 });
135 return resultSet->emitter(); 106 return resultSet->emitter();
136} 107}
137 108
@@ -139,28 +110,29 @@ template <class DomainType>
139QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) 110QSharedPointer<QAbstractItemModel> Store::loadModel(Query query)
140{ 111{
141 auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr> >::create(query, query.requestedProperties.toList()); 112 auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr> >::create(query, query.requestedProperties.toList());
142 auto resultProvider = std::make_shared<ModelResultProvider<DomainType, typename DomainType::Ptr> >(model); 113
143 //Keep the resultprovider alive for as long as the model lives 114 //* Client defines lifetime of model
144 model->setProperty("resultProvider", QVariant::fromValue(std::shared_ptr<void>(resultProvider))); 115 //* The model lifetime defines the duration of live-queries
116 //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks
117 //* The emitter needs to live or the duration of query (respectively, the model)
118 //* The result provider needs to live for as long as results are provided (until the last thread exits).
145 119
146 // Query all resources and aggregate results 120 // Query all resources and aggregate results
147 KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName<DomainType>())) 121 KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName<DomainType>()))
148 .template each<void, QByteArray>([query, resultProvider](const QByteArray &resource, KAsync::Future<void> &future) { 122 .template each<void, QByteArray>([query, model](const QByteArray &resource, KAsync::Future<void> &future) {
149 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resource), resource); 123 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resource), resource);
150 if (facade) { 124 if (facade) {
151 facade->load(query, *resultProvider).template then<void>([&future](){future.setFinished();}).exec(); 125 Trace() << "Trying to fetch from resource";
152 //Keep the facade alive for the lifetime of the resultSet. 126 auto result = facade->load(query);
153 //FIXME this would have to become a list 127 auto emitter = result.second;
154 resultProvider->setFacade(facade); 128 //TODO use aggregating emitter instead
129 model->setEmitter(emitter);
130 model->fetchMore(QModelIndex());
131 result.first.template then<void>([&future](){future.setFinished();}).exec();
155 } else { 132 } else {
156 //Ignore the error and carry on 133 //Ignore the error and carry on
157 future.setFinished(); 134 future.setFinished();
158 } 135 }
159 }).template then<void>([query, resultProvider]() {
160 resultProvider->initialResultSetComplete();
161 if (!query.liveQuery) {
162 resultProvider->complete();
163 }
164 }).exec(); 136 }).exec();
165 137
166 return model; 138 return model;
diff --git a/common/facade.cpp b/common/facade.cpp
index 92124fc..1d6b9a7 100644
--- a/common/facade.cpp
+++ b/common/facade.cpp
@@ -24,76 +24,10 @@
24#include "storage.h" 24#include "storage.h"
25#include "definitions.h" 25#include "definitions.h"
26#include "domainadaptor.h" 26#include "domainadaptor.h"
27#include "queryrunner.h"
27 28
28using namespace Akonadi2; 29using namespace Akonadi2;
29 30
30/**
31 * A QueryRunner runs a query and updates the corresponding result set.
32 *
33 * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work),
34 * and by how long a result set must be updated. If the query is one off the runner dies after the execution,
35 * otherwise it lives on the react to changes and updates the corresponding result set.
36 *
37 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
38 */
39class QueryRunner : public QObject
40{
41 Q_OBJECT
42public:
43 typedef std::function<KAsync::Job<void>()> QueryFunction;
44
45 QueryRunner(const Akonadi2::Query &query) {};
46 /**
47 * Starts query
48 */
49 KAsync::Job<void> run(qint64 newRevision = 0)
50 {
51 return queryFunction();
52 }
53
54 /**
55 * Set the query to run
56 */
57 void setQuery(const QueryFunction &query)
58 {
59 queryFunction = query;
60 }
61
62public slots:
63 /**
64 * Rerun query with new revision
65 */
66 void revisionChanged(qint64 newRevision)
67 {
68 Trace() << "New revision: " << newRevision;
69 run().exec();
70 }
71
72private:
73 QueryFunction queryFunction;
74};
75
76static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
77{
78 //TODO use a result set with an iterator, to read values on demand
79 QVector<QByteArray> keys;
80 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
81 //Skip internals
82 if (Akonadi2::Storage::isInternalKey(key)) {
83 return true;
84 }
85 keys << Akonadi2::Storage::uidFromKey(key);
86 return true;
87 },
88 [](const Akonadi2::Storage::Error &error) {
89 qWarning() << "Error during query: " << error.message;
90 });
91
92 Trace() << "Full scan found " << keys.size() << " results";
93 return ResultSet(keys);
94}
95
96
97 31
98template<class DomainType> 32template<class DomainType>
99GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess) 33GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess)
@@ -150,220 +84,14 @@ KAsync::Job<void> GenericFacade<DomainType>::remove(const DomainType &domainObje
150} 84}
151 85
152template<class DomainType> 86template<class DomainType>
153KAsync::Job<void> GenericFacade<DomainType>::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 87QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> GenericFacade<DomainType>::load(const Akonadi2::Query &query)
154{
155 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
156 resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) {
157 const qint64 newRevision = executeInitialQuery(query, parent, resultProvider);
158 mResourceAccess->sendRevisionReplayedCommand(newRevision);
159 });
160
161
162 //In case of a live query we keep the runner for as long alive as the result provider exists
163 if (query.liveQuery) {
164 auto runner = QSharedPointer<QueryRunner>::create(query);
165 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
166 runner->setQuery([this, query, &resultProvider] () -> KAsync::Job<void> {
167 return KAsync::start<void>([this, query, &resultProvider](KAsync::Future<void> &future) {
168 const qint64 newRevision = executeIncrementalQuery(query, resultProvider);
169 mResourceAccess->sendRevisionReplayedCommand(newRevision);
170 future.setFinished();
171 });
172 });
173 resultProvider.setQueryRunner(runner);
174 //Ensure the connection is open, if it wasn't already opened
175 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
176 mResourceAccess->open();
177 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged);
178 }
179 return KAsync::null<void>();
180}
181
182 //TODO move into result provider?
183template<class DomainType>
184void GenericFacade<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
185{
186 while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
187 switch (operation) {
188 case Akonadi2::Operation_Creation:
189 // Trace() << "Got creation";
190 resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
191 break;
192 case Akonadi2::Operation_Modification:
193 // Trace() << "Got modification";
194 resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
195 break;
196 case Akonadi2::Operation_Removal:
197 // Trace() << "Got removal";
198 resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
199 break;
200 }
201 return true;
202 })){};
203}
204
205template<class DomainType>
206void GenericFacade<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
207{
208 //This only works for a 1:1 mapping of resource to domain types.
209 //Not i.e. for tags that are stored as flags in each entity of an imap store.
210 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
211 //could be added to the adaptor.
212 //
213 // Akonadi2::Storage::getLatest(transaction, bufferTye, key);
214 db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
215 Akonadi2::EntityBuffer buffer(value.data(), value.size());
216 const Akonadi2::Entity &entity = buffer.entity();
217 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
218 Q_ASSERT(metadataBuffer);
219 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
220 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
221 return false;
222 },
223 [](const Akonadi2::Storage::Error &error) {
224 qWarning() << "Error during query: " << error.message;
225 });
226}
227
228template<class DomainType>
229ResultSet GenericFacade<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
230{ 88{
231 QSet<QByteArray> appliedFilters; 89 //The runner lives for the lifetime of the query
232 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); 90 auto runner = new QueryRunner<DomainType>(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType());
233 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; 91 return qMakePair(KAsync::null<void>(), runner->emitter());
234
235 //We do a full scan if there were no indexes available to create the initial set.
236 if (appliedFilters.isEmpty()) {
237 //TODO this should be replaced by an index lookup as well
238 resultSet = fullScan(transaction, bufferTypeForDomainType());
239 }
240 return resultSet;
241}
242
243template<class DomainType>
244ResultSet GenericFacade<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
245{
246 const auto bufferType = bufferTypeForDomainType();
247 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
248 remainingFilters = query.propertyFilter.keys().toSet();
249 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
250 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
251 //Spit out the revision keys one by one.
252 while (*revisionCounter <= topRevision) {
253 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
254 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
255 Trace() << "Revision" << *revisionCounter << type << uid;
256 if (type != bufferType) {
257 //Skip revision
258 *revisionCounter += 1;
259 continue;
260 }
261 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
262 *revisionCounter += 1;
263 return key;
264 }
265 Trace() << "Finished reading incremental result set:" << *revisionCounter;
266 //We're done
267 return QByteArray();
268 });
269}
270
271template<class DomainType>
272ResultSet GenericFacade<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery)
273{
274 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
275
276 //Read through the source values and return whatever matches the filter
277 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
278 while (resultSetPtr->next()) {
279 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
280 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
281 //Always remove removals, they probably don't match due to non-available properties
282 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
283 if (initialQuery) {
284 //We're not interested in removals during the initial query
285 if (operation != Akonadi2::Operation_Removal) {
286 callback(domainObject, Akonadi2::Operation_Creation);
287 }
288 } else {
289 callback(domainObject, operation);
290 }
291 }
292 });
293 }
294 return false;
295 };
296 return ResultSet(generator);
297}
298
299
300template<class DomainType>
301std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> GenericFacade<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
302{
303 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
304 for (const auto &filterProperty : remainingFilters) {
305 const auto property = domainObject->getProperty(filterProperty);
306 if (property.isValid()) {
307 //TODO implement other comparison operators than equality
308 if (property != query.propertyFilter.value(filterProperty)) {
309 Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty);
310 return false;
311 }
312 } else {
313 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
314 }
315 }
316 return true;
317 };
318}
319
320template<class DomainType>
321qint64 GenericFacade<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery)
322{
323 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
324 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
325 Warning() << "Error during query: " << error.store << error.message;
326 });
327 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
328 auto db = transaction.openDatabase(bufferTypeForDomainType() + ".main");
329
330 QSet<QByteArray> remainingFilters;
331 auto resultSet = baseSetRetriever(transaction, remainingFilters);
332 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery);
333 replaySet(filteredSet, resultProvider);
334 resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction));
335 return Akonadi2::Storage::maxRevision(transaction);
336} 92}
337 93
338 94
339template<class DomainType>
340qint64 GenericFacade<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
341{
342 const qint64 baseRevision = resultProvider.revision() + 1;
343 Trace() << "Running incremental query " << baseRevision;
344 return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
345 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
346 }, resultProvider, false);
347}
348
349template<class DomainType>
350qint64 GenericFacade<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
351{
352 auto modifiedQuery = query;
353 if (!query.parentProperty.isEmpty()) {
354 if (parent) {
355 Trace() << "Running initial query for parent:" << parent->identifier();
356 modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier());
357 } else {
358 Trace() << "Running initial query for toplevel";
359 modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant());
360 }
361 }
362 return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
363 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters);
364 }, resultProvider, true);
365}
366
367template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>; 95template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>;
368template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>; 96template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>;
369template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>; 97template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>;
diff --git a/common/facade.h b/common/facade.h
index d8b878b..de67e05 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -59,22 +59,7 @@ public:
59 KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE; 59 KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE;
60 KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE; 60 KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE;
61 KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE; 61 KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE;
62 KAsync::Job<void> load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) Q_DECL_OVERRIDE; 62 QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> load(const Akonadi2::Query &query) Q_DECL_OVERRIDE;
63
64private:
65 //TODO move into result provider?
66 static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
67
68 void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback);
69
70 ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
71 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
72
73 ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery);
74 std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query);
75 qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery);
76 qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
77 qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
78 63
79protected: 64protected:
80 //TODO use one resource access instance per application & per resource 65 //TODO use one resource access instance per application & per resource
diff --git a/common/facadeinterface.h b/common/facadeinterface.h
index 7ec21bc..318abf3 100644
--- a/common/facadeinterface.h
+++ b/common/facadeinterface.h
@@ -23,6 +23,7 @@
23#include <Async/Async> 23#include <Async/Async>
24#include <QByteArray> 24#include <QByteArray>
25#include <QSharedPointer> 25#include <QSharedPointer>
26#include <QPair>
26#include "applicationdomaintype.h" 27#include "applicationdomaintype.h"
27#include "resultprovider.h" 28#include "resultprovider.h"
28 29
@@ -42,10 +43,32 @@ class StoreFacade {
42public: 43public:
43 virtual ~StoreFacade(){}; 44 virtual ~StoreFacade(){};
44 QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); } 45 QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); }
46
47 /**
48 * Create an entity in the store.
49 *
50 * The job returns succefully once the task has been successfully placed in the queue
51 */
45 virtual KAsync::Job<void> create(const DomainType &domainObject) = 0; 52 virtual KAsync::Job<void> create(const DomainType &domainObject) = 0;
53
54 /**
55 * Modify an entity in the store.
56 *
57 * The job returns succefully once the task has been successfully placed in the queue
58 */
46 virtual KAsync::Job<void> modify(const DomainType &domainObject) = 0; 59 virtual KAsync::Job<void> modify(const DomainType &domainObject) = 0;
60
61 /**
62 * Remove an entity from the store.
63 *
64 * The job returns succefully once the task has been successfully placed in the queue
65 */
47 virtual KAsync::Job<void> remove(const DomainType &domainObject) = 0; 66 virtual KAsync::Job<void> remove(const DomainType &domainObject) = 0;
48 virtual KAsync::Job<void> load(const Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) = 0; 67
68 /**
69 * Load entities from the store.
70 */
71 virtual QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr > load(const Query &query) = 0;
49}; 72};
50 73
51template<class DomainType> 74template<class DomainType>
@@ -67,9 +90,9 @@ public:
67 return KAsync::error<void>(-1, "Failed to create a facade"); 90 return KAsync::error<void>(-1, "Failed to create a facade");
68 } 91 }
69 92
70 KAsync::Job<void> load(const Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 93 QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr > load(const Query &query)
71 { 94 {
72 return KAsync::error<void>(-1, "Failed to create a facade"); 95 return qMakePair(KAsync::null<void>(), typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr());
73 } 96 }
74}; 97};
75 98
diff --git a/common/modelresult.cpp b/common/modelresult.cpp
index 935e2e8..65eaba9 100644
--- a/common/modelresult.cpp
+++ b/common/modelresult.cpp
@@ -183,6 +183,28 @@ void ModelResult<T, Ptr>::setFetcher(const std::function<void(const Ptr &parent)
183} 183}
184 184
185template<class T, class Ptr> 185template<class T, class Ptr>
186void ModelResult<T, Ptr>::setEmitter(const typename Akonadi2::ResultEmitter<Ptr>::Ptr &emitter)
187{
188 setFetcher(emitter->mFetcher);
189 emitter->onAdded([this](const Ptr &value) {
190 this->add(value);
191 });
192 emitter->onModified([this](const Ptr &value) {
193 this->modify(value);
194 });
195 emitter->onRemoved([this](const Ptr &value) {
196 this->remove(value);
197 });
198 emitter->onInitialResultSetComplete([this]() {
199 });
200 emitter->onComplete([this]() {
201 });
202 emitter->onClear([this]() {
203 });
204 mEmitter = emitter;
205}
206
207template<class T, class Ptr>
186void ModelResult<T, Ptr>::modify(const Ptr &value) 208void ModelResult<T, Ptr>::modify(const Ptr &value)
187{ 209{
188 auto childId = qHash(value->identifier()); 210 auto childId = qHash(value->identifier());
diff --git a/common/modelresult.h b/common/modelresult.h
index 66dfce5..eb6c86b 100644
--- a/common/modelresult.h
+++ b/common/modelresult.h
@@ -23,20 +23,23 @@
23#include <QAbstractItemModel> 23#include <QAbstractItemModel>
24#include <QModelIndex> 24#include <QModelIndex>
25#include <QDebug> 25#include <QDebug>
26#include <QSharedPointer>
26#include <functional> 27#include <functional>
27#include "query.h" 28#include "query.h"
29#include "resultprovider.h"
28 30
29template<class T, class Ptr> 31template<class T, class Ptr>
30class ModelResult : public QAbstractItemModel 32class ModelResult : public QAbstractItemModel
31{ 33{
32public: 34public:
33
34 enum Roles { 35 enum Roles {
35 DomainObjectRole = Qt::UserRole + 1 36 DomainObjectRole = Qt::UserRole + 1
36 }; 37 };
37 38
38 ModelResult(const Akonadi2::Query &query, const QList<QByteArray> &propertyColumns); 39 ModelResult(const Akonadi2::Query &query, const QList<QByteArray> &propertyColumns);
39 40
41 void setEmitter(const typename Akonadi2::ResultEmitter<Ptr>::Ptr &);
42
40 int rowCount(const QModelIndex &parent = QModelIndex()) const; 43 int rowCount(const QModelIndex &parent = QModelIndex()) const;
41 int columnCount(const QModelIndex &parent = QModelIndex()) const; 44 int columnCount(const QModelIndex &parent = QModelIndex()) const;
42 QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const; 45 QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const;
@@ -65,5 +68,6 @@ private:
65 QList<QByteArray> mPropertyColumns; 68 QList<QByteArray> mPropertyColumns;
66 Akonadi2::Query mQuery; 69 Akonadi2::Query mQuery;
67 std::function<void(const Ptr &)> loadEntities; 70 std::function<void(const Ptr &)> loadEntities;
71 typename Akonadi2::ResultEmitter<Ptr>::Ptr mEmitter;
68}; 72};
69 73
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
new file mode 100644
index 0000000..4159112
--- /dev/null
+++ b/common/queryrunner.cpp
@@ -0,0 +1,292 @@
1/*
2 Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com>
3
4 This library is free software; you can redistribute it and/or modify it
5 under the terms of the GNU Library General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or (at your
7 option) any later version.
8
9 This library is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12 License for more details.
13
14 You should have received a copy of the GNU Library General Public License
15 along with this library; see the file COPYING.LIB. If not, write to the
16 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17 02110-1301, USA.
18*/
19#include "queryrunner.h"
20
21#include <QtConcurrent/QtConcurrentRun>
22#include <QFuture>
23#include <QFutureWatcher>
24#include "commands.h"
25#include "log.h"
26#include "storage.h"
27#include "definitions.h"
28#include "domainadaptor.h"
29
30using namespace Akonadi2;
31
32static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
33{
34 //TODO use a result set with an iterator, to read values on demand
35 QVector<QByteArray> keys;
36 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
37 //Skip internals
38 if (Akonadi2::Storage::isInternalKey(key)) {
39 return true;
40 }
41 keys << Akonadi2::Storage::uidFromKey(key);
42 return true;
43 },
44 [](const Akonadi2::Storage::Error &error) {
45 qWarning() << "Error during query: " << error.message;
46 });
47
48 Trace() << "Full scan found " << keys.size() << " results";
49 return ResultSet(keys);
50}
51
52template<class DomainType>
53QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType)
54 : QueryRunnerBase(),
55 mResourceAccess(resourceAccess),
56 mResultProvider(new ResultProvider<typename DomainType::Ptr>),
57 mDomainTypeAdaptorFactory(factory),
58 mQuery(query),
59 mResourceInstanceIdentifier(instanceIdentifier),
60 mBufferType(bufferType)
61{
62 Trace() << "Starting query";
63 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
64 mResultProvider->setFetcher([this, query](const typename DomainType::Ptr &parent) {
65 Trace() << "Running fetcher";
66
67 // auto watcher = new QFutureWatcher<qint64>;
68 // QObject::connect(watcher, &QFutureWatcher::finished, [](qint64 newRevision) {
69 // mResourceAccess->sendRevisionReplayedCommand(newRevision);
70 // });
71 // auto future = QtConcurrent::run([&resultProvider]() -> qint64 {
72 // const qint64 newRevision = executeInitialQuery(query, parent, resultProvider);
73 // return newRevision;
74 // });
75 // watcher->setFuture(future);
76 const qint64 newRevision = executeInitialQuery(query, parent, *mResultProvider);
77 mResourceAccess->sendRevisionReplayedCommand(newRevision);
78 });
79
80
81 //In case of a live query we keep the runner for as long alive as the result provider exists
82 if (query.liveQuery) {
83 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
84 setQuery([this, query] () -> KAsync::Job<void> {
85 return KAsync::start<void>([this, query](KAsync::Future<void> &future) {
86 //TODO execute in thread
87 const qint64 newRevision = executeIncrementalQuery(query, *mResultProvider);
88 mResourceAccess->sendRevisionReplayedCommand(newRevision);
89 future.setFinished();
90 });
91 });
92 //Ensure the connection is open, if it wasn't already opened
93 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
94 mResourceAccess->open();
95 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged);
96 }
97}
98
99template<class DomainType>
100typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter()
101{
102 return mResultProvider->emitter();
103}
104
105//TODO move into result provider?
106template<class DomainType>
107void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
108{
109 // Trace() << "Replay set";
110 while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
111 switch (operation) {
112 case Akonadi2::Operation_Creation:
113 // Trace() << "Got creation";
114 resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
115 break;
116 case Akonadi2::Operation_Modification:
117 // Trace() << "Got modification";
118 resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
119 break;
120 case Akonadi2::Operation_Removal:
121 // Trace() << "Got removal";
122 resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
123 break;
124 }
125 return true;
126 })){};
127}
128
129template<class DomainType>
130void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
131{
132 //This only works for a 1:1 mapping of resource to domain types.
133 //Not i.e. for tags that are stored as flags in each entity of an imap store.
134 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
135 //could be added to the adaptor.
136 db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
137 Akonadi2::EntityBuffer buffer(value.data(), value.size());
138 const Akonadi2::Entity &entity = buffer.entity();
139 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
140 Q_ASSERT(metadataBuffer);
141 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
142 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
143 return false;
144 },
145 [](const Akonadi2::Storage::Error &error) {
146 qWarning() << "Error during query: " << error.message;
147 });
148}
149
150template<class DomainType>
151ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
152{
153 QSet<QByteArray> appliedFilters;
154 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
155 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
156
157 //We do a full scan if there were no indexes available to create the initial set.
158 if (appliedFilters.isEmpty()) {
159 //TODO this should be replaced by an index lookup as well
160 resultSet = fullScan(transaction, mBufferType);
161 }
162 return resultSet;
163}
164
165template<class DomainType>
166ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
167{
168 const auto bufferType = mBufferType;
169 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
170 remainingFilters = query.propertyFilter.keys().toSet();
171 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
172 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
173 //Spit out the revision keys one by one.
174 while (*revisionCounter <= topRevision) {
175 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
176 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
177 // Trace() << "Revision" << *revisionCounter << type << uid;
178 if (type != bufferType) {
179 //Skip revision
180 *revisionCounter += 1;
181 continue;
182 }
183 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
184 *revisionCounter += 1;
185 return key;
186 }
187 Trace() << "Finished reading incremental result set:" << *revisionCounter;
188 //We're done
189 return QByteArray();
190 });
191}
192
193template<class DomainType>
194ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery)
195{
196 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
197
198 //Read through the source values and return whatever matches the filter
199 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
200 while (resultSetPtr->next()) {
201 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
202 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
203 //Always remove removals, they probably don't match due to non-available properties
204 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
205 Trace() << "entity is not filtered" << initialQuery;
206 if (initialQuery) {
207 //We're not interested in removals during the initial query
208 if (operation != Akonadi2::Operation_Removal) {
209 callback(domainObject, Akonadi2::Operation_Creation);
210 }
211 } else {
212 callback(domainObject, operation);
213 }
214 }
215 });
216 }
217 return false;
218 };
219 return ResultSet(generator);
220}
221
222
223template<class DomainType>
224std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryRunner<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
225{
226 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
227 for (const auto &filterProperty : remainingFilters) {
228 const auto property = domainObject->getProperty(filterProperty);
229 if (property.isValid()) {
230 //TODO implement other comparison operators than equality
231 if (property != query.propertyFilter.value(filterProperty)) {
232 Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty);
233 return false;
234 }
235 } else {
236 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
237 }
238 }
239 return true;
240 };
241}
242
243template<class DomainType>
244qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery)
245{
246 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
247 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
248 Warning() << "Error during query: " << error.store << error.message;
249 });
250 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
251 auto db = transaction.openDatabase(mBufferType + ".main");
252
253 QSet<QByteArray> remainingFilters;
254 auto resultSet = baseSetRetriever(transaction, remainingFilters);
255 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery);
256 replaySet(filteredSet, resultProvider);
257 resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction));
258 return Akonadi2::Storage::maxRevision(transaction);
259}
260
261
262template<class DomainType>
263qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
264{
265 const qint64 baseRevision = resultProvider.revision() + 1;
266 Trace() << "Running incremental query " << baseRevision;
267 return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
268 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
269 }, resultProvider, false);
270}
271
272template<class DomainType>
273qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
274{
275 auto modifiedQuery = query;
276 if (!query.parentProperty.isEmpty()) {
277 if (parent) {
278 Trace() << "Running initial query for parent:" << parent->identifier();
279 modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier());
280 } else {
281 Trace() << "Running initial query for toplevel";
282 modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant());
283 }
284 }
285 return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
286 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters);
287 }, resultProvider, true);
288}
289
290template class QueryRunner<Akonadi2::ApplicationDomain::Folder>;
291template class QueryRunner<Akonadi2::ApplicationDomain::Mail>;
292template class QueryRunner<Akonadi2::ApplicationDomain::Event>;
diff --git a/common/queryrunner.h b/common/queryrunner.h
new file mode 100644
index 0000000..e2af9de
--- /dev/null
+++ b/common/queryrunner.h
@@ -0,0 +1,107 @@
1/*
2 Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com>
3
4 This library is free software; you can redistribute it and/or modify it
5 under the terms of the GNU Library General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or (at your
7 option) any later version.
8
9 This library is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12 License for more details.
13
14 You should have received a copy of the GNU Library General Public License
15 along with this library; see the file COPYING.LIB. If not, write to the
16 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17 02110-1301, USA.
18*/
19
20#pragma once
21
22#include <QObject>
23#include "facadeinterface.h"
24#include "resourceaccess.h"
25#include "resultprovider.h"
26#include "domaintypeadaptorfactoryinterface.h"
27#include "storage.h"
28#include "query.h"
29
30/**
31 * A QueryRunner runs a query and updates the corresponding result set.
32 *
33 * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work),
34 * and by how long a result set must be updated. If the query is one off the runner dies after the execution,
35 * otherwise it lives on the react to changes and updates the corresponding result set.
36 *
37 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
38 */
39
40class QueryRunnerBase : public QObject
41{
42 Q_OBJECT
43protected:
44 typedef std::function<KAsync::Job<void>()> QueryFunction;
45
46 /**
47 * Set the query to run
48 */
49 void setQuery(const QueryFunction &query)
50 {
51 queryFunction = query;
52 }
53
54
55protected slots:
56 /**
57 * Rerun query with new revision
58 */
59 void revisionChanged(qint64 newRevision)
60 {
61 Trace() << "New revision: " << newRevision;
62 run().exec();
63 }
64
65private:
66 /**
67 * Starts query
68 */
69 KAsync::Job<void> run(qint64 newRevision = 0)
70 {
71 return queryFunction();
72 }
73
74 QueryFunction queryFunction;
75};
76
77template<typename DomainType>
78class QueryRunner : public QueryRunnerBase
79{
80public:
81 QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType);
82
83 typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr emitter();
84
85private:
86 static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
87
88 void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback);
89
90 ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
91 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
92
93 ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery);
94 std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query);
95 qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery);
96 qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
97 qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
98
99private:
100 QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > mResultProvider;
101 QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess;
102 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory;
103 QByteArray mResourceInstanceIdentifier;
104 QByteArray mBufferType;
105 Akonadi2::Query mQuery;
106};
107
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 8e27054..e87a1f7 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -37,6 +37,8 @@ class ResourceAccessInterface : public QObject
37{ 37{
38 Q_OBJECT 38 Q_OBJECT
39public: 39public:
40 typedef QSharedPointer<ResourceAccessInterface> Ptr;
41
40 ResourceAccessInterface() {} 42 ResourceAccessInterface() {}
41 virtual ~ResourceAccessInterface() {} 43 virtual ~ResourceAccessInterface() {}
42 virtual KAsync::Job<void> sendCommand(int commandId) = 0; 44 virtual KAsync::Job<void> sendCommand(int commandId) = 0;
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp
index 1796271..3d207e4 100644
--- a/common/resourcefacade.cpp
+++ b/common/resourcefacade.cpp
@@ -54,9 +54,15 @@ KAsync::Job<void> ResourceFacade::remove(const Akonadi2::ApplicationDomain::Akon
54 }); 54 });
55} 55}
56 56
57KAsync::Job<void> ResourceFacade::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr> &resultProvider) 57QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<Akonadi2::ApplicationDomain::AkonadiResource::Ptr>::Ptr > ResourceFacade::load(const Akonadi2::Query &query)
58{ 58{
59 return KAsync::start<void>([query, &resultProvider]() { 59 auto resultProvider = new Akonadi2::ResultProvider<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr>();
60 auto emitter = resultProvider->emitter();
61 resultProvider->setFetcher([](const QSharedPointer<Akonadi2::ApplicationDomain::AkonadiResource> &) {});
62 resultProvider->onDone([resultProvider]() {
63 delete resultProvider;
64 });
65 auto job = KAsync::start<void>([query, resultProvider]() {
60 const auto configuredResources = ResourceConfig::getResources(); 66 const auto configuredResources = ResourceConfig::getResources();
61 for (const auto &res : configuredResources.keys()) { 67 for (const auto &res : configuredResources.keys()) {
62 const auto type = configuredResources.value(res); 68 const auto type = configuredResources.value(res);
@@ -64,12 +70,13 @@ KAsync::Job<void> ResourceFacade::load(const Akonadi2::Query &query, Akonadi2::R
64 auto resource = Akonadi2::ApplicationDomain::AkonadiResource::Ptr::create(); 70 auto resource = Akonadi2::ApplicationDomain::AkonadiResource::Ptr::create();
65 resource->setProperty("identifier", res); 71 resource->setProperty("identifier", res);
66 resource->setProperty("type", type); 72 resource->setProperty("type", type);
67 resultProvider.add(resource); 73 resultProvider->add(resource);
68 } 74 }
69 } 75 }
70 //TODO initialResultSetComplete should be implicit 76 //TODO initialResultSetComplete should be implicit
71 resultProvider.initialResultSetComplete(); 77 resultProvider->initialResultSetComplete();
72 resultProvider.complete(); 78 resultProvider->complete();
73 }); 79 });
80 return qMakePair(job, emitter);
74} 81}
75 82
diff --git a/common/resourcefacade.h b/common/resourcefacade.h
index 123b481..38e0c0e 100644
--- a/common/resourcefacade.h
+++ b/common/resourcefacade.h
@@ -37,5 +37,6 @@ public:
37 KAsync::Job<void> create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; 37 KAsync::Job<void> create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE;
38 KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; 38 KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE;
39 KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; 39 KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE;
40 KAsync::Job<void> load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr> &resultProvider) Q_DECL_OVERRIDE; 40 QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<Akonadi2::ApplicationDomain::AkonadiResource::Ptr>::Ptr > load(const Akonadi2::Query &query) Q_DECL_OVERRIDE;
41}; 41};
42
diff --git a/common/resultprovider.h b/common/resultprovider.h
index 921cd6b..86382ef 100644
--- a/common/resultprovider.h
+++ b/common/resultprovider.h
@@ -20,12 +20,12 @@
20 20
21#pragma once 21#pragma once
22 22
23#include <QThread>
23#include <functional> 24#include <functional>
24#include <memory> 25#include <memory>
25#include "threadboundary.h" 26#include "threadboundary.h"
26#include "resultset.h" 27#include "resultset.h"
27#include "log.h" 28#include "log.h"
28#include "modelresult.h"
29 29
30using namespace async; 30using namespace async;
31 31
@@ -53,12 +53,7 @@ public:
53 virtual void initialResultSetComplete() = 0; 53 virtual void initialResultSetComplete() = 0;
54 virtual void complete() = 0; 54 virtual void complete() = 0;
55 virtual void clear() = 0; 55 virtual void clear() = 0;
56 virtual void setFetcher(const std::function<void(const T &parent)> &fetcher) 56 virtual void setFetcher(const std::function<void(const T &parent)> &fetcher) = 0;
57 {
58 }
59
60 virtual void setFacade(const std::shared_ptr<void> &facade) = 0;
61 virtual void setQueryRunner(const QSharedPointer<QObject> &runner) = 0;
62 57
63 void setRevision(qint64 revision) 58 void setRevision(qint64 revision)
64 { 59 {
@@ -74,101 +69,6 @@ private:
74 qint64 mRevision; 69 qint64 mRevision;
75}; 70};
76 71
77template<class T, class Ptr>
78class ModelResultProvider : public ResultProviderInterface<Ptr> {
79public:
80 ModelResultProvider(QWeakPointer<ModelResult<T, Ptr> > model)
81 : ResultProviderInterface<Ptr>(),
82 mModel(model)
83 {
84
85 }
86
87 void add(const Ptr &value)
88 {
89 if (auto model = mModel.toStrongRef()) {
90 model->add(value);
91 }
92 }
93
94 void modify(const Ptr &value)
95 {
96 if (auto model = mModel.toStrongRef()) {
97 model->modify(value);
98 }
99 }
100
101 void remove(const Ptr &value)
102 {
103 if (auto model = mModel.toStrongRef()) {
104 model->remove(value);
105 }
106 }
107
108 void initialResultSetComplete()
109 {
110 // mResultEmitter->initialResultSetComplete();
111 }
112
113 void complete()
114 {
115 // mResultEmitter->complete();
116 }
117
118 void clear()
119 {
120 // mResultEmitter->clear();
121 }
122
123 /**
124 * For lifetimemanagement only.
125 * We keep the runner alive as long as the result provider exists.
126 */
127 void setFacade(const std::shared_ptr<void> &facade)
128 {
129 mFacade = facade;
130 }
131
132 void onDone(const std::function<void()> &callback)
133 {
134 mOnDoneCallback = callback;
135 }
136
137 bool isDone() const
138 {
139 //The existance of the emitter currently defines wether we're done or not.
140 // return mResultEmitter.toStrongRef().isNull();
141 return true;
142 }
143
144 void setFetcher(const std::function<void(const Ptr &parent)> &fetcher)
145 {
146 if (auto model = mModel.toStrongRef()) {
147 model->setFetcher(fetcher);
148 }
149 }
150
151 void setQueryRunner(const QSharedPointer<QObject> &runner)
152 {
153 mQueryRunner = runner;
154 }
155
156private:
157 void done()
158 {
159 qWarning() << "done";
160 if (mOnDoneCallback) {
161 mOnDoneCallback();
162 mOnDoneCallback = std::function<void()>();
163 }
164 }
165
166 QWeakPointer<ModelResult<T, Ptr> > mModel;
167 QSharedPointer<QObject> mQueryRunner;
168 std::shared_ptr<void> mFacade;
169 std::function<void()> mOnDoneCallback;
170};
171
172/* 72/*
173* The promise side for the result emitter 73* The promise side for the result emitter
174*/ 74*/
@@ -204,6 +104,12 @@ private:
204 } 104 }
205 105
206public: 106public:
107 typedef QSharedPointer<ResultProvider<T> > Ptr;
108
109 ~ResultProvider()
110 {
111 }
112
207 //Called from worker thread 113 //Called from worker thread
208 void add(const T &value) 114 void add(const T &value)
209 { 115 {
@@ -261,30 +167,16 @@ public:
261 //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again 167 //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again
262 auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ mThreadBoundary->callInMainThread([this]() {done();}); delete emitter; }); 168 auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ mThreadBoundary->callInMainThread([this]() {done();}); delete emitter; });
263 mResultEmitter = sharedPtr; 169 mResultEmitter = sharedPtr;
170 sharedPtr->setFetcher([this](const T &parent) {
171 Q_ASSERT(mFetcher);
172 mFetcher(parent);
173 });
264 return sharedPtr; 174 return sharedPtr;
265 } 175 }
266 176
267 return mResultEmitter.toStrongRef(); 177 return mResultEmitter.toStrongRef();
268 } 178 }
269 179
270 /**
271 * For lifetimemanagement only.
272 * We keep the runner alive as long as the result provider exists.
273 */
274 void setQueryRunner(const QSharedPointer<QObject> &runner)
275 {
276 mQueryRunner = runner;
277 }
278
279 /**
280 * For lifetimemanagement only.
281 * We keep the runner alive as long as the result provider exists.
282 */
283 void setFacade(const std::shared_ptr<void> &facade)
284 {
285 mFacade = facade;
286 }
287
288 void onDone(const std::function<void()> &callback) 180 void onDone(const std::function<void()> &callback)
289 { 181 {
290 mThreadBoundary = QSharedPointer<ThreadBoundary>::create(); 182 mThreadBoundary = QSharedPointer<ThreadBoundary>::create();
@@ -299,7 +191,7 @@ public:
299 191
300 void setFetcher(const std::function<void(const T &parent)> &fetcher) 192 void setFetcher(const std::function<void(const T &parent)> &fetcher)
301 { 193 {
302 fetcher(T()); 194 mFetcher = fetcher;
303 } 195 }
304 196
305private: 197private:
@@ -307,16 +199,17 @@ private:
307 { 199 {
308 qWarning() << "done"; 200 qWarning() << "done";
309 if (mOnDoneCallback) { 201 if (mOnDoneCallback) {
310 mOnDoneCallback(); 202 auto callback = mOnDoneCallback;
311 mOnDoneCallback = std::function<void()>(); 203 mOnDoneCallback = std::function<void()>();
204 //This may delete this object
205 callback();
312 } 206 }
313 } 207 }
314 208
315 QWeakPointer<ResultEmitter<T> > mResultEmitter; 209 QWeakPointer<ResultEmitter<T> > mResultEmitter;
316 QSharedPointer<QObject> mQueryRunner;
317 std::shared_ptr<void> mFacade;
318 std::function<void()> mOnDoneCallback; 210 std::function<void()> mOnDoneCallback;
319 QSharedPointer<ThreadBoundary> mThreadBoundary; 211 QSharedPointer<ThreadBoundary> mThreadBoundary;
212 std::function<void(const T &parent)> mFetcher;
320}; 213};
321 214
322/* 215/*
@@ -334,6 +227,8 @@ private:
334template<class DomainType> 227template<class DomainType>
335class ResultEmitter { 228class ResultEmitter {
336public: 229public:
230 typedef QSharedPointer<ResultEmitter<DomainType> > Ptr;
231
337 void onAdded(const std::function<void(const DomainType&)> &handler) 232 void onAdded(const std::function<void(const DomainType&)> &handler)
338 { 233 {
339 addHandler = handler; 234 addHandler = handler;
@@ -394,6 +289,13 @@ public:
394 clearHandler(); 289 clearHandler();
395 } 290 }
396 291
292 void setFetcher(const std::function<void(const DomainType &parent)> &fetcher)
293 {
294 mFetcher = fetcher;
295 }
296
297 std::function<void(const DomainType &parent)> mFetcher;
298
397private: 299private:
398 friend class ResultProvider<DomainType>; 300 friend class ResultProvider<DomainType>;
399 301
diff --git a/common/threadboundary.cpp b/common/threadboundary.cpp
index 47ec508..48fd11a 100644
--- a/common/threadboundary.cpp
+++ b/common/threadboundary.cpp
@@ -24,6 +24,9 @@ Q_DECLARE_METATYPE(std::function<void()>);
24 24
25namespace async { 25namespace async {
26ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType<std::function<void()> >("std::function<void()>"); } 26ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType<std::function<void()> >("std::function<void()>"); }
27ThreadBoundary:: ~ThreadBoundary() {} 27ThreadBoundary:: ~ThreadBoundary()
28{
29}
30
28} 31}
29 32