diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-11-13 23:31:41 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-11-13 23:31:41 +0100 |
commit | 75c231f0758603120ec562af772b48b5f6ac0e24 (patch) | |
tree | 39abf8f038bd379ee3c0640a7476e6129ed71f8c /common/facade.h | |
parent | 09aafbd1373b5d1152ac7a453a140a7f76c2e90e (diff) | |
download | sink-75c231f0758603120ec562af772b48b5f6ac0e24.tar.gz sink-75c231f0758603120ec562af772b48b5f6ac0e24.zip |
DummyResourceTest and QueryTest are passing
sync has been removed from the query code and is now a separate step
Diffstat (limited to 'common/facade.h')
-rw-r--r-- | common/facade.h | 147 |
1 files changed, 60 insertions, 87 deletions
diff --git a/common/facade.h b/common/facade.h index eb55c98..5be1c73 100644 --- a/common/facade.h +++ b/common/facade.h | |||
@@ -44,19 +44,15 @@ class QueryRunner : public QObject | |||
44 | { | 44 | { |
45 | Q_OBJECT | 45 | Q_OBJECT |
46 | public: | 46 | public: |
47 | typedef std::function<KAsync::Job<qint64>(qint64 oldRevision)> QueryFunction; | 47 | typedef std::function<KAsync::Job<void>()> QueryFunction; |
48 | 48 | ||
49 | QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; | 49 | QueryRunner(const Akonadi2::Query &query) {}; |
50 | /** | 50 | /** |
51 | * Starts query | 51 | * Starts query |
52 | */ | 52 | */ |
53 | KAsync::Job<void> run(qint64 newRevision = 0) | 53 | KAsync::Job<void> run(qint64 newRevision = 0) |
54 | { | 54 | { |
55 | //TODO: JOBAPI: that last empty .then should not be necessary | 55 | return queryFunction(); |
56 | //TODO: remove newRevision | ||
57 | return queryFunction(mLatestRevision + 1).then<void, qint64>([this](qint64 revision) { | ||
58 | mLatestRevision = revision; | ||
59 | }).then<void>([](){}); | ||
60 | } | 56 | } |
61 | 57 | ||
62 | /** | 58 | /** |
@@ -74,12 +70,11 @@ public slots: | |||
74 | void revisionChanged(qint64 newRevision) | 70 | void revisionChanged(qint64 newRevision) |
75 | { | 71 | { |
76 | Trace() << "New revision: " << newRevision; | 72 | Trace() << "New revision: " << newRevision; |
77 | run(newRevision).exec(); | 73 | run().exec(); |
78 | } | 74 | } |
79 | 75 | ||
80 | private: | 76 | private: |
81 | QueryFunction queryFunction; | 77 | QueryFunction queryFunction; |
82 | qint64 mLatestRevision; | ||
83 | }; | 78 | }; |
84 | 79 | ||
85 | static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) | 80 | static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) |
@@ -125,10 +120,9 @@ public: | |||
125 | * @param resourceIdentifier is the identifier of the resource instance | 120 | * @param resourceIdentifier is the identifier of the resource instance |
126 | * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa | 121 | * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa |
127 | */ | 122 | */ |
128 | GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<EntityStorage<DomainType> > storage = QSharedPointer<EntityStorage<DomainType> >(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>()) | 123 | GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>()) |
129 | : Akonadi2::StoreFacade<DomainType>(), | 124 | : Akonadi2::StoreFacade<DomainType>(), |
130 | mResourceAccess(resourceAccess), | 125 | mResourceAccess(resourceAccess), |
131 | mStorage(storage), | ||
132 | mDomainTypeAdaptorFactory(adaptorFactory), | 126 | mDomainTypeAdaptorFactory(adaptorFactory), |
133 | mResourceInstanceIdentifier(resourceIdentifier) | 127 | mResourceInstanceIdentifier(resourceIdentifier) |
134 | { | 128 | { |
@@ -177,48 +171,28 @@ public: | |||
177 | //TODO JOBAPI return job from sync continuation to execute it as subjob? | 171 | //TODO JOBAPI return job from sync continuation to execute it as subjob? |
178 | KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE | 172 | KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE |
179 | { | 173 | { |
180 | { | 174 | auto fetchEntities = [this, query, resultProvider](const QByteArray &parent) { |
181 | QSet<QByteArray> remainingFilters; | 175 | Trace() << "Fetching initial set for parent:" << parent; |
182 | auto filter = [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | ||
183 | for (const auto &filterProperty : remainingFilters) { | ||
184 | //TODO implement other comparison operators than equality | ||
185 | if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { | ||
186 | return false; | ||
187 | } | ||
188 | } | ||
189 | return true; | ||
190 | }; | ||
191 | |||
192 | auto fetchEntities = [this, query, resultProvider, filter](const QByteArray &parent) { | ||
193 | Trace() << "Running fetchEntities" << parent; | ||
194 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); | ||
195 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { | ||
196 | Warning() << "Error during query: " << error.store << error.message; | ||
197 | }); | ||
198 | 176 | ||
199 | auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); | 177 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); |
178 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { | ||
179 | Warning() << "Error during query: " << error.store << error.message; | ||
180 | }); | ||
200 | 181 | ||
201 | auto modifiedQuery = query; | 182 | auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); |
202 | modifiedQuery.propertyFilter.insert("parent", parent); | ||
203 | //TODO | ||
204 | QSet<QByteArray> appliedFilters; | ||
205 | auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(modifiedQuery, mResourceInstanceIdentifier, appliedFilters, transaction); | ||
206 | QSet<QByteArray> remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; | ||
207 | 183 | ||
208 | //We do a full scan if there were no indexes available to create the initial set. | 184 | auto modifiedQuery = query; |
209 | if (appliedFilters.isEmpty()) { | 185 | modifiedQuery.propertyFilter.insert("parent", parent); |
210 | //TODO this should be replaced by an index lookup as well | 186 | |
211 | resultSet = fullScan(transaction, bufferTypeForDomainType()); | 187 | QSet<QByteArray> remainingFilters; |
212 | } | 188 | auto resultSet = loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters); |
213 | auto filteredSet = filterSet(resultSet, filter, transaction, true); | 189 | auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, true); |
214 | replaySet(filteredSet, resultProvider); | 190 | replaySet(filteredSet, resultProvider); |
215 | resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); | 191 | const qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); |
216 | qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); | 192 | resultProvider->setRevision(newRevision); |
217 | //TODO send newRevision to resource | 193 | mResourceAccess->sendRevisionReplayedCommand(newRevision); |
218 | // mResourceAccess->sendRevisionReplayedCommand(newRevision); | 194 | }; |
219 | }; | 195 | resultProvider->setFetcher(fetchEntities); |
220 | resultProvider->setFetcher(fetchEntities); | ||
221 | } | ||
222 | 196 | ||
223 | auto runner = QSharedPointer<QueryRunner>::create(query); | 197 | auto runner = QSharedPointer<QueryRunner>::create(query); |
224 | QWeakPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > weakResultProvider = resultProvider; | 198 | QWeakPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > weakResultProvider = resultProvider; |
@@ -233,8 +207,6 @@ public: | |||
233 | return; | 207 | return; |
234 | } | 208 | } |
235 | executeQuery(query, resultProvider).template then<void, qint64>([&future, this](qint64 queriedRevision) { | 209 | executeQuery(query, resultProvider).template then<void, qint64>([&future, this](qint64 queriedRevision) { |
236 | //TODO set revision in result provider? | ||
237 | //TODO update all existing results with new revision | ||
238 | mResourceAccess->sendRevisionReplayedCommand(queriedRevision); | 210 | mResourceAccess->sendRevisionReplayedCommand(queriedRevision); |
239 | future.setFinished(); | 211 | future.setFinished(); |
240 | }).exec(); | 212 | }).exec(); |
@@ -249,27 +221,12 @@ public: | |||
249 | mResourceAccess->open(); | 221 | mResourceAccess->open(); |
250 | QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); | 222 | QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); |
251 | } | 223 | } |
224 | return KAsync::null<void>(); | ||
252 | 225 | ||
253 | //We have to capture the runner to keep it alive | 226 | //We have to capture the runner to keep it alive |
254 | return synchronizeResource(query).template then<void>([runner](KAsync::Future<void> &future) { | ||
255 | future.setFinished(); | ||
256 | }, | ||
257 | [](int error, const QString &errorString) { | ||
258 | Warning() << "Error during sync " << error << errorString; | ||
259 | }); | ||
260 | } | 227 | } |
261 | 228 | ||
262 | private: | 229 | private: |
263 | KAsync::Job<void> synchronizeResource(const Akonadi2::Query &query) | ||
264 | { | ||
265 | //TODO check if a sync is necessary | ||
266 | //TODO Only sync what was requested | ||
267 | //TODO timeout | ||
268 | if (query.syncOnDemand || query.processAll) { | ||
269 | return mResourceAccess->synchronizeResource(query.syncOnDemand, query.processAll); | ||
270 | } | ||
271 | return KAsync::null<void>(); | ||
272 | } | ||
273 | 230 | ||
274 | //TODO move into result provider? | 231 | //TODO move into result provider? |
275 | void replaySet(ResultSet &resultSet, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) | 232 | void replaySet(ResultSet &resultSet, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) |
@@ -280,17 +237,14 @@ private: | |||
280 | Trace() << "Got creation"; | 237 | Trace() << "Got creation"; |
281 | //TODO Only copy in result provider | 238 | //TODO Only copy in result provider |
282 | resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | 239 | resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); |
283 | // modelResult->add(); | ||
284 | break; | 240 | break; |
285 | case Akonadi2::Operation_Modification: | 241 | case Akonadi2::Operation_Modification: |
286 | Trace() << "Got modification"; | 242 | Trace() << "Got modification"; |
287 | resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | 243 | resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); |
288 | // modelResult->modify(); | ||
289 | break; | 244 | break; |
290 | case Akonadi2::Operation_Removal: | 245 | case Akonadi2::Operation_Removal: |
291 | Trace() << "Got removal"; | 246 | Trace() << "Got removal"; |
292 | resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | 247 | resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); |
293 | // modelResult->remove(); | ||
294 | break; | 248 | break; |
295 | } | 249 | } |
296 | return true; | 250 | return true; |
@@ -320,13 +274,28 @@ private: | |||
320 | }); | 274 | }); |
321 | } | 275 | } |
322 | 276 | ||
323 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | 277 | ResultSet loadInitialResultSet(const QByteArray &parent, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) |
324 | { | 278 | { |
279 | Trace() << "Fetching initial set for parent:" << parent; | ||
280 | //TODO | ||
281 | QSet<QByteArray> appliedFilters; | ||
282 | auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); | ||
283 | remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; | ||
284 | |||
285 | //We do a full scan if there were no indexes available to create the initial set. | ||
286 | if (appliedFilters.isEmpty()) { | ||
287 | //TODO this should be replaced by an index lookup as well | ||
288 | resultSet = fullScan(transaction, bufferTypeForDomainType()); | ||
289 | } | ||
290 | return resultSet; | ||
291 | } | ||
325 | 292 | ||
293 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | ||
294 | { | ||
295 | Trace() << "Loading incremental result set starting from revision: " << baseRevision; | ||
326 | const auto bufferType = bufferTypeForDomainType(); | 296 | const auto bufferType = bufferTypeForDomainType(); |
327 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | 297 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); |
328 | //TODO apply filter from index | 298 | return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { |
329 | return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray { | ||
330 | const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); | 299 | const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); |
331 | //Spit out the revision keys one by one. | 300 | //Spit out the revision keys one by one. |
332 | while (*revisionCounter <= topRevision) { | 301 | while (*revisionCounter <= topRevision) { |
@@ -354,7 +323,7 @@ private: | |||
354 | //Read through the source values and return whatever matches the filter | 323 | //Read through the source values and return whatever matches the filter |
355 | std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool { | 324 | std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool { |
356 | while (resultSetPtr->next()) { | 325 | while (resultSetPtr->next()) { |
357 | //TODO only necessary if we actually want to filter or neew the operation type (but not a big deal if we do it always I guess) | 326 | //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) |
358 | readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { | 327 | readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { |
359 | //Always remove removals, they probably don't match due to non-available properties | 328 | //Always remove removals, they probably don't match due to non-available properties |
360 | if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { | 329 | if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { |
@@ -374,6 +343,20 @@ private: | |||
374 | return ResultSet(generator); | 343 | return ResultSet(generator); |
375 | } | 344 | } |
376 | 345 | ||
346 | |||
347 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query) | ||
348 | { | ||
349 | return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | ||
350 | for (const auto &filterProperty : remainingFilters) { | ||
351 | //TODO implement other comparison operators than equality | ||
352 | if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { | ||
353 | return false; | ||
354 | } | ||
355 | } | ||
356 | return true; | ||
357 | }; | ||
358 | } | ||
359 | |||
377 | virtual KAsync::Job<qint64> executeQuery(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) | 360 | virtual KAsync::Job<qint64> executeQuery(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) |
378 | { | 361 | { |
379 | /* | 362 | /* |
@@ -384,16 +367,6 @@ private: | |||
384 | const qint64 baseRevision = resultProvider->revision() + 1; | 367 | const qint64 baseRevision = resultProvider->revision() + 1; |
385 | Trace() << "Running query " << baseRevision; | 368 | Trace() << "Running query " << baseRevision; |
386 | QSet<QByteArray> remainingFilters; | 369 | QSet<QByteArray> remainingFilters; |
387 | auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | ||
388 | for (const auto &filterProperty : remainingFilters) { | ||
389 | //TODO implement other comparison operators than equality | ||
390 | if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { | ||
391 | return false; | ||
392 | } | ||
393 | } | ||
394 | return true; | ||
395 | }; | ||
396 | qint64 newRevision = 0; | ||
397 | 370 | ||
398 | Trace() << "Fetching updates"; | 371 | Trace() << "Fetching updates"; |
399 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); | 372 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); |
@@ -404,10 +377,10 @@ private: | |||
404 | auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); | 377 | auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); |
405 | 378 | ||
406 | auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); | 379 | auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); |
407 | auto filteredSet = filterSet(resultSet, filter, transaction, false); | 380 | auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false); |
408 | replaySet(filteredSet, resultProvider); | 381 | replaySet(filteredSet, resultProvider); |
409 | resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); | 382 | resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); |
410 | newRevision = Akonadi2::Storage::maxRevision(transaction); | 383 | qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); |
411 | 384 | ||
412 | return KAsync::start<qint64>([=]() -> qint64 { | 385 | return KAsync::start<qint64>([=]() -> qint64 { |
413 | return newRevision; | 386 | return newRevision; |