summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-11-30 18:49:04 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-11-30 18:49:04 +0100
commit412563b7ff18684f9786f4e40b1a4d538f2d5233 (patch)
tree3190a317306cfb71b0d5d9bc4c0f06b260a92ce6 /common/queryrunner.cpp
parent790991aa1007d3271d80bc7e77f5b4f86c9bcef0 (diff)
parent6ad307dd846d07f1b55a1679a8d2eb47525af57d (diff)
downloadsink-412563b7ff18684f9786f4e40b1a4d538f2d5233.tar.gz
sink-412563b7ff18684f9786f4e40b1a4d538f2d5233.zip
Merge branch 'feature/modelresult' into develop
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp312
1 files changed, 312 insertions, 0 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
new file mode 100644
index 0000000..e365cfc
--- /dev/null
+++ b/common/queryrunner.cpp
@@ -0,0 +1,312 @@
1/*
2 Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com>
3
4 This library is free software; you can redistribute it and/or modify it
5 under the terms of the GNU Library General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or (at your
7 option) any later version.
8
9 This library is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12 License for more details.
13
14 You should have received a copy of the GNU Library General Public License
15 along with this library; see the file COPYING.LIB. If not, write to the
16 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17 02110-1301, USA.
18*/
19#include "queryrunner.h"
20
21#include <QtConcurrent/QtConcurrentRun>
22#include <QFuture>
23#include <QFutureWatcher>
24#include <QTime>
25#include "commands.h"
26#include "log.h"
27#include "storage.h"
28#include "definitions.h"
29#include "domainadaptor.h"
30
31using namespace Akonadi2;
32
33static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
34{
35 //TODO use a result set with an iterator, to read values on demand
36 QVector<QByteArray> keys;
37 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
38 //Skip internals
39 if (Akonadi2::Storage::isInternalKey(key)) {
40 return true;
41 }
42 keys << Akonadi2::Storage::uidFromKey(key);
43 return true;
44 },
45 [](const Akonadi2::Storage::Error &error) {
46 qWarning() << "Error during query: " << error.message;
47 });
48
49 Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results";
50 return ResultSet(keys);
51}
52
53template<class DomainType>
54QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType)
55 : QueryRunnerBase(),
56 mResourceAccess(resourceAccess),
57 mResultProvider(new ResultProvider<typename DomainType::Ptr>),
58 mDomainTypeAdaptorFactory(factory),
59 mQuery(query),
60 mResourceInstanceIdentifier(instanceIdentifier),
61 mBufferType(bufferType)
62{
63 Trace() << "Starting query";
64 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
65 mResultProvider->setFetcher([this, query](const typename DomainType::Ptr &parent) {
66 Trace() << "Running fetcher";
67
68 // auto watcher = new QFutureWatcher<qint64>;
69 // QObject::connect(watcher, &QFutureWatcher::finished, [](qint64 newRevision) {
70 // mResourceAccess->sendRevisionReplayedCommand(newRevision);
71 // });
72 // auto future = QtConcurrent::run([&resultProvider]() -> qint64 {
73 // const qint64 newRevision = executeInitialQuery(query, parent, resultProvider);
74 // return newRevision;
75 // });
76 // watcher->setFuture(future);
77 const qint64 newRevision = executeInitialQuery(query, parent, *mResultProvider);
78 mResourceAccess->sendRevisionReplayedCommand(newRevision);
79 });
80
81
82 //In case of a live query we keep the runner for as long alive as the result provider exists
83 if (query.liveQuery) {
84 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
85 setQuery([this, query] () -> KAsync::Job<void> {
86 return KAsync::start<void>([this, query](KAsync::Future<void> &future) {
87 //TODO execute in thread
88 const qint64 newRevision = executeIncrementalQuery(query, *mResultProvider);
89 mResourceAccess->sendRevisionReplayedCommand(newRevision);
90 future.setFinished();
91 });
92 });
93 //Ensure the connection is open, if it wasn't already opened
94 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
95 mResourceAccess->open();
96 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged);
97 }
98}
99
100template<class DomainType>
101QueryRunner<DomainType>::~QueryRunner()
102{
103 Trace() << "Stopped query";
104}
105
106template<class DomainType>
107typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter()
108{
109 return mResultProvider->emitter();
110}
111
112//TODO move into result provider?
113template<class DomainType>
114void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
115{
116 // Trace() << "Replay set";
117 int counter = 0;
118 while (resultSet.next([&resultProvider, &counter](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
119 counter++;
120 switch (operation) {
121 case Akonadi2::Operation_Creation:
122 // Trace() << "Got creation";
123 resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
124 break;
125 case Akonadi2::Operation_Modification:
126 // Trace() << "Got modification";
127 resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
128 break;
129 case Akonadi2::Operation_Removal:
130 // Trace() << "Got removal";
131 resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
132 break;
133 }
134 return true;
135 })){};
136 Trace() << "Replayed " << counter << " results";
137}
138
139template<class DomainType>
140void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
141{
142 //This only works for a 1:1 mapping of resource to domain types.
143 //Not i.e. for tags that are stored as flags in each entity of an imap store.
144 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
145 //could be added to the adaptor.
146 db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
147 Akonadi2::EntityBuffer buffer(value.data(), value.size());
148 const Akonadi2::Entity &entity = buffer.entity();
149 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
150 Q_ASSERT(metadataBuffer);
151 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
152 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
153 return false;
154 },
155 [](const Akonadi2::Storage::Error &error) {
156 qWarning() << "Error during query: " << error.message;
157 });
158}
159
160template<class DomainType>
161ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
162{
163 QSet<QByteArray> appliedFilters;
164 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
165 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
166
167 //We do a full scan if there were no indexes available to create the initial set.
168 if (appliedFilters.isEmpty()) {
169 //TODO this should be replaced by an index lookup as well
170 resultSet = fullScan(transaction, mBufferType);
171 }
172 return resultSet;
173}
174
175template<class DomainType>
176ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
177{
178 const auto bufferType = mBufferType;
179 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
180 remainingFilters = query.propertyFilter.keys().toSet();
181 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
182 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
183 //Spit out the revision keys one by one.
184 while (*revisionCounter <= topRevision) {
185 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
186 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
187 // Trace() << "Revision" << *revisionCounter << type << uid;
188 if (type != bufferType) {
189 //Skip revision
190 *revisionCounter += 1;
191 continue;
192 }
193 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
194 *revisionCounter += 1;
195 return key;
196 }
197 Trace() << "Finished reading incremental result set:" << *revisionCounter;
198 //We're done
199 return QByteArray();
200 });
201}
202
203template<class DomainType>
204ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery)
205{
206 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
207
208 //Read through the source values and return whatever matches the filter
209 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
210 while (resultSetPtr->next()) {
211 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
212 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
213 //Always remove removals, they probably don't match due to non-available properties
214 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
215 if (initialQuery) {
216 //We're not interested in removals during the initial query
217 if (operation != Akonadi2::Operation_Removal) {
218 callback(domainObject, Akonadi2::Operation_Creation);
219 }
220 } else {
221 callback(domainObject, operation);
222 }
223 }
224 });
225 }
226 return false;
227 };
228 return ResultSet(generator);
229}
230
231
232template<class DomainType>
233std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryRunner<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
234{
235 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
236 for (const auto &filterProperty : remainingFilters) {
237 const auto property = domainObject->getProperty(filterProperty);
238 if (property.isValid()) {
239 //TODO implement other comparison operators than equality
240 if (property != query.propertyFilter.value(filterProperty)) {
241 Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty);
242 return false;
243 }
244 } else {
245 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
246 }
247 }
248 return true;
249 };
250}
251
252template<class DomainType>
253qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery)
254{
255 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
256 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
257 Warning() << "Error during query: " << error.store << error.message;
258 });
259 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
260 auto db = transaction.openDatabase(mBufferType + ".main");
261
262 QSet<QByteArray> remainingFilters;
263 auto resultSet = baseSetRetriever(transaction, remainingFilters);
264 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery);
265 replaySet(filteredSet, resultProvider);
266 resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction));
267 return Akonadi2::Storage::maxRevision(transaction);
268}
269
270
271template<class DomainType>
272qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
273{
274 QTime time;
275 time.start();
276
277 const qint64 baseRevision = resultProvider.revision() + 1;
278 Trace() << "Running incremental query " << baseRevision;
279 auto revision = load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
280 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
281 }, resultProvider, false);
282 Trace() << "Incremental query took: " << time.elapsed() << " ms";
283 return revision;
284}
285
286template<class DomainType>
287qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
288{
289 QTime time;
290 time.start();
291
292 auto modifiedQuery = query;
293 if (!query.parentProperty.isEmpty()) {
294 if (parent) {
295 Trace() << "Running initial query for parent:" << parent->identifier();
296 modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier());
297 } else {
298 Trace() << "Running initial query for toplevel";
299 modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant());
300 }
301 }
302 auto revision = load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
303 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters);
304 }, resultProvider, true);
305 Trace() << "Initial query took: " << time.elapsed() << " ms";
306 resultProvider.initialResultSetComplete(parent);
307 return revision;
308}
309
310template class QueryRunner<Akonadi2::ApplicationDomain::Folder>;
311template class QueryRunner<Akonadi2::ApplicationDomain::Mail>;
312template class QueryRunner<Akonadi2::ApplicationDomain::Event>;