summaryrefslogtreecommitdiffstats
path: root/common/datastorequery.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-23 01:35:13 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-23 01:35:13 +0200
commit52ad48c8bd755a2fde249296d6017853538f478f (patch)
treed3adbee13e49d2525720069cb7d9ca6b5876dbd8 /common/datastorequery.cpp
parent6fc76bc690e5a2e7748936fa835338d820c7e7de (diff)
downloadsink-52ad48c8bd755a2fde249296d6017853538f478f.tar.gz
sink-52ad48c8bd755a2fde249296d6017853538f478f.zip
A new query system
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r--common/datastorequery.cpp520
1 files changed, 333 insertions, 187 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index cc070be..95df1a0 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -27,13 +27,173 @@ using namespace Sink;
27 27
28SINK_DEBUG_AREA("datastorequery") 28SINK_DEBUG_AREA("datastorequery")
29 29
30class Source : public FilterBase {
31 public:
32 typedef QSharedPointer<Source> Ptr;
33
34 QVector<QByteArray> mIds;
35 QVector<QByteArray>::ConstIterator mIt;
36
37 Source (const QVector<QByteArray> &ids, DataStoreQuery *store)
38 : FilterBase(store),
39 mIds(ids),
40 mIt(mIds.constBegin())
41 {
42
43 }
44
45 virtual ~Source(){}
46
47 virtual void skip() Q_DECL_OVERRIDE
48 {
49 if (mIt != mIds.constEnd()) {
50 mIt++;
51 }
52 };
53
54 void add(const QVector<QByteArray> &ids)
55 {
56 mIds = ids;
57 mIt = mIds.constBegin();
58 }
59
60 bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE
61 {
62 if (mIt == mIds.constEnd()) {
63 return false;
64 }
65 readEntity(*mIt, [callback](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) {
66 callback(entityBuffer.operation(), uid, entityBuffer);
67 });
68 mIt++;
69 return mIt != mIds.constEnd();
70 }
71};
72
73class Collector : public FilterBase {
74public:
75 typedef QSharedPointer<Collector> Ptr;
76
77 Collector(FilterBase::Ptr source, DataStoreQuery *store)
78 : FilterBase(source, store)
79 {
80
81 }
82 virtual ~Collector(){}
83
84 bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE
85 {
86 return mSource->next(callback);
87 }
88};
89
90class Filter : public FilterBase {
91public:
92 typedef QSharedPointer<Filter> Ptr;
93
94 QHash<QByteArray, Sink::Query::Comparator> propertyFilter;
95
96 Filter(FilterBase::Ptr source, DataStoreQuery *store)
97 : FilterBase(source, store)
98 {
99
100 }
101
102 virtual ~Filter(){}
103
104 bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE {
105 bool foundValue = false;
106 while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) {
107 SinkTrace() << "Filter: " << uid << operation;
108
109 //Always accept removals. They can't match the filter since the data is gone.
110 if (operation == Sink::Operation_Removal) {
111 SinkTrace() << "Removal: " << uid << operation;
112 callback(operation, uid, entityBuffer);
113 foundValue = true;
114 } else if (matchesFilter(uid, entityBuffer)) {
115 SinkTrace() << "Accepted: " << uid << operation;
116 callback(operation, uid, entityBuffer);
117 foundValue = true;
118 //TODO if something did not match the filter so far but does now, turn into an add operation.
119 } else {
120 SinkTrace() << "Rejected: " << uid << operation;
121 //TODO emit a removal if we had the uid in the result set and this is a modification.
122 //We don't know if this results in a removal from the dataset, so we emit a removal notification anyways
123 callback(Sink::Operation_Removal, uid, entityBuffer);
124 }
125 return false;
126 }))
127 {}
128 return foundValue;
129 }
130
131 bool matchesFilter(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) {
132 for (const auto &filterProperty : propertyFilter.keys()) {
133 const auto property = getProperty(entityBuffer.entity(), filterProperty);
134 const auto comparator = propertyFilter.value(filterProperty);
135 if (!comparator.matches(property)) {
136 SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
137 return false;
138 }
139 }
140 return true;
141 }
142};
143
144/* class Reduction : public FilterBase { */
145/* public: */
146/* typedef QSharedPointer<Reduction> Ptr; */
147
148/* QHash<QByteArray, QDateTime> aggregateValues; */
149
150/* Reduction(FilterBase::Ptr source, DataStoreQuery *store) */
151/* : FilterBase(source, store) */
152/* { */
153
154/* } */
155
156/* virtual ~Reduction(){} */
157
158/* bool next(const std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE { */
159/* bool foundValue = false; */
160/* while(!foundValue && mSource->next([this, callback, &foundValue](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { */
161/* const auto operation = entityBuffer.operation(); */
162/* SinkTrace() << "Filter: " << uid << operation; */
163/* //Always accept removals. They can't match the filter since the data is gone. */
164/* if (operation == Sink::Operation_Removal) { */
165/* callback(uid, entityBuffer); */
166/* foundValue = true; */
167/* } else if (matchesFilter(uid, entityBuffer)) { */
168/* callback(uid, entityBuffer); */
169/* foundValue = true; */
170/* } */
171/* return false; */
172/* })) */
173/* {} */
174/* return foundValue; */
175/* } */
176
177/* bool matchesFilter(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { */
178/* for (const auto &filterProperty : propertyFilter.keys()) { */
179/* const auto property = getProperty(entityBuffer.entity(), filterProperty); */
180/* const auto comparator = propertyFilter.value(filterProperty); */
181/* if (!comparator.matches(property)) { */
182/* SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; */
183/* return false; */
184/* } */
185/* } */
186/* return true; */
187/* } */
188/* }; */
189
30DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty) 190DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty)
31 : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) 191 : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty)
32{ 192{
33 193 setupQuery();
34} 194}
35 195
36static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) 196static inline QVector<QByteArray> fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
37{ 197{
38 // TODO use a result set with an iterator, to read values on demand 198 // TODO use a result set with an iterator, to read values on demand
39 SinkTrace() << "Looking for : " << bufferType; 199 SinkTrace() << "Looking for : " << bufferType;
@@ -52,59 +212,7 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction,
52 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); 212 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; });
53 213
54 SinkTrace() << "Full scan retrieved " << keys.size() << " results."; 214 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
55 return ResultSet(keys.toList().toVector()); 215 return keys.toList().toVector();
56}
57
58ResultSet DataStoreQuery::loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
59{
60 if (!mQuery.ids.isEmpty()) {
61 return ResultSet(mQuery.ids.toVector());
62 }
63 QSet<QByteArray> appliedFilters;
64 QByteArray appliedSorting;
65
66 auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction);
67
68 remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters;
69 if (appliedSorting.isEmpty()) {
70 remainingSorting = mQuery.sortProperty;
71 }
72
73 // We do a full scan if there were no indexes available to create the initial set.
74 if (appliedFilters.isEmpty()) {
75 // TODO this should be replaced by an index lookup as well
76 resultSet = fullScan(mTransaction, mType);
77 }
78 return resultSet;
79}
80
81ResultSet DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters)
82{
83 const auto bufferType = mType;
84 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
85 remainingFilters = mQuery.propertyFilter.keys().toSet();
86 return ResultSet([this, bufferType, revisionCounter]() -> QByteArray {
87 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction);
88 // Spit out the revision keys one by one.
89 while (*revisionCounter <= topRevision) {
90 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
91 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
92 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
93 Q_ASSERT(!uid.isEmpty());
94 Q_ASSERT(!type.isEmpty());
95 if (type != bufferType) {
96 // Skip revision
97 *revisionCounter += 1;
98 continue;
99 }
100 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
101 *revisionCounter += 1;
102 return key;
103 }
104 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
105 // We're done
106 return QByteArray();
107 });
108} 216}
109 217
110void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) 218void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback)
@@ -122,156 +230,194 @@ QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArra
122 return mGetProperty(entity, property); 230 return mGetProperty(entity, property);
123} 231}
124 232
125ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) 233/* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */
126{ 234/* { */
127 const bool sortingRequired = !sortProperty.isEmpty(); 235/* const bool sortingRequired = !sortProperty.isEmpty(); */
128 if (mInitialQuery && sortingRequired) { 236/* if (mInitialQuery && sortingRequired) { */
129 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; 237/* SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; */
130 // Sort the complete set by reading the sort property and filling into a sorted map 238/* // Sort the complete set by reading the sort property and filling into a sorted map */
131 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); 239/* auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); */
132 while (resultSet.next()) { 240/* while (resultSet.next()) { */
133 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) 241/* // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */
134 readEntity(resultSet.id(), 242/* readEntity(resultSet.id(), */
135 [this, filter, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { 243/* [this, filter, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */
136
137 const auto operation = buffer.operation();
138
139 // We're not interested in removals during the initial query
140 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
141 if (!sortProperty.isEmpty()) {
142 const auto sortValue = getProperty(buffer.entity(), sortProperty);
143 if (sortValue.type() == QVariant::DateTime) {
144 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid);
145 } else {
146 sortedMap->insert(sortValue.toString().toLatin1(), uid);
147 }
148 } else {
149 sortedMap->insert(uid, uid);
150 }
151 }
152 });
153 }
154 244
155 SinkTrace() << "Sorted " << sortedMap->size() << " values."; 245/* const auto operation = buffer.operation(); */
156 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
157 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter](
158 std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool {
159 if (iterator->hasNext()) {
160 readEntity(iterator->next().value(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
161 callback(uid, buffer, Sink::Operation_Creation);
162 });
163 return true;
164 }
165 return false;
166 };
167 246
168 auto skip = [iterator]() { 247/* // We're not interested in removals during the initial query */
169 if (iterator->hasNext()) { 248/* if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */
170 iterator->next(); 249/* if (!sortProperty.isEmpty()) { */
171 } 250/* const auto sortValue = getProperty(buffer.entity(), sortProperty); */
172 }; 251/* if (sortValue.type() == QVariant::DateTime) { */
173 return ResultSet(generator, skip); 252/* sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid); */
174 } else { 253/* } else { */
175 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); 254/* sortedMap->insert(sortValue.toString().toLatin1(), uid); */
176 ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { 255/* } */
177 if (resultSetPtr->next()) { 256/* } else { */
178 SinkTrace() << "Reading the next value: " << resultSetPtr->id(); 257/* sortedMap->insert(uid, uid); */
179 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) 258/* } */
180 readEntity(resultSetPtr->id(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { 259/* } */
181 const auto operation = buffer.operation(); 260/* }); */
182 if (mInitialQuery) { 261/* } */
183 // We're not interested in removals during the initial query
184 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
185 // In the initial set every entity is new
186 callback(uid, buffer, Sink::Operation_Creation);
187 }
188 } else {
189 // Always remove removals, they probably don't match due to non-available properties
190 if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) {
191 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
192 callback(uid, buffer, operation);
193 }
194 }
195 });
196 return true;
197 }
198 return false;
199 };
200 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
201 return ResultSet(generator, skip);
202 }
203}
204 262
263/* SinkTrace() << "Sorted " << sortedMap->size() << " values."; */
264/* auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); */
265/* ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter]( */
266/* std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool { */
267/* if (iterator->hasNext()) { */
268/* readEntity(iterator->next().value(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */
269/* callback(uid, buffer, Sink::Operation_Creation); */
270/* }); */
271/* return true; */
272/* } */
273/* return false; */
274/* }; */
205 275
206DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet<QByteArray> &remainingFilters) 276/* auto skip = [iterator]() { */
277/* if (iterator->hasNext()) { */
278/* iterator->next(); */
279/* } */
280/* }; */
281/* return ResultSet(generator, skip); */
282/* } else { */
283/* auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); */
284/* ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { */
285/* if (resultSetPtr->next()) { */
286/* SinkTrace() << "Reading the next value: " << resultSetPtr->id(); */
287/* // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */
288/* readEntity(resultSetPtr->id(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */
289/* const auto operation = buffer.operation(); */
290/* if (mInitialQuery) { */
291/* // We're not interested in removals during the initial query */
292/* if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */
293/* // In the initial set every entity is new */
294/* callback(uid, buffer, Sink::Operation_Creation); */
295/* } */
296/* } else { */
297/* // Always remove removals, they probably don't match due to non-available properties */
298/* if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { */
299/* // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) */
300/* callback(uid, buffer, operation); */
301/* } */
302/* } */
303/* }); */
304/* return true; */
305/* } */
306/* return false; */
307/* }; */
308/* auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; */
309/* return ResultSet(generator, skip); */
310/* } */
311/* } */
312
313void DataStoreQuery::setupQuery()
207{ 314{
208 auto query = mQuery; 315 FilterBase::Ptr baseSet;
209 return [this, remainingFilters, query](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool { 316 QSet<QByteArray> remainingFilters;
210 if (!query.ids.isEmpty()) { 317 QByteArray appliedSorting;
211 if (!query.ids.contains(uid)) { 318 if (!mQuery.ids.isEmpty()) {
212 SinkTrace() << "Filter by uid: " << uid; 319 mSource = Source::Ptr::create(mQuery.ids.toVector(), this);
213 return false; 320 baseSet = mSource;
214 } 321 remainingFilters = mQuery.propertyFilter.keys().toSet();
215 } 322 } else {
216 for (const auto &filterProperty : remainingFilters) { 323 QSet<QByteArray> appliedFilters;
217 const auto property = getProperty(entity.entity(), filterProperty); 324
218 const auto comparator = query.propertyFilter.value(filterProperty); 325 auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction);
219 if (!comparator.matches(property)) { 326 remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters;
220 SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; 327
221 return false; 328 // We do a full scan if there were no indexes available to create the initial set.
222 } 329 if (appliedFilters.isEmpty()) {
330 // TODO this should be replaced by an index lookup on the uid index
331 mSource = Source::Ptr::create(fullScan(mTransaction, mType), this);
332 } else {
333 mSource = Source::Ptr::create(resultSet, this);
223 } 334 }
224 return true; 335 baseSet = mSource;
225 }; 336 }
226} 337 if (!mQuery.propertyFilter.isEmpty()) {
338 auto filter = Filter::Ptr::create(baseSet, this);
339 filter->propertyFilter = mQuery.propertyFilter;
340 /* for (const auto &f : remainingFilters) { */
341 /* filter->propertyFilter.insert(f, mQuery.propertyFilter.value(f)); */
342 /* } */
343 baseSet = filter;
344 }
345 /* if (appliedSorting.isEmpty() && !mQuery.sortProperty.isEmpty()) { */
346 /* //Apply manual sorting */
347 /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */
348 /* } */
227 349
228ResultSet DataStoreQuery::createFilteredSet(ResultSet &resultSet, const std::function<bool(const QByteArray &, const Sink::EntityBuffer &buffer)> &filter) 350 /* if (mQuery.threadLeaderOnly) { */
229{ 351 /* auto reduce = Reduce::Ptr::create(baseSet, this); */
230 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); 352
231 ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { 353 /* baseSet = reduce; */
232 return resultSetPtr->next([=](const QByteArray &uid, const Sink::EntityBuffer &buffer, Sink::Operation operation) { 354 /* } */
233 if (mInitialQuery) { 355
234 // We're not interested in removals during the initial query 356 mCollector = Collector::Ptr::create(baseSet, this);
235 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
236 // In the initial set every entity is new
237 callback(uid, buffer, Sink::Operation_Creation);
238 }
239 } else {
240 // Always remove removals, they probably don't match due to non-available properties
241 if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) {
242 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
243 callback(uid, buffer, operation);
244 }
245 }
246 });
247 };
248 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
249 return ResultSet(generator, skip);
250} 357}
251 358
252ResultSet DataStoreQuery::postSortFilter(ResultSet &resultSet) 359QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision)
253{ 360{
254 return resultSet; 361 const auto bufferType = mType;
362 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
363 QVector<QByteArray> changedKeys;
364 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction);
365 // Spit out the revision keys one by one.
366 while (*revisionCounter <= topRevision) {
367 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
368 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
369 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
370 Q_ASSERT(!uid.isEmpty());
371 Q_ASSERT(!type.isEmpty());
372 if (type != bufferType) {
373 // Skip revision
374 *revisionCounter += 1;
375 continue;
376 }
377 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
378 *revisionCounter += 1;
379 changedKeys << key;
380 }
381 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
382 return changedKeys;
255} 383}
256 384
385
257ResultSet DataStoreQuery::update(qint64 baseRevision) 386ResultSet DataStoreQuery::update(qint64 baseRevision)
258{ 387{
259 SinkTrace() << "Executing query update"; 388 SinkTrace() << "Executing query update";
260 mInitialQuery = false; 389 auto incrementalResultSet = loadIncrementalResultSet(baseRevision);
261 QSet<QByteArray> remainingFilters; 390 SinkTrace() << "Changed: " << incrementalResultSet;
262 QByteArray remainingSorting; 391 mSource->add(incrementalResultSet);
263 auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters); 392 ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool {
264 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), remainingSorting); 393 if (mCollector->next([callback](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &buffer) {
265 return postSortFilter(filteredSet); 394 SinkTrace() << "Got incremental result: " << uid << operation;
395 callback(uid, buffer, operation);
396 }))
397 {
398 return true;
399 }
400 return false;
401 };
402 return ResultSet(generator, [this]() { mCollector->skip(); });
266} 403}
267 404
405
268ResultSet DataStoreQuery::execute() 406ResultSet DataStoreQuery::execute()
269{ 407{
270 SinkTrace() << "Executing query"; 408 SinkTrace() << "Executing query";
271 mInitialQuery = true; 409
272 QSet<QByteArray> remainingFilters; 410 ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool {
273 QByteArray remainingSorting; 411 if (mCollector->next([callback](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &buffer) {
274 auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting); 412 if (operation != Sink::Operation_Removal) {
275 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), remainingSorting); 413 SinkTrace() << "Got initial result: " << uid << operation;
276 return postSortFilter(filteredSet); 414 callback(uid, buffer, Sink::Operation_Creation);
415 }
416 }))
417 {
418 return true;
419 }
420 return false;
421 };
422 return ResultSet(generator, [this]() { mCollector->skip(); });
277} 423}