summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/clientapi.cpp2
-rw-r--r--common/facade.h147
-rw-r--r--common/modelresult.h8
-rw-r--r--common/resultprovider.h189
4 files changed, 83 insertions, 263 deletions
diff --git a/common/clientapi.cpp b/common/clientapi.cpp
index f99ebb8..839e77b 100644
--- a/common/clientapi.cpp
+++ b/common/clientapi.cpp
@@ -95,7 +95,7 @@ KAsync::Job<void> Store::synchronize(const Akonadi2::Query &query)
95 .template each<void, QByteArray>([query](const QByteArray &resource, KAsync::Future<void> &future) { 95 .template each<void, QByteArray>([query](const QByteArray &resource, KAsync::Future<void> &future) {
96 auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resource); 96 auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resource);
97 resourceAccess->open(); 97 resourceAccess->open();
98 resourceAccess->synchronizeResource(true, false).then<void>([&future]() { 98 resourceAccess->synchronizeResource(query.syncOnDemand, query.processAll).then<void>([&future, resourceAccess]() {
99 future.setFinished(); 99 future.setFinished();
100 }).exec(); 100 }).exec();
101 }) 101 })
diff --git a/common/facade.h b/common/facade.h
index eb55c98..5be1c73 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -44,19 +44,15 @@ class QueryRunner : public QObject
44{ 44{
45 Q_OBJECT 45 Q_OBJECT
46public: 46public:
47 typedef std::function<KAsync::Job<qint64>(qint64 oldRevision)> QueryFunction; 47 typedef std::function<KAsync::Job<void>()> QueryFunction;
48 48
49 QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; 49 QueryRunner(const Akonadi2::Query &query) {};
50 /** 50 /**
51 * Starts query 51 * Starts query
52 */ 52 */
53 KAsync::Job<void> run(qint64 newRevision = 0) 53 KAsync::Job<void> run(qint64 newRevision = 0)
54 { 54 {
55 //TODO: JOBAPI: that last empty .then should not be necessary 55 return queryFunction();
56 //TODO: remove newRevision
57 return queryFunction(mLatestRevision + 1).then<void, qint64>([this](qint64 revision) {
58 mLatestRevision = revision;
59 }).then<void>([](){});
60 } 56 }
61 57
62 /** 58 /**
@@ -74,12 +70,11 @@ public slots:
74 void revisionChanged(qint64 newRevision) 70 void revisionChanged(qint64 newRevision)
75 { 71 {
76 Trace() << "New revision: " << newRevision; 72 Trace() << "New revision: " << newRevision;
77 run(newRevision).exec(); 73 run().exec();
78 } 74 }
79 75
80private: 76private:
81 QueryFunction queryFunction; 77 QueryFunction queryFunction;
82 qint64 mLatestRevision;
83}; 78};
84 79
85static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) 80static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
@@ -125,10 +120,9 @@ public:
125 * @param resourceIdentifier is the identifier of the resource instance 120 * @param resourceIdentifier is the identifier of the resource instance
126 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa 121 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa
127 */ 122 */
128 GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<EntityStorage<DomainType> > storage = QSharedPointer<EntityStorage<DomainType> >(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>()) 123 GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>())
129 : Akonadi2::StoreFacade<DomainType>(), 124 : Akonadi2::StoreFacade<DomainType>(),
130 mResourceAccess(resourceAccess), 125 mResourceAccess(resourceAccess),
131 mStorage(storage),
132 mDomainTypeAdaptorFactory(adaptorFactory), 126 mDomainTypeAdaptorFactory(adaptorFactory),
133 mResourceInstanceIdentifier(resourceIdentifier) 127 mResourceInstanceIdentifier(resourceIdentifier)
134 { 128 {
@@ -177,48 +171,28 @@ public:
177 //TODO JOBAPI return job from sync continuation to execute it as subjob? 171 //TODO JOBAPI return job from sync continuation to execute it as subjob?
178 KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE 172 KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE
179 { 173 {
180 { 174 auto fetchEntities = [this, query, resultProvider](const QByteArray &parent) {
181 QSet<QByteArray> remainingFilters; 175 Trace() << "Fetching initial set for parent:" << parent;
182 auto filter = [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
183 for (const auto &filterProperty : remainingFilters) {
184 //TODO implement other comparison operators than equality
185 if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) {
186 return false;
187 }
188 }
189 return true;
190 };
191
192 auto fetchEntities = [this, query, resultProvider, filter](const QByteArray &parent) {
193 Trace() << "Running fetchEntities" << parent;
194 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
195 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
196 Warning() << "Error during query: " << error.store << error.message;
197 });
198 176
199 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); 177 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
178 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
179 Warning() << "Error during query: " << error.store << error.message;
180 });
200 181
201 auto modifiedQuery = query; 182 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
202 modifiedQuery.propertyFilter.insert("parent", parent);
203 //TODO
204 QSet<QByteArray> appliedFilters;
205 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(modifiedQuery, mResourceInstanceIdentifier, appliedFilters, transaction);
206 QSet<QByteArray> remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
207 183
208 //We do a full scan if there were no indexes available to create the initial set. 184 auto modifiedQuery = query;
209 if (appliedFilters.isEmpty()) { 185 modifiedQuery.propertyFilter.insert("parent", parent);
210 //TODO this should be replaced by an index lookup as well 186
211 resultSet = fullScan(transaction, bufferTypeForDomainType()); 187 QSet<QByteArray> remainingFilters;
212 } 188 auto resultSet = loadInitialResultSet(parent, modifiedQuery, transaction, remainingFilters);
213 auto filteredSet = filterSet(resultSet, filter, transaction, true); 189 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, true);
214 replaySet(filteredSet, resultProvider); 190 replaySet(filteredSet, resultProvider);
215 resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); 191 const qint64 newRevision = Akonadi2::Storage::maxRevision(transaction);
216 qint64 newRevision = Akonadi2::Storage::maxRevision(transaction); 192 resultProvider->setRevision(newRevision);
217 //TODO send newRevision to resource 193 mResourceAccess->sendRevisionReplayedCommand(newRevision);
218 // mResourceAccess->sendRevisionReplayedCommand(newRevision); 194 };
219 }; 195 resultProvider->setFetcher(fetchEntities);
220 resultProvider->setFetcher(fetchEntities);
221 }
222 196
223 auto runner = QSharedPointer<QueryRunner>::create(query); 197 auto runner = QSharedPointer<QueryRunner>::create(query);
224 QWeakPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > weakResultProvider = resultProvider; 198 QWeakPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > weakResultProvider = resultProvider;
@@ -233,8 +207,6 @@ public:
233 return; 207 return;
234 } 208 }
235 executeQuery(query, resultProvider).template then<void, qint64>([&future, this](qint64 queriedRevision) { 209 executeQuery(query, resultProvider).template then<void, qint64>([&future, this](qint64 queriedRevision) {
236 //TODO set revision in result provider?
237 //TODO update all existing results with new revision
238 mResourceAccess->sendRevisionReplayedCommand(queriedRevision); 210 mResourceAccess->sendRevisionReplayedCommand(queriedRevision);
239 future.setFinished(); 211 future.setFinished();
240 }).exec(); 212 }).exec();
@@ -249,27 +221,12 @@ public:
249 mResourceAccess->open(); 221 mResourceAccess->open();
250 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); 222 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged);
251 } 223 }
224 return KAsync::null<void>();
252 225
253 //We have to capture the runner to keep it alive 226 //We have to capture the runner to keep it alive
254 return synchronizeResource(query).template then<void>([runner](KAsync::Future<void> &future) {
255 future.setFinished();
256 },
257 [](int error, const QString &errorString) {
258 Warning() << "Error during sync " << error << errorString;
259 });
260 } 227 }
261 228
262private: 229private:
263 KAsync::Job<void> synchronizeResource(const Akonadi2::Query &query)
264 {
265 //TODO check if a sync is necessary
266 //TODO Only sync what was requested
267 //TODO timeout
268 if (query.syncOnDemand || query.processAll) {
269 return mResourceAccess->synchronizeResource(query.syncOnDemand, query.processAll);
270 }
271 return KAsync::null<void>();
272 }
273 230
274 //TODO move into result provider? 231 //TODO move into result provider?
275 void replaySet(ResultSet &resultSet, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) 232 void replaySet(ResultSet &resultSet, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider)
@@ -280,17 +237,14 @@ private:
280 Trace() << "Got creation"; 237 Trace() << "Got creation";
281 //TODO Only copy in result provider 238 //TODO Only copy in result provider
282 resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); 239 resultProvider->add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
283 // modelResult->add();
284 break; 240 break;
285 case Akonadi2::Operation_Modification: 241 case Akonadi2::Operation_Modification:
286 Trace() << "Got modification"; 242 Trace() << "Got modification";
287 resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); 243 resultProvider->modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
288 // modelResult->modify();
289 break; 244 break;
290 case Akonadi2::Operation_Removal: 245 case Akonadi2::Operation_Removal:
291 Trace() << "Got removal"; 246 Trace() << "Got removal";
292 resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); 247 resultProvider->remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
293 // modelResult->remove();
294 break; 248 break;
295 } 249 }
296 return true; 250 return true;
@@ -320,13 +274,28 @@ private:
320 }); 274 });
321 } 275 }
322 276
323 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) 277 ResultSet loadInitialResultSet(const QByteArray &parent, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
324 { 278 {
279 Trace() << "Fetching initial set for parent:" << parent;
280 //TODO
281 QSet<QByteArray> appliedFilters;
282 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
283 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
284
285 //We do a full scan if there were no indexes available to create the initial set.
286 if (appliedFilters.isEmpty()) {
287 //TODO this should be replaced by an index lookup as well
288 resultSet = fullScan(transaction, bufferTypeForDomainType());
289 }
290 return resultSet;
291 }
325 292
293 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
294 {
295 Trace() << "Loading incremental result set starting from revision: " << baseRevision;
326 const auto bufferType = bufferTypeForDomainType(); 296 const auto bufferType = bufferTypeForDomainType();
327 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); 297 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
328 //TODO apply filter from index 298 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
329 return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray {
330 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); 299 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
331 //Spit out the revision keys one by one. 300 //Spit out the revision keys one by one.
332 while (*revisionCounter <= topRevision) { 301 while (*revisionCounter <= topRevision) {
@@ -354,7 +323,7 @@ private:
354 //Read through the source values and return whatever matches the filter 323 //Read through the source values and return whatever matches the filter
355 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool { 324 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
356 while (resultSetPtr->next()) { 325 while (resultSetPtr->next()) {
357 //TODO only necessary if we actually want to filter or neew the operation type (but not a big deal if we do it always I guess) 326 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
358 readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { 327 readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
359 //Always remove removals, they probably don't match due to non-available properties 328 //Always remove removals, they probably don't match due to non-available properties
360 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { 329 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
@@ -374,6 +343,20 @@ private:
374 return ResultSet(generator); 343 return ResultSet(generator);
375 } 344 }
376 345
346
347 std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
348 {
349 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
350 for (const auto &filterProperty : remainingFilters) {
351 //TODO implement other comparison operators than equality
352 if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) {
353 return false;
354 }
355 }
356 return true;
357 };
358 }
359
377 virtual KAsync::Job<qint64> executeQuery(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider) 360 virtual KAsync::Job<qint64> executeQuery(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProviderInterface<typename DomainType::Ptr> > &resultProvider)
378 { 361 {
379 /* 362 /*
@@ -384,16 +367,6 @@ private:
384 const qint64 baseRevision = resultProvider->revision() + 1; 367 const qint64 baseRevision = resultProvider->revision() + 1;
385 Trace() << "Running query " << baseRevision; 368 Trace() << "Running query " << baseRevision;
386 QSet<QByteArray> remainingFilters; 369 QSet<QByteArray> remainingFilters;
387 auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
388 for (const auto &filterProperty : remainingFilters) {
389 //TODO implement other comparison operators than equality
390 if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) {
391 return false;
392 }
393 }
394 return true;
395 };
396 qint64 newRevision = 0;
397 370
398 Trace() << "Fetching updates"; 371 Trace() << "Fetching updates";
399 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); 372 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
@@ -404,10 +377,10 @@ private:
404 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); 377 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
405 378
406 auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); 379 auto resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
407 auto filteredSet = filterSet(resultSet, filter, transaction, false); 380 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), transaction, false);
408 replaySet(filteredSet, resultProvider); 381 replaySet(filteredSet, resultProvider);
409 resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction)); 382 resultProvider->setRevision(Akonadi2::Storage::maxRevision(transaction));
410 newRevision = Akonadi2::Storage::maxRevision(transaction); 383 qint64 newRevision = Akonadi2::Storage::maxRevision(transaction);
411 384
412 return KAsync::start<qint64>([=]() -> qint64 { 385 return KAsync::start<qint64>([=]() -> qint64 {
413 return newRevision; 386 return newRevision;
diff --git a/common/modelresult.h b/common/modelresult.h
index 756f4d6..eabb868 100644
--- a/common/modelresult.h
+++ b/common/modelresult.h
@@ -111,6 +111,10 @@ public:
111 { 111 {
112 auto childId = qHash(value->identifier()); 112 auto childId = qHash(value->identifier());
113 auto id = parentId(value); 113 auto id = parentId(value);
114 //Ignore updates we get before the initial fetch is done
115 if (!mEntityChildrenFetched[id]) {
116 return;
117 }
114 auto parent = createIndexFromId(id); 118 auto parent = createIndexFromId(id);
115 qDebug() << "Added entity " << childId; 119 qDebug() << "Added entity " << childId;
116 const auto keys = mTree[id]; 120 const auto keys = mTree[id];
@@ -131,6 +135,10 @@ public:
131 { 135 {
132 auto childId = qHash(value->identifier()); 136 auto childId = qHash(value->identifier());
133 auto id = parentId(value); 137 auto id = parentId(value);
138 //Ignore updates we get before the initial fetch is done
139 if (!mEntityChildrenFetched[id]) {
140 return;
141 }
134 auto parent = createIndexFromId(id); 142 auto parent = createIndexFromId(id);
135 qDebug() << "Modified entity" << childId; 143 qDebug() << "Modified entity" << childId;
136 auto i = mTree[id].indexOf(childId); 144 auto i = mTree[id].indexOf(childId);
diff --git a/common/resultprovider.h b/common/resultprovider.h
index 43d21a4..0d23127 100644
--- a/common/resultprovider.h
+++ b/common/resultprovider.h
@@ -23,6 +23,9 @@
23#include <functional> 23#include <functional>
24#include <memory> 24#include <memory>
25#include "threadboundary.h" 25#include "threadboundary.h"
26#include "resultset.h"
27#include "log.h"
28#include "modelresult.h"
26 29
27using namespace async; 30using namespace async;
28 31
@@ -117,18 +120,6 @@ public:
117 // mResultEmitter->clear(); 120 // mResultEmitter->clear();
118 } 121 }
119 122
120 // QSharedPointer<ResultEmitter<T> > emitter()
121 // {
122 // if (!mResultEmitter) {
123 // //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again
124 // auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ done(); delete emitter; });
125 // mResultEmitter = sharedPtr;
126 // return sharedPtr;
127 // }
128 //
129 // return mResultEmitter.toStrongRef();
130 // }
131
132 /** 123 /**
133 * For lifetimemanagement only. 124 * For lifetimemanagement only.
134 * We keep the runner alive as long as the result provider exists. 125 * We keep the runner alive as long as the result provider exists.
@@ -162,40 +153,6 @@ public:
162 mQueryRunner = runner; 153 mQueryRunner = runner;
163 } 154 }
164 155
165 // qint64 fetch(const ResultSet &resultSet)
166 // {
167 // //Fetch a bunch
168 // //
169 // // Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
170 // // storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
171 // // Warning() << "Error during query: " << error.store << error.message;
172 // // });
173 // //
174 // // auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
175 //
176 // // Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction);
177 // // auto resultSet = getResultSet(query, transaction, baseRevision);
178 // while (resultSet.next([this](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
179 // switch (operation) {
180 // case Akonadi2::Operation_Creation:
181 // Trace() << "Got creation";
182 // //TODO Only copy in result provider
183 // add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<T>(*value).template staticCast<T>());
184 // break;
185 // case Akonadi2::Operation_Modification:
186 // Trace() << "Got modification";
187 // modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<T>(*value).template staticCast<T>());
188 // break;
189 // case Akonadi2::Operation_Removal:
190 // Trace() << "Got removal";
191 // remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<T>(*value).template staticCast<T>());
192 // break;
193 // }
194 // return true;
195 // })){};
196 // // return Akonadi2::Storage::maxRevision(transaction);
197 // }
198
199private: 156private:
200 void done() 157 void done()
201 { 158 {
@@ -212,129 +169,6 @@ private:
212 std::function<void()> mOnDoneCallback; 169 std::function<void()> mOnDoneCallback;
213}; 170};
214 171
215
216
217
218
219
220template<class T>
221class SyncResultProvider : public ResultProviderInterface<T> {
222public:
223 void add(const T &value)
224 {
225 mResultEmitter->addHandler(value);
226 }
227
228 void modify(const T &value)
229 {
230 mResultEmitter->modifyHandler(value);
231 }
232
233 void remove(const T &value)
234 {
235 mResultEmitter->removeHandler(value);
236 }
237
238 void initialResultSetComplete()
239 {
240 mResultEmitter->initialResultSetComplete();
241 }
242
243 void complete()
244 {
245 mResultEmitter->complete();
246 }
247
248 void clear()
249 {
250 mResultEmitter->clear();
251 }
252
253 QSharedPointer<ResultEmitter<T> > emitter()
254 {
255 if (!mResultEmitter) {
256 //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again
257 auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ done(); delete emitter; });
258 mResultEmitter = sharedPtr;
259 return sharedPtr;
260 }
261
262 return mResultEmitter.toStrongRef();
263 }
264
265 /**
266 * For lifetimemanagement only.
267 * We keep the runner alive as long as the result provider exists.
268 */
269 void setFacade(const std::shared_ptr<void> &facade)
270 {
271 mFacade = facade;
272 }
273
274 void onDone(const std::function<void()> &callback)
275 {
276 mOnDoneCallback = callback;
277 }
278
279 bool isDone() const
280 {
281 //The existance of the emitter currently defines wether we're done or not.
282 return mResultEmitter.toStrongRef().isNull();
283 }
284
285 // qint64 fetch(const ResultSet &resultSet)
286 // {
287 // //Fetch a bunch
288 // //
289 // // Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
290 // // storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
291 // // Warning() << "Error during query: " << error.store << error.message;
292 // // });
293 // //
294 // // auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
295 //
296 // // Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction);
297 // // auto resultSet = getResultSet(query, transaction, baseRevision);
298 // while (resultSet.next([this](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
299 // switch (operation) {
300 // case Akonadi2::Operation_Creation:
301 // Trace() << "Got creation";
302 // //TODO Only copy in result provider
303 // add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<T>(*value).template staticCast<T>());
304 // break;
305 // case Akonadi2::Operation_Modification:
306 // Trace() << "Got modification";
307 // modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<T>(*value).template staticCast<T>());
308 // break;
309 // case Akonadi2::Operation_Removal:
310 // Trace() << "Got removal";
311 // remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<T>(*value).template staticCast<T>());
312 // break;
313 // }
314 // return true;
315 // })){};
316 // // return Akonadi2::Storage::maxRevision(transaction);
317 // }
318
319private:
320 void done()
321 {
322 qWarning() << "done";
323 if (mOnDoneCallback) {
324 mOnDoneCallback();
325 mOnDoneCallback = std::function<void()>();
326 }
327 }
328
329 QWeakPointer<ResultEmitter<T> > mResultEmitter;
330 std::shared_ptr<void> mFacade;
331 std::function<void()> mOnDoneCallback;
332 QSharedPointer<ThreadBoundary> mThreadBoundary;
333};
334
335
336
337
338/* 172/*
339* The promise side for the result emitter 173* The promise side for the result emitter
340*/ 174*/
@@ -434,18 +268,18 @@ public:
434 } 268 }
435 269
436 /** 270 /**
437 * For lifetimemanagement only. 271 * For lifetimemanagement only.
438 * We keep the runner alive as long as the result provider exists. 272 * We keep the runner alive as long as the result provider exists.
439 */ 273 */
440 void setQueryRunner(const QSharedPointer<QObject> &runner) 274 void setQueryRunner(const QSharedPointer<QObject> &runner)
441 { 275 {
442 mQueryRunner = runner; 276 mQueryRunner = runner;
443 } 277 }
444 278
445 /** 279 /**
446 * For lifetimemanagement only. 280 * For lifetimemanagement only.
447 * We keep the runner alive as long as the result provider exists. 281 * We keep the runner alive as long as the result provider exists.
448 */ 282 */
449 void setFacade(const std::shared_ptr<void> &facade) 283 void setFacade(const std::shared_ptr<void> &facade)
450 { 284 {
451 mFacade = facade; 285 mFacade = facade;
@@ -463,6 +297,11 @@ public:
463 return mResultEmitter.toStrongRef().isNull(); 297 return mResultEmitter.toStrongRef().isNull();
464 } 298 }
465 299
300 void setFetcher(const std::function<void(const QByteArray &parent)> &fetcher)
301 {
302 fetcher(QByteArray());
303 }
304
466private: 305private:
467 void done() 306 void done()
468 { 307 {