summaryrefslogtreecommitdiffstats
path: root/common/entityreader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/entityreader.cpp')
-rw-r--r--common/entityreader.cpp250
1 files changed, 35 insertions, 215 deletions
diff --git a/common/entityreader.cpp b/common/entityreader.cpp
index 01c25d2..faa154b 100644
--- a/common/entityreader.cpp
+++ b/common/entityreader.cpp
@@ -150,205 +150,41 @@ void EntityReader<DomainType>::query(const Sink::Query &query, const std::functi
150 }); 150 });
151} 151}
152 152
153template <class DomainType> 153/* template <class DomainType> */
154void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, 154/* void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, */
155 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) 155/* const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) */
156{ 156/* { */
157 db.findLatest(key, 157/* db.findLatest(key, */
158 [=](const QByteArray &key, const QByteArray &value) -> bool { 158/* [=](const QByteArray &key, const QByteArray &value) -> bool { */
159 Sink::EntityBuffer buffer(value.data(), value.size()); 159/* Sink::EntityBuffer buffer(value.data(), value.size()); */
160 const Sink::Entity &entity = buffer.entity(); 160/* const Sink::Entity &entity = buffer.entity(); */
161 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 161/* const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); */
162 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; 162/* const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; */
163 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; 163/* const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; */
164 auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); 164/* auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); */
165 Q_ASSERT(adaptor); 165/* Q_ASSERT(adaptor); */
166 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); 166/* resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); */
167 return false; 167/* return false; */
168 }, 168/* }, */
169 [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); 169/* [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); */
170} 170/* } */
171
172static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
173{
174 // TODO use a result set with an iterator, to read values on demand
175 SinkTrace() << "Looking for : " << bufferType;
176 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
177 QSet<QByteArray> keys;
178 Storage::mainDatabase(transaction, bufferType)
179 .scan(QByteArray(),
180 [&](const QByteArray &key, const QByteArray &value) -> bool {
181 if (keys.contains(Sink::Storage::uidFromKey(key))) {
182 //Not something that should persist if the replay works, so we keep a message for now.
183 SinkTrace() << "Multiple revisions for key: " << key;
184 }
185 keys << Sink::Storage::uidFromKey(key);
186 return true;
187 },
188 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; });
189
190 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
191 return ResultSet(keys.toList().toVector());
192}
193
194template <class DomainType>
195ResultSet EntityReader<DomainType>::loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
196{
197 if (!query.ids.isEmpty()) {
198 return ResultSet(query.ids.toVector());
199 }
200 QSet<QByteArray> appliedFilters;
201 QByteArray appliedSorting;
202 auto resultSet = Sink::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, mTransaction);
203 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
204 if (appliedSorting.isEmpty()) {
205 remainingSorting = query.sortProperty;
206 }
207 171
208 // We do a full scan if there were no indexes available to create the initial set.
209 if (appliedFilters.isEmpty()) {
210 // TODO this should be replaced by an index lookup as well
211 resultSet = fullScan(mTransaction, ApplicationDomain::getTypeName<DomainType>());
212 }
213 return resultSet;
214}
215
216template <class DomainType>
217ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters)
218{
219 const auto bufferType = ApplicationDomain::getTypeName<DomainType>();
220 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
221 remainingFilters = query.propertyFilter.keys().toSet();
222 return ResultSet([this, bufferType, revisionCounter]() -> QByteArray {
223 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction);
224 // Spit out the revision keys one by one.
225 while (*revisionCounter <= topRevision) {
226 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
227 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
228 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
229 Q_ASSERT(!uid.isEmpty());
230 Q_ASSERT(!type.isEmpty());
231 if (type != bufferType) {
232 // Skip revision
233 *revisionCounter += 1;
234 continue;
235 }
236 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
237 *revisionCounter += 1;
238 return key;
239 }
240 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
241 // We're done
242 return QByteArray();
243 });
244}
245
246template <class DomainType>
247ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
248 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty)
249{
250 const bool sortingRequired = !sortProperty.isEmpty();
251 if (initialQuery && sortingRequired) {
252 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty;
253 // Sort the complete set by reading the sort property and filling into a sorted map
254 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
255 while (resultSet.next()) {
256 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
257 readEntity(db, resultSet.id(),
258 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
259 // We're not interested in removals during the initial query
260 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
261 if (!sortProperty.isEmpty()) {
262 const auto sortValue = domainObject->getProperty(sortProperty);
263 if (sortValue.type() == QVariant::DateTime) {
264 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier());
265 } else {
266 sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier());
267 }
268 } else {
269 sortedMap->insert(domainObject->identifier(), domainObject->identifier());
270 }
271 }
272 });
273 }
274
275 SinkTrace() << "Sorted " << sortedMap->size() << " values.";
276 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
277 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](
278 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
279 if (iterator->hasNext()) {
280 readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject,
281 Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); });
282 return true;
283 }
284 return false;
285 };
286 172
287 auto skip = [iterator]() {
288 if (iterator->hasNext()) {
289 iterator->next();
290 }
291 };
292 return ResultSet(generator, skip);
293 } else {
294 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
295 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](
296 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
297 if (resultSetPtr->next()) {
298 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
299 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
300 if (initialQuery) {
301 // We're not interested in removals during the initial query
302 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
303 // In the initial set every entity is new
304 callback(domainObject, Sink::Operation_Creation);
305 }
306 } else {
307 // Always remove removals, they probably don't match due to non-available properties
308 if ((operation == Sink::Operation_Removal) || filter(domainObject)) {
309 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
310 callback(domainObject, operation);
311 }
312 }
313 });
314 return true;
315 }
316 return false;
317 };
318 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
319 return ResultSet(generator, skip);
320 }
321}
322 173
323template <class DomainType> 174template <class DomainType>
324QPair<qint64, qint64> EntityReader<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) 175QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
325{ 176{
326 QTime time; 177 QTime time;
327 time.start(); 178 time.start();
328 179
329 auto db = Storage::mainDatabase(mTransaction, ApplicationDomain::getTypeName<DomainType>()); 180 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction);
181 auto resultSet = preparedQuery.execute();
330 182
331 QSet<QByteArray> remainingFilters;
332 QByteArray remainingSorting;
333 auto resultSet = baseSetRetriever(remainingFilters, remainingSorting);
334 SinkTrace() << "Base set retrieved. " << Log::TraceTime(time.elapsed());
335 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting);
336 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 183 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
337 auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); 184 auto replayedEntities = replaySet(resultSet, offset, batchsize, callback);
338 // SinkTrace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed());
339 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
340}
341 185
342template <class DomainType>
343QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
344{
345 QTime time;
346 time.start();
347 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
348 return loadInitialResultSet(query, remainingFilters, remainingSorting);
349 }, true, offset, batchsize, callback);
350 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 186 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
351 return revisionAndReplayedEntities; 187 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
352} 188}
353 189
354template <class DomainType> 190template <class DomainType>
@@ -357,33 +193,15 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si
357 QTime time; 193 QTime time;
358 time.start(); 194 time.start();
359 const qint64 baseRevision = lastRevision + 1; 195 const qint64 baseRevision = lastRevision + 1;
360 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
361 return loadIncrementalResultSet(baseRevision, query, remainingFilters);
362 }, false, 0, 0, callback);
363 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
364 return revisionAndReplayedEntities;
365}
366 196
367template <class DomainType> 197 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction);
368std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> 198 auto resultSet = preparedQuery.update(baseRevision);
369EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query) 199
370{ 200 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
371 return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { 201 auto replayedEntities = replaySet(resultSet, 0, 0, callback);
372 if (!query.ids.isEmpty()) { 202
373 if (!query.ids.contains(domainObject->identifier())) { 203 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
374 return false; 204 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
375 }
376 }
377 for (const auto &filterProperty : remainingFilters) {
378 const auto property = domainObject->getProperty(filterProperty);
379 const auto comparator = query.propertyFilter.value(filterProperty);
380 if (!comparator.matches(property)) {
381 SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
382 return false;
383 }
384 }
385 return true;
386 };
387} 205}
388 206
389template <class DomainType> 207template <class DomainType>
@@ -394,9 +212,11 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int
394 int counter = 0; 212 int counter = 0;
395 while (!batchSize || (counter < batchSize)) { 213 while (!batchSize || (counter < batchSize)) {
396 const bool ret = 214 const bool ret =
397 resultSet.next([&counter, callback](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { 215 resultSet.next([this, &counter, callback](const QByteArray &uid, const Sink::EntityBuffer &value, Sink::Operation operation) -> bool {
398 counter++; 216 counter++;
399 return callback(value.staticCast<DomainType>(), operation); 217 auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(value.entity());
218 Q_ASSERT(adaptor);
219 return callback(QSharedPointer<DomainType>::create(mResourceInstanceIdentifier, uid, value.revision(), adaptor), operation);
400 }); 220 });
401 if (!ret) { 221 if (!ret) {
402 break; 222 break;