diff options
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r-- | common/datastorequery.cpp | 520 |
1 files changed, 333 insertions, 187 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index cc070be..95df1a0 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -27,13 +27,173 @@ using namespace Sink; | |||
27 | 27 | ||
28 | SINK_DEBUG_AREA("datastorequery") | 28 | SINK_DEBUG_AREA("datastorequery") |
29 | 29 | ||
30 | class Source : public FilterBase { | ||
31 | public: | ||
32 | typedef QSharedPointer<Source> Ptr; | ||
33 | |||
34 | QVector<QByteArray> mIds; | ||
35 | QVector<QByteArray>::ConstIterator mIt; | ||
36 | |||
37 | Source (const QVector<QByteArray> &ids, DataStoreQuery *store) | ||
38 | : FilterBase(store), | ||
39 | mIds(ids), | ||
40 | mIt(mIds.constBegin()) | ||
41 | { | ||
42 | |||
43 | } | ||
44 | |||
45 | virtual ~Source(){} | ||
46 | |||
47 | virtual void skip() Q_DECL_OVERRIDE | ||
48 | { | ||
49 | if (mIt != mIds.constEnd()) { | ||
50 | mIt++; | ||
51 | } | ||
52 | }; | ||
53 | |||
54 | void add(const QVector<QByteArray> &ids) | ||
55 | { | ||
56 | mIds = ids; | ||
57 | mIt = mIds.constBegin(); | ||
58 | } | ||
59 | |||
60 | bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE | ||
61 | { | ||
62 | if (mIt == mIds.constEnd()) { | ||
63 | return false; | ||
64 | } | ||
65 | readEntity(*mIt, [callback](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { | ||
66 | callback(entityBuffer.operation(), uid, entityBuffer); | ||
67 | }); | ||
68 | mIt++; | ||
69 | return mIt != mIds.constEnd(); | ||
70 | } | ||
71 | }; | ||
72 | |||
73 | class Collector : public FilterBase { | ||
74 | public: | ||
75 | typedef QSharedPointer<Collector> Ptr; | ||
76 | |||
77 | Collector(FilterBase::Ptr source, DataStoreQuery *store) | ||
78 | : FilterBase(source, store) | ||
79 | { | ||
80 | |||
81 | } | ||
82 | virtual ~Collector(){} | ||
83 | |||
84 | bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE | ||
85 | { | ||
86 | return mSource->next(callback); | ||
87 | } | ||
88 | }; | ||
89 | |||
90 | class Filter : public FilterBase { | ||
91 | public: | ||
92 | typedef QSharedPointer<Filter> Ptr; | ||
93 | |||
94 | QHash<QByteArray, Sink::Query::Comparator> propertyFilter; | ||
95 | |||
96 | Filter(FilterBase::Ptr source, DataStoreQuery *store) | ||
97 | : FilterBase(source, store) | ||
98 | { | ||
99 | |||
100 | } | ||
101 | |||
102 | virtual ~Filter(){} | ||
103 | |||
104 | bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE { | ||
105 | bool foundValue = false; | ||
106 | while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { | ||
107 | SinkTrace() << "Filter: " << uid << operation; | ||
108 | |||
109 | //Always accept removals. They can't match the filter since the data is gone. | ||
110 | if (operation == Sink::Operation_Removal) { | ||
111 | SinkTrace() << "Removal: " << uid << operation; | ||
112 | callback(operation, uid, entityBuffer); | ||
113 | foundValue = true; | ||
114 | } else if (matchesFilter(uid, entityBuffer)) { | ||
115 | SinkTrace() << "Accepted: " << uid << operation; | ||
116 | callback(operation, uid, entityBuffer); | ||
117 | foundValue = true; | ||
118 | //TODO if something did not match the filter so far but does now, turn into an add operation. | ||
119 | } else { | ||
120 | SinkTrace() << "Rejected: " << uid << operation; | ||
121 | //TODO emit a removal if we had the uid in the result set and this is a modification. | ||
122 | //We don't know if this results in a removal from the dataset, so we emit a removal notification anyways | ||
123 | callback(Sink::Operation_Removal, uid, entityBuffer); | ||
124 | } | ||
125 | return false; | ||
126 | })) | ||
127 | {} | ||
128 | return foundValue; | ||
129 | } | ||
130 | |||
131 | bool matchesFilter(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { | ||
132 | for (const auto &filterProperty : propertyFilter.keys()) { | ||
133 | const auto property = getProperty(entityBuffer.entity(), filterProperty); | ||
134 | const auto comparator = propertyFilter.value(filterProperty); | ||
135 | if (!comparator.matches(property)) { | ||
136 | SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; | ||
137 | return false; | ||
138 | } | ||
139 | } | ||
140 | return true; | ||
141 | } | ||
142 | }; | ||
143 | |||
144 | /* class Reduction : public FilterBase { */ | ||
145 | /* public: */ | ||
146 | /* typedef QSharedPointer<Reduction> Ptr; */ | ||
147 | |||
148 | /* QHash<QByteArray, QDateTime> aggregateValues; */ | ||
149 | |||
150 | /* Reduction(FilterBase::Ptr source, DataStoreQuery *store) */ | ||
151 | /* : FilterBase(source, store) */ | ||
152 | /* { */ | ||
153 | |||
154 | /* } */ | ||
155 | |||
156 | /* virtual ~Reduction(){} */ | ||
157 | |||
158 | /* bool next(const std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE { */ | ||
159 | /* bool foundValue = false; */ | ||
160 | /* while(!foundValue && mSource->next([this, callback, &foundValue](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { */ | ||
161 | /* const auto operation = entityBuffer.operation(); */ | ||
162 | /* SinkTrace() << "Filter: " << uid << operation; */ | ||
163 | /* //Always accept removals. They can't match the filter since the data is gone. */ | ||
164 | /* if (operation == Sink::Operation_Removal) { */ | ||
165 | /* callback(uid, entityBuffer); */ | ||
166 | /* foundValue = true; */ | ||
167 | /* } else if (matchesFilter(uid, entityBuffer)) { */ | ||
168 | /* callback(uid, entityBuffer); */ | ||
169 | /* foundValue = true; */ | ||
170 | /* } */ | ||
171 | /* return false; */ | ||
172 | /* })) */ | ||
173 | /* {} */ | ||
174 | /* return foundValue; */ | ||
175 | /* } */ | ||
176 | |||
177 | /* bool matchesFilter(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { */ | ||
178 | /* for (const auto &filterProperty : propertyFilter.keys()) { */ | ||
179 | /* const auto property = getProperty(entityBuffer.entity(), filterProperty); */ | ||
180 | /* const auto comparator = propertyFilter.value(filterProperty); */ | ||
181 | /* if (!comparator.matches(property)) { */ | ||
182 | /* SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; */ | ||
183 | /* return false; */ | ||
184 | /* } */ | ||
185 | /* } */ | ||
186 | /* return true; */ | ||
187 | /* } */ | ||
188 | /* }; */ | ||
189 | |||
30 | DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty) | 190 | DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty) |
31 | : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) | 191 | : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) |
32 | { | 192 | { |
33 | 193 | setupQuery(); | |
34 | } | 194 | } |
35 | 195 | ||
36 | static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) | 196 | static inline QVector<QByteArray> fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) |
37 | { | 197 | { |
38 | // TODO use a result set with an iterator, to read values on demand | 198 | // TODO use a result set with an iterator, to read values on demand |
39 | SinkTrace() << "Looking for : " << bufferType; | 199 | SinkTrace() << "Looking for : " << bufferType; |
@@ -52,59 +212,7 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, | |||
52 | [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); | 212 | [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); |
53 | 213 | ||
54 | SinkTrace() << "Full scan retrieved " << keys.size() << " results."; | 214 | SinkTrace() << "Full scan retrieved " << keys.size() << " results."; |
55 | return ResultSet(keys.toList().toVector()); | 215 | return keys.toList().toVector(); |
56 | } | ||
57 | |||
58 | ResultSet DataStoreQuery::loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) | ||
59 | { | ||
60 | if (!mQuery.ids.isEmpty()) { | ||
61 | return ResultSet(mQuery.ids.toVector()); | ||
62 | } | ||
63 | QSet<QByteArray> appliedFilters; | ||
64 | QByteArray appliedSorting; | ||
65 | |||
66 | auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction); | ||
67 | |||
68 | remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters; | ||
69 | if (appliedSorting.isEmpty()) { | ||
70 | remainingSorting = mQuery.sortProperty; | ||
71 | } | ||
72 | |||
73 | // We do a full scan if there were no indexes available to create the initial set. | ||
74 | if (appliedFilters.isEmpty()) { | ||
75 | // TODO this should be replaced by an index lookup as well | ||
76 | resultSet = fullScan(mTransaction, mType); | ||
77 | } | ||
78 | return resultSet; | ||
79 | } | ||
80 | |||
81 | ResultSet DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters) | ||
82 | { | ||
83 | const auto bufferType = mType; | ||
84 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
85 | remainingFilters = mQuery.propertyFilter.keys().toSet(); | ||
86 | return ResultSet([this, bufferType, revisionCounter]() -> QByteArray { | ||
87 | const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); | ||
88 | // Spit out the revision keys one by one. | ||
89 | while (*revisionCounter <= topRevision) { | ||
90 | const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); | ||
91 | const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); | ||
92 | // SinkTrace() << "Revision" << *revisionCounter << type << uid; | ||
93 | Q_ASSERT(!uid.isEmpty()); | ||
94 | Q_ASSERT(!type.isEmpty()); | ||
95 | if (type != bufferType) { | ||
96 | // Skip revision | ||
97 | *revisionCounter += 1; | ||
98 | continue; | ||
99 | } | ||
100 | const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); | ||
101 | *revisionCounter += 1; | ||
102 | return key; | ||
103 | } | ||
104 | SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; | ||
105 | // We're done | ||
106 | return QByteArray(); | ||
107 | }); | ||
108 | } | 216 | } |
109 | 217 | ||
110 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) | 218 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) |
@@ -122,156 +230,194 @@ QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArra | |||
122 | return mGetProperty(entity, property); | 230 | return mGetProperty(entity, property); |
123 | } | 231 | } |
124 | 232 | ||
125 | ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) | 233 | /* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ |
126 | { | 234 | /* { */ |
127 | const bool sortingRequired = !sortProperty.isEmpty(); | 235 | /* const bool sortingRequired = !sortProperty.isEmpty(); */ |
128 | if (mInitialQuery && sortingRequired) { | 236 | /* if (mInitialQuery && sortingRequired) { */ |
129 | SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; | 237 | /* SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; */ |
130 | // Sort the complete set by reading the sort property and filling into a sorted map | 238 | /* // Sort the complete set by reading the sort property and filling into a sorted map */ |
131 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); | 239 | /* auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); */ |
132 | while (resultSet.next()) { | 240 | /* while (resultSet.next()) { */ |
133 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | 241 | /* // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */ |
134 | readEntity(resultSet.id(), | 242 | /* readEntity(resultSet.id(), */ |
135 | [this, filter, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | 243 | /* [this, filter, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ |
136 | |||
137 | const auto operation = buffer.operation(); | ||
138 | |||
139 | // We're not interested in removals during the initial query | ||
140 | if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { | ||
141 | if (!sortProperty.isEmpty()) { | ||
142 | const auto sortValue = getProperty(buffer.entity(), sortProperty); | ||
143 | if (sortValue.type() == QVariant::DateTime) { | ||
144 | sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid); | ||
145 | } else { | ||
146 | sortedMap->insert(sortValue.toString().toLatin1(), uid); | ||
147 | } | ||
148 | } else { | ||
149 | sortedMap->insert(uid, uid); | ||
150 | } | ||
151 | } | ||
152 | }); | ||
153 | } | ||
154 | 244 | ||
155 | SinkTrace() << "Sorted " << sortedMap->size() << " values."; | 245 | /* const auto operation = buffer.operation(); */ |
156 | auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); | ||
157 | ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter]( | ||
158 | std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool { | ||
159 | if (iterator->hasNext()) { | ||
160 | readEntity(iterator->next().value(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | ||
161 | callback(uid, buffer, Sink::Operation_Creation); | ||
162 | }); | ||
163 | return true; | ||
164 | } | ||
165 | return false; | ||
166 | }; | ||
167 | 246 | ||
168 | auto skip = [iterator]() { | 247 | /* // We're not interested in removals during the initial query */ |
169 | if (iterator->hasNext()) { | 248 | /* if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */ |
170 | iterator->next(); | 249 | /* if (!sortProperty.isEmpty()) { */ |
171 | } | 250 | /* const auto sortValue = getProperty(buffer.entity(), sortProperty); */ |
172 | }; | 251 | /* if (sortValue.type() == QVariant::DateTime) { */ |
173 | return ResultSet(generator, skip); | 252 | /* sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid); */ |
174 | } else { | 253 | /* } else { */ |
175 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | 254 | /* sortedMap->insert(sortValue.toString().toLatin1(), uid); */ |
176 | ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { | 255 | /* } */ |
177 | if (resultSetPtr->next()) { | 256 | /* } else { */ |
178 | SinkTrace() << "Reading the next value: " << resultSetPtr->id(); | 257 | /* sortedMap->insert(uid, uid); */ |
179 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | 258 | /* } */ |
180 | readEntity(resultSetPtr->id(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | 259 | /* } */ |
181 | const auto operation = buffer.operation(); | 260 | /* }); */ |
182 | if (mInitialQuery) { | 261 | /* } */ |
183 | // We're not interested in removals during the initial query | ||
184 | if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { | ||
185 | // In the initial set every entity is new | ||
186 | callback(uid, buffer, Sink::Operation_Creation); | ||
187 | } | ||
188 | } else { | ||
189 | // Always remove removals, they probably don't match due to non-available properties | ||
190 | if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { | ||
191 | // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) | ||
192 | callback(uid, buffer, operation); | ||
193 | } | ||
194 | } | ||
195 | }); | ||
196 | return true; | ||
197 | } | ||
198 | return false; | ||
199 | }; | ||
200 | auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; | ||
201 | return ResultSet(generator, skip); | ||
202 | } | ||
203 | } | ||
204 | 262 | ||
263 | /* SinkTrace() << "Sorted " << sortedMap->size() << " values."; */ | ||
264 | /* auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); */ | ||
265 | /* ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter]( */ | ||
266 | /* std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool { */ | ||
267 | /* if (iterator->hasNext()) { */ | ||
268 | /* readEntity(iterator->next().value(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ | ||
269 | /* callback(uid, buffer, Sink::Operation_Creation); */ | ||
270 | /* }); */ | ||
271 | /* return true; */ | ||
272 | /* } */ | ||
273 | /* return false; */ | ||
274 | /* }; */ | ||
205 | 275 | ||
206 | DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet<QByteArray> &remainingFilters) | 276 | /* auto skip = [iterator]() { */ |
277 | /* if (iterator->hasNext()) { */ | ||
278 | /* iterator->next(); */ | ||
279 | /* } */ | ||
280 | /* }; */ | ||
281 | /* return ResultSet(generator, skip); */ | ||
282 | /* } else { */ | ||
283 | /* auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); */ | ||
284 | /* ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { */ | ||
285 | /* if (resultSetPtr->next()) { */ | ||
286 | /* SinkTrace() << "Reading the next value: " << resultSetPtr->id(); */ | ||
287 | /* // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */ | ||
288 | /* readEntity(resultSetPtr->id(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ | ||
289 | /* const auto operation = buffer.operation(); */ | ||
290 | /* if (mInitialQuery) { */ | ||
291 | /* // We're not interested in removals during the initial query */ | ||
292 | /* if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */ | ||
293 | /* // In the initial set every entity is new */ | ||
294 | /* callback(uid, buffer, Sink::Operation_Creation); */ | ||
295 | /* } */ | ||
296 | /* } else { */ | ||
297 | /* // Always remove removals, they probably don't match due to non-available properties */ | ||
298 | /* if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { */ | ||
299 | /* // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) */ | ||
300 | /* callback(uid, buffer, operation); */ | ||
301 | /* } */ | ||
302 | /* } */ | ||
303 | /* }); */ | ||
304 | /* return true; */ | ||
305 | /* } */ | ||
306 | /* return false; */ | ||
307 | /* }; */ | ||
308 | /* auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; */ | ||
309 | /* return ResultSet(generator, skip); */ | ||
310 | /* } */ | ||
311 | /* } */ | ||
312 | |||
313 | void DataStoreQuery::setupQuery() | ||
207 | { | 314 | { |
208 | auto query = mQuery; | 315 | FilterBase::Ptr baseSet; |
209 | return [this, remainingFilters, query](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool { | 316 | QSet<QByteArray> remainingFilters; |
210 | if (!query.ids.isEmpty()) { | 317 | QByteArray appliedSorting; |
211 | if (!query.ids.contains(uid)) { | 318 | if (!mQuery.ids.isEmpty()) { |
212 | SinkTrace() << "Filter by uid: " << uid; | 319 | mSource = Source::Ptr::create(mQuery.ids.toVector(), this); |
213 | return false; | 320 | baseSet = mSource; |
214 | } | 321 | remainingFilters = mQuery.propertyFilter.keys().toSet(); |
215 | } | 322 | } else { |
216 | for (const auto &filterProperty : remainingFilters) { | 323 | QSet<QByteArray> appliedFilters; |
217 | const auto property = getProperty(entity.entity(), filterProperty); | 324 | |
218 | const auto comparator = query.propertyFilter.value(filterProperty); | 325 | auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction); |
219 | if (!comparator.matches(property)) { | 326 | remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters; |
220 | SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; | 327 | |
221 | return false; | 328 | // We do a full scan if there were no indexes available to create the initial set. |
222 | } | 329 | if (appliedFilters.isEmpty()) { |
330 | // TODO this should be replaced by an index lookup on the uid index | ||
331 | mSource = Source::Ptr::create(fullScan(mTransaction, mType), this); | ||
332 | } else { | ||
333 | mSource = Source::Ptr::create(resultSet, this); | ||
223 | } | 334 | } |
224 | return true; | 335 | baseSet = mSource; |
225 | }; | 336 | } |
226 | } | 337 | if (!mQuery.propertyFilter.isEmpty()) { |
338 | auto filter = Filter::Ptr::create(baseSet, this); | ||
339 | filter->propertyFilter = mQuery.propertyFilter; | ||
340 | /* for (const auto &f : remainingFilters) { */ | ||
341 | /* filter->propertyFilter.insert(f, mQuery.propertyFilter.value(f)); */ | ||
342 | /* } */ | ||
343 | baseSet = filter; | ||
344 | } | ||
345 | /* if (appliedSorting.isEmpty() && !mQuery.sortProperty.isEmpty()) { */ | ||
346 | /* //Apply manual sorting */ | ||
347 | /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */ | ||
348 | /* } */ | ||
227 | 349 | ||
228 | ResultSet DataStoreQuery::createFilteredSet(ResultSet &resultSet, const std::function<bool(const QByteArray &, const Sink::EntityBuffer &buffer)> &filter) | 350 | /* if (mQuery.threadLeaderOnly) { */ |
229 | { | 351 | /* auto reduce = Reduce::Ptr::create(baseSet, this); */ |
230 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | 352 | |
231 | ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { | 353 | /* baseSet = reduce; */ |
232 | return resultSetPtr->next([=](const QByteArray &uid, const Sink::EntityBuffer &buffer, Sink::Operation operation) { | 354 | /* } */ |
233 | if (mInitialQuery) { | 355 | |
234 | // We're not interested in removals during the initial query | 356 | mCollector = Collector::Ptr::create(baseSet, this); |
235 | if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { | ||
236 | // In the initial set every entity is new | ||
237 | callback(uid, buffer, Sink::Operation_Creation); | ||
238 | } | ||
239 | } else { | ||
240 | // Always remove removals, they probably don't match due to non-available properties | ||
241 | if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { | ||
242 | // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) | ||
243 | callback(uid, buffer, operation); | ||
244 | } | ||
245 | } | ||
246 | }); | ||
247 | }; | ||
248 | auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; | ||
249 | return ResultSet(generator, skip); | ||
250 | } | 357 | } |
251 | 358 | ||
252 | ResultSet DataStoreQuery::postSortFilter(ResultSet &resultSet) | 359 | QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision) |
253 | { | 360 | { |
254 | return resultSet; | 361 | const auto bufferType = mType; |
362 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
363 | QVector<QByteArray> changedKeys; | ||
364 | const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); | ||
365 | // Spit out the revision keys one by one. | ||
366 | while (*revisionCounter <= topRevision) { | ||
367 | const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); | ||
368 | const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); | ||
369 | // SinkTrace() << "Revision" << *revisionCounter << type << uid; | ||
370 | Q_ASSERT(!uid.isEmpty()); | ||
371 | Q_ASSERT(!type.isEmpty()); | ||
372 | if (type != bufferType) { | ||
373 | // Skip revision | ||
374 | *revisionCounter += 1; | ||
375 | continue; | ||
376 | } | ||
377 | const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); | ||
378 | *revisionCounter += 1; | ||
379 | changedKeys << key; | ||
380 | } | ||
381 | SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; | ||
382 | return changedKeys; | ||
255 | } | 383 | } |
256 | 384 | ||
385 | |||
257 | ResultSet DataStoreQuery::update(qint64 baseRevision) | 386 | ResultSet DataStoreQuery::update(qint64 baseRevision) |
258 | { | 387 | { |
259 | SinkTrace() << "Executing query update"; | 388 | SinkTrace() << "Executing query update"; |
260 | mInitialQuery = false; | 389 | auto incrementalResultSet = loadIncrementalResultSet(baseRevision); |
261 | QSet<QByteArray> remainingFilters; | 390 | SinkTrace() << "Changed: " << incrementalResultSet; |
262 | QByteArray remainingSorting; | 391 | mSource->add(incrementalResultSet); |
263 | auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters); | 392 | ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { |
264 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), remainingSorting); | 393 | if (mCollector->next([callback](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &buffer) { |
265 | return postSortFilter(filteredSet); | 394 | SinkTrace() << "Got incremental result: " << uid << operation; |
395 | callback(uid, buffer, operation); | ||
396 | })) | ||
397 | { | ||
398 | return true; | ||
399 | } | ||
400 | return false; | ||
401 | }; | ||
402 | return ResultSet(generator, [this]() { mCollector->skip(); }); | ||
266 | } | 403 | } |
267 | 404 | ||
405 | |||
268 | ResultSet DataStoreQuery::execute() | 406 | ResultSet DataStoreQuery::execute() |
269 | { | 407 | { |
270 | SinkTrace() << "Executing query"; | 408 | SinkTrace() << "Executing query"; |
271 | mInitialQuery = true; | 409 | |
272 | QSet<QByteArray> remainingFilters; | 410 | ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { |
273 | QByteArray remainingSorting; | 411 | if (mCollector->next([callback](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &buffer) { |
274 | auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting); | 412 | if (operation != Sink::Operation_Removal) { |
275 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), remainingSorting); | 413 | SinkTrace() << "Got initial result: " << uid << operation; |
276 | return postSortFilter(filteredSet); | 414 | callback(uid, buffer, Sink::Operation_Creation); |
415 | } | ||
416 | })) | ||
417 | { | ||
418 | return true; | ||
419 | } | ||
420 | return false; | ||
421 | }; | ||
422 | return ResultSet(generator, [this]() { mCollector->skip(); }); | ||
277 | } | 423 | } |