summaryrefslogtreecommitdiffstats
path: root/common/entityreader.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-19 18:55:21 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-19 18:55:21 +0200
commit4a14a6fade947aa830d3f21598a4a6ba7316b933 (patch)
treec6b340bf1c6284e5501d371f65b58e3a69391a26 /common/entityreader.cpp
parent1deac558af4b1c9f04352ede7f8e172f11a70a6b (diff)
downloadsink-4a14a6fade947aa830d3f21598a4a6ba7316b933.tar.gz
sink-4a14a6fade947aa830d3f21598a4a6ba7316b933.zip
Refactored the query part of the entity reader into DataStoreQuery.
DataStoreQuery now encapsulates the low-level query that operates directly on the storage. It no longer has access to the resource buffers, and is instantiated by the type implementation, so we can specialize the query alogorithm per type, but not per resource. This will allow us to implement the threading queries for the mailtype.
Diffstat (limited to 'common/entityreader.cpp')
-rw-r--r--common/entityreader.cpp250
1 files changed, 35 insertions, 215 deletions
diff --git a/common/entityreader.cpp b/common/entityreader.cpp
index 01c25d2..faa154b 100644
--- a/common/entityreader.cpp
+++ b/common/entityreader.cpp
@@ -150,205 +150,41 @@ void EntityReader<DomainType>::query(const Sink::Query &query, const std::functi
150 }); 150 });
151} 151}
152 152
153template <class DomainType> 153/* template <class DomainType> */
154void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, 154/* void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, */
155 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) 155/* const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) */
156{ 156/* { */
157 db.findLatest(key, 157/* db.findLatest(key, */
158 [=](const QByteArray &key, const QByteArray &value) -> bool { 158/* [=](const QByteArray &key, const QByteArray &value) -> bool { */
159 Sink::EntityBuffer buffer(value.data(), value.size()); 159/* Sink::EntityBuffer buffer(value.data(), value.size()); */
160 const Sink::Entity &entity = buffer.entity(); 160/* const Sink::Entity &entity = buffer.entity(); */
161 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 161/* const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); */
162 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; 162/* const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; */
163 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; 163/* const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; */
164 auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); 164/* auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); */
165 Q_ASSERT(adaptor); 165/* Q_ASSERT(adaptor); */
166 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); 166/* resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); */
167 return false; 167/* return false; */
168 }, 168/* }, */
169 [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); 169/* [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); */
170} 170/* } */
171
172static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
173{
174 // TODO use a result set with an iterator, to read values on demand
175 SinkTrace() << "Looking for : " << bufferType;
176 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
177 QSet<QByteArray> keys;
178 Storage::mainDatabase(transaction, bufferType)
179 .scan(QByteArray(),
180 [&](const QByteArray &key, const QByteArray &value) -> bool {
181 if (keys.contains(Sink::Storage::uidFromKey(key))) {
182 //Not something that should persist if the replay works, so we keep a message for now.
183 SinkTrace() << "Multiple revisions for key: " << key;
184 }
185 keys << Sink::Storage::uidFromKey(key);
186 return true;
187 },
188 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; });
189
190 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
191 return ResultSet(keys.toList().toVector());
192}
193
194template <class DomainType>
195ResultSet EntityReader<DomainType>::loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
196{
197 if (!query.ids.isEmpty()) {
198 return ResultSet(query.ids.toVector());
199 }
200 QSet<QByteArray> appliedFilters;
201 QByteArray appliedSorting;
202 auto resultSet = Sink::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, mTransaction);
203 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
204 if (appliedSorting.isEmpty()) {
205 remainingSorting = query.sortProperty;
206 }
207 171
208 // We do a full scan if there were no indexes available to create the initial set.
209 if (appliedFilters.isEmpty()) {
210 // TODO this should be replaced by an index lookup as well
211 resultSet = fullScan(mTransaction, ApplicationDomain::getTypeName<DomainType>());
212 }
213 return resultSet;
214}
215
216template <class DomainType>
217ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters)
218{
219 const auto bufferType = ApplicationDomain::getTypeName<DomainType>();
220 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
221 remainingFilters = query.propertyFilter.keys().toSet();
222 return ResultSet([this, bufferType, revisionCounter]() -> QByteArray {
223 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction);
224 // Spit out the revision keys one by one.
225 while (*revisionCounter <= topRevision) {
226 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
227 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
228 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
229 Q_ASSERT(!uid.isEmpty());
230 Q_ASSERT(!type.isEmpty());
231 if (type != bufferType) {
232 // Skip revision
233 *revisionCounter += 1;
234 continue;
235 }
236 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
237 *revisionCounter += 1;
238 return key;
239 }
240 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
241 // We're done
242 return QByteArray();
243 });
244}
245
246template <class DomainType>
247ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
248 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty)
249{
250 const bool sortingRequired = !sortProperty.isEmpty();
251 if (initialQuery && sortingRequired) {
252 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty;
253 // Sort the complete set by reading the sort property and filling into a sorted map
254 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
255 while (resultSet.next()) {
256 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
257 readEntity(db, resultSet.id(),
258 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
259 // We're not interested in removals during the initial query
260 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
261 if (!sortProperty.isEmpty()) {
262 const auto sortValue = domainObject->getProperty(sortProperty);
263 if (sortValue.type() == QVariant::DateTime) {
264 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier());
265 } else {
266 sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier());
267 }
268 } else {
269 sortedMap->insert(domainObject->identifier(), domainObject->identifier());
270 }
271 }
272 });
273 }
274
275 SinkTrace() << "Sorted " << sortedMap->size() << " values.";
276 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
277 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](
278 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
279 if (iterator->hasNext()) {
280 readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject,
281 Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); });
282 return true;
283 }
284 return false;
285 };
286 172
287 auto skip = [iterator]() {
288 if (iterator->hasNext()) {
289 iterator->next();
290 }
291 };
292 return ResultSet(generator, skip);
293 } else {
294 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
295 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](
296 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
297 if (resultSetPtr->next()) {
298 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
299 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
300 if (initialQuery) {
301 // We're not interested in removals during the initial query
302 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
303 // In the initial set every entity is new
304 callback(domainObject, Sink::Operation_Creation);
305 }
306 } else {
307 // Always remove removals, they probably don't match due to non-available properties
308 if ((operation == Sink::Operation_Removal) || filter(domainObject)) {
309 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
310 callback(domainObject, operation);
311 }
312 }
313 });
314 return true;
315 }
316 return false;
317 };
318 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
319 return ResultSet(generator, skip);
320 }
321}
322 173
323template <class DomainType> 174template <class DomainType>
324QPair<qint64, qint64> EntityReader<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) 175QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
325{ 176{
326 QTime time; 177 QTime time;
327 time.start(); 178 time.start();
328 179
329 auto db = Storage::mainDatabase(mTransaction, ApplicationDomain::getTypeName<DomainType>()); 180 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction);
181 auto resultSet = preparedQuery.execute();
330 182
331 QSet<QByteArray> remainingFilters;
332 QByteArray remainingSorting;
333 auto resultSet = baseSetRetriever(remainingFilters, remainingSorting);
334 SinkTrace() << "Base set retrieved. " << Log::TraceTime(time.elapsed());
335 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting);
336 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 183 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
337 auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); 184 auto replayedEntities = replaySet(resultSet, offset, batchsize, callback);
338 // SinkTrace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed());
339 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
340}
341 185
342template <class DomainType>
343QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
344{
345 QTime time;
346 time.start();
347 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
348 return loadInitialResultSet(query, remainingFilters, remainingSorting);
349 }, true, offset, batchsize, callback);
350 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 186 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
351 return revisionAndReplayedEntities; 187 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
352} 188}
353 189
354template <class DomainType> 190template <class DomainType>
@@ -357,33 +193,15 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si
357 QTime time; 193 QTime time;
358 time.start(); 194 time.start();
359 const qint64 baseRevision = lastRevision + 1; 195 const qint64 baseRevision = lastRevision + 1;
360 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
361 return loadIncrementalResultSet(baseRevision, query, remainingFilters);
362 }, false, 0, 0, callback);
363 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
364 return revisionAndReplayedEntities;
365}
366 196
367template <class DomainType> 197 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction);
368std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> 198 auto resultSet = preparedQuery.update(baseRevision);
369EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query) 199
370{ 200 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
371 return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { 201 auto replayedEntities = replaySet(resultSet, 0, 0, callback);
372 if (!query.ids.isEmpty()) { 202
373 if (!query.ids.contains(domainObject->identifier())) { 203 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
374 return false; 204 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
375 }
376 }
377 for (const auto &filterProperty : remainingFilters) {
378 const auto property = domainObject->getProperty(filterProperty);
379 const auto comparator = query.propertyFilter.value(filterProperty);
380 if (!comparator.matches(property)) {
381 SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
382 return false;
383 }
384 }
385 return true;
386 };
387} 205}
388 206
389template <class DomainType> 207template <class DomainType>
@@ -394,9 +212,11 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int
394 int counter = 0; 212 int counter = 0;
395 while (!batchSize || (counter < batchSize)) { 213 while (!batchSize || (counter < batchSize)) {
396 const bool ret = 214 const bool ret =
397 resultSet.next([&counter, callback](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { 215 resultSet.next([this, &counter, callback](const QByteArray &uid, const Sink::EntityBuffer &value, Sink::Operation operation) -> bool {
398 counter++; 216 counter++;
399 return callback(value.staticCast<DomainType>(), operation); 217 auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(value.entity());
218 Q_ASSERT(adaptor);
219 return callback(QSharedPointer<DomainType>::create(mResourceInstanceIdentifier, uid, value.revision(), adaptor), operation);
400 }); 220 });
401 if (!ret) { 221 if (!ret) {
402 break; 222 break;