summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp169
1 files changed, 121 insertions, 48 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 25c9d5b..af232c3 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -30,57 +30,89 @@
30 30
31using namespace Akonadi2; 31using namespace Akonadi2;
32 32
33static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) 33/*
34 * This class wraps the actual query implementation.
35 *
36 * This is a worker object that can be moved to a thread to execute the query.
37 * The only interaction point is the ResultProvider, which handles the threadsafe reporting of the result.
38 */
39template<typename DomainType>
40class QueryWorker : public QObject
34{ 41{
35 //TODO use a result set with an iterator, to read values on demand 42public:
36 QVector<QByteArray> keys; 43 QueryWorker(const Akonadi2::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType);
37 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { 44 virtual ~QueryWorker();
38 //Skip internals 45
39 if (Akonadi2::Storage::isInternalKey(key)) { 46 qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
40 return true; 47 qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
41 } 48
42 keys << Akonadi2::Storage::uidFromKey(key); 49private:
43 return true; 50 static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties);
44 }, 51
45 [](const Akonadi2::Storage::Error &error) { 52 void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback);
46 qWarning() << "Error during query: " << error.message; 53
47 }); 54 ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
55 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
56
57 ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery);
58 std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query);
59 qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery);
60
61private:
62 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory;
63 QByteArray mResourceInstanceIdentifier;
64 QByteArray mBufferType;
65 Akonadi2::Query mQuery;
66};
48 67
49 Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results";
50 return ResultSet(keys);
51}
52 68
53template<class DomainType> 69template<class DomainType>
54QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) 70QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType)
55 : QueryRunnerBase(), 71 : QueryRunnerBase(),
56 mResultProvider(new ResultProvider<typename DomainType::Ptr>),
57 mResourceAccess(resourceAccess), 72 mResourceAccess(resourceAccess),
58 mDomainTypeAdaptorFactory(factory), 73 mResultProvider(new ResultProvider<typename DomainType::Ptr>)
59 mResourceInstanceIdentifier(instanceIdentifier),
60 mBufferType(bufferType),
61 mQuery(query)
62{ 74{
63 Trace() << "Starting query"; 75 Trace() << "Starting query";
64 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. 76 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
65 mResultProvider->setFetcher([this, query](const typename DomainType::Ptr &parent) { 77 mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) {
66 Trace() << "Running fetcher"; 78 Trace() << "Running fetcher";
67 const qint64 newRevision = executeInitialQuery(query, parent, *mResultProvider); 79 auto resultProvider = mResultProvider;
68 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 80 auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 {
69 if (query.liveQuery) { 81 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType);
70 mResourceAccess->sendRevisionReplayedCommand(newRevision); 82 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider);
71 } 83 return newRevision;
84 });
85 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
86 if (query.liveQuery) {
87 auto watcher = new QFutureWatcher<qint64>;
88 watcher->setFuture(result);
89 QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, watcher]() {
90 const auto newRevision = watcher->future().result();
91 mResourceAccess->sendRevisionReplayedCommand(newRevision);
92 delete watcher;
93 });
94 }
72 }); 95 });
73 96
74 97 // In case of a live query we keep the runner for as long alive as the result provider exists
75 //In case of a live query we keep the runner for as long alive as the result provider exists
76 if (query.liveQuery) { 98 if (query.liveQuery) {
77 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 99 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
78 setQuery([this, query] () -> KAsync::Job<void> { 100 setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job<void> {
79 return KAsync::start<void>([this, query](KAsync::Future<void> &future) { 101 return KAsync::start<void>([this, query, instanceIdentifier, factory, bufferType](KAsync::Future<void> &future) {
80 //TODO execute in thread 102 auto resultProvider = mResultProvider;
81 const qint64 newRevision = executeIncrementalQuery(query, *mResultProvider); 103 auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 {
82 mResourceAccess->sendRevisionReplayedCommand(newRevision); 104 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType);
83 future.setFinished(); 105 const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider);
106 return newRevision;
107 });
108 auto watcher = new QFutureWatcher<qint64>;
109 watcher->setFuture(result);
110 QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, &future, watcher]() {
111 const auto newRevision = watcher->future().result();
112 mResourceAccess->sendRevisionReplayedCommand(newRevision);
113 future.setFinished();
114 delete watcher;
115 });
84 }); 116 });
85 }); 117 });
86 //Ensure the connection is open, if it wasn't already opened 118 //Ensure the connection is open, if it wasn't already opened
@@ -102,8 +134,48 @@ typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<Doma
102 return mResultProvider->emitter(); 134 return mResultProvider->emitter();
103} 135}
104 136
137
138
139static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
140{
141 //TODO use a result set with an iterator, to read values on demand
142 QVector<QByteArray> keys;
143 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
144 //Skip internals
145 if (Akonadi2::Storage::isInternalKey(key)) {
146 return true;
147 }
148 keys << Akonadi2::Storage::uidFromKey(key);
149 return true;
150 },
151 [](const Akonadi2::Storage::Error &error) {
152 qWarning() << "Error during query: " << error.message;
153 });
154
155 Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results";
156 return ResultSet(keys);
157}
158
159
105template<class DomainType> 160template<class DomainType>
106void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties) 161QueryWorker<DomainType>::QueryWorker(const Akonadi2::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType)
162 : QObject(),
163 mDomainTypeAdaptorFactory(factory),
164 mResourceInstanceIdentifier(instanceIdentifier),
165 mBufferType(bufferType),
166 mQuery(query)
167{
168 Trace() << "Starting query worker";
169}
170
171template<class DomainType>
172QueryWorker<DomainType>::~QueryWorker()
173{
174 Trace() << "Stopped query worker";
175}
176
177template<class DomainType>
178void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties)
107{ 179{
108 int counter = 0; 180 int counter = 0;
109 while (resultSet.next([&resultProvider, &counter, &properties](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { 181 while (resultSet.next([&resultProvider, &counter, &properties](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
@@ -128,7 +200,7 @@ void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultPr
128} 200}
129 201
130template<class DomainType> 202template<class DomainType>
131void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback) 203void QueryWorker<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
132{ 204{
133 //This only works for a 1:1 mapping of resource to domain types. 205 //This only works for a 1:1 mapping of resource to domain types.
134 //Not i.e. for tags that are stored as flags in each entity of an imap store. 206 //Not i.e. for tags that are stored as flags in each entity of an imap store.
@@ -150,7 +222,7 @@ void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase
150} 222}
151 223
152template<class DomainType> 224template<class DomainType>
153ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) 225ResultSet QueryWorker<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
154{ 226{
155 if (!query.ids.isEmpty()) { 227 if (!query.ids.isEmpty()) {
156 return ResultSet(query.ids.toVector()); 228 return ResultSet(query.ids.toVector());
@@ -168,7 +240,7 @@ ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &q
168} 240}
169 241
170template<class DomainType> 242template<class DomainType>
171ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) 243ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
172{ 244{
173 const auto bufferType = mBufferType; 245 const auto bufferType = mBufferType;
174 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); 246 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
@@ -196,7 +268,7 @@ ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision,
196} 268}
197 269
198template<class DomainType> 270template<class DomainType>
199ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) 271ResultSet QueryWorker<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery)
200{ 272{
201 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); 273 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
202 274
@@ -225,9 +297,8 @@ ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const s
225 return ResultSet(generator); 297 return ResultSet(generator);
226} 298}
227 299
228
229template<class DomainType> 300template<class DomainType>
230std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryRunner<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query) 301std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryWorker<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
231{ 302{
232 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { 303 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
233 if (!query.ids.isEmpty()) { 304 if (!query.ids.isEmpty()) {
@@ -252,7 +323,7 @@ std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr
252} 323}
253 324
254template<class DomainType> 325template<class DomainType>
255qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) 326qint64 QueryWorker<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery)
256{ 327{
257 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); 328 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
258 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { 329 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
@@ -269,9 +340,8 @@ qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::fu
269 return Akonadi2::Storage::maxRevision(transaction); 340 return Akonadi2::Storage::maxRevision(transaction);
270} 341}
271 342
272
273template<class DomainType> 343template<class DomainType>
274qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 344qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
275{ 345{
276 QTime time; 346 QTime time;
277 time.start(); 347 time.start();
@@ -286,7 +356,7 @@ qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &q
286} 356}
287 357
288template<class DomainType> 358template<class DomainType>
289qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 359qint64 QueryWorker<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
290{ 360{
291 QTime time; 361 QTime time;
292 time.start(); 362 time.start();
@@ -312,3 +382,6 @@ qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query
312template class QueryRunner<Akonadi2::ApplicationDomain::Folder>; 382template class QueryRunner<Akonadi2::ApplicationDomain::Folder>;
313template class QueryRunner<Akonadi2::ApplicationDomain::Mail>; 383template class QueryRunner<Akonadi2::ApplicationDomain::Mail>;
314template class QueryRunner<Akonadi2::ApplicationDomain::Event>; 384template class QueryRunner<Akonadi2::ApplicationDomain::Event>;
385template class QueryWorker<Akonadi2::ApplicationDomain::Folder>;
386template class QueryWorker<Akonadi2::ApplicationDomain::Mail>;
387template class QueryWorker<Akonadi2::ApplicationDomain::Event>;