diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-11-18 00:51:55 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-11-18 00:51:55 +0100 |
commit | 0f24357d01bd8a278f03793db863d3f71ac37ef2 (patch) | |
tree | 331c908d936d70447d7e0e4d5e65be2be74ef510 /common/facade.h | |
parent | b68a67fdbe0eb73aaef648ceb686824c7fbc1552 (diff) | |
download | sink-0f24357d01bd8a278f03793db863d3f71ac37ef2.tar.gz sink-0f24357d01bd8a278f03793db863d3f71ac37ef2.zip |
Don't use a smart pointer for the result provider
We're not doing any lifetime management anyways.
Diffstat (limited to 'common/facade.h')
-rw-r--r-- | common/facade.h | 77 |
1 files changed, 35 insertions, 42 deletions
diff --git a/common/facade.h b/common/facade.h index dcbe589..82fd5ff 100644 --- a/common/facade.h +++ b/common/facade.h | |||
@@ -169,68 +169,54 @@ public: | |||
169 | return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); | 169 | return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); |
170 | } | 170 | } |
171 | 171 | ||
172 | //TODO JOBAPI return job from sync continuation to execute it as subjob? | 172 | KAsync::Job<void> load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) Q_DECL_OVERRIDE |
173 | KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE | ||
174 | { | 173 | { |
175 | QWeakPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > weakResultProvider = resultProvider; | ||
176 | |||
177 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. | 174 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. |
178 | resultProvider->setFetcher([this, query, weakResultProvider](const QByteArray &parent) { | 175 | resultProvider.setFetcher([this, query, &resultProvider](const QByteArray &parent) { |
179 | if (auto resultProvider = weakResultProvider.toStrongRef()) { | 176 | const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); |
180 | const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); | 177 | mResourceAccess->sendRevisionReplayedCommand(newRevision); |
181 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
182 | } else { | ||
183 | Warning() << "Tried executing query after result provider is already gone"; | ||
184 | } | ||
185 | }); | 178 | }); |
186 | 179 | ||
187 | auto runner = QSharedPointer<QueryRunner>::create(query); | ||
188 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | ||
189 | runner->setQuery([this, weakResultProvider, query] () -> KAsync::Job<void> { | ||
190 | return KAsync::start<void>([this, weakResultProvider, query](KAsync::Future<void> &future) { | ||
191 | Trace() << "Executing query "; | ||
192 | if (auto resultProvider = weakResultProvider.toStrongRef()) { | ||
193 | const qint64 newRevision = executeIncrementalQuery(query, resultProvider); | ||
194 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
195 | } else { | ||
196 | Warning() << "Tried executing query after result provider is already gone"; | ||
197 | future.setError(0, QString()); | ||
198 | } | ||
199 | future.setFinished(); | ||
200 | }); | ||
201 | }); | ||
202 | 180 | ||
203 | //In case of a live query we keep the runner for as long alive as the result provider exists | 181 | //In case of a live query we keep the runner for as long alive as the result provider exists |
204 | if (query.liveQuery) { | 182 | if (query.liveQuery) { |
205 | resultProvider->setQueryRunner(runner); | 183 | auto runner = QSharedPointer<QueryRunner>::create(query); |
184 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | ||
185 | runner->setQuery([this, query, &resultProvider] () -> KAsync::Job<void> { | ||
186 | return KAsync::start<void>([this, query, &resultProvider](KAsync::Future<void> &future) { | ||
187 | Trace() << "Executing query "; | ||
188 | const qint64 newRevision = executeIncrementalQuery(query, resultProvider); | ||
189 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
190 | future.setFinished(); | ||
191 | }); | ||
192 | }); | ||
193 | resultProvider.setQueryRunner(runner); | ||
206 | //Ensure the connection is open, if it wasn't already opened | 194 | //Ensure the connection is open, if it wasn't already opened |
207 | //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | 195 | //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates |
208 | mResourceAccess->open(); | 196 | mResourceAccess->open(); |
209 | QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); | 197 | QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); |
210 | } | 198 | } |
211 | return KAsync::null<void>(); | 199 | return KAsync::null<void>(); |
212 | |||
213 | //We have to capture the runner to keep it alive | ||
214 | } | 200 | } |
215 | 201 | ||
216 | private: | 202 | private: |
217 | 203 | ||
218 | //TODO move into result provider? | 204 | //TODO move into result provider? |
219 | static void replaySet(ResultSet &resultSet, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) | 205 | static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) |
220 | { | 206 | { |
221 | while (resultSet.next([resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { | 207 | while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { |
222 | switch (operation) { | 208 | switch (operation) { |
223 | case Akonadi2::Operation_Creation: | 209 | case Akonadi2::Operation_Creation: |
224 | Trace() << "Got creation"; | 210 | Trace() << "Got creation"; |
225 | resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | 211 | resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); |
226 | break; | 212 | break; |
227 | case Akonadi2::Operation_Modification: | 213 | case Akonadi2::Operation_Modification: |
228 | Trace() << "Got modification"; | 214 | Trace() << "Got modification"; |
229 | resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | 215 | resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); |
230 | break; | 216 | break; |
231 | case Akonadi2::Operation_Removal: | 217 | case Akonadi2::Operation_Removal: |
232 | Trace() << "Got removal"; | 218 | Trace() << "Got removal"; |
233 | resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | 219 | resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); |
234 | break; | 220 | break; |
235 | } | 221 | } |
236 | return true; | 222 | return true; |
@@ -281,6 +267,7 @@ private: | |||
281 | Trace() << "Loading incremental result set starting from revision: " << baseRevision; | 267 | Trace() << "Loading incremental result set starting from revision: " << baseRevision; |
282 | const auto bufferType = bufferTypeForDomainType(); | 268 | const auto bufferType = bufferTypeForDomainType(); |
283 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | 269 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); |
270 | remainingFilters = query.propertyFilter.keys().toSet(); | ||
284 | return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { | 271 | return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { |
285 | const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); | 272 | const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); |
286 | //Spit out the revision keys one by one. | 273 | //Spit out the revision keys one by one. |
@@ -334,16 +321,22 @@ private: | |||
334 | { | 321 | { |
335 | return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | 322 | return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { |
336 | for (const auto &filterProperty : remainingFilters) { | 323 | for (const auto &filterProperty : remainingFilters) { |
337 | //TODO implement other comparison operators than equality | 324 | const auto property = domainObject->getProperty(filterProperty); |
338 | if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { | 325 | if (property.isValid()) { |
339 | return false; | 326 | //TODO implement other comparison operators than equality |
327 | if (property != query.propertyFilter.value(filterProperty)) { | ||
328 | Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); | ||
329 | return false; | ||
330 | } | ||
331 | } else { | ||
332 | Warning() << "Ignored property filter because value is invalid: " << filterProperty; | ||
340 | } | 333 | } |
341 | } | 334 | } |
342 | return true; | 335 | return true; |
343 | }; | 336 | }; |
344 | } | 337 | } |
345 | 338 | ||
346 | qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) | 339 | qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) |
347 | { | 340 | { |
348 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); | 341 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); |
349 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { | 342 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { |
@@ -355,21 +348,21 @@ private: | |||
355 | auto resultSet = baseSetRetriever(transaction, remainingFilters); | 348 | auto resultSet = baseSetRetriever(transaction, remainingFilters); |
356 | auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); | 349 | auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); |
357 | replaySet(filteredSet, resultProvider); | 350 | replaySet(filteredSet, resultProvider); |
358 | resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); | 351 | resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); |
359 | return Akonadi2::Storage::maxRevision(transaction); | 352 | return Akonadi2::Storage::maxRevision(transaction); |
360 | } | 353 | } |
361 | 354 | ||
362 | 355 | ||
363 | qint64 executeIncrementalQuery(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) | 356 | qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) |
364 | { | 357 | { |
365 | const qint64 baseRevision = resultProvider->revision() + 1; | 358 | const qint64 baseRevision = resultProvider.revision() + 1; |
366 | Trace() << "Running incremental query " << baseRevision; | 359 | Trace() << "Running incremental query " << baseRevision; |
367 | return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { | 360 | return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { |
368 | return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); | 361 | return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); |
369 | }, resultProvider); | 362 | }, resultProvider); |
370 | } | 363 | } |
371 | 364 | ||
372 | qint64 executeInitialQuery(const Akonadi2::Query &query, const QByteArray &parent, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) | 365 | qint64 executeInitialQuery(const Akonadi2::Query &query, const QByteArray &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) |
373 | { | 366 | { |
374 | Trace() << "Running initial query for parent:" << parent; | 367 | Trace() << "Running initial query for parent:" << parent; |
375 | auto modifiedQuery = query; | 368 | auto modifiedQuery = query; |