summaryrefslogtreecommitdiffstats
path: root/common/facade.h
diff options
context:
space:
mode:
Diffstat (limited to 'common/facade.h')
-rw-r--r--common/facade.h263
1 files changed, 188 insertions, 75 deletions
diff --git a/common/facade.h b/common/facade.h
index 643ebec..eb55c98 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -135,68 +135,6 @@ public:
135 if (!mResourceAccess) { 135 if (!mResourceAccess) {
136 mResourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resourceIdentifier); 136 mResourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resourceIdentifier);
137 } 137 }
138 if (!mStorage) {
139 mStorage = QSharedPointer<EntityStorage<DomainType> >::create(resourceIdentifier);
140 const auto bufferType = bufferTypeForDomainType();
141
142 mStorage->readEntity = [bufferType, this] (const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
143 {
144 //This only works for a 1:1 mapping of resource to domain types.
145 //Not i.e. for tags that are stored as flags in each entity of an imap store.
146 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
147 //could be added to the adaptor.
148 transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
149 Akonadi2::EntityBuffer buffer(value.data(), value.size());
150 const Akonadi2::Entity &entity = buffer.entity();
151 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
152 Q_ASSERT(metadataBuffer);
153 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
154 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
155 return false;
156 },
157 [](const Akonadi2::Storage::Error &error) {
158 qWarning() << "Error during query: " << error.message;
159 });
160 };
161
162 mStorage->loadInitialResultSet = [bufferType, this] (const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet
163 {
164 QSet<QByteArray> appliedFilters;
165 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
166 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
167
168 //We do a full scan if there were no indexes available to create the initial set.
169 if (appliedFilters.isEmpty()) {
170 //TODO this should be replaced by an index lookup as well
171 return fullScan(transaction, bufferType);
172 }
173 return resultSet;
174 };
175
176 mStorage->loadIncrementalResultSet = [bufferType, this] (qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet
177 {
178 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
179 return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray {
180 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
181 //Spit out the revision keys one by one.
182 while (*revisionCounter <= topRevision) {
183 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
184 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
185 Trace() << "Revision" << *revisionCounter << type << uid;
186 if (type != bufferType) {
187 //Skip revision
188 *revisionCounter += 1;
189 continue;
190 }
191 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
192 *revisionCounter += 1;
193 return key;
194 }
195 //We're done
196 return QByteArray();
197 });
198 };
199 }
200 } 138 }
201 139
202 ~GenericFacade() 140 ~GenericFacade()
@@ -237,13 +175,56 @@ public:
237 } 175 }
238 176
239 //TODO JOBAPI return job from sync continuation to execute it as subjob? 177 //TODO JOBAPI return job from sync continuation to execute it as subjob?
240 KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE 178 KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE
241 { 179 {
180 {
181 QSet<QByteArray> remainingFilters;
182 auto filter = [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
183 for (const auto &filterProperty : remainingFilters) {
184 //TODO implement other comparison operators than equality
185 if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) {
186 return false;
187 }
188 }
189 return true;
190 };
191
192 auto fetchEntities = [this, query, resultProvider, filter](const QByteArray &parent) {
193 Trace() << "Running fetchEntities" << parent;
194 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
195 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
196 Warning() << "Error during query: " << error.store << error.message;
197 });
198
199 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
200
201 auto modifiedQuery = query;
202 modifiedQuery.propertyFilter.insert("parent", parent);
203 //TODO
204 QSet<QByteArray> appliedFilters;
205 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(modifiedQuery, mResourceInstanceIdentifier, appliedFilters, transaction);
206 QSet<QByteArray> remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
207
208 //We do a full scan if there were no indexes available to create the initial set.
209 if (appliedFilters.isEmpty()) {
210 //TODO this should be replaced by an index lookup as well
211 resultSet = fullScan(transaction, bufferTypeForDomainType());
212 }
213 auto filteredSet = filterSet(resultSet, filter, transaction, true);
214 replaySet(filteredSet, resultProvider);
215 resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction));
216 qint64 newRevision = Akonadi2::Storage::maxRevision(transaction);
217 //TODO send newRevision to resource
218 // mResourceAccess->sendRevisionReplayedCommand(newRevision);
219 };
220 resultProvider->setFetcher(fetchEntities);
221 }
222
242 auto runner = QSharedPointer<QueryRunner>::create(query); 223 auto runner = QSharedPointer<QueryRunner>::create(query);
243 QWeakPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > weakResultProvider = resultProvider; 224 QWeakPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > weakResultProvider = resultProvider;
244 runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision) -> KAsync::Job<qint64> { 225 runner->setQuery([this, weakResultProvider, query] () -> KAsync::Job<void> {
245 return KAsync::start<qint64>([this, weakResultProvider, query, oldRevision](KAsync::Future<qint64> &future) { 226 return KAsync::start<void>([this, weakResultProvider, query](KAsync::Future<void> &future) {
246 Trace() << "Executing query " << oldRevision; 227 Trace() << "Executing query ";
247 auto resultProvider = weakResultProvider.toStrongRef(); 228 auto resultProvider = weakResultProvider.toStrongRef();
248 if (!resultProvider) { 229 if (!resultProvider) {
249 Warning() << "Tried executing query after result provider is already gone"; 230 Warning() << "Tried executing query after result provider is already gone";
@@ -251,11 +232,10 @@ public:
251 future.setFinished(); 232 future.setFinished();
252 return; 233 return;
253 } 234 }
254 load(query, resultProvider, oldRevision).template then<void, qint64>([&future, this](qint64 queriedRevision) { 235 executeQuery(query, resultProvider).template then<void, qint64>([&future, this](qint64 queriedRevision) {
255 //TODO set revision in result provider? 236 //TODO set revision in result provider?
256 //TODO update all existing results with new revision 237 //TODO update all existing results with new revision
257 mResourceAccess->sendRevisionReplayedCommand(queriedRevision); 238 mResourceAccess->sendRevisionReplayedCommand(queriedRevision);
258 future.setValue(queriedRevision);
259 future.setFinished(); 239 future.setFinished();
260 }).exec(); 240 }).exec();
261 }); 241 });
@@ -272,9 +252,7 @@ public:
272 252
273 //We have to capture the runner to keep it alive 253 //We have to capture the runner to keep it alive
274 return synchronizeResource(query).template then<void>([runner](KAsync::Future<void> &future) { 254 return synchronizeResource(query).template then<void>([runner](KAsync::Future<void> &future) {
275 runner->run().then<void>([&future]() { 255 future.setFinished();
276 future.setFinished();
277 }).exec();
278 }, 256 },
279 [](int error, const QString &errorString) { 257 [](int error, const QString &errorString) {
280 Warning() << "Error during sync " << error << errorString; 258 Warning() << "Error during sync " << error << errorString;
@@ -293,17 +271,152 @@ private:
293 return KAsync::null<void>(); 271 return KAsync::null<void>();
294 } 272 }
295 273
296 virtual KAsync::Job<qint64> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider, qint64 oldRevision) 274 //TODO move into result provider?
275 void replaySet(ResultSet &resultSet, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider)
276 {
277 while (resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
278 switch (operation) {
279 case Akonadi2::Operation_Creation:
280 Trace() << "Got creation";
281 //TODO Only copy in result provider
282 resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
283 // modelResult->add();
284 break;
285 case Akonadi2::Operation_Modification:
286 Trace() << "Got modification";
287 resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
288 // modelResult->modify();
289 break;
290 case Akonadi2::Operation_Removal:
291 Trace() << "Got removal";
292 resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
293 // modelResult->remove();
294 break;
295 }
296 return true;
297 })){};
298 }
299
300 void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
301 {
302 const auto bufferType = bufferTypeForDomainType();
303 //This only works for a 1:1 mapping of resource to domain types.
304 //Not i.e. for tags that are stored as flags in each entity of an imap store.
305 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
306 //could be added to the adaptor.
307 //
308 // Akonadi2::Storage::getLatest(transaction, bufferTye, key);
309 transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
310 Akonadi2::EntityBuffer buffer(value.data(), value.size());
311 const Akonadi2::Entity &entity = buffer.entity();
312 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
313 Q_ASSERT(metadataBuffer);
314 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
315 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
316 return false;
317 },
318 [](const Akonadi2::Storage::Error &error) {
319 qWarning() << "Error during query: " << error.message;
320 });
321 }
322
323 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
324 {
325
326 const auto bufferType = bufferTypeForDomainType();
327 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
328 //TODO apply filter from index
329 return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray {
330 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
331 //Spit out the revision keys one by one.
332 while (*revisionCounter <= topRevision) {
333 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
334 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
335 Trace() << "Revision" << *revisionCounter << type << uid;
336 if (type != bufferType) {
337 //Skip revision
338 *revisionCounter += 1;
339 continue;
340 }
341 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
342 *revisionCounter += 1;
343 return key;
344 }
345 //We're done
346 return QByteArray();
347 });
348 }
349
350 ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery)
297 { 351 {
352 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
353
354 //Read through the source values and return whatever matches the filter
355 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
356 while (resultSetPtr->next()) {
357 //TODO only necessary if we actually want to filter or neew the operation type (but not a big deal if we do it always I guess)
358 readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
359 //Always remove removals, they probably don't match due to non-available properties
360 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
361 if (initialQuery) {
362 //We're not interested in removals during the initial query
363 if (operation != Akonadi2::Operation_Removal) {
364 callback(domainObject, Akonadi2::Operation_Creation);
365 }
366 } else {
367 callback(domainObject, operation);
368 }
369 }
370 });
371 }
372 return false;
373 };
374 return ResultSet(generator);
375 }
376
377 virtual KAsync::Job<qint64> executeQuery(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider)
378 {
379 /*
380 * This method gets called initially, and after every revision change.
381 * * We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
382 * * Incremental updates are loaded directly, leaving it up to the model to discard the changes if they are not interesting
383 */
384 const qint64 baseRevision = resultProvider->revision() + 1;
385 Trace() << "Running query " << baseRevision;
386 QSet<QByteArray> remainingFilters;
387 auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
388 for (const auto &filterProperty : remainingFilters) {
389 //TODO implement other comparison operators than equality
390 if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) {
391 return false;
392 }
393 }
394 return true;
395 };
396 qint64 newRevision = 0;
397
398 Trace() << "Fetching updates";
399 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
400 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
401 Warning() << "Error during query: " << error.store << error.message;
402 });
403
404 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
405
406 auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
407 auto filteredSet = filterSet(resultSet, filter, transaction, false);
408 replaySet(filteredSet, resultProvider);
409 resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction));
410 newRevision = Akonadi2::Storage::maxRevision(transaction);
411
298 return KAsync::start<qint64>([=]() -> qint64 { 412 return KAsync::start<qint64>([=]() -> qint64 {
299 return mStorage->read(query, oldRevision, resultProvider); 413 return newRevision;
300 }); 414 });
301 } 415 }
302 416
303protected: 417protected:
304 //TODO use one resource access instance per application & per resource 418 //TODO use one resource access instance per application & per resource
305 QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess; 419 QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess;
306 QSharedPointer<EntityStorage<DomainType> > mStorage;
307 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; 420 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory;
308 QByteArray mResourceInstanceIdentifier; 421 QByteArray mResourceInstanceIdentifier;
309}; 422};