diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-09-19 18:55:21 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-09-19 18:55:21 +0200 |
commit | 4a14a6fade947aa830d3f21598a4a6ba7316b933 (patch) | |
tree | c6b340bf1c6284e5501d371f65b58e3a69391a26 /common/entityreader.cpp | |
parent | 1deac558af4b1c9f04352ede7f8e172f11a70a6b (diff) | |
download | sink-4a14a6fade947aa830d3f21598a4a6ba7316b933.tar.gz sink-4a14a6fade947aa830d3f21598a4a6ba7316b933.zip |
Refactored the query part of the entity reader into DataStoreQuery.
DataStoreQuery now encapsulates the low-level query that operates
directly on the storage. It no longer has access to the resource
buffers, and is instantiated by the type implementation, so we can
specialize the query alogorithm per type, but not per resource.
This will allow us to implement the threading queries for the mailtype.
Diffstat (limited to 'common/entityreader.cpp')
-rw-r--r-- | common/entityreader.cpp | 250 |
1 files changed, 35 insertions, 215 deletions
diff --git a/common/entityreader.cpp b/common/entityreader.cpp index 01c25d2..faa154b 100644 --- a/common/entityreader.cpp +++ b/common/entityreader.cpp | |||
@@ -150,205 +150,41 @@ void EntityReader<DomainType>::query(const Sink::Query &query, const std::functi | |||
150 | }); | 150 | }); |
151 | } | 151 | } |
152 | 152 | ||
153 | template <class DomainType> | 153 | /* template <class DomainType> */ |
154 | void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, | 154 | /* void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, */ |
155 | const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) | 155 | /* const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) */ |
156 | { | 156 | /* { */ |
157 | db.findLatest(key, | 157 | /* db.findLatest(key, */ |
158 | [=](const QByteArray &key, const QByteArray &value) -> bool { | 158 | /* [=](const QByteArray &key, const QByteArray &value) -> bool { */ |
159 | Sink::EntityBuffer buffer(value.data(), value.size()); | 159 | /* Sink::EntityBuffer buffer(value.data(), value.size()); */ |
160 | const Sink::Entity &entity = buffer.entity(); | 160 | /* const Sink::Entity &entity = buffer.entity(); */ |
161 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | 161 | /* const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); */ |
162 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | 162 | /* const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; */ |
163 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | 163 | /* const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; */ |
164 | auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); | 164 | /* auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); */ |
165 | Q_ASSERT(adaptor); | 165 | /* Q_ASSERT(adaptor); */ |
166 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); | 166 | /* resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); */ |
167 | return false; | 167 | /* return false; */ |
168 | }, | 168 | /* }, */ |
169 | [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); | 169 | /* [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); */ |
170 | } | 170 | /* } */ |
171 | |||
172 | static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
173 | { | ||
174 | // TODO use a result set with an iterator, to read values on demand | ||
175 | SinkTrace() << "Looking for : " << bufferType; | ||
176 | //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. | ||
177 | QSet<QByteArray> keys; | ||
178 | Storage::mainDatabase(transaction, bufferType) | ||
179 | .scan(QByteArray(), | ||
180 | [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
181 | if (keys.contains(Sink::Storage::uidFromKey(key))) { | ||
182 | //Not something that should persist if the replay works, so we keep a message for now. | ||
183 | SinkTrace() << "Multiple revisions for key: " << key; | ||
184 | } | ||
185 | keys << Sink::Storage::uidFromKey(key); | ||
186 | return true; | ||
187 | }, | ||
188 | [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); | ||
189 | |||
190 | SinkTrace() << "Full scan retrieved " << keys.size() << " results."; | ||
191 | return ResultSet(keys.toList().toVector()); | ||
192 | } | ||
193 | |||
194 | template <class DomainType> | ||
195 | ResultSet EntityReader<DomainType>::loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) | ||
196 | { | ||
197 | if (!query.ids.isEmpty()) { | ||
198 | return ResultSet(query.ids.toVector()); | ||
199 | } | ||
200 | QSet<QByteArray> appliedFilters; | ||
201 | QByteArray appliedSorting; | ||
202 | auto resultSet = Sink::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, mTransaction); | ||
203 | remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; | ||
204 | if (appliedSorting.isEmpty()) { | ||
205 | remainingSorting = query.sortProperty; | ||
206 | } | ||
207 | 171 | ||
208 | // We do a full scan if there were no indexes available to create the initial set. | ||
209 | if (appliedFilters.isEmpty()) { | ||
210 | // TODO this should be replaced by an index lookup as well | ||
211 | resultSet = fullScan(mTransaction, ApplicationDomain::getTypeName<DomainType>()); | ||
212 | } | ||
213 | return resultSet; | ||
214 | } | ||
215 | |||
216 | template <class DomainType> | ||
217 | ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters) | ||
218 | { | ||
219 | const auto bufferType = ApplicationDomain::getTypeName<DomainType>(); | ||
220 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
221 | remainingFilters = query.propertyFilter.keys().toSet(); | ||
222 | return ResultSet([this, bufferType, revisionCounter]() -> QByteArray { | ||
223 | const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); | ||
224 | // Spit out the revision keys one by one. | ||
225 | while (*revisionCounter <= topRevision) { | ||
226 | const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); | ||
227 | const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); | ||
228 | // SinkTrace() << "Revision" << *revisionCounter << type << uid; | ||
229 | Q_ASSERT(!uid.isEmpty()); | ||
230 | Q_ASSERT(!type.isEmpty()); | ||
231 | if (type != bufferType) { | ||
232 | // Skip revision | ||
233 | *revisionCounter += 1; | ||
234 | continue; | ||
235 | } | ||
236 | const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); | ||
237 | *revisionCounter += 1; | ||
238 | return key; | ||
239 | } | ||
240 | SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; | ||
241 | // We're done | ||
242 | return QByteArray(); | ||
243 | }); | ||
244 | } | ||
245 | |||
246 | template <class DomainType> | ||
247 | ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, | ||
248 | const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) | ||
249 | { | ||
250 | const bool sortingRequired = !sortProperty.isEmpty(); | ||
251 | if (initialQuery && sortingRequired) { | ||
252 | SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; | ||
253 | // Sort the complete set by reading the sort property and filling into a sorted map | ||
254 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); | ||
255 | while (resultSet.next()) { | ||
256 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
257 | readEntity(db, resultSet.id(), | ||
258 | [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { | ||
259 | // We're not interested in removals during the initial query | ||
260 | if ((operation != Sink::Operation_Removal) && filter(domainObject)) { | ||
261 | if (!sortProperty.isEmpty()) { | ||
262 | const auto sortValue = domainObject->getProperty(sortProperty); | ||
263 | if (sortValue.type() == QVariant::DateTime) { | ||
264 | sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); | ||
265 | } else { | ||
266 | sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); | ||
267 | } | ||
268 | } else { | ||
269 | sortedMap->insert(domainObject->identifier(), domainObject->identifier()); | ||
270 | } | ||
271 | } | ||
272 | }); | ||
273 | } | ||
274 | |||
275 | SinkTrace() << "Sorted " << sortedMap->size() << " values."; | ||
276 | auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); | ||
277 | ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( | ||
278 | std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | ||
279 | if (iterator->hasNext()) { | ||
280 | readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, | ||
281 | Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); }); | ||
282 | return true; | ||
283 | } | ||
284 | return false; | ||
285 | }; | ||
286 | 172 | ||
287 | auto skip = [iterator]() { | ||
288 | if (iterator->hasNext()) { | ||
289 | iterator->next(); | ||
290 | } | ||
291 | }; | ||
292 | return ResultSet(generator, skip); | ||
293 | } else { | ||
294 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
295 | ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery]( | ||
296 | std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | ||
297 | if (resultSetPtr->next()) { | ||
298 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
299 | readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { | ||
300 | if (initialQuery) { | ||
301 | // We're not interested in removals during the initial query | ||
302 | if ((operation != Sink::Operation_Removal) && filter(domainObject)) { | ||
303 | // In the initial set every entity is new | ||
304 | callback(domainObject, Sink::Operation_Creation); | ||
305 | } | ||
306 | } else { | ||
307 | // Always remove removals, they probably don't match due to non-available properties | ||
308 | if ((operation == Sink::Operation_Removal) || filter(domainObject)) { | ||
309 | // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) | ||
310 | callback(domainObject, operation); | ||
311 | } | ||
312 | } | ||
313 | }); | ||
314 | return true; | ||
315 | } | ||
316 | return false; | ||
317 | }; | ||
318 | auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; | ||
319 | return ResultSet(generator, skip); | ||
320 | } | ||
321 | } | ||
322 | 173 | ||
323 | template <class DomainType> | 174 | template <class DomainType> |
324 | QPair<qint64, qint64> EntityReader<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) | 175 | QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) |
325 | { | 176 | { |
326 | QTime time; | 177 | QTime time; |
327 | time.start(); | 178 | time.start(); |
328 | 179 | ||
329 | auto db = Storage::mainDatabase(mTransaction, ApplicationDomain::getTypeName<DomainType>()); | 180 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); |
181 | auto resultSet = preparedQuery.execute(); | ||
330 | 182 | ||
331 | QSet<QByteArray> remainingFilters; | ||
332 | QByteArray remainingSorting; | ||
333 | auto resultSet = baseSetRetriever(remainingFilters, remainingSorting); | ||
334 | SinkTrace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); | ||
335 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); | ||
336 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 183 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
337 | auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); | 184 | auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); |
338 | // SinkTrace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); | ||
339 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); | ||
340 | } | ||
341 | 185 | ||
342 | template <class DomainType> | ||
343 | QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) | ||
344 | { | ||
345 | QTime time; | ||
346 | time.start(); | ||
347 | auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { | ||
348 | return loadInitialResultSet(query, remainingFilters, remainingSorting); | ||
349 | }, true, offset, batchsize, callback); | ||
350 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 186 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
351 | return revisionAndReplayedEntities; | 187 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); |
352 | } | 188 | } |
353 | 189 | ||
354 | template <class DomainType> | 190 | template <class DomainType> |
@@ -357,33 +193,15 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si | |||
357 | QTime time; | 193 | QTime time; |
358 | time.start(); | 194 | time.start(); |
359 | const qint64 baseRevision = lastRevision + 1; | 195 | const qint64 baseRevision = lastRevision + 1; |
360 | auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { | ||
361 | return loadIncrementalResultSet(baseRevision, query, remainingFilters); | ||
362 | }, false, 0, 0, callback); | ||
363 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | ||
364 | return revisionAndReplayedEntities; | ||
365 | } | ||
366 | 196 | ||
367 | template <class DomainType> | 197 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); |
368 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> | 198 | auto resultSet = preparedQuery.update(baseRevision); |
369 | EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query) | 199 | |
370 | { | 200 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
371 | return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | 201 | auto replayedEntities = replaySet(resultSet, 0, 0, callback); |
372 | if (!query.ids.isEmpty()) { | 202 | |
373 | if (!query.ids.contains(domainObject->identifier())) { | 203 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
374 | return false; | 204 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); |
375 | } | ||
376 | } | ||
377 | for (const auto &filterProperty : remainingFilters) { | ||
378 | const auto property = domainObject->getProperty(filterProperty); | ||
379 | const auto comparator = query.propertyFilter.value(filterProperty); | ||
380 | if (!comparator.matches(property)) { | ||
381 | SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; | ||
382 | return false; | ||
383 | } | ||
384 | } | ||
385 | return true; | ||
386 | }; | ||
387 | } | 205 | } |
388 | 206 | ||
389 | template <class DomainType> | 207 | template <class DomainType> |
@@ -394,9 +212,11 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int | |||
394 | int counter = 0; | 212 | int counter = 0; |
395 | while (!batchSize || (counter < batchSize)) { | 213 | while (!batchSize || (counter < batchSize)) { |
396 | const bool ret = | 214 | const bool ret = |
397 | resultSet.next([&counter, callback](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { | 215 | resultSet.next([this, &counter, callback](const QByteArray &uid, const Sink::EntityBuffer &value, Sink::Operation operation) -> bool { |
398 | counter++; | 216 | counter++; |
399 | return callback(value.staticCast<DomainType>(), operation); | 217 | auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(value.entity()); |
218 | Q_ASSERT(adaptor); | ||
219 | return callback(QSharedPointer<DomainType>::create(mResourceInstanceIdentifier, uid, value.revision(), adaptor), operation); | ||
400 | }); | 220 | }); |
401 | if (!ret) { | 221 | if (!ret) { |
402 | break; | 222 | break; |