summaryrefslogtreecommitdiffstats
path: root/common/datastorequery.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-20 17:18:21 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-20 17:18:21 +0200
commitebc5c48c03b6145e604da7c313b35321d0a71142 (patch)
tree1cee00a9fa4faa4995c0a50f01703ac5672c8797 /common/datastorequery.cpp
parent4a14a6fade947aa830d3f21598a4a6ba7316b933 (diff)
downloadsink-ebc5c48c03b6145e604da7c313b35321d0a71142.tar.gz
sink-ebc5c48c03b6145e604da7c313b35321d0a71142.zip
A first draft of the threading algorithm.
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r--common/datastorequery.cpp55
1 files changed, 43 insertions, 12 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 3237c53..cc070be 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -122,17 +122,17 @@ QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArra
122 return mGetProperty(entity, property); 122 return mGetProperty(entity, property);
123} 123}
124 124
125ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty) 125ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty)
126{ 126{
127 const bool sortingRequired = !sortProperty.isEmpty(); 127 const bool sortingRequired = !sortProperty.isEmpty();
128 if (initialQuery && sortingRequired) { 128 if (mInitialQuery && sortingRequired) {
129 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; 129 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty;
130 // Sort the complete set by reading the sort property and filling into a sorted map 130 // Sort the complete set by reading the sort property and filling into a sorted map
131 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); 131 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
132 while (resultSet.next()) { 132 while (resultSet.next()) {
133 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) 133 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
134 readEntity(resultSet.id(), 134 readEntity(resultSet.id(),
135 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { 135 [this, filter, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
136 136
137 const auto operation = buffer.operation(); 137 const auto operation = buffer.operation();
138 138
@@ -154,10 +154,10 @@ ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFun
154 154
155 SinkTrace() << "Sorted " << sortedMap->size() << " values."; 155 SinkTrace() << "Sorted " << sortedMap->size() << " values.";
156 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); 156 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
157 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter, initialQuery]( 157 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter](
158 std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool { 158 std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool {
159 if (iterator->hasNext()) { 159 if (iterator->hasNext()) {
160 readEntity(iterator->next().value(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { 160 readEntity(iterator->next().value(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
161 callback(uid, buffer, Sink::Operation_Creation); 161 callback(uid, buffer, Sink::Operation_Creation);
162 }); 162 });
163 return true; 163 return true;
@@ -173,13 +173,13 @@ ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFun
173 return ResultSet(generator, skip); 173 return ResultSet(generator, skip);
174 } else { 174 } else {
175 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); 175 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
176 ResultSet::ValueGenerator generator = [this, resultSetPtr, filter, initialQuery](const ResultSet::Callback &callback) -> bool { 176 ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool {
177 if (resultSetPtr->next()) { 177 if (resultSetPtr->next()) {
178 SinkTrace() << "Reading the next value: " << resultSetPtr->id(); 178 SinkTrace() << "Reading the next value: " << resultSetPtr->id();
179 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) 179 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
180 readEntity(resultSetPtr->id(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { 180 readEntity(resultSetPtr->id(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
181 const auto operation = buffer.operation(); 181 const auto operation = buffer.operation();
182 if (initialQuery) { 182 if (mInitialQuery) {
183 // We're not interested in removals during the initial query 183 // We're not interested in removals during the initial query
184 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { 184 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
185 // In the initial set every entity is new 185 // In the initial set every entity is new
@@ -225,22 +225,53 @@ DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet<QByteArray>
225 }; 225 };
226} 226}
227 227
228ResultSet DataStoreQuery::createFilteredSet(ResultSet &resultSet, const std::function<bool(const QByteArray &, const Sink::EntityBuffer &buffer)> &filter)
229{
230 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
231 ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool {
232 return resultSetPtr->next([=](const QByteArray &uid, const Sink::EntityBuffer &buffer, Sink::Operation operation) {
233 if (mInitialQuery) {
234 // We're not interested in removals during the initial query
235 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
236 // In the initial set every entity is new
237 callback(uid, buffer, Sink::Operation_Creation);
238 }
239 } else {
240 // Always remove removals, they probably don't match due to non-available properties
241 if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) {
242 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
243 callback(uid, buffer, operation);
244 }
245 }
246 });
247 };
248 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
249 return ResultSet(generator, skip);
250}
251
252ResultSet DataStoreQuery::postSortFilter(ResultSet &resultSet)
253{
254 return resultSet;
255}
256
228ResultSet DataStoreQuery::update(qint64 baseRevision) 257ResultSet DataStoreQuery::update(qint64 baseRevision)
229{ 258{
230 SinkTrace() << "Executing query update"; 259 SinkTrace() << "Executing query update";
260 mInitialQuery = false;
231 QSet<QByteArray> remainingFilters; 261 QSet<QByteArray> remainingFilters;
232 QByteArray remainingSorting; 262 QByteArray remainingSorting;
233 auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters); 263 auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters);
234 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), false, remainingSorting); 264 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), remainingSorting);
235 return filteredSet; 265 return postSortFilter(filteredSet);
236} 266}
237 267
238ResultSet DataStoreQuery::execute() 268ResultSet DataStoreQuery::execute()
239{ 269{
240 SinkTrace() << "Executing query"; 270 SinkTrace() << "Executing query";
271 mInitialQuery = true;
241 QSet<QByteArray> remainingFilters; 272 QSet<QByteArray> remainingFilters;
242 QByteArray remainingSorting; 273 QByteArray remainingSorting;
243 auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting); 274 auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting);
244 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), true, remainingSorting); 275 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), remainingSorting);
245 return filteredSet; 276 return postSortFilter(filteredSet);
246} 277}