diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | common/clientapi.cpp | 62 | ||||
-rw-r--r-- | common/facade.cpp | 282 | ||||
-rw-r--r-- | common/facade.h | 17 | ||||
-rw-r--r-- | common/facadeinterface.h | 29 | ||||
-rw-r--r-- | common/modelresult.cpp | 22 | ||||
-rw-r--r-- | common/modelresult.h | 6 | ||||
-rw-r--r-- | common/queryrunner.cpp | 292 | ||||
-rw-r--r-- | common/queryrunner.h | 107 | ||||
-rw-r--r-- | common/resourceaccess.h | 2 | ||||
-rw-r--r-- | common/resourcefacade.cpp | 17 | ||||
-rw-r--r-- | common/resourcefacade.h | 3 | ||||
-rw-r--r-- | common/resultprovider.h | 150 | ||||
-rw-r--r-- | common/threadboundary.cpp | 5 |
14 files changed, 522 insertions, 473 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 01056d0..be312b9 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -26,6 +26,7 @@ set(command_SRCS | |||
26 | resource.cpp | 26 | resource.cpp |
27 | genericresource.cpp | 27 | genericresource.cpp |
28 | resourceaccess.cpp | 28 | resourceaccess.cpp |
29 | queryrunner.cpp | ||
29 | listener.cpp | 30 | listener.cpp |
30 | storage_common.cpp | 31 | storage_common.cpp |
31 | threadboundary.cpp | 32 | threadboundary.cpp |
diff --git a/common/clientapi.cpp b/common/clientapi.cpp index 02f8ce6..b24dfa8 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp | |||
@@ -34,6 +34,7 @@ | |||
34 | #include "definitions.h" | 34 | #include "definitions.h" |
35 | #include "resourceconfig.h" | 35 | #include "resourceconfig.h" |
36 | #include "facadefactory.h" | 36 | #include "facadefactory.h" |
37 | #include "modelresult.h" | ||
37 | #include "log.h" | 38 | #include "log.h" |
38 | 39 | ||
39 | #define ASYNCINTHREAD | 40 | #define ASYNCINTHREAD |
@@ -100,38 +101,8 @@ template <class DomainType> | |||
100 | QSharedPointer<ResultEmitter<typename DomainType::Ptr> > Store::load(Query query) | 101 | QSharedPointer<ResultEmitter<typename DomainType::Ptr> > Store::load(Query query) |
101 | { | 102 | { |
102 | auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); | 103 | auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); |
103 | 104 | qWarning() << "Main thread " << QThread::currentThreadId(); | |
104 | //Execute the search in a thread. | 105 | //FIXME remove |
105 | //We must guarantee that the emitter is returned before the first result is emitted. | ||
106 | //The result provider must be threadsafe. | ||
107 | async::run([query, resultSet](){ | ||
108 | QEventLoop eventLoop; | ||
109 | resultSet->onDone([&eventLoop](){ | ||
110 | eventLoop.quit(); | ||
111 | }); | ||
112 | // Query all resources and aggregate results | ||
113 | KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName<DomainType>())) | ||
114 | .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, KAsync::Future<void> &future) { | ||
115 | if (auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resource), resource)) { | ||
116 | facade->load(query, *resultSet).template then<void>([&future](){future.setFinished();}).exec(); | ||
117 | //Keep the facade alive for the lifetime of the resultSet. | ||
118 | resultSet->setFacade(facade); | ||
119 | } else { | ||
120 | //Ignore the error and carry on | ||
121 | future.setFinished(); | ||
122 | } | ||
123 | }).template then<void>([query, resultSet]() { | ||
124 | resultSet->initialResultSetComplete(); | ||
125 | if (!query.liveQuery) { | ||
126 | resultSet->complete(); | ||
127 | } | ||
128 | }).exec(); | ||
129 | |||
130 | //Keep the thread alive until the result is ready | ||
131 | if (!resultSet->isDone()) { | ||
132 | eventLoop.exec(); | ||
133 | } | ||
134 | }); | ||
135 | return resultSet->emitter(); | 106 | return resultSet->emitter(); |
136 | } | 107 | } |
137 | 108 | ||
@@ -139,28 +110,29 @@ template <class DomainType> | |||
139 | QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) | 110 | QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) |
140 | { | 111 | { |
141 | auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr> >::create(query, query.requestedProperties.toList()); | 112 | auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr> >::create(query, query.requestedProperties.toList()); |
142 | auto resultProvider = std::make_shared<ModelResultProvider<DomainType, typename DomainType::Ptr> >(model); | 113 | |
143 | //Keep the resultprovider alive for as long as the model lives | 114 | //* Client defines lifetime of model |
144 | model->setProperty("resultProvider", QVariant::fromValue(std::shared_ptr<void>(resultProvider))); | 115 | //* The model lifetime defines the duration of live-queries |
116 | //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks | ||
117 | //* The emitter needs to live or the duration of query (respectively, the model) | ||
118 | //* The result provider needs to live for as long as results are provided (until the last thread exits). | ||
145 | 119 | ||
146 | // Query all resources and aggregate results | 120 | // Query all resources and aggregate results |
147 | KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName<DomainType>())) | 121 | KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName<DomainType>())) |
148 | .template each<void, QByteArray>([query, resultProvider](const QByteArray &resource, KAsync::Future<void> &future) { | 122 | .template each<void, QByteArray>([query, model](const QByteArray &resource, KAsync::Future<void> &future) { |
149 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resource), resource); | 123 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resource), resource); |
150 | if (facade) { | 124 | if (facade) { |
151 | facade->load(query, *resultProvider).template then<void>([&future](){future.setFinished();}).exec(); | 125 | Trace() << "Trying to fetch from resource"; |
152 | //Keep the facade alive for the lifetime of the resultSet. | 126 | auto result = facade->load(query); |
153 | //FIXME this would have to become a list | 127 | auto emitter = result.second; |
154 | resultProvider->setFacade(facade); | 128 | //TODO use aggregating emitter instead |
129 | model->setEmitter(emitter); | ||
130 | model->fetchMore(QModelIndex()); | ||
131 | result.first.template then<void>([&future](){future.setFinished();}).exec(); | ||
155 | } else { | 132 | } else { |
156 | //Ignore the error and carry on | 133 | //Ignore the error and carry on |
157 | future.setFinished(); | 134 | future.setFinished(); |
158 | } | 135 | } |
159 | }).template then<void>([query, resultProvider]() { | ||
160 | resultProvider->initialResultSetComplete(); | ||
161 | if (!query.liveQuery) { | ||
162 | resultProvider->complete(); | ||
163 | } | ||
164 | }).exec(); | 136 | }).exec(); |
165 | 137 | ||
166 | return model; | 138 | return model; |
diff --git a/common/facade.cpp b/common/facade.cpp index 92124fc..1d6b9a7 100644 --- a/common/facade.cpp +++ b/common/facade.cpp | |||
@@ -24,76 +24,10 @@ | |||
24 | #include "storage.h" | 24 | #include "storage.h" |
25 | #include "definitions.h" | 25 | #include "definitions.h" |
26 | #include "domainadaptor.h" | 26 | #include "domainadaptor.h" |
27 | #include "queryrunner.h" | ||
27 | 28 | ||
28 | using namespace Akonadi2; | 29 | using namespace Akonadi2; |
29 | 30 | ||
30 | /** | ||
31 | * A QueryRunner runs a query and updates the corresponding result set. | ||
32 | * | ||
33 | * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), | ||
34 | * and by how long a result set must be updated. If the query is one off the runner dies after the execution, | ||
35 | * otherwise it lives on the react to changes and updates the corresponding result set. | ||
36 | * | ||
37 | * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. | ||
38 | */ | ||
39 | class QueryRunner : public QObject | ||
40 | { | ||
41 | Q_OBJECT | ||
42 | public: | ||
43 | typedef std::function<KAsync::Job<void>()> QueryFunction; | ||
44 | |||
45 | QueryRunner(const Akonadi2::Query &query) {}; | ||
46 | /** | ||
47 | * Starts query | ||
48 | */ | ||
49 | KAsync::Job<void> run(qint64 newRevision = 0) | ||
50 | { | ||
51 | return queryFunction(); | ||
52 | } | ||
53 | |||
54 | /** | ||
55 | * Set the query to run | ||
56 | */ | ||
57 | void setQuery(const QueryFunction &query) | ||
58 | { | ||
59 | queryFunction = query; | ||
60 | } | ||
61 | |||
62 | public slots: | ||
63 | /** | ||
64 | * Rerun query with new revision | ||
65 | */ | ||
66 | void revisionChanged(qint64 newRevision) | ||
67 | { | ||
68 | Trace() << "New revision: " << newRevision; | ||
69 | run().exec(); | ||
70 | } | ||
71 | |||
72 | private: | ||
73 | QueryFunction queryFunction; | ||
74 | }; | ||
75 | |||
76 | static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
77 | { | ||
78 | //TODO use a result set with an iterator, to read values on demand | ||
79 | QVector<QByteArray> keys; | ||
80 | transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
81 | //Skip internals | ||
82 | if (Akonadi2::Storage::isInternalKey(key)) { | ||
83 | return true; | ||
84 | } | ||
85 | keys << Akonadi2::Storage::uidFromKey(key); | ||
86 | return true; | ||
87 | }, | ||
88 | [](const Akonadi2::Storage::Error &error) { | ||
89 | qWarning() << "Error during query: " << error.message; | ||
90 | }); | ||
91 | |||
92 | Trace() << "Full scan found " << keys.size() << " results"; | ||
93 | return ResultSet(keys); | ||
94 | } | ||
95 | |||
96 | |||
97 | 31 | ||
98 | template<class DomainType> | 32 | template<class DomainType> |
99 | GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess) | 33 | GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess) |
@@ -150,220 +84,14 @@ KAsync::Job<void> GenericFacade<DomainType>::remove(const DomainType &domainObje | |||
150 | } | 84 | } |
151 | 85 | ||
152 | template<class DomainType> | 86 | template<class DomainType> |
153 | KAsync::Job<void> GenericFacade<DomainType>::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 87 | QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> GenericFacade<DomainType>::load(const Akonadi2::Query &query) |
154 | { | ||
155 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. | ||
156 | resultProvider.setFetcher([this, query, &resultProvider](const typename DomainType::Ptr &parent) { | ||
157 | const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); | ||
158 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
159 | }); | ||
160 | |||
161 | |||
162 | //In case of a live query we keep the runner for as long alive as the result provider exists | ||
163 | if (query.liveQuery) { | ||
164 | auto runner = QSharedPointer<QueryRunner>::create(query); | ||
165 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | ||
166 | runner->setQuery([this, query, &resultProvider] () -> KAsync::Job<void> { | ||
167 | return KAsync::start<void>([this, query, &resultProvider](KAsync::Future<void> &future) { | ||
168 | const qint64 newRevision = executeIncrementalQuery(query, resultProvider); | ||
169 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
170 | future.setFinished(); | ||
171 | }); | ||
172 | }); | ||
173 | resultProvider.setQueryRunner(runner); | ||
174 | //Ensure the connection is open, if it wasn't already opened | ||
175 | //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | ||
176 | mResourceAccess->open(); | ||
177 | QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); | ||
178 | } | ||
179 | return KAsync::null<void>(); | ||
180 | } | ||
181 | |||
182 | //TODO move into result provider? | ||
183 | template<class DomainType> | ||
184 | void GenericFacade<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
185 | { | ||
186 | while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { | ||
187 | switch (operation) { | ||
188 | case Akonadi2::Operation_Creation: | ||
189 | // Trace() << "Got creation"; | ||
190 | resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
191 | break; | ||
192 | case Akonadi2::Operation_Modification: | ||
193 | // Trace() << "Got modification"; | ||
194 | resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
195 | break; | ||
196 | case Akonadi2::Operation_Removal: | ||
197 | // Trace() << "Got removal"; | ||
198 | resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
199 | break; | ||
200 | } | ||
201 | return true; | ||
202 | })){}; | ||
203 | } | ||
204 | |||
205 | template<class DomainType> | ||
206 | void GenericFacade<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback) | ||
207 | { | ||
208 | //This only works for a 1:1 mapping of resource to domain types. | ||
209 | //Not i.e. for tags that are stored as flags in each entity of an imap store. | ||
210 | //additional properties that don't have a 1:1 mapping (such as separately stored tags), | ||
211 | //could be added to the adaptor. | ||
212 | // | ||
213 | // Akonadi2::Storage::getLatest(transaction, bufferTye, key); | ||
214 | db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { | ||
215 | Akonadi2::EntityBuffer buffer(value.data(), value.size()); | ||
216 | const Akonadi2::Entity &entity = buffer.entity(); | ||
217 | const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata()); | ||
218 | Q_ASSERT(metadataBuffer); | ||
219 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
220 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); | ||
221 | return false; | ||
222 | }, | ||
223 | [](const Akonadi2::Storage::Error &error) { | ||
224 | qWarning() << "Error during query: " << error.message; | ||
225 | }); | ||
226 | } | ||
227 | |||
228 | template<class DomainType> | ||
229 | ResultSet GenericFacade<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | ||
230 | { | 88 | { |
231 | QSet<QByteArray> appliedFilters; | 89 | //The runner lives for the lifetime of the query |
232 | auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); | 90 | auto runner = new QueryRunner<DomainType>(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType()); |
233 | remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; | 91 | return qMakePair(KAsync::null<void>(), runner->emitter()); |
234 | |||
235 | //We do a full scan if there were no indexes available to create the initial set. | ||
236 | if (appliedFilters.isEmpty()) { | ||
237 | //TODO this should be replaced by an index lookup as well | ||
238 | resultSet = fullScan(transaction, bufferTypeForDomainType()); | ||
239 | } | ||
240 | return resultSet; | ||
241 | } | ||
242 | |||
243 | template<class DomainType> | ||
244 | ResultSet GenericFacade<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | ||
245 | { | ||
246 | const auto bufferType = bufferTypeForDomainType(); | ||
247 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
248 | remainingFilters = query.propertyFilter.keys().toSet(); | ||
249 | return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { | ||
250 | const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); | ||
251 | //Spit out the revision keys one by one. | ||
252 | while (*revisionCounter <= topRevision) { | ||
253 | const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); | ||
254 | const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); | ||
255 | Trace() << "Revision" << *revisionCounter << type << uid; | ||
256 | if (type != bufferType) { | ||
257 | //Skip revision | ||
258 | *revisionCounter += 1; | ||
259 | continue; | ||
260 | } | ||
261 | const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); | ||
262 | *revisionCounter += 1; | ||
263 | return key; | ||
264 | } | ||
265 | Trace() << "Finished reading incremental result set:" << *revisionCounter; | ||
266 | //We're done | ||
267 | return QByteArray(); | ||
268 | }); | ||
269 | } | ||
270 | |||
271 | template<class DomainType> | ||
272 | ResultSet GenericFacade<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) | ||
273 | { | ||
274 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
275 | |||
276 | //Read through the source values and return whatever matches the filter | ||
277 | std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool { | ||
278 | while (resultSetPtr->next()) { | ||
279 | //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
280 | readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { | ||
281 | //Always remove removals, they probably don't match due to non-available properties | ||
282 | if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { | ||
283 | if (initialQuery) { | ||
284 | //We're not interested in removals during the initial query | ||
285 | if (operation != Akonadi2::Operation_Removal) { | ||
286 | callback(domainObject, Akonadi2::Operation_Creation); | ||
287 | } | ||
288 | } else { | ||
289 | callback(domainObject, operation); | ||
290 | } | ||
291 | } | ||
292 | }); | ||
293 | } | ||
294 | return false; | ||
295 | }; | ||
296 | return ResultSet(generator); | ||
297 | } | ||
298 | |||
299 | |||
300 | template<class DomainType> | ||
301 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> GenericFacade<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query) | ||
302 | { | ||
303 | return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | ||
304 | for (const auto &filterProperty : remainingFilters) { | ||
305 | const auto property = domainObject->getProperty(filterProperty); | ||
306 | if (property.isValid()) { | ||
307 | //TODO implement other comparison operators than equality | ||
308 | if (property != query.propertyFilter.value(filterProperty)) { | ||
309 | Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); | ||
310 | return false; | ||
311 | } | ||
312 | } else { | ||
313 | Warning() << "Ignored property filter because value is invalid: " << filterProperty; | ||
314 | } | ||
315 | } | ||
316 | return true; | ||
317 | }; | ||
318 | } | ||
319 | |||
320 | template<class DomainType> | ||
321 | qint64 GenericFacade<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) | ||
322 | { | ||
323 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); | ||
324 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { | ||
325 | Warning() << "Error during query: " << error.store << error.message; | ||
326 | }); | ||
327 | auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); | ||
328 | auto db = transaction.openDatabase(bufferTypeForDomainType() + ".main"); | ||
329 | |||
330 | QSet<QByteArray> remainingFilters; | ||
331 | auto resultSet = baseSetRetriever(transaction, remainingFilters); | ||
332 | auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery); | ||
333 | replaySet(filteredSet, resultProvider); | ||
334 | resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); | ||
335 | return Akonadi2::Storage::maxRevision(transaction); | ||
336 | } | 92 | } |
337 | 93 | ||
338 | 94 | ||
339 | template<class DomainType> | ||
340 | qint64 GenericFacade<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
341 | { | ||
342 | const qint64 baseRevision = resultProvider.revision() + 1; | ||
343 | Trace() << "Running incremental query " << baseRevision; | ||
344 | return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { | ||
345 | return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); | ||
346 | }, resultProvider, false); | ||
347 | } | ||
348 | |||
349 | template<class DomainType> | ||
350 | qint64 GenericFacade<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
351 | { | ||
352 | auto modifiedQuery = query; | ||
353 | if (!query.parentProperty.isEmpty()) { | ||
354 | if (parent) { | ||
355 | Trace() << "Running initial query for parent:" << parent->identifier(); | ||
356 | modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); | ||
357 | } else { | ||
358 | Trace() << "Running initial query for toplevel"; | ||
359 | modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); | ||
360 | } | ||
361 | } | ||
362 | return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { | ||
363 | return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); | ||
364 | }, resultProvider, true); | ||
365 | } | ||
366 | |||
367 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>; | 95 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>; |
368 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>; | 96 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>; |
369 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>; | 97 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>; |
diff --git a/common/facade.h b/common/facade.h index d8b878b..de67e05 100644 --- a/common/facade.h +++ b/common/facade.h | |||
@@ -59,22 +59,7 @@ public: | |||
59 | KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE; | 59 | KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE; |
60 | KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE; | 60 | KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE; |
61 | KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE; | 61 | KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE; |
62 | KAsync::Job<void> load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) Q_DECL_OVERRIDE; | 62 | QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> load(const Akonadi2::Query &query) Q_DECL_OVERRIDE; |
63 | |||
64 | private: | ||
65 | //TODO move into result provider? | ||
66 | static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
67 | |||
68 | void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback); | ||
69 | |||
70 | ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
71 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
72 | |||
73 | ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); | ||
74 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query); | ||
75 | qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); | ||
76 | qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
77 | qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
78 | 63 | ||
79 | protected: | 64 | protected: |
80 | //TODO use one resource access instance per application & per resource | 65 | //TODO use one resource access instance per application & per resource |
diff --git a/common/facadeinterface.h b/common/facadeinterface.h index 7ec21bc..318abf3 100644 --- a/common/facadeinterface.h +++ b/common/facadeinterface.h | |||
@@ -23,6 +23,7 @@ | |||
23 | #include <Async/Async> | 23 | #include <Async/Async> |
24 | #include <QByteArray> | 24 | #include <QByteArray> |
25 | #include <QSharedPointer> | 25 | #include <QSharedPointer> |
26 | #include <QPair> | ||
26 | #include "applicationdomaintype.h" | 27 | #include "applicationdomaintype.h" |
27 | #include "resultprovider.h" | 28 | #include "resultprovider.h" |
28 | 29 | ||
@@ -42,10 +43,32 @@ class StoreFacade { | |||
42 | public: | 43 | public: |
43 | virtual ~StoreFacade(){}; | 44 | virtual ~StoreFacade(){}; |
44 | QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); } | 45 | QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); } |
46 | |||
47 | /** | ||
48 | * Create an entity in the store. | ||
49 | * | ||
50 | * The job returns succefully once the task has been successfully placed in the queue | ||
51 | */ | ||
45 | virtual KAsync::Job<void> create(const DomainType &domainObject) = 0; | 52 | virtual KAsync::Job<void> create(const DomainType &domainObject) = 0; |
53 | |||
54 | /** | ||
55 | * Modify an entity in the store. | ||
56 | * | ||
57 | * The job returns succefully once the task has been successfully placed in the queue | ||
58 | */ | ||
46 | virtual KAsync::Job<void> modify(const DomainType &domainObject) = 0; | 59 | virtual KAsync::Job<void> modify(const DomainType &domainObject) = 0; |
60 | |||
61 | /** | ||
62 | * Remove an entity from the store. | ||
63 | * | ||
64 | * The job returns succefully once the task has been successfully placed in the queue | ||
65 | */ | ||
47 | virtual KAsync::Job<void> remove(const DomainType &domainObject) = 0; | 66 | virtual KAsync::Job<void> remove(const DomainType &domainObject) = 0; |
48 | virtual KAsync::Job<void> load(const Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) = 0; | 67 | |
68 | /** | ||
69 | * Load entities from the store. | ||
70 | */ | ||
71 | virtual QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr > load(const Query &query) = 0; | ||
49 | }; | 72 | }; |
50 | 73 | ||
51 | template<class DomainType> | 74 | template<class DomainType> |
@@ -67,9 +90,9 @@ public: | |||
67 | return KAsync::error<void>(-1, "Failed to create a facade"); | 90 | return KAsync::error<void>(-1, "Failed to create a facade"); |
68 | } | 91 | } |
69 | 92 | ||
70 | KAsync::Job<void> load(const Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 93 | QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr > load(const Query &query) |
71 | { | 94 | { |
72 | return KAsync::error<void>(-1, "Failed to create a facade"); | 95 | return qMakePair(KAsync::null<void>(), typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr()); |
73 | } | 96 | } |
74 | }; | 97 | }; |
75 | 98 | ||
diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 935e2e8..65eaba9 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp | |||
@@ -183,6 +183,28 @@ void ModelResult<T, Ptr>::setFetcher(const std::function<void(const Ptr &parent) | |||
183 | } | 183 | } |
184 | 184 | ||
185 | template<class T, class Ptr> | 185 | template<class T, class Ptr> |
186 | void ModelResult<T, Ptr>::setEmitter(const typename Akonadi2::ResultEmitter<Ptr>::Ptr &emitter) | ||
187 | { | ||
188 | setFetcher(emitter->mFetcher); | ||
189 | emitter->onAdded([this](const Ptr &value) { | ||
190 | this->add(value); | ||
191 | }); | ||
192 | emitter->onModified([this](const Ptr &value) { | ||
193 | this->modify(value); | ||
194 | }); | ||
195 | emitter->onRemoved([this](const Ptr &value) { | ||
196 | this->remove(value); | ||
197 | }); | ||
198 | emitter->onInitialResultSetComplete([this]() { | ||
199 | }); | ||
200 | emitter->onComplete([this]() { | ||
201 | }); | ||
202 | emitter->onClear([this]() { | ||
203 | }); | ||
204 | mEmitter = emitter; | ||
205 | } | ||
206 | |||
207 | template<class T, class Ptr> | ||
186 | void ModelResult<T, Ptr>::modify(const Ptr &value) | 208 | void ModelResult<T, Ptr>::modify(const Ptr &value) |
187 | { | 209 | { |
188 | auto childId = qHash(value->identifier()); | 210 | auto childId = qHash(value->identifier()); |
diff --git a/common/modelresult.h b/common/modelresult.h index 66dfce5..eb6c86b 100644 --- a/common/modelresult.h +++ b/common/modelresult.h | |||
@@ -23,20 +23,23 @@ | |||
23 | #include <QAbstractItemModel> | 23 | #include <QAbstractItemModel> |
24 | #include <QModelIndex> | 24 | #include <QModelIndex> |
25 | #include <QDebug> | 25 | #include <QDebug> |
26 | #include <QSharedPointer> | ||
26 | #include <functional> | 27 | #include <functional> |
27 | #include "query.h" | 28 | #include "query.h" |
29 | #include "resultprovider.h" | ||
28 | 30 | ||
29 | template<class T, class Ptr> | 31 | template<class T, class Ptr> |
30 | class ModelResult : public QAbstractItemModel | 32 | class ModelResult : public QAbstractItemModel |
31 | { | 33 | { |
32 | public: | 34 | public: |
33 | |||
34 | enum Roles { | 35 | enum Roles { |
35 | DomainObjectRole = Qt::UserRole + 1 | 36 | DomainObjectRole = Qt::UserRole + 1 |
36 | }; | 37 | }; |
37 | 38 | ||
38 | ModelResult(const Akonadi2::Query &query, const QList<QByteArray> &propertyColumns); | 39 | ModelResult(const Akonadi2::Query &query, const QList<QByteArray> &propertyColumns); |
39 | 40 | ||
41 | void setEmitter(const typename Akonadi2::ResultEmitter<Ptr>::Ptr &); | ||
42 | |||
40 | int rowCount(const QModelIndex &parent = QModelIndex()) const; | 43 | int rowCount(const QModelIndex &parent = QModelIndex()) const; |
41 | int columnCount(const QModelIndex &parent = QModelIndex()) const; | 44 | int columnCount(const QModelIndex &parent = QModelIndex()) const; |
42 | QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const; | 45 | QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const; |
@@ -65,5 +68,6 @@ private: | |||
65 | QList<QByteArray> mPropertyColumns; | 68 | QList<QByteArray> mPropertyColumns; |
66 | Akonadi2::Query mQuery; | 69 | Akonadi2::Query mQuery; |
67 | std::function<void(const Ptr &)> loadEntities; | 70 | std::function<void(const Ptr &)> loadEntities; |
71 | typename Akonadi2::ResultEmitter<Ptr>::Ptr mEmitter; | ||
68 | }; | 72 | }; |
69 | 73 | ||
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp new file mode 100644 index 0000000..4159112 --- /dev/null +++ b/common/queryrunner.cpp | |||
@@ -0,0 +1,292 @@ | |||
1 | /* | ||
2 | Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | |||
4 | This library is free software; you can redistribute it and/or modify it | ||
5 | under the terms of the GNU Library General Public License as published by | ||
6 | the Free Software Foundation; either version 2 of the License, or (at your | ||
7 | option) any later version. | ||
8 | |||
9 | This library is distributed in the hope that it will be useful, but WITHOUT | ||
10 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | ||
11 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public | ||
12 | License for more details. | ||
13 | |||
14 | You should have received a copy of the GNU Library General Public License | ||
15 | along with this library; see the file COPYING.LIB. If not, write to the | ||
16 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA | ||
17 | 02110-1301, USA. | ||
18 | */ | ||
19 | #include "queryrunner.h" | ||
20 | |||
21 | #include <QtConcurrent/QtConcurrentRun> | ||
22 | #include <QFuture> | ||
23 | #include <QFutureWatcher> | ||
24 | #include "commands.h" | ||
25 | #include "log.h" | ||
26 | #include "storage.h" | ||
27 | #include "definitions.h" | ||
28 | #include "domainadaptor.h" | ||
29 | |||
30 | using namespace Akonadi2; | ||
31 | |||
32 | static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
33 | { | ||
34 | //TODO use a result set with an iterator, to read values on demand | ||
35 | QVector<QByteArray> keys; | ||
36 | transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
37 | //Skip internals | ||
38 | if (Akonadi2::Storage::isInternalKey(key)) { | ||
39 | return true; | ||
40 | } | ||
41 | keys << Akonadi2::Storage::uidFromKey(key); | ||
42 | return true; | ||
43 | }, | ||
44 | [](const Akonadi2::Storage::Error &error) { | ||
45 | qWarning() << "Error during query: " << error.message; | ||
46 | }); | ||
47 | |||
48 | Trace() << "Full scan found " << keys.size() << " results"; | ||
49 | return ResultSet(keys); | ||
50 | } | ||
51 | |||
52 | template<class DomainType> | ||
53 | QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) | ||
54 | : QueryRunnerBase(), | ||
55 | mResourceAccess(resourceAccess), | ||
56 | mResultProvider(new ResultProvider<typename DomainType::Ptr>), | ||
57 | mDomainTypeAdaptorFactory(factory), | ||
58 | mQuery(query), | ||
59 | mResourceInstanceIdentifier(instanceIdentifier), | ||
60 | mBufferType(bufferType) | ||
61 | { | ||
62 | Trace() << "Starting query"; | ||
63 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. | ||
64 | mResultProvider->setFetcher([this, query](const typename DomainType::Ptr &parent) { | ||
65 | Trace() << "Running fetcher"; | ||
66 | |||
67 | // auto watcher = new QFutureWatcher<qint64>; | ||
68 | // QObject::connect(watcher, &QFutureWatcher::finished, [](qint64 newRevision) { | ||
69 | // mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
70 | // }); | ||
71 | // auto future = QtConcurrent::run([&resultProvider]() -> qint64 { | ||
72 | // const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); | ||
73 | // return newRevision; | ||
74 | // }); | ||
75 | // watcher->setFuture(future); | ||
76 | const qint64 newRevision = executeInitialQuery(query, parent, *mResultProvider); | ||
77 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
78 | }); | ||
79 | |||
80 | |||
81 | //In case of a live query we keep the runner for as long alive as the result provider exists | ||
82 | if (query.liveQuery) { | ||
83 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | ||
84 | setQuery([this, query] () -> KAsync::Job<void> { | ||
85 | return KAsync::start<void>([this, query](KAsync::Future<void> &future) { | ||
86 | //TODO execute in thread | ||
87 | const qint64 newRevision = executeIncrementalQuery(query, *mResultProvider); | ||
88 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
89 | future.setFinished(); | ||
90 | }); | ||
91 | }); | ||
92 | //Ensure the connection is open, if it wasn't already opened | ||
93 | //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | ||
94 | mResourceAccess->open(); | ||
95 | QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged); | ||
96 | } | ||
97 | } | ||
98 | |||
99 | template<class DomainType> | ||
100 | typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter() | ||
101 | { | ||
102 | return mResultProvider->emitter(); | ||
103 | } | ||
104 | |||
105 | //TODO move into result provider? | ||
106 | template<class DomainType> | ||
107 | void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
108 | { | ||
109 | // Trace() << "Replay set"; | ||
110 | while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { | ||
111 | switch (operation) { | ||
112 | case Akonadi2::Operation_Creation: | ||
113 | // Trace() << "Got creation"; | ||
114 | resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
115 | break; | ||
116 | case Akonadi2::Operation_Modification: | ||
117 | // Trace() << "Got modification"; | ||
118 | resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
119 | break; | ||
120 | case Akonadi2::Operation_Removal: | ||
121 | // Trace() << "Got removal"; | ||
122 | resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
123 | break; | ||
124 | } | ||
125 | return true; | ||
126 | })){}; | ||
127 | } | ||
128 | |||
129 | template<class DomainType> | ||
130 | void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback) | ||
131 | { | ||
132 | //This only works for a 1:1 mapping of resource to domain types. | ||
133 | //Not i.e. for tags that are stored as flags in each entity of an imap store. | ||
134 | //additional properties that don't have a 1:1 mapping (such as separately stored tags), | ||
135 | //could be added to the adaptor. | ||
136 | db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { | ||
137 | Akonadi2::EntityBuffer buffer(value.data(), value.size()); | ||
138 | const Akonadi2::Entity &entity = buffer.entity(); | ||
139 | const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata()); | ||
140 | Q_ASSERT(metadataBuffer); | ||
141 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
142 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); | ||
143 | return false; | ||
144 | }, | ||
145 | [](const Akonadi2::Storage::Error &error) { | ||
146 | qWarning() << "Error during query: " << error.message; | ||
147 | }); | ||
148 | } | ||
149 | |||
150 | template<class DomainType> | ||
151 | ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | ||
152 | { | ||
153 | QSet<QByteArray> appliedFilters; | ||
154 | auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); | ||
155 | remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; | ||
156 | |||
157 | //We do a full scan if there were no indexes available to create the initial set. | ||
158 | if (appliedFilters.isEmpty()) { | ||
159 | //TODO this should be replaced by an index lookup as well | ||
160 | resultSet = fullScan(transaction, mBufferType); | ||
161 | } | ||
162 | return resultSet; | ||
163 | } | ||
164 | |||
165 | template<class DomainType> | ||
166 | ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | ||
167 | { | ||
168 | const auto bufferType = mBufferType; | ||
169 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
170 | remainingFilters = query.propertyFilter.keys().toSet(); | ||
171 | return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { | ||
172 | const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); | ||
173 | //Spit out the revision keys one by one. | ||
174 | while (*revisionCounter <= topRevision) { | ||
175 | const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); | ||
176 | const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); | ||
177 | // Trace() << "Revision" << *revisionCounter << type << uid; | ||
178 | if (type != bufferType) { | ||
179 | //Skip revision | ||
180 | *revisionCounter += 1; | ||
181 | continue; | ||
182 | } | ||
183 | const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); | ||
184 | *revisionCounter += 1; | ||
185 | return key; | ||
186 | } | ||
187 | Trace() << "Finished reading incremental result set:" << *revisionCounter; | ||
188 | //We're done | ||
189 | return QByteArray(); | ||
190 | }); | ||
191 | } | ||
192 | |||
193 | template<class DomainType> | ||
194 | ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) | ||
195 | { | ||
196 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
197 | |||
198 | //Read through the source values and return whatever matches the filter | ||
199 | std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool { | ||
200 | while (resultSetPtr->next()) { | ||
201 | //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
202 | readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { | ||
203 | //Always remove removals, they probably don't match due to non-available properties | ||
204 | if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { | ||
205 | Trace() << "entity is not filtered" << initialQuery; | ||
206 | if (initialQuery) { | ||
207 | //We're not interested in removals during the initial query | ||
208 | if (operation != Akonadi2::Operation_Removal) { | ||
209 | callback(domainObject, Akonadi2::Operation_Creation); | ||
210 | } | ||
211 | } else { | ||
212 | callback(domainObject, operation); | ||
213 | } | ||
214 | } | ||
215 | }); | ||
216 | } | ||
217 | return false; | ||
218 | }; | ||
219 | return ResultSet(generator); | ||
220 | } | ||
221 | |||
222 | |||
223 | template<class DomainType> | ||
224 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryRunner<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query) | ||
225 | { | ||
226 | return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | ||
227 | for (const auto &filterProperty : remainingFilters) { | ||
228 | const auto property = domainObject->getProperty(filterProperty); | ||
229 | if (property.isValid()) { | ||
230 | //TODO implement other comparison operators than equality | ||
231 | if (property != query.propertyFilter.value(filterProperty)) { | ||
232 | Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); | ||
233 | return false; | ||
234 | } | ||
235 | } else { | ||
236 | Warning() << "Ignored property filter because value is invalid: " << filterProperty; | ||
237 | } | ||
238 | } | ||
239 | return true; | ||
240 | }; | ||
241 | } | ||
242 | |||
243 | template<class DomainType> | ||
244 | qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) | ||
245 | { | ||
246 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); | ||
247 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { | ||
248 | Warning() << "Error during query: " << error.store << error.message; | ||
249 | }); | ||
250 | auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); | ||
251 | auto db = transaction.openDatabase(mBufferType + ".main"); | ||
252 | |||
253 | QSet<QByteArray> remainingFilters; | ||
254 | auto resultSet = baseSetRetriever(transaction, remainingFilters); | ||
255 | auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery); | ||
256 | replaySet(filteredSet, resultProvider); | ||
257 | resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); | ||
258 | return Akonadi2::Storage::maxRevision(transaction); | ||
259 | } | ||
260 | |||
261 | |||
262 | template<class DomainType> | ||
263 | qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
264 | { | ||
265 | const qint64 baseRevision = resultProvider.revision() + 1; | ||
266 | Trace() << "Running incremental query " << baseRevision; | ||
267 | return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { | ||
268 | return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); | ||
269 | }, resultProvider, false); | ||
270 | } | ||
271 | |||
272 | template<class DomainType> | ||
273 | qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
274 | { | ||
275 | auto modifiedQuery = query; | ||
276 | if (!query.parentProperty.isEmpty()) { | ||
277 | if (parent) { | ||
278 | Trace() << "Running initial query for parent:" << parent->identifier(); | ||
279 | modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); | ||
280 | } else { | ||
281 | Trace() << "Running initial query for toplevel"; | ||
282 | modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); | ||
283 | } | ||
284 | } | ||
285 | return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { | ||
286 | return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); | ||
287 | }, resultProvider, true); | ||
288 | } | ||
289 | |||
290 | template class QueryRunner<Akonadi2::ApplicationDomain::Folder>; | ||
291 | template class QueryRunner<Akonadi2::ApplicationDomain::Mail>; | ||
292 | template class QueryRunner<Akonadi2::ApplicationDomain::Event>; | ||
diff --git a/common/queryrunner.h b/common/queryrunner.h new file mode 100644 index 0000000..e2af9de --- /dev/null +++ b/common/queryrunner.h | |||
@@ -0,0 +1,107 @@ | |||
1 | /* | ||
2 | Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | |||
4 | This library is free software; you can redistribute it and/or modify it | ||
5 | under the terms of the GNU Library General Public License as published by | ||
6 | the Free Software Foundation; either version 2 of the License, or (at your | ||
7 | option) any later version. | ||
8 | |||
9 | This library is distributed in the hope that it will be useful, but WITHOUT | ||
10 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | ||
11 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public | ||
12 | License for more details. | ||
13 | |||
14 | You should have received a copy of the GNU Library General Public License | ||
15 | along with this library; see the file COPYING.LIB. If not, write to the | ||
16 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA | ||
17 | 02110-1301, USA. | ||
18 | */ | ||
19 | |||
20 | #pragma once | ||
21 | |||
22 | #include <QObject> | ||
23 | #include "facadeinterface.h" | ||
24 | #include "resourceaccess.h" | ||
25 | #include "resultprovider.h" | ||
26 | #include "domaintypeadaptorfactoryinterface.h" | ||
27 | #include "storage.h" | ||
28 | #include "query.h" | ||
29 | |||
30 | /** | ||
31 | * A QueryRunner runs a query and updates the corresponding result set. | ||
32 | * | ||
33 | * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), | ||
34 | * and by how long a result set must be updated. If the query is one off the runner dies after the execution, | ||
35 | * otherwise it lives on the react to changes and updates the corresponding result set. | ||
36 | * | ||
37 | * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. | ||
38 | */ | ||
39 | |||
40 | class QueryRunnerBase : public QObject | ||
41 | { | ||
42 | Q_OBJECT | ||
43 | protected: | ||
44 | typedef std::function<KAsync::Job<void>()> QueryFunction; | ||
45 | |||
46 | /** | ||
47 | * Set the query to run | ||
48 | */ | ||
49 | void setQuery(const QueryFunction &query) | ||
50 | { | ||
51 | queryFunction = query; | ||
52 | } | ||
53 | |||
54 | |||
55 | protected slots: | ||
56 | /** | ||
57 | * Rerun query with new revision | ||
58 | */ | ||
59 | void revisionChanged(qint64 newRevision) | ||
60 | { | ||
61 | Trace() << "New revision: " << newRevision; | ||
62 | run().exec(); | ||
63 | } | ||
64 | |||
65 | private: | ||
66 | /** | ||
67 | * Starts query | ||
68 | */ | ||
69 | KAsync::Job<void> run(qint64 newRevision = 0) | ||
70 | { | ||
71 | return queryFunction(); | ||
72 | } | ||
73 | |||
74 | QueryFunction queryFunction; | ||
75 | }; | ||
76 | |||
77 | template<typename DomainType> | ||
78 | class QueryRunner : public QueryRunnerBase | ||
79 | { | ||
80 | public: | ||
81 | QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); | ||
82 | |||
83 | typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); | ||
84 | |||
85 | private: | ||
86 | static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
87 | |||
88 | void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback); | ||
89 | |||
90 | ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
91 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
92 | |||
93 | ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); | ||
94 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query); | ||
95 | qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); | ||
96 | qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
97 | qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
98 | |||
99 | private: | ||
100 | QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > mResultProvider; | ||
101 | QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess; | ||
102 | DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; | ||
103 | QByteArray mResourceInstanceIdentifier; | ||
104 | QByteArray mBufferType; | ||
105 | Akonadi2::Query mQuery; | ||
106 | }; | ||
107 | |||
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 8e27054..e87a1f7 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -37,6 +37,8 @@ class ResourceAccessInterface : public QObject | |||
37 | { | 37 | { |
38 | Q_OBJECT | 38 | Q_OBJECT |
39 | public: | 39 | public: |
40 | typedef QSharedPointer<ResourceAccessInterface> Ptr; | ||
41 | |||
40 | ResourceAccessInterface() {} | 42 | ResourceAccessInterface() {} |
41 | virtual ~ResourceAccessInterface() {} | 43 | virtual ~ResourceAccessInterface() {} |
42 | virtual KAsync::Job<void> sendCommand(int commandId) = 0; | 44 | virtual KAsync::Job<void> sendCommand(int commandId) = 0; |
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index 1796271..3d207e4 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp | |||
@@ -54,9 +54,15 @@ KAsync::Job<void> ResourceFacade::remove(const Akonadi2::ApplicationDomain::Akon | |||
54 | }); | 54 | }); |
55 | } | 55 | } |
56 | 56 | ||
57 | KAsync::Job<void> ResourceFacade::load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr> &resultProvider) | 57 | QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<Akonadi2::ApplicationDomain::AkonadiResource::Ptr>::Ptr > ResourceFacade::load(const Akonadi2::Query &query) |
58 | { | 58 | { |
59 | return KAsync::start<void>([query, &resultProvider]() { | 59 | auto resultProvider = new Akonadi2::ResultProvider<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr>(); |
60 | auto emitter = resultProvider->emitter(); | ||
61 | resultProvider->setFetcher([](const QSharedPointer<Akonadi2::ApplicationDomain::AkonadiResource> &) {}); | ||
62 | resultProvider->onDone([resultProvider]() { | ||
63 | delete resultProvider; | ||
64 | }); | ||
65 | auto job = KAsync::start<void>([query, resultProvider]() { | ||
60 | const auto configuredResources = ResourceConfig::getResources(); | 66 | const auto configuredResources = ResourceConfig::getResources(); |
61 | for (const auto &res : configuredResources.keys()) { | 67 | for (const auto &res : configuredResources.keys()) { |
62 | const auto type = configuredResources.value(res); | 68 | const auto type = configuredResources.value(res); |
@@ -64,12 +70,13 @@ KAsync::Job<void> ResourceFacade::load(const Akonadi2::Query &query, Akonadi2::R | |||
64 | auto resource = Akonadi2::ApplicationDomain::AkonadiResource::Ptr::create(); | 70 | auto resource = Akonadi2::ApplicationDomain::AkonadiResource::Ptr::create(); |
65 | resource->setProperty("identifier", res); | 71 | resource->setProperty("identifier", res); |
66 | resource->setProperty("type", type); | 72 | resource->setProperty("type", type); |
67 | resultProvider.add(resource); | 73 | resultProvider->add(resource); |
68 | } | 74 | } |
69 | } | 75 | } |
70 | //TODO initialResultSetComplete should be implicit | 76 | //TODO initialResultSetComplete should be implicit |
71 | resultProvider.initialResultSetComplete(); | 77 | resultProvider->initialResultSetComplete(); |
72 | resultProvider.complete(); | 78 | resultProvider->complete(); |
73 | }); | 79 | }); |
80 | return qMakePair(job, emitter); | ||
74 | } | 81 | } |
75 | 82 | ||
diff --git a/common/resourcefacade.h b/common/resourcefacade.h index 123b481..38e0c0e 100644 --- a/common/resourcefacade.h +++ b/common/resourcefacade.h | |||
@@ -37,5 +37,6 @@ public: | |||
37 | KAsync::Job<void> create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; | 37 | KAsync::Job<void> create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; |
38 | KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; | 38 | KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; |
39 | KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; | 39 | KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; |
40 | KAsync::Job<void> load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr> &resultProvider) Q_DECL_OVERRIDE; | 40 | QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<Akonadi2::ApplicationDomain::AkonadiResource::Ptr>::Ptr > load(const Akonadi2::Query &query) Q_DECL_OVERRIDE; |
41 | }; | 41 | }; |
42 | |||
diff --git a/common/resultprovider.h b/common/resultprovider.h index 921cd6b..86382ef 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h | |||
@@ -20,12 +20,12 @@ | |||
20 | 20 | ||
21 | #pragma once | 21 | #pragma once |
22 | 22 | ||
23 | #include <QThread> | ||
23 | #include <functional> | 24 | #include <functional> |
24 | #include <memory> | 25 | #include <memory> |
25 | #include "threadboundary.h" | 26 | #include "threadboundary.h" |
26 | #include "resultset.h" | 27 | #include "resultset.h" |
27 | #include "log.h" | 28 | #include "log.h" |
28 | #include "modelresult.h" | ||
29 | 29 | ||
30 | using namespace async; | 30 | using namespace async; |
31 | 31 | ||
@@ -53,12 +53,7 @@ public: | |||
53 | virtual void initialResultSetComplete() = 0; | 53 | virtual void initialResultSetComplete() = 0; |
54 | virtual void complete() = 0; | 54 | virtual void complete() = 0; |
55 | virtual void clear() = 0; | 55 | virtual void clear() = 0; |
56 | virtual void setFetcher(const std::function<void(const T &parent)> &fetcher) | 56 | virtual void setFetcher(const std::function<void(const T &parent)> &fetcher) = 0; |
57 | { | ||
58 | } | ||
59 | |||
60 | virtual void setFacade(const std::shared_ptr<void> &facade) = 0; | ||
61 | virtual void setQueryRunner(const QSharedPointer<QObject> &runner) = 0; | ||
62 | 57 | ||
63 | void setRevision(qint64 revision) | 58 | void setRevision(qint64 revision) |
64 | { | 59 | { |
@@ -74,101 +69,6 @@ private: | |||
74 | qint64 mRevision; | 69 | qint64 mRevision; |
75 | }; | 70 | }; |
76 | 71 | ||
77 | template<class T, class Ptr> | ||
78 | class ModelResultProvider : public ResultProviderInterface<Ptr> { | ||
79 | public: | ||
80 | ModelResultProvider(QWeakPointer<ModelResult<T, Ptr> > model) | ||
81 | : ResultProviderInterface<Ptr>(), | ||
82 | mModel(model) | ||
83 | { | ||
84 | |||
85 | } | ||
86 | |||
87 | void add(const Ptr &value) | ||
88 | { | ||
89 | if (auto model = mModel.toStrongRef()) { | ||
90 | model->add(value); | ||
91 | } | ||
92 | } | ||
93 | |||
94 | void modify(const Ptr &value) | ||
95 | { | ||
96 | if (auto model = mModel.toStrongRef()) { | ||
97 | model->modify(value); | ||
98 | } | ||
99 | } | ||
100 | |||
101 | void remove(const Ptr &value) | ||
102 | { | ||
103 | if (auto model = mModel.toStrongRef()) { | ||
104 | model->remove(value); | ||
105 | } | ||
106 | } | ||
107 | |||
108 | void initialResultSetComplete() | ||
109 | { | ||
110 | // mResultEmitter->initialResultSetComplete(); | ||
111 | } | ||
112 | |||
113 | void complete() | ||
114 | { | ||
115 | // mResultEmitter->complete(); | ||
116 | } | ||
117 | |||
118 | void clear() | ||
119 | { | ||
120 | // mResultEmitter->clear(); | ||
121 | } | ||
122 | |||
123 | /** | ||
124 | * For lifetimemanagement only. | ||
125 | * We keep the runner alive as long as the result provider exists. | ||
126 | */ | ||
127 | void setFacade(const std::shared_ptr<void> &facade) | ||
128 | { | ||
129 | mFacade = facade; | ||
130 | } | ||
131 | |||
132 | void onDone(const std::function<void()> &callback) | ||
133 | { | ||
134 | mOnDoneCallback = callback; | ||
135 | } | ||
136 | |||
137 | bool isDone() const | ||
138 | { | ||
139 | //The existance of the emitter currently defines wether we're done or not. | ||
140 | // return mResultEmitter.toStrongRef().isNull(); | ||
141 | return true; | ||
142 | } | ||
143 | |||
144 | void setFetcher(const std::function<void(const Ptr &parent)> &fetcher) | ||
145 | { | ||
146 | if (auto model = mModel.toStrongRef()) { | ||
147 | model->setFetcher(fetcher); | ||
148 | } | ||
149 | } | ||
150 | |||
151 | void setQueryRunner(const QSharedPointer<QObject> &runner) | ||
152 | { | ||
153 | mQueryRunner = runner; | ||
154 | } | ||
155 | |||
156 | private: | ||
157 | void done() | ||
158 | { | ||
159 | qWarning() << "done"; | ||
160 | if (mOnDoneCallback) { | ||
161 | mOnDoneCallback(); | ||
162 | mOnDoneCallback = std::function<void()>(); | ||
163 | } | ||
164 | } | ||
165 | |||
166 | QWeakPointer<ModelResult<T, Ptr> > mModel; | ||
167 | QSharedPointer<QObject> mQueryRunner; | ||
168 | std::shared_ptr<void> mFacade; | ||
169 | std::function<void()> mOnDoneCallback; | ||
170 | }; | ||
171 | |||
172 | /* | 72 | /* |
173 | * The promise side for the result emitter | 73 | * The promise side for the result emitter |
174 | */ | 74 | */ |
@@ -204,6 +104,12 @@ private: | |||
204 | } | 104 | } |
205 | 105 | ||
206 | public: | 106 | public: |
107 | typedef QSharedPointer<ResultProvider<T> > Ptr; | ||
108 | |||
109 | ~ResultProvider() | ||
110 | { | ||
111 | } | ||
112 | |||
207 | //Called from worker thread | 113 | //Called from worker thread |
208 | void add(const T &value) | 114 | void add(const T &value) |
209 | { | 115 | { |
@@ -261,30 +167,16 @@ public: | |||
261 | //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again | 167 | //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again |
262 | auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ mThreadBoundary->callInMainThread([this]() {done();}); delete emitter; }); | 168 | auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ mThreadBoundary->callInMainThread([this]() {done();}); delete emitter; }); |
263 | mResultEmitter = sharedPtr; | 169 | mResultEmitter = sharedPtr; |
170 | sharedPtr->setFetcher([this](const T &parent) { | ||
171 | Q_ASSERT(mFetcher); | ||
172 | mFetcher(parent); | ||
173 | }); | ||
264 | return sharedPtr; | 174 | return sharedPtr; |
265 | } | 175 | } |
266 | 176 | ||
267 | return mResultEmitter.toStrongRef(); | 177 | return mResultEmitter.toStrongRef(); |
268 | } | 178 | } |
269 | 179 | ||
270 | /** | ||
271 | * For lifetimemanagement only. | ||
272 | * We keep the runner alive as long as the result provider exists. | ||
273 | */ | ||
274 | void setQueryRunner(const QSharedPointer<QObject> &runner) | ||
275 | { | ||
276 | mQueryRunner = runner; | ||
277 | } | ||
278 | |||
279 | /** | ||
280 | * For lifetimemanagement only. | ||
281 | * We keep the runner alive as long as the result provider exists. | ||
282 | */ | ||
283 | void setFacade(const std::shared_ptr<void> &facade) | ||
284 | { | ||
285 | mFacade = facade; | ||
286 | } | ||
287 | |||
288 | void onDone(const std::function<void()> &callback) | 180 | void onDone(const std::function<void()> &callback) |
289 | { | 181 | { |
290 | mThreadBoundary = QSharedPointer<ThreadBoundary>::create(); | 182 | mThreadBoundary = QSharedPointer<ThreadBoundary>::create(); |
@@ -299,7 +191,7 @@ public: | |||
299 | 191 | ||
300 | void setFetcher(const std::function<void(const T &parent)> &fetcher) | 192 | void setFetcher(const std::function<void(const T &parent)> &fetcher) |
301 | { | 193 | { |
302 | fetcher(T()); | 194 | mFetcher = fetcher; |
303 | } | 195 | } |
304 | 196 | ||
305 | private: | 197 | private: |
@@ -307,16 +199,17 @@ private: | |||
307 | { | 199 | { |
308 | qWarning() << "done"; | 200 | qWarning() << "done"; |
309 | if (mOnDoneCallback) { | 201 | if (mOnDoneCallback) { |
310 | mOnDoneCallback(); | 202 | auto callback = mOnDoneCallback; |
311 | mOnDoneCallback = std::function<void()>(); | 203 | mOnDoneCallback = std::function<void()>(); |
204 | //This may delete this object | ||
205 | callback(); | ||
312 | } | 206 | } |
313 | } | 207 | } |
314 | 208 | ||
315 | QWeakPointer<ResultEmitter<T> > mResultEmitter; | 209 | QWeakPointer<ResultEmitter<T> > mResultEmitter; |
316 | QSharedPointer<QObject> mQueryRunner; | ||
317 | std::shared_ptr<void> mFacade; | ||
318 | std::function<void()> mOnDoneCallback; | 210 | std::function<void()> mOnDoneCallback; |
319 | QSharedPointer<ThreadBoundary> mThreadBoundary; | 211 | QSharedPointer<ThreadBoundary> mThreadBoundary; |
212 | std::function<void(const T &parent)> mFetcher; | ||
320 | }; | 213 | }; |
321 | 214 | ||
322 | /* | 215 | /* |
@@ -334,6 +227,8 @@ private: | |||
334 | template<class DomainType> | 227 | template<class DomainType> |
335 | class ResultEmitter { | 228 | class ResultEmitter { |
336 | public: | 229 | public: |
230 | typedef QSharedPointer<ResultEmitter<DomainType> > Ptr; | ||
231 | |||
337 | void onAdded(const std::function<void(const DomainType&)> &handler) | 232 | void onAdded(const std::function<void(const DomainType&)> &handler) |
338 | { | 233 | { |
339 | addHandler = handler; | 234 | addHandler = handler; |
@@ -394,6 +289,13 @@ public: | |||
394 | clearHandler(); | 289 | clearHandler(); |
395 | } | 290 | } |
396 | 291 | ||
292 | void setFetcher(const std::function<void(const DomainType &parent)> &fetcher) | ||
293 | { | ||
294 | mFetcher = fetcher; | ||
295 | } | ||
296 | |||
297 | std::function<void(const DomainType &parent)> mFetcher; | ||
298 | |||
397 | private: | 299 | private: |
398 | friend class ResultProvider<DomainType>; | 300 | friend class ResultProvider<DomainType>; |
399 | 301 | ||
diff --git a/common/threadboundary.cpp b/common/threadboundary.cpp index 47ec508..48fd11a 100644 --- a/common/threadboundary.cpp +++ b/common/threadboundary.cpp | |||
@@ -24,6 +24,9 @@ Q_DECLARE_METATYPE(std::function<void()>); | |||
24 | 24 | ||
25 | namespace async { | 25 | namespace async { |
26 | ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType<std::function<void()> >("std::function<void()>"); } | 26 | ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType<std::function<void()> >("std::function<void()>"); } |
27 | ThreadBoundary:: ~ThreadBoundary() {} | 27 | ThreadBoundary:: ~ThreadBoundary() |
28 | { | ||
29 | } | ||
30 | |||
28 | } | 31 | } |
29 | 32 | ||