diff options
Diffstat (limited to 'common/entityreader.cpp')
-rw-r--r-- | common/entityreader.cpp | 250 |
1 files changed, 35 insertions, 215 deletions
diff --git a/common/entityreader.cpp b/common/entityreader.cpp index 01c25d2..faa154b 100644 --- a/common/entityreader.cpp +++ b/common/entityreader.cpp | |||
@@ -150,205 +150,41 @@ void EntityReader<DomainType>::query(const Sink::Query &query, const std::functi | |||
150 | }); | 150 | }); |
151 | } | 151 | } |
152 | 152 | ||
153 | template <class DomainType> | 153 | /* template <class DomainType> */ |
154 | void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, | 154 | /* void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, */ |
155 | const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) | 155 | /* const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) */ |
156 | { | 156 | /* { */ |
157 | db.findLatest(key, | 157 | /* db.findLatest(key, */ |
158 | [=](const QByteArray &key, const QByteArray &value) -> bool { | 158 | /* [=](const QByteArray &key, const QByteArray &value) -> bool { */ |
159 | Sink::EntityBuffer buffer(value.data(), value.size()); | 159 | /* Sink::EntityBuffer buffer(value.data(), value.size()); */ |
160 | const Sink::Entity &entity = buffer.entity(); | 160 | /* const Sink::Entity &entity = buffer.entity(); */ |
161 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | 161 | /* const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); */ |
162 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | 162 | /* const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; */ |
163 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | 163 | /* const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; */ |
164 | auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); | 164 | /* auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); */ |
165 | Q_ASSERT(adaptor); | 165 | /* Q_ASSERT(adaptor); */ |
166 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); | 166 | /* resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); */ |
167 | return false; | 167 | /* return false; */ |
168 | }, | 168 | /* }, */ |
169 | [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); | 169 | /* [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); */ |
170 | } | 170 | /* } */ |
171 | |||
172 | static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
173 | { | ||
174 | // TODO use a result set with an iterator, to read values on demand | ||
175 | SinkTrace() << "Looking for : " << bufferType; | ||
176 | //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. | ||
177 | QSet<QByteArray> keys; | ||
178 | Storage::mainDatabase(transaction, bufferType) | ||
179 | .scan(QByteArray(), | ||
180 | [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
181 | if (keys.contains(Sink::Storage::uidFromKey(key))) { | ||
182 | //Not something that should persist if the replay works, so we keep a message for now. | ||
183 | SinkTrace() << "Multiple revisions for key: " << key; | ||
184 | } | ||
185 | keys << Sink::Storage::uidFromKey(key); | ||
186 | return true; | ||
187 | }, | ||
188 | [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); | ||
189 | |||
190 | SinkTrace() << "Full scan retrieved " << keys.size() << " results."; | ||
191 | return ResultSet(keys.toList().toVector()); | ||
192 | } | ||
193 | |||
194 | template <class DomainType> | ||
195 | ResultSet EntityReader<DomainType>::loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) | ||
196 | { | ||
197 | if (!query.ids.isEmpty()) { | ||
198 | return ResultSet(query.ids.toVector()); | ||
199 | } | ||
200 | QSet<QByteArray> appliedFilters; | ||
201 | QByteArray appliedSorting; | ||
202 | auto resultSet = Sink::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, mTransaction); | ||
203 | remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; | ||
204 | if (appliedSorting.isEmpty()) { | ||
205 | remainingSorting = query.sortProperty; | ||
206 | } | ||
207 | 171 | ||
208 | // We do a full scan if there were no indexes available to create the initial set. | ||
209 | if (appliedFilters.isEmpty()) { | ||
210 | // TODO this should be replaced by an index lookup as well | ||
211 | resultSet = fullScan(mTransaction, ApplicationDomain::getTypeName<DomainType>()); | ||
212 | } | ||
213 | return resultSet; | ||
214 | } | ||
215 | |||
216 | template <class DomainType> | ||
217 | ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters) | ||
218 | { | ||
219 | const auto bufferType = ApplicationDomain::getTypeName<DomainType>(); | ||
220 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
221 | remainingFilters = query.propertyFilter.keys().toSet(); | ||
222 | return ResultSet([this, bufferType, revisionCounter]() -> QByteArray { | ||
223 | const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); | ||
224 | // Spit out the revision keys one by one. | ||
225 | while (*revisionCounter <= topRevision) { | ||
226 | const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); | ||
227 | const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); | ||
228 | // SinkTrace() << "Revision" << *revisionCounter << type << uid; | ||
229 | Q_ASSERT(!uid.isEmpty()); | ||
230 | Q_ASSERT(!type.isEmpty()); | ||
231 | if (type != bufferType) { | ||
232 | // Skip revision | ||
233 | *revisionCounter += 1; | ||
234 | continue; | ||
235 | } | ||
236 | const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); | ||
237 | *revisionCounter += 1; | ||
238 | return key; | ||
239 | } | ||
240 | SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; | ||
241 | // We're done | ||
242 | return QByteArray(); | ||
243 | }); | ||
244 | } | ||
245 | |||
246 | template <class DomainType> | ||
247 | ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, | ||
248 | const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) | ||
249 | { | ||
250 | const bool sortingRequired = !sortProperty.isEmpty(); | ||
251 | if (initialQuery && sortingRequired) { | ||
252 | SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; | ||
253 | // Sort the complete set by reading the sort property and filling into a sorted map | ||
254 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); | ||
255 | while (resultSet.next()) { | ||
256 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
257 | readEntity(db, resultSet.id(), | ||
258 | [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { | ||
259 | // We're not interested in removals during the initial query | ||
260 | if ((operation != Sink::Operation_Removal) && filter(domainObject)) { | ||
261 | if (!sortProperty.isEmpty()) { | ||
262 | const auto sortValue = domainObject->getProperty(sortProperty); | ||
263 | if (sortValue.type() == QVariant::DateTime) { | ||
264 | sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); | ||
265 | } else { | ||
266 | sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); | ||
267 | } | ||
268 | } else { | ||
269 | sortedMap->insert(domainObject->identifier(), domainObject->identifier()); | ||
270 | } | ||
271 | } | ||
272 | }); | ||
273 | } | ||
274 | |||
275 | SinkTrace() << "Sorted " << sortedMap->size() << " values."; | ||
276 | auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); | ||
277 | ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( | ||
278 | std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | ||
279 | if (iterator->hasNext()) { | ||
280 | readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, | ||
281 | Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); }); | ||
282 | return true; | ||
283 | } | ||
284 | return false; | ||
285 | }; | ||
286 | 172 | ||
287 | auto skip = [iterator]() { | ||
288 | if (iterator->hasNext()) { | ||
289 | iterator->next(); | ||
290 | } | ||
291 | }; | ||
292 | return ResultSet(generator, skip); | ||
293 | } else { | ||
294 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
295 | ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery]( | ||
296 | std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | ||
297 | if (resultSetPtr->next()) { | ||
298 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
299 | readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { | ||
300 | if (initialQuery) { | ||
301 | // We're not interested in removals during the initial query | ||
302 | if ((operation != Sink::Operation_Removal) && filter(domainObject)) { | ||
303 | // In the initial set every entity is new | ||
304 | callback(domainObject, Sink::Operation_Creation); | ||
305 | } | ||
306 | } else { | ||
307 | // Always remove removals, they probably don't match due to non-available properties | ||
308 | if ((operation == Sink::Operation_Removal) || filter(domainObject)) { | ||
309 | // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) | ||
310 | callback(domainObject, operation); | ||
311 | } | ||
312 | } | ||
313 | }); | ||
314 | return true; | ||
315 | } | ||
316 | return false; | ||
317 | }; | ||
318 | auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; | ||
319 | return ResultSet(generator, skip); | ||
320 | } | ||
321 | } | ||
322 | 173 | ||
323 | template <class DomainType> | 174 | template <class DomainType> |
324 | QPair<qint64, qint64> EntityReader<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) | 175 | QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) |
325 | { | 176 | { |
326 | QTime time; | 177 | QTime time; |
327 | time.start(); | 178 | time.start(); |
328 | 179 | ||
329 | auto db = Storage::mainDatabase(mTransaction, ApplicationDomain::getTypeName<DomainType>()); | 180 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); |
181 | auto resultSet = preparedQuery.execute(); | ||
330 | 182 | ||
331 | QSet<QByteArray> remainingFilters; | ||
332 | QByteArray remainingSorting; | ||
333 | auto resultSet = baseSetRetriever(remainingFilters, remainingSorting); | ||
334 | SinkTrace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); | ||
335 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); | ||
336 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 183 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
337 | auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); | 184 | auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); |
338 | // SinkTrace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); | ||
339 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); | ||
340 | } | ||
341 | 185 | ||
342 | template <class DomainType> | ||
343 | QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) | ||
344 | { | ||
345 | QTime time; | ||
346 | time.start(); | ||
347 | auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { | ||
348 | return loadInitialResultSet(query, remainingFilters, remainingSorting); | ||
349 | }, true, offset, batchsize, callback); | ||
350 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 186 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
351 | return revisionAndReplayedEntities; | 187 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); |
352 | } | 188 | } |
353 | 189 | ||
354 | template <class DomainType> | 190 | template <class DomainType> |
@@ -357,33 +193,15 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si | |||
357 | QTime time; | 193 | QTime time; |
358 | time.start(); | 194 | time.start(); |
359 | const qint64 baseRevision = lastRevision + 1; | 195 | const qint64 baseRevision = lastRevision + 1; |
360 | auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { | ||
361 | return loadIncrementalResultSet(baseRevision, query, remainingFilters); | ||
362 | }, false, 0, 0, callback); | ||
363 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | ||
364 | return revisionAndReplayedEntities; | ||
365 | } | ||
366 | 196 | ||
367 | template <class DomainType> | 197 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); |
368 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> | 198 | auto resultSet = preparedQuery.update(baseRevision); |
369 | EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query) | 199 | |
370 | { | 200 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
371 | return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | 201 | auto replayedEntities = replaySet(resultSet, 0, 0, callback); |
372 | if (!query.ids.isEmpty()) { | 202 | |
373 | if (!query.ids.contains(domainObject->identifier())) { | 203 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
374 | return false; | 204 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); |
375 | } | ||
376 | } | ||
377 | for (const auto &filterProperty : remainingFilters) { | ||
378 | const auto property = domainObject->getProperty(filterProperty); | ||
379 | const auto comparator = query.propertyFilter.value(filterProperty); | ||
380 | if (!comparator.matches(property)) { | ||
381 | SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; | ||
382 | return false; | ||
383 | } | ||
384 | } | ||
385 | return true; | ||
386 | }; | ||
387 | } | 205 | } |
388 | 206 | ||
389 | template <class DomainType> | 207 | template <class DomainType> |
@@ -394,9 +212,11 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int | |||
394 | int counter = 0; | 212 | int counter = 0; |
395 | while (!batchSize || (counter < batchSize)) { | 213 | while (!batchSize || (counter < batchSize)) { |
396 | const bool ret = | 214 | const bool ret = |
397 | resultSet.next([&counter, callback](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { | 215 | resultSet.next([this, &counter, callback](const QByteArray &uid, const Sink::EntityBuffer &value, Sink::Operation operation) -> bool { |
398 | counter++; | 216 | counter++; |
399 | return callback(value.staticCast<DomainType>(), operation); | 217 | auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(value.entity()); |
218 | Q_ASSERT(adaptor); | ||
219 | return callback(QSharedPointer<DomainType>::create(mResourceInstanceIdentifier, uid, value.revision(), adaptor), operation); | ||
400 | }); | 220 | }); |
401 | if (!ret) { | 221 | if (!ret) { |
402 | break; | 222 | break; |