diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-07-11 11:55:29 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-07-11 11:55:29 +0200 |
commit | 3a3118e768e1447dc7524328e84b8d7faef81fe1 (patch) | |
tree | af5582170ed6164fffc9365f34b17bf449c0db40 /common/entityreader.cpp | |
parent | f9379318d801df204cc50385c5eca1f28e91755e (diff) | |
parent | ce2fd2666f084eebe443598f6f3740a02913091e (diff) | |
download | sink-3a3118e768e1447dc7524328e84b8d7faef81fe1.tar.gz sink-3a3118e768e1447dc7524328e84b8d7faef81fe1.zip |
Merge branch 'feature/notifications' into develop
Diffstat (limited to 'common/entityreader.cpp')
-rw-r--r-- | common/entityreader.cpp | 50 |
1 files changed, 26 insertions, 24 deletions
diff --git a/common/entityreader.cpp b/common/entityreader.cpp index c15f73f..411e7e4 100644 --- a/common/entityreader.cpp +++ b/common/entityreader.cpp | |||
@@ -23,6 +23,8 @@ | |||
23 | #include "storage.h" | 23 | #include "storage.h" |
24 | #include "query.h" | 24 | #include "query.h" |
25 | 25 | ||
26 | SINK_DEBUG_AREA("entityreader") | ||
27 | |||
26 | using namespace Sink; | 28 | using namespace Sink; |
27 | 29 | ||
28 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) | 30 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) |
@@ -32,15 +34,15 @@ QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLat | |||
32 | [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { | 34 | [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { |
33 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 35 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
34 | if (!buffer.isValid()) { | 36 | if (!buffer.isValid()) { |
35 | Warning() << "Read invalid buffer from disk"; | 37 | SinkWarning() << "Read invalid buffer from disk"; |
36 | } else { | 38 | } else { |
37 | Trace() << "Found value " << key; | 39 | SinkTrace() << "Found value " << key; |
38 | current = adaptorFactory.createAdaptor(buffer.entity()); | 40 | current = adaptorFactory.createAdaptor(buffer.entity()); |
39 | retrievedRevision = Sink::Storage::revisionFromKey(key); | 41 | retrievedRevision = Sink::Storage::revisionFromKey(key); |
40 | } | 42 | } |
41 | return false; | 43 | return false; |
42 | }, | 44 | }, |
43 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | 45 | [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); |
44 | return current; | 46 | return current; |
45 | } | 47 | } |
46 | 48 | ||
@@ -51,14 +53,14 @@ QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(co | |||
51 | [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { | 53 | [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { |
52 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 54 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
53 | if (!buffer.isValid()) { | 55 | if (!buffer.isValid()) { |
54 | Warning() << "Read invalid buffer from disk"; | 56 | SinkWarning() << "Read invalid buffer from disk"; |
55 | } else { | 57 | } else { |
56 | current = adaptorFactory.createAdaptor(buffer.entity()); | 58 | current = adaptorFactory.createAdaptor(buffer.entity()); |
57 | retrievedRevision = Sink::Storage::revisionFromKey(key); | 59 | retrievedRevision = Sink::Storage::revisionFromKey(key); |
58 | } | 60 | } |
59 | return false; | 61 | return false; |
60 | }, | 62 | }, |
61 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | 63 | [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); |
62 | return current; | 64 | return current; |
63 | } | 65 | } |
64 | 66 | ||
@@ -74,7 +76,7 @@ QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPre | |||
74 | } | 76 | } |
75 | return true; | 77 | return true; |
76 | }, | 78 | }, |
77 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }, true); | 79 | [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); |
78 | return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); | 80 | return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); |
79 | } | 81 | } |
80 | 82 | ||
@@ -86,7 +88,7 @@ EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QBy | |||
86 | mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) | 88 | mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) |
87 | { | 89 | { |
88 | Q_ASSERT(!resourceType.isEmpty()); | 90 | Q_ASSERT(!resourceType.isEmpty()); |
89 | Trace() << "resourceType " << resourceType; | 91 | SinkTrace() << "resourceType " << resourceType; |
90 | Q_ASSERT(mDomainTypeAdaptorFactoryPtr); | 92 | Q_ASSERT(mDomainTypeAdaptorFactoryPtr); |
91 | } | 93 | } |
92 | 94 | ||
@@ -165,13 +167,13 @@ void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db | |||
165 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); | 167 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); |
166 | return false; | 168 | return false; |
167 | }, | 169 | }, |
168 | [&](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message << key; }); | 170 | [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); |
169 | } | 171 | } |
170 | 172 | ||
171 | static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) | 173 | static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) |
172 | { | 174 | { |
173 | // TODO use a result set with an iterator, to read values on demand | 175 | // TODO use a result set with an iterator, to read values on demand |
174 | Trace() << "Looking for : " << bufferType; | 176 | SinkTrace() << "Looking for : " << bufferType; |
175 | //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. | 177 | //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. |
176 | QSet<QByteArray> keys; | 178 | QSet<QByteArray> keys; |
177 | Storage::mainDatabase(transaction, bufferType) | 179 | Storage::mainDatabase(transaction, bufferType) |
@@ -179,14 +181,14 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, | |||
179 | [&](const QByteArray &key, const QByteArray &value) -> bool { | 181 | [&](const QByteArray &key, const QByteArray &value) -> bool { |
180 | if (keys.contains(Sink::Storage::uidFromKey(key))) { | 182 | if (keys.contains(Sink::Storage::uidFromKey(key))) { |
181 | //Not something that should persist if the replay works, so we keep a message for now. | 183 | //Not something that should persist if the replay works, so we keep a message for now. |
182 | Trace() << "Multiple revisions for key: " << key; | 184 | SinkTrace() << "Multiple revisions for key: " << key; |
183 | } | 185 | } |
184 | keys << Sink::Storage::uidFromKey(key); | 186 | keys << Sink::Storage::uidFromKey(key); |
185 | return true; | 187 | return true; |
186 | }, | 188 | }, |
187 | [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; }); | 189 | [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); |
188 | 190 | ||
189 | Trace() << "Full scan retrieved " << keys.size() << " results."; | 191 | SinkTrace() << "Full scan retrieved " << keys.size() << " results."; |
190 | return ResultSet(keys.toList().toVector()); | 192 | return ResultSet(keys.toList().toVector()); |
191 | } | 193 | } |
192 | 194 | ||
@@ -224,7 +226,7 @@ ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision | |||
224 | while (*revisionCounter <= topRevision) { | 226 | while (*revisionCounter <= topRevision) { |
225 | const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); | 227 | const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); |
226 | const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); | 228 | const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); |
227 | // Trace() << "Revision" << *revisionCounter << type << uid; | 229 | // SinkTrace() << "Revision" << *revisionCounter << type << uid; |
228 | Q_ASSERT(!uid.isEmpty()); | 230 | Q_ASSERT(!uid.isEmpty()); |
229 | Q_ASSERT(!type.isEmpty()); | 231 | Q_ASSERT(!type.isEmpty()); |
230 | if (type != bufferType) { | 232 | if (type != bufferType) { |
@@ -236,7 +238,7 @@ ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision | |||
236 | *revisionCounter += 1; | 238 | *revisionCounter += 1; |
237 | return key; | 239 | return key; |
238 | } | 240 | } |
239 | Trace() << "Finished reading incremental result set:" << *revisionCounter; | 241 | SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; |
240 | // We're done | 242 | // We're done |
241 | return QByteArray(); | 243 | return QByteArray(); |
242 | }); | 244 | }); |
@@ -248,7 +250,7 @@ ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const | |||
248 | { | 250 | { |
249 | const bool sortingRequired = !sortProperty.isEmpty(); | 251 | const bool sortingRequired = !sortProperty.isEmpty(); |
250 | if (initialQuery && sortingRequired) { | 252 | if (initialQuery && sortingRequired) { |
251 | Trace() << "Sorting the resultset in memory according to property: " << sortProperty; | 253 | SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; |
252 | // Sort the complete set by reading the sort property and filling into a sorted map | 254 | // Sort the complete set by reading the sort property and filling into a sorted map |
253 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); | 255 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); |
254 | while (resultSet.next()) { | 256 | while (resultSet.next()) { |
@@ -271,7 +273,7 @@ ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const | |||
271 | }); | 273 | }); |
272 | } | 274 | } |
273 | 275 | ||
274 | Trace() << "Sorted " << sortedMap->size() << " values."; | 276 | SinkTrace() << "Sorted " << sortedMap->size() << " values."; |
275 | auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); | 277 | auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); |
276 | ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( | 278 | ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( |
277 | std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | 279 | std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { |
@@ -330,11 +332,11 @@ QPair<qint64, qint64> EntityReader<DomainType>::load(const Sink::Query &query, c | |||
330 | QSet<QByteArray> remainingFilters; | 332 | QSet<QByteArray> remainingFilters; |
331 | QByteArray remainingSorting; | 333 | QByteArray remainingSorting; |
332 | auto resultSet = baseSetRetriever(remainingFilters, remainingSorting); | 334 | auto resultSet = baseSetRetriever(remainingFilters, remainingSorting); |
333 | Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); | 335 | SinkTrace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); |
334 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); | 336 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); |
335 | Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 337 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
336 | auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); | 338 | auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); |
337 | // Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); | 339 | // SinkTrace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); |
338 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); | 340 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); |
339 | } | 341 | } |
340 | 342 | ||
@@ -346,7 +348,7 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink:: | |||
346 | auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { | 348 | auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { |
347 | return loadInitialResultSet(query, remainingFilters, remainingSorting); | 349 | return loadInitialResultSet(query, remainingFilters, remainingSorting); |
348 | }, true, offset, batchsize, callback); | 350 | }, true, offset, batchsize, callback); |
349 | Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 351 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
350 | return revisionAndReplayedEntities; | 352 | return revisionAndReplayedEntities; |
351 | } | 353 | } |
352 | 354 | ||
@@ -359,7 +361,7 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si | |||
359 | auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { | 361 | auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { |
360 | return loadIncrementalResultSet(baseRevision, query, remainingFilters); | 362 | return loadIncrementalResultSet(baseRevision, query, remainingFilters); |
361 | }, false, 0, 0, callback); | 363 | }, false, 0, 0, callback); |
362 | Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 364 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
363 | return revisionAndReplayedEntities; | 365 | return revisionAndReplayedEntities; |
364 | } | 366 | } |
365 | 367 | ||
@@ -377,7 +379,7 @@ EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, con | |||
377 | const auto property = domainObject->getProperty(filterProperty); | 379 | const auto property = domainObject->getProperty(filterProperty); |
378 | const auto comparator = query.propertyFilter.value(filterProperty); | 380 | const auto comparator = query.propertyFilter.value(filterProperty); |
379 | if (!comparator.matches(property)) { | 381 | if (!comparator.matches(property)) { |
380 | Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; | 382 | SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; |
381 | return false; | 383 | return false; |
382 | } | 384 | } |
383 | } | 385 | } |
@@ -388,7 +390,7 @@ EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, con | |||
388 | template <class DomainType> | 390 | template <class DomainType> |
389 | qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) | 391 | qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) |
390 | { | 392 | { |
391 | Trace() << "Skipping over " << offset << " results"; | 393 | SinkTrace() << "Skipping over " << offset << " results"; |
392 | resultSet.skip(offset); | 394 | resultSet.skip(offset); |
393 | int counter = 0; | 395 | int counter = 0; |
394 | while (!batchSize || (counter < batchSize)) { | 396 | while (!batchSize || (counter < batchSize)) { |
@@ -401,7 +403,7 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int | |||
401 | break; | 403 | break; |
402 | } | 404 | } |
403 | }; | 405 | }; |
404 | Trace() << "Replayed " << counter << " results." | 406 | SinkTrace() << "Replayed " << counter << " results." |
405 | << "Limit " << batchSize; | 407 | << "Limit " << batchSize; |
406 | return counter; | 408 | return counter; |
407 | } | 409 | } |