diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-11-27 17:30:04 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-11-27 17:30:04 +0100 |
commit | 5b41b26a349967acf2197f9f9228526193fd826e (patch) | |
tree | 166452bcc0757564deefe233bf031d2ccb0564d2 /common | |
parent | 13af56e436f49df32d3b2f6f223cf1dec2eabaac (diff) | |
download | sink-5b41b26a349967acf2197f9f9228526193fd826e.tar.gz sink-5b41b26a349967acf2197f9f9228526193fd826e.zip |
Introduced a QueryRunner object
The QueryRunner object lives for the duration of the query (so just
for the initial query for non-live queries, and for the lifetime of the
result model for live queries).
It's supposed to handle all the threading internally and decouple the
lifetime of the facade.
Diffstat (limited to 'common')
-rw-r--r-- | common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | common/clientapi.cpp | 62 | ||||
-rw-r--r-- | common/facade.cpp | 282 | ||||
-rw-r--r-- | common/facade.h | 17 | ||||
-rw-r--r-- | common/facadeinterface.h | 29 | ||||
-rw-r--r-- | common/modelresult.cpp | 22 | ||||
-rw-r--r-- | common/modelresult.h | 6 | ||||
-rw-r--r-- | common/queryrunner.cpp | 292 | ||||
-rw-r--r-- | common/queryrunner.h | 107 | ||||
-rw-r--r-- | common/resourceaccess.h | 2 | ||||
-rw-r--r-- | common/resourcefacade.cpp | 17 | ||||
-rw-r--r-- | common/resourcefacade.h | 3 | ||||
-rw-r--r-- | common/resultprovider.h | 150 | ||||
-rw-r--r-- | common/threadboundary.cpp | 5 |
14 files changed, 522 insertions, 473 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 01056d0..be312b9 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -26,6 +26,7 @@ set(command_SRCS | |||
26 | resource.cpp | 26 | resource.cpp |
27 | genericresource.cpp | 27 | genericresource.cpp |
28 | resourceaccess.cpp | 28 | resourceaccess.cpp |
29 | queryrunner.cpp | ||
29 | listener.cpp | 30 | listener.cpp |
30 | storage_common.cpp | 31 | storage_common.cpp |
31 | threadboundary.cpp | 32 | threadboundary.cpp |
diff --git a/common/clientapi.cpp b/common/clientapi.cpp index 02f8ce6..b24dfa8 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp | |||
@@ -34,6 +34,7 @@ | |||
34 | #include "definitions.h" | 34 | #include "definitions.h" |
35 | #include "resourceconfig.h" | 35 | #include "resourceconfig.h" |
36 | #include "facadefactory.h" | 36 | #include "facadefactory.h" |
37 | #include "modelresult.h" | ||
37 | #include "log.h" | 38 | #include "log.h" |
38 | 39 | ||
39 | #define ASYNCINTHREAD | 40 | #define ASYNCINTHREAD |
@@ -100,38 +101,8 @@ template <class DomainType> | |||
100 | QSharedPointer<ResultEmitter<typename DomainType::Ptr> > Store::load(Query query) | 101 | QSharedPointer<ResultEmitter<typename DomainType::Ptr> > Store::load(Query query) |
101 | { | 102 | { |
102 | auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); | 103 | auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); |
103 | 104 | qWarning() << "Main thread " << QThread::currentThreadId(); | |
104 | //Execute the search in a thread. | 105 | //FIXME remove |
105 | //We must guarantee that the emitter is returned before the first result is emitted. | ||
106 | //The result provider must be threadsafe. | ||
107 | async::run([query, resultSet](){ | ||
108 | QEventLoop eventLoop; | ||
109 | resultSet->onDone([&eventLoop](){ | ||
110 | eventLoop.quit(); | ||
111 | }); | ||
112 | // Query all resources and aggregate results | ||
113 | KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName<DomainType>())) | ||
114 | .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, KAsync::Future<void> &future) { | ||
115 | if (auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resource), resource)) { | ||
116 | facade->load(query, *resultSet).template then<void>([&future](){future.setFinished();}).exec(); | ||
117 | //Keep the facade alive for the lifetime of the resultSet. | ||
118 | resultSet->setFacade(facade); | ||
119 | } else { | ||
120 | //Ignore the error and carry on | ||
121 | future.setFinished(); | ||
122 | } | ||
123 | }).template then<void>([query, resultSet]() { | ||
124 | resultSet->initialResultSetComplete(); | ||
125 | if (!query.liveQuery) { | ||
126 | resultSet->complete(); | ||
127 | } | ||
128 | }).exec(); | ||
129 | |||
130 | //Keep the thread alive until the result is ready | ||
131 | if (!resultSet->isDone()) { | ||
132 | eventLoop.exec(); | ||
133 | } | ||
134 | }); | ||
135 | return resultSet->emitter(); | 106 | return resultSet->emitter(); |
136 | } | 107 | } |
137 | 108 | ||
@@ -139,28 +110,29 @@ template <class DomainType> | |||
139 | QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) | 110 | QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) |
140 | { | 111 | { |
141 | auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr> >::create(query, query.requestedProperties.toList()); | 112 | auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr> >::create(query, query.requestedProperties.toList()); |
142 | auto resultProvider = std::make_shared<ModelResultProvider<DomainType, typename DomainType::Ptr> >(model); | 113 | |
143 | //Keep the resultprovider alive for as long as the model lives | 114 | //* Client defines lifetime of model |
144 | model->setProperty("resultProvider", QVariant::fromValue(std::shared_ptr<void>(resultProvider))); | 115 | //* The model lifetime defines the duration of live-queries |
116 | //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks | ||
117 | //* The emitter needs to live or the duration of query (respectively, the model) | ||
118 | //* The result provider needs to live for as long as results are provided (until the last thread exits). | ||
145 | 119 | ||
146 | // Query all resources and aggregate results | 120 | // Query all resources and aggregate results |
147 | KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName<DomainType>())) | 121 | KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName<DomainType>())) |
148 | .template each<void, QByteArray>([query, resultProvider](const QByteArray &resource, KAsync::Future<void> &future) { | 122 | .template each<void, QByteArray>([query, model](const QByteArray &resource, KAsync::Future<void> &future) { |
149 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resource), resource); | 123 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resource), resource); |
150 | if (facade) { | 124 | if (facade) { |
151 | facade->load(query, *resultProvider).template then<void>([&future](){future.setFinished();}).exec(); | 125 | Trace() << "Trying to fetch from resource"; |
152 | //Keep the facade alive for the lifetime of the resultSet. | 126 | auto result = facade->load(query); |
153 | //FIXME this would have to become a list | 127 | auto emitter = result.second; |
154 | resultProvider->setFacade(facade); | 128 | //TODO use aggregating emitter instead |
129 | model->setEmitter(emitter); | ||
130 | model->fetchMore(QModelIndex()); | ||
131 | result.first.template then<void>([&future](){future.setFinished();}).exec(); | ||
155 | } else { | 132 | } else { |
156 | //Ignore the error and carry on | 133 | //Ignore the error and carry on |
157 | future.setFinished(); | 134 | future.setFinished(); |
158 | } | 135 | } |
159 | }).template then<void>([query, resultProvider]() { | ||
160 | resultProvider->initialResultSetComplete(); | ||
161 | if (!query.liveQuery) { | ||
162 | resultProvider->complete(); | ||
163 | } | ||
164 | }).exec(); | 136 | }).exec(); |
165 | 137 | ||
166 | return model; | 138 | return model; |
diff --git a/common/facade.cpp b/common/facade.cpp index 92124fc..1d6b9a7 100644 --- a/common/facade.cpp +++ b/common/facade.cpp | |||
@@ -24,76 +24,10 @@ | |||
24 | #include "storage.h" | 24 | #include "storage.h" |
25 | #include "definitions.h" | 25 | #include "definitions.h" |
26 | #include "domainadaptor.h" | 26 | #include "domainadaptor.h" |
27 | #include "queryrunner.h" | ||
27 | 28 | ||
28 | using namespace Akonadi2; | 29 | using namespace Akonadi2; |
29 | 30 | ||
30 | /** | ||
31 | * A QueryRunner runs a query and updates the corresponding result set. | ||
32 | * | ||
33 | * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), | ||
34 | * and by how long a result set must be updated. If the query is one off the runner dies after the execution, | ||
35 | * otherwise it lives on the react to changes and updates the corresponding result set. | ||
36 | * | ||
37 | * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. | ||
38 | */ | ||
39 | class QueryRunner : public QObject | ||
40 | { | ||
41 | Q_OBJECT | ||
42 | public: | ||
43 | typedef std::function<KAsync::Job<void>()> QueryFunction; | ||
44 | |||
45 | QueryRunner(const Akonadi2::Query &query) {}; | ||
46 | /** | ||
47 | * Starts query | ||
48 | */ | ||
49 | KAsync::Job<void> run(qint64 newRevision = 0) | ||
50 | { | ||
51 | return queryFunction(); | ||
52 | } | ||
53 | |||
54 | /** | ||
55 | * Set the query to run | ||
56 | */ | ||
57 | void setQuery(const QueryFunction &query) | ||
58 | { | ||
59 | queryFunction = query; | ||
60 | } | ||
61 | |||
62 | public slots: | ||
63 | /** | ||
64 | * Rerun query with new revision | ||
65 | */ | ||
66 | void revisionChanged(qint64 newRevision) | ||
67 | { | ||
68 | Trace() << "New revision: " << newRevision; | ||
69 | run().exec(); | ||
70 | } | ||
71 | |||
72 | private: | ||
73 | QueryFunction queryFunction; | ||
74 | }; | ||
75 | |||
76 | static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
77 | { | ||
78 | //TODO use a result set with an iterator, to read values on demand | ||
79 | QVector<QByteArray> keys; | ||
80 | transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
81 | //Skip internals | ||
82 | if (Akonadi2::Storage::isInternalKey(key)) { | ||
83 | return true; | ||
84 | } | ||
85 | keys << Akonadi2::Storage::uidFromKey(key); | ||
86 | return true; | ||
87 | }, | ||
88 | [](const Akonadi2::Storage::Error &error) { | ||
89 | qWarning() << "Error during query: " << error.message; | ||
90 | }); | ||
91 | |||
92 | Trace() << "Full scan found " << keys.size() << " results"; | ||
93 | return ResultSet(keys); | ||
94 | } | ||
95 | |||
96 | |||
97 | 31 | ||
98 | template<class DomainType> | 32 | template<class DomainType> |
99 | GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess) | 33 | GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess) |
@@ -150,220 +84,14 @@ KAsync::Job<void> GenericFacade<DomainType>::remove(const DomainType &domainObje | |||
150 | } | 84 | } |
151 | 85 | ||
152 | template<class DomainType> | 86 | template<class DomainType> |
153 | KAsync::Job<void> GenericFacade<DomainType>::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 87 | QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> GenericFacade<DomainType>::load(const Akonadi2::Query &query) |
154 | { | ||
155 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. | ||
156 | resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) { | ||
157 | const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); | ||
158 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
159 | }); | ||
160 | |||
161 | |||
162 | //In case of a live query we keep the runner for as long alive as the result provider exists | ||
163 | if (query.liveQuery) { | ||
164 | auto runner = QSharedPointer<QueryRunner>::create(query); | ||
165 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | ||
166 | runner->setQuery([this, query, &resultProvider] () -> KAsync::Job<void> { | ||
167 | return KAsync::start<void>([this, query, &resultProvider](KAsync::Future<void> &future) { | ||
168 | const qint64 newRevision = executeIncrementalQuery(query, resultProvider); | ||
169 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
170 | future.setFinished(); | ||
171 | }); | ||
172 | }); | ||
173 | resultProvider.setQueryRunner(runner); | ||
174 | //Ensure the connection is open, if it wasn't already opened | ||
175 | //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | ||
176 | mResourceAccess->open(); | ||
177 | QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); | ||
178 | } | ||
179 | return KAsync::null<void>(); | ||
180 | } | ||
181 | |||
182 | //TODO move into result provider? | ||
183 | template<class DomainType> | ||
184 | void GenericFacade<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
185 | { | ||
186 | while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { | ||
187 | switch (operation) { | ||
188 | case Akonadi2::Operation_Creation: | ||
189 | // Trace() << "Got creation"; | ||
190 | resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
191 | break; | ||
192 | case Akonadi2::Operation_Modification: | ||
193 | // Trace() << "Got modification"; | ||
194 | resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
195 | break; | ||
196 | case Akonadi2::Operation_Removal: | ||
197 | // Trace() << "Got removal"; | ||
198 | resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
199 | break; | ||
200 | } | ||
201 | return true; | ||
202 | })){}; | ||
203 | } | ||
204 | |||
205 | template<class DomainType> | ||
206 | void GenericFacade<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback) | ||
207 | { | ||
208 | //This only works for a 1:1 mapping of resource to domain types. | ||
209 | //Not i.e. for tags that are stored as flags in each entity of an imap store. | ||
210 | //additional properties that don't have a 1:1 mapping (such as separately stored tags), | ||
211 | //could be added to the adaptor. | ||
212 | // | ||
213 | // Akonadi2::Storage::getLatest(transaction, bufferTye, key); | ||
214 | db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { | ||
215 | Akonadi2::EntityBuffer buffer(value.data(), value.size()); | ||
216 | const Akonadi2::Entity &entity = buffer.entity(); | ||
217 | const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata()); | ||
218 | Q_ASSERT(metadataBuffer); | ||
219 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
220 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); | ||
221 | return false; | ||
222 | }, | ||
223 | [](const Akonadi2::Storage::Error &error) { | ||
224 | qWarning() << "Error during query: " << error.message; | ||
225 | }); | ||
226 | } | ||
227 | |||
228 | template<class DomainType> | ||
229 | ResultSet GenericFacade<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | ||
230 | { | 88 | { |
231 | QSet<QByteArray> appliedFilters; | 89 | //The runner lives for the lifetime of the query |
232 | auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); | 90 | auto runner = new QueryRunner<DomainType>(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType()); |
233 | remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; | 91 | return qMakePair(KAsync::null<void>(), runner->emitter()); |
234 | |||
235 | //We do a full scan if there were no indexes available to create the initial set. | ||
236 | if (appliedFilters.isEmpty()) { | ||
237 | //TODO this should be replaced by an index lookup as well | ||
238 | resultSet = fullScan(transaction, bufferTypeForDomainType()); | ||
239 | } | ||
240 | return resultSet; | ||
241 | } | ||
242 | |||
243 | template<class DomainType> | ||
244 | ResultSet GenericFacade<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | ||
245 | { | ||
246 | const auto bufferType = bufferTypeForDomainType(); | ||
247 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
248 | remainingFilters = query.propertyFilter.keys().toSet(); | ||
249 | return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { | ||
250 | const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); | ||
251 | //Spit out the revision keys one by one. | ||
252 | while (*revisionCounter <= topRevision) { | ||
253 | const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); | ||
254 | const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); | ||
255 | Trace() << "Revision" << *revisionCounter << type << uid; | ||
256 | if (type != bufferType) { | ||
257 | //Skip revision | ||
258 | *revisionCounter += 1; | ||
259 | continue; | ||
260 | } | ||
261 | const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); | ||
262 | *revisionCounter += 1; | ||
263 | return key; | ||
264 | } | ||
265 | Trace() << "Finished reading incremental result set:" << *revisionCounter; | ||
266 | //We're done | ||
267 | return QByteArray(); | ||
268 | }); | ||
269 | } | ||
270 | |||
271 | template<class DomainType> | ||
272 | ResultSet GenericFacade<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) | ||
273 | { | ||
274 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
275 | |||
276 | //Read through the source values and return whatever matches the filter | ||
277 | std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool { | ||
278 | while (resultSetPtr->next()) { | ||
279 | //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
280 | readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { | ||
281 | //Always remove removals, they probably don't match due to non-available properties | ||
282 | if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { | ||
283 | if (initialQuery) { | ||
284 | //We're not interested in removals during the initial query | ||
285 | if (operation != Akonadi2::Operation_Removal) { | ||
286 | callback(domainObject, Akonadi2::Operation_Creation); | ||
287 | } | ||
288 | } else { | ||
289 | callback(domainObject, operation); | ||
290 | } | ||
291 | } | ||
292 | }); | ||
293 | } | ||
294 | return false; | ||
295 | }; | ||
296 | return ResultSet(generator); | ||
297 | } | ||
298 | |||
299 | |||
300 | template<class DomainType> | ||
301 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> GenericFacade<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query) | ||
302 | { | ||
303 | return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | ||
304 | for (const auto &filterProperty : remainingFilters) { | ||
305 | const auto property = domainObject->getProperty(filterProperty); | ||
306 | if (property.isValid()) { | ||
307 | //TODO implement other comparison operators than equality | ||
308 | if (property != query.propertyFilter.value(filterProperty)) { | ||
309 | Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); | ||
310 | return false; | ||
311 | } | ||
312 | } else { | ||
313 | Warning() << "Ignored property filter because value is invalid: " << filterProperty; | ||
314 | } | ||
315 | } | ||
316 | return true; | ||
317 | }; | ||
318 | } | ||
319 | |||
320 | template<class DomainType> | ||
321 | qint64 GenericFacade<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) | ||
322 | { | ||
323 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); | ||
324 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { | ||
325 | Warning() << "Error during query: " << error.store << error.message; | ||
326 | }); | ||
327 | auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); | ||
328 | auto db = transaction.openDatabase(bufferTypeForDomainType() + ".main"); | ||
329 | |||
330 | QSet<QByteArray> remainingFilters; | ||
331 | auto resultSet = baseSetRetriever(transaction, remainingFilters); | ||
332 | auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery); | ||
333 | replaySet(filteredSet, resultProvider); | ||
334 | resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); | ||
335 | return Akonadi2::Storage::maxRevision(transaction); | ||
336 | } | 92 | } |
337 | 93 | ||
338 | 94 | ||
339 | template<class DomainType> | ||
340 | qint64 GenericFacade<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
341 | { | ||
342 | const qint64 baseRevision = resultProvider.revision() + 1; | ||
343 | Trace() << "Running incremental query " << baseRevision; | ||
344 | return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { | ||
345 | return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); | ||
346 | }, resultProvider, false); | ||
347 | } | ||
348 | |||
349 | template<class DomainType> | ||
350 | qint64 GenericFacade<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
351 | { | ||
352 | auto modifiedQuery = query; | ||
353 | if (!query.parentProperty.isEmpty()) { | ||
354 | if (parent) { | ||
355 | Trace() << "Running initial query for parent:" << parent->identifier(); | ||
356 | modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); | ||
357 | } else { | ||
358 | Trace() << "Running initial query for toplevel"; | ||
359 | modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); | ||
360 | } | ||
361 | } | ||
362 | return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { | ||
363 | return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); | ||
364 | }, resultProvider, true); | ||
365 | } | ||
366 | |||
367 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>; | 95 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>; |
368 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>; | 96 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>; |
369 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>; | 97 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>; |
diff --git a/common/facade.h b/common/facade.h index d8b878b..de67e05 100644 --- a/common/facade.h +++ b/common/facade.h | |||
@@ -59,22 +59,7 @@ public: | |||
59 | KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE; | 59 | KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE; |
60 | KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE; | 60 | KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE; |
61 | KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE; | 61 | KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE; |
62 | KAsync::Job<void> load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) Q_DECL_OVERRIDE; | 62 | QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> load(const Akonadi2::Query &query) Q_DECL_OVERRIDE; |
63 | |||
64 | private: | ||
65 | //TODO move into result provider? | ||
66 | static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
67 | |||
68 | void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback); | ||
69 | |||
70 | ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
71 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
72 | |||
73 | ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); | ||
74 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query); | ||
75 | qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); | ||
76 | qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
77 | qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
78 | 63 | ||
79 | protected: | 64 | protected: |
80 | //TODO use one resource access instance per application & per resource | 65 | //TODO use one resource access instance per application & per resource |
diff --git a/common/facadeinterface.h b/common/facadeinterface.h index 7ec21bc..318abf3 100644 --- a/common/facadeinterface.h +++ b/common/facadeinterface.h | |||
@@ -23,6 +23,7 @@ | |||
23 | #include <Async/Async> | 23 | #include <Async/Async> |
24 | #include <QByteArray> | 24 | #include <QByteArray> |
25 | #include <QSharedPointer> | 25 | #include <QSharedPointer> |
26 | #include <QPair> | ||
26 | #include "applicationdomaintype.h" | 27 | #include "applicationdomaintype.h" |
27 | #include "resultprovider.h" | 28 | #include "resultprovider.h" |
28 | 29 | ||
@@ -42,10 +43,32 @@ class StoreFacade { | |||
42 | public: | 43 | public: |
43 | virtual ~StoreFacade(){}; | 44 | virtual ~StoreFacade(){}; |
44 | QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); } | 45 | QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); } |
46 | |||
47 | /** | ||
48 | * Create an entity in the store. | ||
49 | * | ||
50 | * The job returns succefully once the task has been successfully placed in the queue | ||
51 | */ | ||
45 | virtual KAsync::Job<void> create(const DomainType &domainObject) = 0; | 52 | virtual KAsync::Job<void> create(const DomainType &domainObject) = 0; |
53 | |||
54 | /** | ||
55 | * Modify an entity in the store. | ||
56 | * | ||
57 | * The job returns succefully once the task has been successfully placed in the queue | ||
58 | */ | ||
46 | virtual KAsync::Job<void> modify(const DomainType &domainObject) = 0; | 59 | virtual KAsync::Job<void> modify(const DomainType &domainObject) = 0; |
60 | |||
61 | /** | ||
62 | * Remove an entity from the store. | ||
63 | * | ||
64 | * The job returns succefully once the task has been successfully placed in the queue | ||
65 | */ | ||
47 | virtual KAsync::Job<void> remove(const DomainType &domainObject) = 0; | 66 | virtual KAsync::Job<void> remove(const DomainType &domainObject) = 0; |
48 | virtual KAsync::Job<void> load(const Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) = 0; | 67 | |
68 | /** | ||
69 | * Load entities from the store. | ||
70 | */ | ||
71 | virtual QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr > load(const Query &query) = 0; | ||
49 | }; | 72 | }; |
50 | 73 | ||
51 | template<class DomainType> | 74 | template<class DomainType> |
@@ -67,9 +90,9 @@ public: | |||
67 | return KAsync::error<void>(-1, "Failed to create a facade"); | 90 | return KAsync::error<void>(-1, "Failed to create a facade"); |
68 | } | 91 | } |
69 | 92 | ||
70 | KAsync::Job<void> load(const Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 93 | QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr > load(const Query &query) |
71 | { | 94 | { |
72 | return KAsync::error<void>(-1, "Failed to create a facade"); | 95 | return qMakePair(KAsync::null<void>(), typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr()); |
73 | } | 96 | } |
74 | }; | 97 | }; |
75 | 98 | ||
diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 935e2e8..65eaba9 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp | |||
@@ -183,6 +183,28 @@ void ModelResult<T, Ptr>::setFetcher(const std::function<void(const Ptr &parent) | |||
183 | } | 183 | } |
184 | 184 | ||
185 | template<class T, class Ptr> | 185 | template<class T, class Ptr> |
186 | void ModelResult<T, Ptr>::setEmitter(const typename Akonadi2::ResultEmitter<Ptr>::Ptr &emitter) | ||
187 | { | ||
188 | setFetcher(emitter->mFetcher); | ||
189 | emitter->onAdded([this](const Ptr &value) { | ||
190 | this->add(value); | ||
191 | }); | ||
192 | emitter->onModified([this](const Ptr &value) { | ||
193 | this->modify(value); | ||
194 | }); | ||
195 | emitter->onRemoved([this](const Ptr &value) { | ||
196 | this->remove(value); | ||
197 | }); | ||
198 | emitter->onInitialResultSetComplete([this]() { | ||
199 | }); | ||
200 | emitter->onComplete([this]() { | ||
201 | }); | ||
202 | emitter->onClear([this]() { | ||
203 | }); | ||
204 | mEmitter = emitter; | ||
205 | } | ||
206 | |||
207 | template<class T, class Ptr> | ||
186 | void ModelResult<T, Ptr>::modify(const Ptr &value) | 208 | void ModelResult<T, Ptr>::modify(const Ptr &value) |
187 | { | 209 | { |
188 | auto childId = qHash(value->identifier()); | 210 | auto childId = qHash(value->identifier()); |
diff --git a/common/modelresult.h b/common/modelresult.h index 66dfce5..eb6c86b 100644 --- a/common/modelresult.h +++ b/common/modelresult.h | |||
@@ -23,20 +23,23 @@ | |||
23 | #include <QAbstractItemModel> | 23 | #include <QAbstractItemModel> |
24 | #include <QModelIndex> | 24 | #include <QModelIndex> |
25 | #include <QDebug> | 25 | #include <QDebug> |
26 | #include <QSharedPointer> | ||
26 | #include <functional> | 27 | #include <functional> |
27 | #include "query.h" | 28 | #include "query.h" |
29 | #include "resultprovider.h" | ||
28 | 30 | ||
29 | template<class T, class Ptr> | 31 | template<class T, class Ptr> |
30 | class ModelResult : public QAbstractItemModel | 32 | class ModelResult : public QAbstractItemModel |
31 | { | 33 | { |
32 | public: | 34 | public: |
33 | |||
34 | enum Roles { | 35 | enum Roles { |
35 | DomainObjectRole = Qt::UserRole + 1 | 36 | DomainObjectRole = Qt::UserRole + 1 |
36 | }; | 37 | }; |
37 | 38 | ||
38 | ModelResult(const Akonadi2::Query &query, const QList<QByteArray> &propertyColumns); | 39 | ModelResult(const Akonadi2::Query &query, const QList<QByteArray> &propertyColumns); |
39 | 40 | ||
41 | void setEmitter(const typename Akonadi2::ResultEmitter<Ptr>::Ptr &); | ||
42 | |||
40 | int rowCount(const QModelIndex &parent = QModelIndex()) const; | 43 | int rowCount(const QModelIndex &parent = QModelIndex()) const; |
41 | int columnCount(const QModelIndex &parent = QModelIndex()) const; | 44 | int columnCount(const QModelIndex &parent = QModelIndex()) const; |
42 | QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const; | 45 | QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const; |
@@ -65,5 +68,6 @@ private: | |||
65 | QList<QByteArray> mPropertyColumns; | 68 | QList<QByteArray> mPropertyColumns; |
66 | Akonadi2::Query mQuery; | 69 | Akonadi2::Query mQuery; |
67 | std::function<void(const Ptr &)> loadEntities; | 70 | std::function<void(const Ptr &)> loadEntities; |
71 | typename Akonadi2::ResultEmitter<Ptr>::Ptr mEmitter; | ||
68 | }; | 72 | }; |
69 | 73 | ||
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp new file mode 100644 index 0000000..4159112 --- /dev/null +++ b/common/queryrunner.cpp | |||
@@ -0,0 +1,292 @@ | |||
1 | /* | ||
2 | Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | |||
4 | This library is free software; you can redistribute it and/or modify it | ||
5 | under the terms of the GNU Library General Public License as published by | ||
6 | the Free Software Foundation; either version 2 of the License, or (at your | ||
7 | option) any later version. | ||
8 | |||
9 | This library is distributed in the hope that it will be useful, but WITHOUT | ||
10 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | ||
11 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public | ||
12 | License for more details. | ||
13 | |||
14 | You should have received a copy of the GNU Library General Public License | ||
15 | along with this library; see the file COPYING.LIB. If not, write to the | ||
16 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA | ||
17 | 02110-1301, USA. | ||
18 | */ | ||
19 | #include "queryrunner.h" | ||
20 | |||
21 | #include <QtConcurrent/QtConcurrentRun> | ||
22 | #include <QFuture> | ||
23 | #include <QFutureWatcher> | ||
24 | #include "commands.h" | ||
25 | #include "log.h" | ||
26 | #include "storage.h" | ||
27 | #include "definitions.h" | ||
28 | #include "domainadaptor.h" | ||
29 | |||
30 | using namespace Akonadi2; | ||
31 | |||
32 | static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
33 | { | ||
34 | //TODO use a result set with an iterator, to read values on demand | ||
35 | QVector<QByteArray> keys; | ||
36 | transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
37 | //Skip internals | ||
38 | if (Akonadi2::Storage::isInternalKey(key)) { | ||
39 | return true; | ||
40 | } | ||
41 | keys << Akonadi2::Storage::uidFromKey(key); | ||
42 | return true; | ||
43 | }, | ||
44 | [](const Akonadi2::Storage::Error &error) { | ||
45 | qWarning() << "Error during query: " << error.message; | ||
46 | }); | ||
47 | |||
48 | Trace() << "Full scan found " << keys.size() << " results"; | ||
49 | return ResultSet(keys); | ||
50 | } | ||
51 | |||
52 | template<class DomainType> | ||
53 | QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) | ||
54 | : QueryRunnerBase(), | ||
55 | mResourceAccess(resourceAccess), | ||
56 | mResultProvider(new ResultProvider<typename DomainType::Ptr>), | ||
57 | mDomainTypeAdaptorFactory(factory), | ||
58 | mQuery(query), | ||
59 | mResourceInstanceIdentifier(instanceIdentifier), | ||
60 | mBufferType(bufferType) | ||
61 | { | ||
62 | Trace() << "Starting query"; | ||
63 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. | ||
64 | mResultProvider->setFetcher([this, query](const typename DomainType::Ptr &parent) { | ||
65 | Trace() << "Running fetcher"; | ||
66 | |||
67 | // auto watcher = new QFutureWatcher<qint64>; | ||
68 | // QObject::connect(watcher, &QFutureWatcher::finished, [](qint64 newRevision) { | ||
69 | // mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
70 | // }); | ||
71 | // auto future = QtConcurrent::run([&resultProvider]() -> qint64 { | ||
72 | // const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); | ||
73 | // return newRevision; | ||
74 | // }); | ||
75 | // watcher->setFuture(future); | ||
76 | const qint64 newRevision = executeInitialQuery(query, parent, *mResultProvider); | ||
77 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
78 | }); | ||
79 | |||
80 | |||
81 | //In case of a live query we keep the runner for as long alive as the result provider exists | ||
82 | if (query.liveQuery) { | ||
83 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | ||
84 | setQuery([this, query] () -> KAsync::Job<void> { | ||
85 | return KAsync::start<void>([this, query](KAsync::Future<void> &future) { | ||
86 | //TODO execute in thread | ||
87 | const qint64 newRevision = executeIncrementalQuery(query, *mResultProvider); | ||
88 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
89 | future.setFinished(); | ||
90 | }); | ||
91 | }); | ||
92 | //Ensure the connection is open, if it wasn't already opened | ||
93 | //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | ||
94 | mResourceAccess->open(); | ||
95 | QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged); | ||
96 | } | ||
97 | } | ||
98 | |||
99 | template<class DomainType> | ||
100 | typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter() | ||
101 | { | ||
102 | return mResultProvider->emitter(); | ||
103 | } | ||
104 | |||
105 | //TODO move into result provider? | ||
106 | template<class DomainType> | ||
107 | void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
108 | { | ||
109 | // Trace() << "Replay set"; | ||
110 | while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { | ||
111 | switch (operation) { | ||
112 | case Akonadi2::Operation_Creation: | ||
113 | // Trace() << "Got creation"; | ||
114 | resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
115 | break; | ||
116 | case Akonadi2::Operation_Modification: | ||
117 | // Trace() << "Got modification"; | ||
118 | resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
119 | break; | ||
120 | case Akonadi2::Operation_Removal: | ||
121 | // Trace() << "Got removal"; | ||
122 | resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
123 | break; | ||
124 | } | ||
125 | return true; | ||
126 | })){}; | ||
127 | } | ||
128 | |||
129 | template<class DomainType> | ||
130 | void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback) | ||
131 | { | ||
132 | //This only works for a 1:1 mapping of resource to domain types. | ||
133 | //Not i.e. for tags that are stored as flags in each entity of an imap store. | ||
134 | //additional properties that don't have a 1:1 mapping (such as separately stored tags), | ||
135 | //could be added to the adaptor. | ||
136 | db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { | ||
137 | Akonadi2::EntityBuffer buffer(value.data(), value.size()); | ||
138 | const Akonadi2::Entity &entity = buffer.entity(); | ||
139 | const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata()); | ||
140 | Q_ASSERT(metadataBuffer); | ||
141 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
142 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); | ||
143 | return false; | ||
144 | }, | ||
145 | [](const Akonadi2::Storage::Error &error) { | ||
146 | qWarning() << "Error during query: " << error.message; | ||
147 | }); | ||
148 | } | ||
149 | |||
150 | template<class DomainType> | ||
151 | ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | ||
152 | { | ||
153 | QSet<QByteArray> appliedFilters; | ||
154 | auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); | ||
155 | remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; | ||
156 | |||
157 | //We do a full scan if there were no indexes available to create the initial set. | ||
158 | if (appliedFilters.isEmpty()) { | ||
159 | //TODO this should be replaced by an index lookup as well | ||
160 | resultSet = fullScan(transaction, mBufferType); | ||
161 | } | ||
162 | return resultSet; | ||
163 | } | ||
164 | |||
165 | template<class DomainType> | ||
166 | ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | ||
167 | { | ||
168 | const auto bufferType = mBufferType; | ||
169 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
170 | remainingFilters = query.propertyFilter.keys().toSet(); | ||
171 | return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { | ||
172 | const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); | ||
173 | //Spit out the revision keys one by one. | ||
174 | while (*revisionCounter <= topRevision) { | ||
175 | const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); | ||
176 | const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); | ||
177 | // Trace() << "Revision" << *revisionCounter << type << uid; | ||
178 | if (type != bufferType) { | ||
179 | //Skip revision | ||
180 | *revisionCounter += 1; | ||
181 | continue; | ||
182 | } | ||
183 | const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); | ||
184 | *revisionCounter += 1; | ||
185 | return key; | ||
186 | } | ||
187 | Trace() << "Finished reading incremental result set:" << *revisionCounter; | ||
188 | //We're done | ||
189 | return QByteArray(); | ||
190 | }); | ||
191 | } | ||
192 | |||
193 | template<class DomainType> | ||
194 | ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) | ||
195 | { | ||
196 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
197 | |||
198 | //Read through the source values and return whatever matches the filter | ||
199 | std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool { | ||
200 | while (resultSetPtr->next()) { | ||
201 | //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
202 | readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { | ||
203 | //Always remove removals, they probably don't match due to non-available properties | ||
204 | if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { | ||
205 | Trace() << "entity is not filtered" << initialQuery; | ||
206 | if (initialQuery) { | ||
207 | //We're not interested in removals during the initial query | ||
208 | if (operation != Akonadi2::Operation_Removal) { | ||
209 | callback(domainObject, Akonadi2::Operation_Creation); | ||
210 | } | ||
211 | } else { | ||
212 | callback(domainObject, operation); | ||
213 | } | ||
214 | } | ||
215 | }); | ||
216 | } | ||
217 | return false; | ||
218 | }; | ||
219 | return ResultSet(generator); | ||
220 | } | ||
221 | |||
222 | |||
223 | template<class DomainType> | ||
224 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryRunner<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query) | ||
225 | { | ||
226 | return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | ||
227 | for (const auto &filterProperty : remainingFilters) { | ||
228 | const auto property = domainObject->getProperty(filterProperty); | ||
229 | if (property.isValid()) { | ||
230 | //TODO implement other comparison operators than equality | ||
231 | if (property != query.propertyFilter.value(filterProperty)) { | ||
232 | Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); | ||
233 | return false; | ||
234 | } | ||
235 | } else { | ||
236 | Warning() << "Ignored property filter because value is invalid: " << filterProperty; | ||
237 | } | ||
238 | } | ||
239 | return true; | ||
240 | }; | ||
241 | } | ||
242 | |||
243 | template<class DomainType> | ||
244 | qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) | ||
245 | { | ||
246 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); | ||
247 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { | ||
248 | Warning() << "Error during query: " << error.store << error.message; | ||
249 | }); | ||
250 | auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); | ||
251 | auto db = transaction.openDatabase(mBufferType + ".main"); | ||
252 | |||
253 | QSet<QByteArray> remainingFilters; | ||
254 | auto resultSet = baseSetRetriever(transaction, remainingFilters); | ||
255 | auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery); | ||
256 | replaySet(filteredSet, resultProvider); | ||
257 | resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); | ||
258 | return Akonadi2::Storage::maxRevision(transaction); | ||
259 | } | ||
260 | |||
261 | |||
262 | template<class DomainType> | ||
263 | qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
264 | { | ||
265 | const qint64 baseRevision = resultProvider.revision() + 1; | ||
266 | Trace() << "Running incremental query " << baseRevision; | ||
267 | return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { | ||
268 | return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); | ||
269 | }, resultProvider, false); | ||
270 | } | ||
271 | |||
272 | template<class DomainType> | ||
273 | qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
274 | { | ||
275 | auto modifiedQuery = query; | ||
276 | if (!query.parentProperty.isEmpty()) { | ||
277 | if (parent) { | ||
278 | Trace() << "Running initial query for parent:" << parent->identifier(); | ||
279 | modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); | ||
280 | } else { | ||
281 | Trace() << "Running initial query for toplevel"; | ||
282 | modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); | ||
283 | } | ||
284 | } | ||
285 | return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { | ||
286 | return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); | ||
287 | }, resultProvider, true); | ||
288 | } | ||
289 | |||
290 | template class QueryRunner<Akonadi2::ApplicationDomain::Folder>; | ||
291 | template class QueryRunner<Akonadi2::ApplicationDomain::Mail>; | ||
292 | template class QueryRunner<Akonadi2::ApplicationDomain::Event>; | ||
diff --git a/common/queryrunner.h b/common/queryrunner.h new file mode 100644 index 0000000..e2af9de --- /dev/null +++ b/common/queryrunner.h | |||
@@ -0,0 +1,107 @@ | |||
1 | /* | ||
2 | Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | |||
4 | This library is free software; you can redistribute it and/or modify it | ||
5 | under the terms of the GNU Library General Public License as published by | ||
6 | the Free Software Foundation; either version 2 of the License, or (at your | ||
7 | option) any later version. | ||
8 | |||
9 | This library is distributed in the hope that it will be useful, but WITHOUT | ||
10 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | ||
11 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public | ||
12 | License for more details. | ||
13 | |||
14 | You should have received a copy of the GNU Library General Public License | ||
15 | along with this library; see the file COPYING.LIB. If not, write to the | ||
16 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA | ||
17 | 02110-1301, USA. | ||
18 | */ | ||
19 | |||
20 | #pragma once | ||
21 | |||
22 | #include <QObject> | ||
23 | #include "facadeinterface.h" | ||
24 | #include "resourceaccess.h" | ||
25 | #include "resultprovider.h" | ||
26 | #include "domaintypeadaptorfactoryinterface.h" | ||
27 | #include "storage.h" | ||
28 | #include "query.h" | ||
29 | |||
30 | /** | ||
31 | * A QueryRunner runs a query and updates the corresponding result set. | ||
32 | * | ||
33 | * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), | ||
34 | * and by how long a result set must be updated. If the query is one off the runner dies after the execution, | ||
35 | * otherwise it lives on the react to changes and updates the corresponding result set. | ||
36 | * | ||
37 | * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. | ||
38 | */ | ||
39 | |||
40 | class QueryRunnerBase : public QObject | ||
41 | { | ||
42 | Q_OBJECT | ||
43 | protected: | ||
44 | typedef std::function<KAsync::Job<void>()> QueryFunction; | ||
45 | |||
46 | /** | ||
47 | * Set the query to run | ||
48 | */ | ||
49 | void setQuery(const QueryFunction &query) | ||
50 | { | ||
51 | queryFunction = query; | ||
52 | } | ||
53 | |||
54 | |||
55 | protected slots: | ||
56 | /** | ||
57 | * Rerun query with new revision | ||
58 | */ | ||
59 | void revisionChanged(qint64 newRevision) | ||
60 | { | ||
61 | Trace() << "New revision: " << newRevision; | ||
62 | run().exec(); | ||
63 | } | ||
64 | |||
65 | private: | ||
66 | /** | ||
67 | * Starts query | ||
68 | */ | ||
69 | KAsync::Job<void> run(qint64 newRevision = 0) | ||
70 | { | ||
71 | return queryFunction(); | ||
72 | } | ||
73 | |||
74 | QueryFunction queryFunction; | ||
75 | }; | ||
76 | |||
77 | template<typename DomainType> | ||
78 | class QueryRunner : public QueryRunnerBase | ||
79 | { | ||
80 | public: | ||
81 | QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); | ||
82 | |||
83 | typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); | ||
84 | |||
85 | private: | ||
86 | static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
87 | |||
88 | void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback); | ||
89 | |||
90 | ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
91 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
92 | |||
93 | ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); | ||
94 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query); | ||
95 | qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); | ||
96 | qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
97 | qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
98 | |||
99 | private: | ||
100 | QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > mResultProvider; | ||
101 | QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess; | ||
102 | DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; | ||
103 | QByteArray mResourceInstanceIdentifier; | ||
104 | QByteArray mBufferType; | ||
105 | Akonadi2::Query mQuery; | ||
106 | }; | ||
107 | |||
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 8e27054..e87a1f7 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -37,6 +37,8 @@ class ResourceAccessInterface : public QObject | |||
37 | { | 37 | { |
38 | Q_OBJECT | 38 | Q_OBJECT |
39 | public: | 39 | public: |
40 | typedef QSharedPointer<ResourceAccessInterface> Ptr; | ||
41 | |||
40 | ResourceAccessInterface() {} | 42 | ResourceAccessInterface() {} |
41 | virtual ~ResourceAccessInterface() {} | 43 | virtual ~ResourceAccessInterface() {} |
42 | virtual KAsync::Job<void> sendCommand(int commandId) = 0; | 44 | virtual KAsync::Job<void> sendCommand(int commandId) = 0; |
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index 1796271..3d207e4 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp | |||
@@ -54,9 +54,15 @@ KAsync::Job<void> ResourceFacade::remove(const Akonadi2::ApplicationDomain::Akon | |||
54 | }); | 54 | }); |
55 | } | 55 | } |
56 | 56 | ||
57 | KAsync::Job<void> ResourceFacade::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr> &resultProvider) | 57 | QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<Akonadi2::ApplicationDomain::AkonadiResource::Ptr>::Ptr > ResourceFacade::load(const Akonadi2::Query &query) |
58 | { | 58 | { |
59 | return KAsync::start<void>([query, &resultProvider]() { | 59 | auto resultProvider = new Akonadi2::ResultProvider<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr>(); |
60 | auto emitter = resultProvider->emitter(); | ||
61 | resultProvider->setFetcher([](const QSharedPointer<Akonadi2::ApplicationDomain::AkonadiResource> &) {}); | ||
62 | resultProvider->onDone([resultProvider]() { | ||
63 | delete resultProvider; | ||
64 | }); | ||
65 | auto job = KAsync::start<void>([query, resultProvider]() { | ||
60 | const auto configuredResources = ResourceConfig::getResources(); | 66 | const auto configuredResources = ResourceConfig::getResources(); |
61 | for (const auto &res : configuredResources.keys()) { | 67 | for (const auto &res : configuredResources.keys()) { |
62 | const auto type = configuredResources.value(res); | 68 | const auto type = configuredResources.value(res); |
@@ -64,12 +70,13 @@ KAsync::Job<void> ResourceFacade::load(const Akonadi2::Query &query, Akonadi2::R | |||
64 | auto resource = Akonadi2::ApplicationDomain::AkonadiResource::Ptr::create(); | 70 | auto resource = Akonadi2::ApplicationDomain::AkonadiResource::Ptr::create(); |
65 | resource->setProperty("identifier", res); | 71 | resource->setProperty("identifier", res); |
66 | resource->setProperty("type", type); | 72 | resource->setProperty("type", type); |
67 | resultProvider.add(resource); | 73 | resultProvider->add(resource); |
68 | } | 74 | } |
69 | } | 75 | } |
70 | //TODO initialResultSetComplete should be implicit | 76 | //TODO initialResultSetComplete should be implicit |
71 | resultProvider.initialResultSetComplete(); | 77 | resultProvider->initialResultSetComplete(); |
72 | resultProvider.complete(); | 78 | resultProvider->complete(); |
73 | }); | 79 | }); |
80 | return qMakePair(job, emitter); | ||
74 | } | 81 | } |
75 | 82 | ||
diff --git a/common/resourcefacade.h b/common/resourcefacade.h index 123b481..38e0c0e 100644 --- a/common/resourcefacade.h +++ b/common/resourcefacade.h | |||
@@ -37,5 +37,6 @@ public: | |||
37 | KAsync::Job<void> create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; | 37 | KAsync::Job<void> create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; |
38 | KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; | 38 | KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; |
39 | KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; | 39 | KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; |
40 | KAsync::Job<void> load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr> &resultProvider) Q_DECL_OVERRIDE; | 40 | QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<Akonadi2::ApplicationDomain::AkonadiResource::Ptr>::Ptr > load(const Akonadi2::Query &query) Q_DECL_OVERRIDE; |
41 | }; | 41 | }; |
42 | |||
diff --git a/common/resultprovider.h b/common/resultprovider.h index 921cd6b..86382ef 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h | |||
@@ -20,12 +20,12 @@ | |||
20 | 20 | ||
21 | #pragma once | 21 | #pragma once |
22 | 22 | ||
23 | #include <QThread> | ||
23 | #include <functional> | 24 | #include <functional> |
24 | #include <memory> | 25 | #include <memory> |
25 | #include "threadboundary.h" | 26 | #include "threadboundary.h" |
26 | #include "resultset.h" | 27 | #include "resultset.h" |
27 | #include "log.h" | 28 | #include "log.h" |
28 | #include "modelresult.h" | ||
29 | 29 | ||
30 | using namespace async; | 30 | using namespace async; |
31 | 31 | ||
@@ -53,12 +53,7 @@ public: | |||
53 | virtual void initialResultSetComplete() = 0; | 53 | virtual void initialResultSetComplete() = 0; |
54 | virtual void complete() = 0; | 54 | virtual void complete() = 0; |
55 | virtual void clear() = 0; | 55 | virtual void clear() = 0; |
56 | virtual void setFetcher(const std::function<void(const T &parent)> &fetcher) | 56 | virtual void setFetcher(const std::function<void(const T &parent)> &fetcher) = 0; |
57 | { | ||
58 | } | ||
59 | |||
60 | virtual void setFacade(const std::shared_ptr<void> &facade) = 0; | ||
61 | virtual void setQueryRunner(const QSharedPointer<QObject> &runner) = 0; | ||
62 | 57 | ||
63 | void setRevision(qint64 revision) | 58 | void setRevision(qint64 revision) |
64 | { | 59 | { |
@@ -74,101 +69,6 @@ private: | |||
74 | qint64 mRevision; | 69 | qint64 mRevision; |
75 | }; | 70 | }; |
76 | 71 | ||
77 | template<class T, class Ptr> | ||
78 | class ModelResultProvider : public ResultProviderInterface<Ptr> { | ||
79 | public: | ||
80 | ModelResultProvider(QWeakPointer<ModelResult<T, Ptr> > model) | ||
81 | : ResultProviderInterface<Ptr>(), | ||
82 | mModel(model) | ||
83 | { | ||
84 | |||
85 | } | ||
86 | |||
87 | void add(const Ptr &value) | ||
88 | { | ||
89 | if (auto model = mModel.toStrongRef()) { | ||
90 | model->add(value); | ||
91 | } | ||
92 | } | ||
93 | |||
94 | void modify(const Ptr &value) | ||
95 | { | ||
96 | if (auto model = mModel.toStrongRef()) { | ||
97 | model->modify(value); | ||
98 | } | ||
99 | } | ||
100 | |||
101 | void remove(const Ptr &value) | ||
102 | { | ||
103 | if (auto model = mModel.toStrongRef()) { | ||
104 | model->remove(value); | ||
105 | } | ||
106 | } | ||
107 | |||
108 | void initialResultSetComplete() | ||
109 | { | ||
110 | // mResultEmitter->initialResultSetComplete(); | ||
111 | } | ||
112 | |||
113 | void complete() | ||
114 | { | ||
115 | // mResultEmitter->complete(); | ||
116 | } | ||
117 | |||
118 | void clear() | ||
119 | { | ||
120 | // mResultEmitter->clear(); | ||
121 | } | ||
122 | |||
123 | /** | ||
124 | * For lifetimemanagement only. | ||
125 | * We keep the runner alive as long as the result provider exists. | ||
126 | */ | ||
127 | void setFacade(const std::shared_ptr<void> &facade) | ||
128 | { | ||
129 | mFacade = facade; | ||
130 | } | ||
131 | |||
132 | void onDone(const std::function<void()> &callback) | ||
133 | { | ||
134 | mOnDoneCallback = callback; | ||
135 | } | ||
136 | |||
137 | bool isDone() const | ||
138 | { | ||
139 | //The existance of the emitter currently defines wether we're done or not. | ||
140 | // return mResultEmitter.toStrongRef().isNull(); | ||
141 | return true; | ||
142 | } | ||
143 | |||
144 | void setFetcher(const std::function<void(const Ptr &parent)> &fetcher) | ||
145 | { | ||
146 | if (auto model = mModel.toStrongRef()) { | ||
147 | model->setFetcher(fetcher); | ||
148 | } | ||
149 | } | ||
150 | |||
151 | void setQueryRunner(const QSharedPointer<QObject> &runner) | ||
152 | { | ||
153 | mQueryRunner = runner; | ||
154 | } | ||
155 | |||
156 | private: | ||
157 | void done() | ||
158 | { | ||
159 | qWarning() << "done"; | ||
160 | if (mOnDoneCallback) { | ||
161 | mOnDoneCallback(); | ||
162 | mOnDoneCallback = std::function<void()>(); | ||
163 | } | ||
164 | } | ||
165 | |||
166 | QWeakPointer<ModelResult<T, Ptr> > mModel; | ||
167 | QSharedPointer<QObject> mQueryRunner; | ||
168 | std::shared_ptr<void> mFacade; | ||
169 | std::function<void()> mOnDoneCallback; | ||
170 | }; | ||
171 | |||
172 | /* | 72 | /* |
173 | * The promise side for the result emitter | 73 | * The promise side for the result emitter |
174 | */ | 74 | */ |
@@ -204,6 +104,12 @@ private: | |||
204 | } | 104 | } |
205 | 105 | ||
206 | public: | 106 | public: |
107 | typedef QSharedPointer<ResultProvider<T> > Ptr; | ||
108 | |||
109 | ~ResultProvider() | ||
110 | { | ||
111 | } | ||
112 | |||
207 | //Called from worker thread | 113 | //Called from worker thread |
208 | void add(const T &value) | 114 | void add(const T &value) |
209 | { | 115 | { |
@@ -261,30 +167,16 @@ public: | |||
261 | //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again | 167 | //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again |
262 | auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ mThreadBoundary->callInMainThread([this]() {done();}); delete emitter; }); | 168 | auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ mThreadBoundary->callInMainThread([this]() {done();}); delete emitter; }); |
263 | mResultEmitter = sharedPtr; | 169 | mResultEmitter = sharedPtr; |
170 | sharedPtr->setFetcher([this](const T &parent) { | ||
171 | Q_ASSERT(mFetcher); | ||
172 | mFetcher(parent); | ||
173 | }); | ||
264 | return sharedPtr; | 174 | return sharedPtr; |
265 | } | 175 | } |
266 | 176 | ||
267 | return mResultEmitter.toStrongRef(); | 177 | return mResultEmitter.toStrongRef(); |
268 | } | 178 | } |
269 | 179 | ||
270 | /** | ||
271 | * For lifetimemanagement only. | ||
272 | * We keep the runner alive as long as the result provider exists. | ||
273 | */ | ||
274 | void setQueryRunner(const QSharedPointer<QObject> &runner) | ||
275 | { | ||
276 | mQueryRunner = runner; | ||
277 | } | ||
278 | |||
279 | /** | ||
280 | * For lifetimemanagement only. | ||
281 | * We keep the runner alive as long as the result provider exists. | ||
282 | */ | ||
283 | void setFacade(const std::shared_ptr<void> &facade) | ||
284 | { | ||
285 | mFacade = facade; | ||
286 | } | ||
287 | |||
288 | void onDone(const std::function<void()> &callback) | 180 | void onDone(const std::function<void()> &callback) |
289 | { | 181 | { |
290 | mThreadBoundary = QSharedPointer<ThreadBoundary>::create(); | 182 | mThreadBoundary = QSharedPointer<ThreadBoundary>::create(); |
@@ -299,7 +191,7 @@ public: | |||
299 | 191 | ||
300 | void setFetcher(const std::function<void(const T &parent)> &fetcher) | 192 | void setFetcher(const std::function<void(const T &parent)> &fetcher) |
301 | { | 193 | { |
302 | fetcher(T()); | 194 | mFetcher = fetcher; |
303 | } | 195 | } |
304 | 196 | ||
305 | private: | 197 | private: |
@@ -307,16 +199,17 @@ private: | |||
307 | { | 199 | { |
308 | qWarning() << "done"; | 200 | qWarning() << "done"; |
309 | if (mOnDoneCallback) { | 201 | if (mOnDoneCallback) { |
310 | mOnDoneCallback(); | 202 | auto callback = mOnDoneCallback; |
311 | mOnDoneCallback = std::function<void()>(); | 203 | mOnDoneCallback = std::function<void()>(); |
204 | //This may delete this object | ||
205 | callback(); | ||
312 | } | 206 | } |
313 | } | 207 | } |
314 | 208 | ||
315 | QWeakPointer<ResultEmitter<T> > mResultEmitter; | 209 | QWeakPointer<ResultEmitter<T> > mResultEmitter; |
316 | QSharedPointer<QObject> mQueryRunner; | ||
317 | std::shared_ptr<void> mFacade; | ||
318 | std::function<void()> mOnDoneCallback; | 210 | std::function<void()> mOnDoneCallback; |
319 | QSharedPointer<ThreadBoundary> mThreadBoundary; | 211 | QSharedPointer<ThreadBoundary> mThreadBoundary; |
212 | std::function<void(const T &parent)> mFetcher; | ||
320 | }; | 213 | }; |
321 | 214 | ||
322 | /* | 215 | /* |
@@ -334,6 +227,8 @@ private: | |||
334 | template<class DomainType> | 227 | template<class DomainType> |
335 | class ResultEmitter { | 228 | class ResultEmitter { |
336 | public: | 229 | public: |
230 | typedef QSharedPointer<ResultEmitter<DomainType> > Ptr; | ||
231 | |||
337 | void onAdded(const std::function<void(const DomainType&)> &handler) | 232 | void onAdded(const std::function<void(const DomainType&)> &handler) |
338 | { | 233 | { |
339 | addHandler = handler; | 234 | addHandler = handler; |
@@ -394,6 +289,13 @@ public: | |||
394 | clearHandler(); | 289 | clearHandler(); |
395 | } | 290 | } |
396 | 291 | ||
292 | void setFetcher(const std::function<void(const DomainType &parent)> &fetcher) | ||
293 | { | ||
294 | mFetcher = fetcher; | ||
295 | } | ||
296 | |||
297 | std::function<void(const DomainType &parent)> mFetcher; | ||
298 | |||
397 | private: | 299 | private: |
398 | friend class ResultProvider<DomainType>; | 300 | friend class ResultProvider<DomainType>; |
399 | 301 | ||
diff --git a/common/threadboundary.cpp b/common/threadboundary.cpp index 47ec508..48fd11a 100644 --- a/common/threadboundary.cpp +++ b/common/threadboundary.cpp | |||
@@ -24,6 +24,9 @@ Q_DECLARE_METATYPE(std::function<void()>); | |||
24 | 24 | ||
25 | namespace async { | 25 | namespace async { |
26 | ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType<std::function<void()> >("std::function<void()>"); } | 26 | ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType<std::function<void()> >("std::function<void()>"); } |
27 | ThreadBoundary:: ~ThreadBoundary() {} | 27 | ThreadBoundary:: ~ThreadBoundary() |
28 | { | ||
29 | } | ||
30 | |||
28 | } | 31 | } |
29 | 32 | ||