summaryrefslogtreecommitdiffstats
path: root/common/facade.h
diff options
context:
space:
mode:
Diffstat (limited to 'common/facade.h')
-rw-r--r--common/facade.h147
1 files changed, 60 insertions, 87 deletions
diff --git a/common/facade.h b/common/facade.h
index eb55c98..5be1c73 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -44,19 +44,15 @@ class QueryRunner : public QObject
44{ 44{
45 Q_OBJECT 45 Q_OBJECT
46public: 46public:
47 typedef std::function<KAsync::Job<qint64>(qint64 oldRevision)> QueryFunction; 47 typedef std::function<KAsync::Job<void>()> QueryFunction;
48 48
49 QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; 49 QueryRunner(const Akonadi2::Query &query) {};
50 /** 50 /**
51 * Starts query 51 * Starts query
52 */ 52 */
53 KAsync::Job<void> run(qint64 newRevision = 0) 53 KAsync::Job<void> run(qint64 newRevision = 0)
54 { 54 {
55 //TODO: JOBAPI: that last empty .then should not be necessary 55 return queryFunction();
56 //TODO: remove newRevision
57 return queryFunction(mLatestRevision + 1).then<void, qint64>([this](qint64 revision) {
58 mLatestRevision = revision;
59 }).then<void>([](){});
60 } 56 }
61 57
62 /** 58 /**
@@ -74,12 +70,11 @@ public slots:
74 void revisionChanged(qint64 newRevision) 70 void revisionChanged(qint64 newRevision)
75 { 71 {
76 Trace() << "New revision: " << newRevision; 72 Trace() << "New revision: " << newRevision;
77 run(newRevision).exec(); 73 run().exec();
78 } 74 }
79 75
80private: 76private:
81 QueryFunction queryFunction; 77 QueryFunction queryFunction;
82 qint64 mLatestRevision;
83}; 78};
84 79
85static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) 80static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
@@ -125,10 +120,9 @@ public:
125 * @param resourceIdentifier is the identifier of the resource instance 120 * @param resourceIdentifier is the identifier of the resource instance
126 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa 121 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa
127 */ 122 */
128 GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<EntityStorage<DomainType> > storage = QSharedPointer<EntityStorage<DomainType> >(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>()) 123 GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>())
129 : Akonadi2::StoreFacade<DomainType>(), 124 : Akonadi2::StoreFacade<DomainType>(),
130 mResourceAccess(resourceAccess), 125 mResourceAccess(resourceAccess),
131 mStorage(storage),
132 mDomainTypeAdaptorFactory(adaptorFactory), 126 mDomainTypeAdaptorFactory(adaptorFactory),
133 mResourceInstanceIdentifier(resourceIdentifier) 127 mResourceInstanceIdentifier(resourceIdentifier)
134 { 128 {
@@ -177,48 +171,28 @@ public:
177 //TODO JOBAPI return job from sync continuation to execute it as subjob? 171 //TODO JOBAPI return job from sync continuation to execute it as subjob?
178 KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE 172 KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE
179 { 173 {
180 { 174 auto fetchEntities = [this, query, resultProvider](const QByteArray &parent) {
181 QSet<QByteArray> remainingFilters; 175 Trace() << "Fetching initial set for parent:" << parent;
182 auto filter = [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
183 for (const auto &filterProperty : remainingFilters) {
184 //TODO implement other comparison operators than equality
185 if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) {
186 return false;
187 }
188 }
189 return true;
190 };
191
192 auto fetchEntities = [this, query, resultProvider, filter](const QByteArray &parent) {
193 Trace() << "Running fetchEntities" << parent;
194 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
195 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
196 Warning() << "Error during query: " << error.store << error.message;
197 });
198 176
199 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); 177 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
178 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
179 Warning() << "Error during query: " << error.store << error.message;
180 });
200 181
201 auto modifiedQuery = query; 182 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
202 modifiedQuery.propertyFilter.insert("parent", parent);
203 //TODO
204 QSet<QByteArray> appliedFilters;
205 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(modifiedQuery, mResourceInstanceIdentifier, appliedFilters, transaction);
206 QSet<QByteArray> remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
207 183
208 //We do a full scan if there were no indexes available to create the initial set. 184 auto modifiedQuery = query;
209 if (appliedFilters.isEmpty()) { 185 modifiedQuery.propertyFilter.insert("parent", parent);
210 //TODO this should be replaced by an index lookup as well 186
211 resultSet = fullScan(transaction, bufferTypeForDomainType()); 187 QSet<QByteArray> remainingFilters;
212 } 188 auto resultSet = loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters);
213 auto filteredSet = filterSet(resultSet, filter, transaction, true); 189 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, true);
214 replaySet(filteredSet, resultProvider); 190 replaySet(filteredSet, resultProvider);
215 resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); 191 const qint64 newRevision = Akonadi2::Storage::maxRevision(transaction);
216 qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); 192 resultProvider->setRevision(newRevision);
217 //TODO send newRevision to resource 193 mResourceAccess->sendRevisionReplayedCommand(newRevision);
218 // mResourceAccess->sendRevisionReplayedCommand(newRevision); 194 };
219 }; 195 resultProvider->setFetcher(fetchEntities);
220 resultProvider->setFetcher(fetchEntities);
221 }
222 196
223 auto runner = QSharedPointer<QueryRunner>::create(query); 197 auto runner = QSharedPointer<QueryRunner>::create(query);
224 QWeakPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > weakResultProvider = resultProvider; 198 QWeakPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > weakResultProvider = resultProvider;
@@ -233,8 +207,6 @@ public:
233 return; 207 return;
234 } 208 }
235 executeQuery(query, resultProvider).template then<void, qint64>([&future, this](qint64 queriedRevision) { 209 executeQuery(query, resultProvider).template then<void, qint64>([&future, this](qint64 queriedRevision) {
236 //TODO set revision in result provider?
237 //TODO update all existing results with new revision
238 mResourceAccess->sendRevisionReplayedCommand(queriedRevision); 210 mResourceAccess->sendRevisionReplayedCommand(queriedRevision);
239 future.setFinished(); 211 future.setFinished();
240 }).exec(); 212 }).exec();
@@ -249,27 +221,12 @@ public:
249 mResourceAccess->open(); 221 mResourceAccess->open();
250 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); 222 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged);
251 } 223 }
224 return KAsync::null<void>();
252 225
253 //We have to capture the runner to keep it alive 226 //We have to capture the runner to keep it alive
254 return synchronizeResource(query).template then<void>([runner](KAsync::Future<void> &future) {
255 future.setFinished();
256 },
257 [](int error, const QString &errorString) {
258 Warning() << "Error during sync " << error << errorString;
259 });
260 } 227 }
261 228
262private: 229private:
263 KAsync::Job<void> synchronizeResource(const Akonadi2::Query &query)
264 {
265 //TODO check if a sync is necessary
266 //TODO Only sync what was requested
267 //TODO timeout
268 if (query.syncOnDemand || query.processAll) {
269 return mResourceAccess->synchronizeResource(query.syncOnDemand, query.processAll);
270 }
271 return KAsync::null<void>();
272 }
273 230
274 //TODO move into result provider? 231 //TODO move into result provider?
275 void replaySet(ResultSet &resultSet, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) 232 void replaySet(ResultSet &resultSet, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider)
@@ -280,17 +237,14 @@ private:
280 Trace() << "Got creation"; 237 Trace() << "Got creation";
281 //TODO Only copy in result provider 238 //TODO Only copy in result provider
282 resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); 239 resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
283 // modelResult->add();
284 break; 240 break;
285 case Akonadi2::Operation_Modification: 241 case Akonadi2::Operation_Modification:
286 Trace() << "Got modification"; 242 Trace() << "Got modification";
287 resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); 243 resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
288 // modelResult->modify();
289 break; 244 break;
290 case Akonadi2::Operation_Removal: 245 case Akonadi2::Operation_Removal:
291 Trace() << "Got removal"; 246 Trace() << "Got removal";
292 resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); 247 resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
293 // modelResult->remove();
294 break; 248 break;
295 } 249 }
296 return true; 250 return true;
@@ -320,13 +274,28 @@ private:
320 }); 274 });
321 } 275 }
322 276
323 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) 277 ResultSet loadInitialResultSet(const QByteArray &parent, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
324 { 278 {
279 Trace() << "Fetching initial set for parent:" << parent;
280 //TODO
281 QSet<QByteArray> appliedFilters;
282 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
283 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
284
285 //We do a full scan if there were no indexes available to create the initial set.
286 if (appliedFilters.isEmpty()) {
287 //TODO this should be replaced by an index lookup as well
288 resultSet = fullScan(transaction, bufferTypeForDomainType());
289 }
290 return resultSet;
291 }
325 292
293 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
294 {
295 Trace() << "Loading incremental result set starting from revision: " << baseRevision;
326 const auto bufferType = bufferTypeForDomainType(); 296 const auto bufferType = bufferTypeForDomainType();
327 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); 297 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
328 //TODO apply filter from index 298 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
329 return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray {
330 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); 299 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
331 //Spit out the revision keys one by one. 300 //Spit out the revision keys one by one.
332 while (*revisionCounter <= topRevision) { 301 while (*revisionCounter <= topRevision) {
@@ -354,7 +323,7 @@ private:
354 //Read through the source values and return whatever matches the filter 323 //Read through the source values and return whatever matches the filter
355 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool { 324 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
356 while (resultSetPtr->next()) { 325 while (resultSetPtr->next()) {
357 //TODO only necessary if we actually want to filter or neew the operation type (but not a big deal if we do it always I guess) 326 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
358 readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { 327 readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
359 //Always remove removals, they probably don't match due to non-available properties 328 //Always remove removals, they probably don't match due to non-available properties
360 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { 329 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
@@ -374,6 +343,20 @@ private:
374 return ResultSet(generator); 343 return ResultSet(generator);
375 } 344 }
376 345
346
347 std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
348 {
349 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
350 for (const auto &filterProperty : remainingFilters) {
351 //TODO implement other comparison operators than equality
352 if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) {
353 return false;
354 }
355 }
356 return true;
357 };
358 }
359
377 virtual KAsync::Job<qint64> executeQuery(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) 360 virtual KAsync::Job<qint64> executeQuery(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider)
378 { 361 {
379 /* 362 /*
@@ -384,16 +367,6 @@ private:
384 const qint64 baseRevision = resultProvider->revision() + 1; 367 const qint64 baseRevision = resultProvider->revision() + 1;
385 Trace() << "Running query " << baseRevision; 368 Trace() << "Running query " << baseRevision;
386 QSet<QByteArray> remainingFilters; 369 QSet<QByteArray> remainingFilters;
387 auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
388 for (const auto &filterProperty : remainingFilters) {
389 //TODO implement other comparison operators than equality
390 if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) {
391 return false;
392 }
393 }
394 return true;
395 };
396 qint64 newRevision = 0;
397 370
398 Trace() << "Fetching updates"; 371 Trace() << "Fetching updates";
399 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); 372 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
@@ -404,10 +377,10 @@ private:
404 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); 377 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
405 378
406 auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); 379 auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
407 auto filteredSet = filterSet(resultSet, filter, transaction, false); 380 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false);
408 replaySet(filteredSet, resultProvider); 381 replaySet(filteredSet, resultProvider);
409 resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); 382 resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction));
410 newRevision = Akonadi2::Storage::maxRevision(transaction); 383 qint64 newRevision = Akonadi2::Storage::maxRevision(transaction);
411 384
412 return KAsync::start<qint64>([=]() -> qint64 { 385 return KAsync::start<qint64>([=]() -> qint64 {
413 return newRevision; 386 return newRevision;