summaryrefslogtreecommitdiffstats
path: root/common/facade.h
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-11-18 00:51:55 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-11-18 00:51:55 +0100
commit0f24357d01bd8a278f03793db863d3f71ac37ef2 (patch)
tree331c908d936d70447d7e0e4d5e65be2be74ef510 /common/facade.h
parentb68a67fdbe0eb73aaef648ceb686824c7fbc1552 (diff)
downloadsink-0f24357d01bd8a278f03793db863d3f71ac37ef2.tar.gz
sink-0f24357d01bd8a278f03793db863d3f71ac37ef2.zip
Don't use a smart pointer for the result provider
We're not doing any lifetime management anyways.
Diffstat (limited to 'common/facade.h')
-rw-r--r--common/facade.h77
1 files changed, 35 insertions, 42 deletions
diff --git a/common/facade.h b/common/facade.h
index dcbe589..82fd5ff 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -169,68 +169,54 @@ public:
169 return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); 169 return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType());
170 } 170 }
171 171
172 //TODO JOBAPI return job from sync continuation to execute it as subjob? 172 KAsync::Job<void> load(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) Q_DECL_OVERRIDE
173 KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE
174 { 173 {
175 QWeakPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > weakResultProvider = resultProvider;
176
177 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. 174 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
178 resultProvider->setFetcher([this, query, weakResultProvider](const QByteArray &parent) { 175 resultProvider.setFetcher([this, query, &resultProvider](const QByteArray &parent) {
179 if (auto resultProvider = weakResultProvider.toStrongRef()) { 176 const qint64 newRevision = executeInitialQuery(query, parent, resultProvider);
180 const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); 177 mResourceAccess->sendRevisionReplayedCommand(newRevision);
181 mResourceAccess->sendRevisionReplayedCommand(newRevision);
182 } else {
183 Warning() << "Tried executing query after result provider is already gone";
184 }
185 }); 178 });
186 179
187 auto runner = QSharedPointer<QueryRunner>::create(query);
188 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
189 runner->setQuery([this, weakResultProvider, query] () -> KAsync::Job<void> {
190 return KAsync::start<void>([this, weakResultProvider, query](KAsync::Future<void> &future) {
191 Trace() << "Executing query ";
192 if (auto resultProvider = weakResultProvider.toStrongRef()) {
193 const qint64 newRevision = executeIncrementalQuery(query, resultProvider);
194 mResourceAccess->sendRevisionReplayedCommand(newRevision);
195 } else {
196 Warning() << "Tried executing query after result provider is already gone";
197 future.setError(0, QString());
198 }
199 future.setFinished();
200 });
201 });
202 180
203 //In case of a live query we keep the runner for as long alive as the result provider exists 181 //In case of a live query we keep the runner for as long alive as the result provider exists
204 if (query.liveQuery) { 182 if (query.liveQuery) {
205 resultProvider->setQueryRunner(runner); 183 auto runner = QSharedPointer<QueryRunner>::create(query);
184 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
185 runner->setQuery([this, query, &resultProvider] () -> KAsync::Job<void> {
186 return KAsync::start<void>([this, query, &resultProvider](KAsync::Future<void> &future) {
187 Trace() << "Executing query ";
188 const qint64 newRevision = executeIncrementalQuery(query, resultProvider);
189 mResourceAccess->sendRevisionReplayedCommand(newRevision);
190 future.setFinished();
191 });
192 });
193 resultProvider.setQueryRunner(runner);
206 //Ensure the connection is open, if it wasn't already opened 194 //Ensure the connection is open, if it wasn't already opened
207 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates 195 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
208 mResourceAccess->open(); 196 mResourceAccess->open();
209 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); 197 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged);
210 } 198 }
211 return KAsync::null<void>(); 199 return KAsync::null<void>();
212
213 //We have to capture the runner to keep it alive
214 } 200 }
215 201
216private: 202private:
217 203
218 //TODO move into result provider? 204 //TODO move into result provider?
219 static void replaySet(ResultSet &resultSet, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) 205 static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
220 { 206 {
221 while (resultSet.next([resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { 207 while (resultSet.next([&resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
222 switch (operation) { 208 switch (operation) {
223 case Akonadi2::Operation_Creation: 209 case Akonadi2::Operation_Creation:
224 Trace() << "Got creation"; 210 Trace() << "Got creation";
225 resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); 211 resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
226 break; 212 break;
227 case Akonadi2::Operation_Modification: 213 case Akonadi2::Operation_Modification:
228 Trace() << "Got modification"; 214 Trace() << "Got modification";
229 resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); 215 resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
230 break; 216 break;
231 case Akonadi2::Operation_Removal: 217 case Akonadi2::Operation_Removal:
232 Trace() << "Got removal"; 218 Trace() << "Got removal";
233 resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); 219 resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
234 break; 220 break;
235 } 221 }
236 return true; 222 return true;
@@ -281,6 +267,7 @@ private:
281 Trace() << "Loading incremental result set starting from revision: " << baseRevision; 267 Trace() << "Loading incremental result set starting from revision: " << baseRevision;
282 const auto bufferType = bufferTypeForDomainType(); 268 const auto bufferType = bufferTypeForDomainType();
283 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); 269 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
270 remainingFilters = query.propertyFilter.keys().toSet();
284 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { 271 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
285 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); 272 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
286 //Spit out the revision keys one by one. 273 //Spit out the revision keys one by one.
@@ -334,16 +321,22 @@ private:
334 { 321 {
335 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { 322 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
336 for (const auto &filterProperty : remainingFilters) { 323 for (const auto &filterProperty : remainingFilters) {
337 //TODO implement other comparison operators than equality 324 const auto property = domainObject->getProperty(filterProperty);
338 if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { 325 if (property.isValid()) {
339 return false; 326 //TODO implement other comparison operators than equality
327 if (property != query.propertyFilter.value(filterProperty)) {
328 Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty);
329 return false;
330 }
331 } else {
332 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
340 } 333 }
341 } 334 }
342 return true; 335 return true;
343 }; 336 };
344 } 337 }
345 338
346 qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) 339 qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
347 { 340 {
348 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); 341 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
349 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { 342 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
@@ -355,21 +348,21 @@ private:
355 auto resultSet = baseSetRetriever(transaction, remainingFilters); 348 auto resultSet = baseSetRetriever(transaction, remainingFilters);
356 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); 349 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false);
357 replaySet(filteredSet, resultProvider); 350 replaySet(filteredSet, resultProvider);
358 resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); 351 resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction));
359 return Akonadi2::Storage::maxRevision(transaction); 352 return Akonadi2::Storage::maxRevision(transaction);
360 } 353 }
361 354
362 355
363 qint64 executeIncrementalQuery(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) 356 qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
364 { 357 {
365 const qint64 baseRevision = resultProvider->revision() + 1; 358 const qint64 baseRevision = resultProvider.revision() + 1;
366 Trace() << "Running incremental query " << baseRevision; 359 Trace() << "Running incremental query " << baseRevision;
367 return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { 360 return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
368 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); 361 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
369 }, resultProvider); 362 }, resultProvider);
370 } 363 }
371 364
372 qint64 executeInitialQuery(const Akonadi2::Query &query, const QByteArray &parent, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) 365 qint64 executeInitialQuery(const Akonadi2::Query &query, const QByteArray &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
373 { 366 {
374 Trace() << "Running initial query for parent:" << parent; 367 Trace() << "Running initial query for parent:" << parent;
375 auto modifiedQuery = query; 368 auto modifiedQuery = query;