summaryrefslogtreecommitdiffstats
path: root/common/entityreader.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-11 11:55:29 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-11 11:55:29 +0200
commit3a3118e768e1447dc7524328e84b8d7faef81fe1 (patch)
treeaf5582170ed6164fffc9365f34b17bf449c0db40 /common/entityreader.cpp
parentf9379318d801df204cc50385c5eca1f28e91755e (diff)
parentce2fd2666f084eebe443598f6f3740a02913091e (diff)
downloadsink-3a3118e768e1447dc7524328e84b8d7faef81fe1.tar.gz
sink-3a3118e768e1447dc7524328e84b8d7faef81fe1.zip
Merge branch 'feature/notifications' into develop
Diffstat (limited to 'common/entityreader.cpp')
-rw-r--r--common/entityreader.cpp50
1 files changed, 26 insertions, 24 deletions
diff --git a/common/entityreader.cpp b/common/entityreader.cpp
index c15f73f..411e7e4 100644
--- a/common/entityreader.cpp
+++ b/common/entityreader.cpp
@@ -23,6 +23,8 @@
23#include "storage.h" 23#include "storage.h"
24#include "query.h" 24#include "query.h"
25 25
26SINK_DEBUG_AREA("entityreader")
27
26using namespace Sink; 28using namespace Sink;
27 29
28QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) 30QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision)
@@ -32,15 +34,15 @@ QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLat
32 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { 34 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool {
33 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 35 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
34 if (!buffer.isValid()) { 36 if (!buffer.isValid()) {
35 Warning() << "Read invalid buffer from disk"; 37 SinkWarning() << "Read invalid buffer from disk";
36 } else { 38 } else {
37 Trace() << "Found value " << key; 39 SinkTrace() << "Found value " << key;
38 current = adaptorFactory.createAdaptor(buffer.entity()); 40 current = adaptorFactory.createAdaptor(buffer.entity());
39 retrievedRevision = Sink::Storage::revisionFromKey(key); 41 retrievedRevision = Sink::Storage::revisionFromKey(key);
40 } 42 }
41 return false; 43 return false;
42 }, 44 },
43 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); 45 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; });
44 return current; 46 return current;
45} 47}
46 48
@@ -51,14 +53,14 @@ QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(co
51 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { 53 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool {
52 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 54 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
53 if (!buffer.isValid()) { 55 if (!buffer.isValid()) {
54 Warning() << "Read invalid buffer from disk"; 56 SinkWarning() << "Read invalid buffer from disk";
55 } else { 57 } else {
56 current = adaptorFactory.createAdaptor(buffer.entity()); 58 current = adaptorFactory.createAdaptor(buffer.entity());
57 retrievedRevision = Sink::Storage::revisionFromKey(key); 59 retrievedRevision = Sink::Storage::revisionFromKey(key);
58 } 60 }
59 return false; 61 return false;
60 }, 62 },
61 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); 63 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; });
62 return current; 64 return current;
63} 65}
64 66
@@ -74,7 +76,7 @@ QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPre
74 } 76 }
75 return true; 77 return true;
76 }, 78 },
77 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }, true); 79 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true);
78 return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); 80 return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision);
79} 81}
80 82
@@ -86,7 +88,7 @@ EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QBy
86 mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) 88 mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr)
87{ 89{
88 Q_ASSERT(!resourceType.isEmpty()); 90 Q_ASSERT(!resourceType.isEmpty());
89 Trace() << "resourceType " << resourceType; 91 SinkTrace() << "resourceType " << resourceType;
90 Q_ASSERT(mDomainTypeAdaptorFactoryPtr); 92 Q_ASSERT(mDomainTypeAdaptorFactoryPtr);
91} 93}
92 94
@@ -165,13 +167,13 @@ void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db
165 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); 167 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation);
166 return false; 168 return false;
167 }, 169 },
168 [&](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message << key; }); 170 [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; });
169} 171}
170 172
171static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) 173static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
172{ 174{
173 // TODO use a result set with an iterator, to read values on demand 175 // TODO use a result set with an iterator, to read values on demand
174 Trace() << "Looking for : " << bufferType; 176 SinkTrace() << "Looking for : " << bufferType;
175 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. 177 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
176 QSet<QByteArray> keys; 178 QSet<QByteArray> keys;
177 Storage::mainDatabase(transaction, bufferType) 179 Storage::mainDatabase(transaction, bufferType)
@@ -179,14 +181,14 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction,
179 [&](const QByteArray &key, const QByteArray &value) -> bool { 181 [&](const QByteArray &key, const QByteArray &value) -> bool {
180 if (keys.contains(Sink::Storage::uidFromKey(key))) { 182 if (keys.contains(Sink::Storage::uidFromKey(key))) {
181 //Not something that should persist if the replay works, so we keep a message for now. 183 //Not something that should persist if the replay works, so we keep a message for now.
182 Trace() << "Multiple revisions for key: " << key; 184 SinkTrace() << "Multiple revisions for key: " << key;
183 } 185 }
184 keys << Sink::Storage::uidFromKey(key); 186 keys << Sink::Storage::uidFromKey(key);
185 return true; 187 return true;
186 }, 188 },
187 [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; }); 189 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; });
188 190
189 Trace() << "Full scan retrieved " << keys.size() << " results."; 191 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
190 return ResultSet(keys.toList().toVector()); 192 return ResultSet(keys.toList().toVector());
191} 193}
192 194
@@ -224,7 +226,7 @@ ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision
224 while (*revisionCounter <= topRevision) { 226 while (*revisionCounter <= topRevision) {
225 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); 227 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
226 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); 228 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
227 // Trace() << "Revision" << *revisionCounter << type << uid; 229 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
228 Q_ASSERT(!uid.isEmpty()); 230 Q_ASSERT(!uid.isEmpty());
229 Q_ASSERT(!type.isEmpty()); 231 Q_ASSERT(!type.isEmpty());
230 if (type != bufferType) { 232 if (type != bufferType) {
@@ -236,7 +238,7 @@ ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision
236 *revisionCounter += 1; 238 *revisionCounter += 1;
237 return key; 239 return key;
238 } 240 }
239 Trace() << "Finished reading incremental result set:" << *revisionCounter; 241 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
240 // We're done 242 // We're done
241 return QByteArray(); 243 return QByteArray();
242 }); 244 });
@@ -248,7 +250,7 @@ ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const
248{ 250{
249 const bool sortingRequired = !sortProperty.isEmpty(); 251 const bool sortingRequired = !sortProperty.isEmpty();
250 if (initialQuery && sortingRequired) { 252 if (initialQuery && sortingRequired) {
251 Trace() << "Sorting the resultset in memory according to property: " << sortProperty; 253 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty;
252 // Sort the complete set by reading the sort property and filling into a sorted map 254 // Sort the complete set by reading the sort property and filling into a sorted map
253 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); 255 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
254 while (resultSet.next()) { 256 while (resultSet.next()) {
@@ -271,7 +273,7 @@ ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const
271 }); 273 });
272 } 274 }
273 275
274 Trace() << "Sorted " << sortedMap->size() << " values."; 276 SinkTrace() << "Sorted " << sortedMap->size() << " values.";
275 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); 277 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
276 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( 278 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](
277 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { 279 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
@@ -330,11 +332,11 @@ QPair<qint64, qint64> EntityReader<DomainType>::load(const Sink::Query &query, c
330 QSet<QByteArray> remainingFilters; 332 QSet<QByteArray> remainingFilters;
331 QByteArray remainingSorting; 333 QByteArray remainingSorting;
332 auto resultSet = baseSetRetriever(remainingFilters, remainingSorting); 334 auto resultSet = baseSetRetriever(remainingFilters, remainingSorting);
333 Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); 335 SinkTrace() << "Base set retrieved. " << Log::TraceTime(time.elapsed());
334 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); 336 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting);
335 Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 337 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
336 auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); 338 auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback);
337 // Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); 339 // SinkTrace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed());
338 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); 340 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
339} 341}
340 342
@@ -346,7 +348,7 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::
346 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { 348 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
347 return loadInitialResultSet(query, remainingFilters, remainingSorting); 349 return loadInitialResultSet(query, remainingFilters, remainingSorting);
348 }, true, offset, batchsize, callback); 350 }, true, offset, batchsize, callback);
349 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 351 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
350 return revisionAndReplayedEntities; 352 return revisionAndReplayedEntities;
351} 353}
352 354
@@ -359,7 +361,7 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si
359 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { 361 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
360 return loadIncrementalResultSet(baseRevision, query, remainingFilters); 362 return loadIncrementalResultSet(baseRevision, query, remainingFilters);
361 }, false, 0, 0, callback); 363 }, false, 0, 0, callback);
362 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 364 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
363 return revisionAndReplayedEntities; 365 return revisionAndReplayedEntities;
364} 366}
365 367
@@ -377,7 +379,7 @@ EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, con
377 const auto property = domainObject->getProperty(filterProperty); 379 const auto property = domainObject->getProperty(filterProperty);
378 const auto comparator = query.propertyFilter.value(filterProperty); 380 const auto comparator = query.propertyFilter.value(filterProperty);
379 if (!comparator.matches(property)) { 381 if (!comparator.matches(property)) {
380 Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; 382 SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
381 return false; 383 return false;
382 } 384 }
383 } 385 }
@@ -388,7 +390,7 @@ EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, con
388template <class DomainType> 390template <class DomainType>
389qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) 391qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
390{ 392{
391 Trace() << "Skipping over " << offset << " results"; 393 SinkTrace() << "Skipping over " << offset << " results";
392 resultSet.skip(offset); 394 resultSet.skip(offset);
393 int counter = 0; 395 int counter = 0;
394 while (!batchSize || (counter < batchSize)) { 396 while (!batchSize || (counter < batchSize)) {
@@ -401,7 +403,7 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int
401 break; 403 break;
402 } 404 }
403 }; 405 };
404 Trace() << "Replayed " << counter << " results." 406 SinkTrace() << "Replayed " << counter << " results."
405 << "Limit " << batchSize; 407 << "Limit " << batchSize;
406 return counter; 408 return counter;
407} 409}