summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/facade.h91
1 files changed, 41 insertions, 50 deletions
diff --git a/common/facade.h b/common/facade.h
index 6e45e08..dcbe589 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -172,45 +172,31 @@ public:
172 //TODO JOBAPI return job from sync continuation to execute it as subjob? 172 //TODO JOBAPI return job from sync continuation to execute it as subjob?
173 KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE 173 KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE
174 { 174 {
175 auto fetchEntities = [this, query, resultProvider](const QByteArray &parent) { 175 QWeakPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > weakResultProvider = resultProvider;
176 Trace() << "Fetching initial set for parent:" << parent;
177
178 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
179 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
180 Warning() << "Error during query: " << error.store << error.message;
181 });
182
183 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
184
185 auto modifiedQuery = query;
186 modifiedQuery.propertyFilter.insert("parent", parent);
187 176
188 QSet<QByteArray> remainingFilters; 177 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
189 auto resultSet = loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters); 178 resultProvider->setFetcher([this, query, weakResultProvider](const QByteArray &parent) {
190 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, true); 179 if (auto resultProvider = weakResultProvider.toStrongRef()) {
191 replaySet(filteredSet, resultProvider); 180 const qint64 newRevision = executeInitialQuery(query, parent, resultProvider);
192 const qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); 181 mResourceAccess->sendRevisionReplayedCommand(newRevision);
193 resultProvider->setRevision(newRevision); 182 } else {
194 mResourceAccess->sendRevisionReplayedCommand(newRevision); 183 Warning() << "Tried executing query after result provider is already gone";
195 }; 184 }
196 resultProvider->setFetcher(fetchEntities); 185 });
197 186
198 auto runner = QSharedPointer<QueryRunner>::create(query); 187 auto runner = QSharedPointer<QueryRunner>::create(query);
199 QWeakPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > weakResultProvider = resultProvider; 188 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
200 runner->setQuery([this, weakResultProvider, query] () -> KAsync::Job<void> { 189 runner->setQuery([this, weakResultProvider, query] () -> KAsync::Job<void> {
201 return KAsync::start<void>([this, weakResultProvider, query](KAsync::Future<void> &future) { 190 return KAsync::start<void>([this, weakResultProvider, query](KAsync::Future<void> &future) {
202 Trace() << "Executing query "; 191 Trace() << "Executing query ";
203 auto resultProvider = weakResultProvider.toStrongRef(); 192 if (auto resultProvider = weakResultProvider.toStrongRef()) {
204 if (!resultProvider) { 193 const qint64 newRevision = executeIncrementalQuery(query, resultProvider);
194 mResourceAccess->sendRevisionReplayedCommand(newRevision);
195 } else {
205 Warning() << "Tried executing query after result provider is already gone"; 196 Warning() << "Tried executing query after result provider is already gone";
206 future.setError(0, QString()); 197 future.setError(0, QString());
207 future.setFinished();
208 return;
209 } 198 }
210 executeQuery(query, resultProvider).template then<void, qint64>([&future, this](qint64 queriedRevision) { 199 future.setFinished();
211 mResourceAccess->sendRevisionReplayedCommand(queriedRevision);
212 future.setFinished();
213 }).exec();
214 }); 200 });
215 }); 201 });
216 202
@@ -230,13 +216,12 @@ public:
230private: 216private:
231 217
232 //TODO move into result provider? 218 //TODO move into result provider?
233 void replaySet(ResultSet &resultSet, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) 219 static void replaySet(ResultSet &resultSet, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider)
234 { 220 {
235 while (resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { 221 while (resultSet.next([resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
236 switch (operation) { 222 switch (operation) {
237 case Akonadi2::Operation_Creation: 223 case Akonadi2::Operation_Creation:
238 Trace() << "Got creation"; 224 Trace() << "Got creation";
239 //TODO Only copy in result provider
240 resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); 225 resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
241 break; 226 break;
242 case Akonadi2::Operation_Modification: 227 case Akonadi2::Operation_Modification:
@@ -358,34 +343,40 @@ private:
358 }; 343 };
359 } 344 }
360 345
361 virtual KAsync::Job<qint64> executeQuery(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) 346 qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider)
362 { 347 {
363 /*
364 * This method gets called initially, and after every revision change.
365 * * We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
366 * * Incremental updates are loaded directly, leaving it up to the model to discard the changes if they are not interesting
367 */
368 const qint64 baseRevision = resultProvider->revision() + 1;
369 Trace() << "Running query " << baseRevision;
370 QSet<QByteArray> remainingFilters;
371
372 Trace() << "Fetching updates";
373 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); 348 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
374 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { 349 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
375 Warning() << "Error during query: " << error.store << error.message; 350 Warning() << "Error during query: " << error.store << error.message;
376 }); 351 });
377
378 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); 352 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
379 353
380 auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); 354 QSet<QByteArray> remainingFilters;
355 auto resultSet = baseSetRetriever(transaction, remainingFilters);
381 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); 356 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false);
382 replaySet(filteredSet, resultProvider); 357 replaySet(filteredSet, resultProvider);
383 resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); 358 resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction));
384 qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); 359 return Akonadi2::Storage::maxRevision(transaction);
360 }
385 361
386 return KAsync::start<qint64>([=]() -> qint64 { 362
387 return newRevision; 363 qint64 executeIncrementalQuery(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider)
388 }); 364 {
365 const qint64 baseRevision = resultProvider->revision() + 1;
366 Trace() << "Running incremental query " << baseRevision;
367 return load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
368 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
369 }, resultProvider);
370 }
371
372 qint64 executeInitialQuery(const Akonadi2::Query &query, const QByteArray &parent, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider)
373 {
374 Trace() << "Running initial query for parent:" << parent;
375 auto modifiedQuery = query;
376 modifiedQuery.propertyFilter.insert("parent", parent);
377 return load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
378 return loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters);
379 }, resultProvider);
389 } 380 }
390 381
391protected: 382protected: