summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-02-15 09:35:43 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-02-15 09:35:43 +0100
commitfb0df0d8008ef05cfa94936e19c22dec0faaa8e4 (patch)
tree0918986d77c4951f394e12d3e114c126eec18094
parente5dd64ba354aef193c5516d9a58e1d724220c55b (diff)
downloadsink-fb0df0d8008ef05cfa94936e19c22dec0faaa8e4.tar.gz
sink-fb0df0d8008ef05cfa94936e19c22dec0faaa8e4.zip
Prepared querying of batches and added a switch to enable/disable
sorting
-rw-r--r--common/query.h1
-rw-r--r--common/queryrunner.cpp67
-rw-r--r--common/queryrunner.h2
-rw-r--r--common/resultset.h25
4 files changed, 69 insertions, 26 deletions
diff --git a/common/query.h b/common/query.h
index a0a50ba..bb826aa 100644
--- a/common/query.h
+++ b/common/query.h
@@ -131,6 +131,7 @@ public:
131 QHash<QByteArray, QVariant> propertyFilter; 131 QHash<QByteArray, QVariant> propertyFilter;
132 QByteArrayList requestedProperties; 132 QByteArrayList requestedProperties;
133 QByteArray parentProperty; 133 QByteArray parentProperty;
134 QByteArray sortProperty;
134 bool liveQuery; 135 bool liveQuery;
135 bool syncOnDemand; 136 bool syncOnDemand;
136 bool processAll; 137 bool processAll;
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index d4ace86..1f645e8 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -45,10 +45,10 @@ public:
45 virtual ~QueryWorker(); 45 virtual ~QueryWorker();
46 46
47 qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 47 qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
48 qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 48 qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
49 49
50private: 50private:
51 void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties); 51 void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize);
52 52
53 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); 53 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback);
54 54
@@ -57,7 +57,7 @@ private:
57 57
58 ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); 58 ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty);
59 std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query); 59 std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query);
60 qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); 60 qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize);
61 61
62private: 62private:
63 QueryRunnerBase::ResultTransformation mResultTransformation; 63 QueryRunnerBase::ResultTransformation mResultTransformation;
@@ -72,7 +72,9 @@ template<class DomainType>
72QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) 72QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType)
73 : QueryRunnerBase(), 73 : QueryRunnerBase(),
74 mResourceAccess(resourceAccess), 74 mResourceAccess(resourceAccess),
75 mResultProvider(new ResultProvider<typename DomainType::Ptr>) 75 mResultProvider(new ResultProvider<typename DomainType::Ptr>),
76 mOffset(0),
77 mBatchSize(0)
76{ 78{
77 Trace() << "Starting query"; 79 Trace() << "Starting query";
78 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. 80 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
@@ -81,7 +83,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
81 auto resultProvider = mResultProvider; 83 auto resultProvider = mResultProvider;
82 async::run<qint64>([=]() -> qint64 { 84 async::run<qint64>([=]() -> qint64 {
83 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 85 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
84 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); 86 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize);
85 return newRevision; 87 return newRevision;
86 }) 88 })
87 .template then<void, qint64>([query, this](qint64 newRevision) { 89 .template then<void, qint64>([query, this](qint64 newRevision) {
@@ -150,7 +152,6 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction,
150 Warning() << "Error during query: " << error.message; 152 Warning() << "Error during query: " << error.message;
151 }); 153 });
152 154
153 Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results";
154 return ResultSet(keys); 155 return ResultSet(keys);
155} 156}
156 157
@@ -174,15 +175,19 @@ QueryWorker<DomainType>::~QueryWorker()
174} 175}
175 176
176template<class DomainType> 177template<class DomainType>
177void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties) 178void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize)
178{ 179{
179 int counter = 0; 180 int counter = 0;
180 while (resultSet.next([this, &resultProvider, &counter, &properties](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { 181 resultSet.skip(offset);
182 while (resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool {
181 //FIXME allow maildir resource to set the mimeMessage property 183 //FIXME allow maildir resource to set the mimeMessage property
182 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>(); 184 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>();
183 if (mResultTransformation) { 185 if (mResultTransformation) {
184 mResultTransformation(*valueCopy); 186 mResultTransformation(*valueCopy);
185 } 187 }
188 if (batchSize && counter >= batchSize) {
189 return false;
190 }
186 counter++; 191 counter++;
187 switch (operation) { 192 switch (operation) {
188 case Sink::Operation_Creation: 193 case Sink::Operation_Creation:
@@ -217,7 +222,8 @@ void QueryWorker<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db,
217 Q_ASSERT(metadataBuffer); 222 Q_ASSERT(metadataBuffer);
218 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; 223 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
219 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; 224 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
220 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), operation); 225 auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity);
226 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation);
221 return false; 227 return false;
222 }, 228 },
223 [](const Sink::Storage::Error &error) { 229 [](const Sink::Storage::Error &error) {
@@ -274,9 +280,10 @@ ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision,
274template<class DomainType> 280template<class DomainType>
275ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) 281ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty)
276{ 282{
277 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); 283 bool sortingRequired = false;
278 284 if (initialQuery && sortingRequired) {
279 if (initialQuery) { 285 //Sort the complete set by reading the sort property and filling into a sorted map
286 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
280 while (resultSet.next()) { 287 while (resultSet.next()) {
281 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) 288 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
282 readEntity(db, resultSet.id(), [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { 289 readEntity(db, resultSet.id(), [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
@@ -301,23 +308,39 @@ ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const
301 } 308 }
302 return false; 309 return false;
303 }; 310 };
304 return ResultSet(generator); 311
312 auto skip = [iterator]() {
313 iterator->next();
314 };
315 return ResultSet(generator, skip);
305 } else { 316 } else {
306 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); 317 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
307 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { 318 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
308 if (resultSetPtr->next()) { 319 if (resultSetPtr->next()) {
309 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) 320 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
310 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { 321 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
311 //Always remove removals, they probably don't match due to non-available properties 322 if (initialQuery) {
312 if ((operation == Sink::Operation_Removal) || filter(domainObject)) { 323 //We're not interested in removals during the initial query
313 callback(domainObject, operation); 324 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
325 //In the initial set every entity is new
326 callback(domainObject, Sink::Operation_Creation);
327 }
328 } else {
329 //Always remove removals, they probably don't match due to non-available properties
330 if ((operation == Sink::Operation_Removal) || filter(domainObject)) {
331 //TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
332 callback(domainObject, operation);
333 }
314 } 334 }
315 }); 335 });
316 return true; 336 return true;
317 } 337 }
318 return false; 338 return false;
319 }; 339 };
320 return ResultSet(generator); 340 auto skip = [resultSetPtr]() {
341 resultSetPtr->skip(1);
342 };
343 return ResultSet(generator, skip);
321 } 344 }
322} 345}
323 346
@@ -347,7 +370,7 @@ std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &do
347} 370}
348 371
349template<class DomainType> 372template<class DomainType>
350qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) 373qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize)
351{ 374{
352 QTime time; 375 QTime time;
353 time.start(); 376 time.start();
@@ -364,7 +387,7 @@ qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::functi
364 Trace() << "Base set retrieved. " << time.elapsed(); 387 Trace() << "Base set retrieved. " << time.elapsed();
365 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, query.sortProperty); 388 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, query.sortProperty);
366 Trace() << "Filtered set retrieved. " << time.elapsed(); 389 Trace() << "Filtered set retrieved. " << time.elapsed();
367 replaySet(filteredSet, resultProvider, query.requestedProperties); 390 replaySet(filteredSet, resultProvider, query.requestedProperties, offset, batchSize);
368 Trace() << "Filtered set replayed. " << time.elapsed(); 391 Trace() << "Filtered set replayed. " << time.elapsed();
369 resultProvider.setRevision(Sink::Storage::maxRevision(transaction)); 392 resultProvider.setRevision(Sink::Storage::maxRevision(transaction));
370 return Sink::Storage::maxRevision(transaction); 393 return Sink::Storage::maxRevision(transaction);
@@ -380,13 +403,13 @@ qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query
380 Trace() << "Running incremental query " << baseRevision; 403 Trace() << "Running incremental query " << baseRevision;
381 auto revision = load(query, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { 404 auto revision = load(query, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
382 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); 405 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
383 }, resultProvider, false); 406 }, resultProvider, false, 0, 0);
384 Trace() << "Incremental query took: " << time.elapsed() << " ms"; 407 Trace() << "Incremental query took: " << time.elapsed() << " ms";
385 return revision; 408 return revision;
386} 409}
387 410
388template<class DomainType> 411template<class DomainType>
389qint64 QueryWorker<DomainType>::executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 412qint64 QueryWorker<DomainType>::executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize)
390{ 413{
391 QTime time; 414 QTime time;
392 time.start(); 415 time.start();
@@ -403,7 +426,7 @@ qint64 QueryWorker<DomainType>::executeInitialQuery(const Sink::Query &query, co
403 } 426 }
404 auto revision = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { 427 auto revision = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
405 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); 428 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters);
406 }, resultProvider, true); 429 }, resultProvider, true, offset, batchsize);
407 Trace() << "Initial query took: " << time.elapsed() << " ms"; 430 Trace() << "Initial query took: " << time.elapsed() << " ms";
408 resultProvider.initialResultSetComplete(parent); 431 resultProvider.initialResultSetComplete(parent);
409 return revision; 432 return revision;
diff --git a/common/queryrunner.h b/common/queryrunner.h
index 04e4587..436e2e0 100644
--- a/common/queryrunner.h
+++ b/common/queryrunner.h
@@ -96,5 +96,7 @@ private:
96 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; 96 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess;
97 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr> > mResultProvider; 97 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr> > mResultProvider;
98 ResultTransformation mResultTransformation; 98 ResultTransformation mResultTransformation;
99 int mOffset;
100 int mBatchSize;
99}; 101};
100 102
diff --git a/common/resultset.h b/common/resultset.h
index 2ca8800..8a0720d 100644
--- a/common/resultset.h
+++ b/common/resultset.h
@@ -32,6 +32,7 @@ class ResultSet {
32 public: 32 public:
33 typedef std::function<bool(std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)>)> ValueGenerator; 33 typedef std::function<bool(std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)>)> ValueGenerator;
34 typedef std::function<QByteArray()> IdGenerator; 34 typedef std::function<QByteArray()> IdGenerator;
35 typedef std::function<void()> SkipValue;
35 36
36 ResultSet() 37 ResultSet()
37 : mIt(nullptr) 38 : mIt(nullptr)
@@ -39,23 +40,30 @@ class ResultSet {
39 40
40 } 41 }
41 42
42 ResultSet(const ValueGenerator &generator) 43 ResultSet(const ValueGenerator &generator, const SkipValue &skip)
43 : mIt(nullptr), 44 : mIt(nullptr),
44 mValueGenerator(generator) 45 mValueGenerator(generator),
46 mSkip(skip)
45 { 47 {
46 48
47 } 49 }
48 50
49 ResultSet(const IdGenerator &generator) 51 ResultSet(const IdGenerator &generator)
50 : mIt(nullptr), 52 : mIt(nullptr),
51 mGenerator(generator) 53 mGenerator(generator),
54 mSkip([this]() {
55 mGenerator();
56 })
52 { 57 {
53 58
54 } 59 }
55 60
56 ResultSet(const QVector<QByteArray> &resultSet) 61 ResultSet(const QVector<QByteArray> &resultSet)
57 : mResultSet(resultSet), 62 : mResultSet(resultSet),
58 mIt(nullptr) 63 mIt(nullptr),
64 mSkip([this]() {
65 mGenerator();
66 })
59 { 67 {
60 68
61 } 69 }
@@ -99,6 +107,14 @@ class ResultSet {
99 return false; 107 return false;
100 } 108 }
101 109
110 void skip(int number)
111 {
112 Q_ASSERT(mSkip);
113 for (int i = 0; i < number; i++) {
114 mSkip();
115 }
116 }
117
102 QByteArray id() 118 QByteArray id()
103 { 119 {
104 if (mIt) { 120 if (mIt) {
@@ -119,5 +135,6 @@ class ResultSet {
119 QByteArray mCurrentValue; 135 QByteArray mCurrentValue;
120 IdGenerator mGenerator; 136 IdGenerator mGenerator;
121 ValueGenerator mValueGenerator; 137 ValueGenerator mValueGenerator;
138 SkipValue mSkip;
122}; 139};
123 140