diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-06 17:52:52 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-06 17:52:52 +0200 |
commit | 87695f52d5ac627cdd710f37c275fccdf920af0b (patch) | |
tree | 733a7e66fafd3a0ae747b050427f2d7762bde793 /common/datastorequery.cpp | |
parent | f1e496f7c12ebc787ed47a4c048015f2098e65d9 (diff) | |
download | sink-87695f52d5ac627cdd710f37c275fccdf920af0b.tar.gz sink-87695f52d5ac627cdd710f37c275fccdf920af0b.zip |
count as a first aggregation function
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r-- | common/datastorequery.cpp | 65 |
1 files changed, 34 insertions, 31 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 7341675..8b14951 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -62,13 +62,13 @@ class Source : public FilterBase { | |||
62 | mIt = mIds.constBegin(); | 62 | mIt = mIds.constBegin(); |
63 | } | 63 | } |
64 | 64 | ||
65 | bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE | 65 | bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE |
66 | { | 66 | { |
67 | if (mIt == mIds.constEnd()) { | 67 | if (mIt == mIds.constEnd()) { |
68 | return false; | 68 | return false; |
69 | } | 69 | } |
70 | readEntity(*mIt, [callback](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { | 70 | readEntity(*mIt, [callback](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { |
71 | callback(entityBuffer.operation(), uid, entityBuffer); | 71 | callback({uid, entityBuffer, entityBuffer.operation()}); |
72 | }); | 72 | }); |
73 | mIt++; | 73 | mIt++; |
74 | return mIt != mIds.constEnd(); | 74 | return mIt != mIds.constEnd(); |
@@ -86,7 +86,7 @@ public: | |||
86 | } | 86 | } |
87 | virtual ~Collector(){} | 87 | virtual ~Collector(){} |
88 | 88 | ||
89 | bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE | 89 | bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE |
90 | { | 90 | { |
91 | return mSource->next(callback); | 91 | return mSource->next(callback); |
92 | } | 92 | } |
@@ -106,26 +106,26 @@ public: | |||
106 | 106 | ||
107 | virtual ~Filter(){} | 107 | virtual ~Filter(){} |
108 | 108 | ||
109 | bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE { | 109 | bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { |
110 | bool foundValue = false; | 110 | bool foundValue = false; |
111 | while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { | 111 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { |
112 | SinkTrace() << "Filter: " << uid << operation; | 112 | SinkTrace() << "Filter: " << result.uid << result.operation; |
113 | 113 | ||
114 | //Always accept removals. They can't match the filter since the data is gone. | 114 | //Always accept removals. They can't match the filter since the data is gone. |
115 | if (operation == Sink::Operation_Removal) { | 115 | if (result.operation == Sink::Operation_Removal) { |
116 | SinkTrace() << "Removal: " << uid << operation; | 116 | SinkTrace() << "Removal: " << result.uid << result.operation; |
117 | callback(operation, uid, entityBuffer); | 117 | callback(result); |
118 | foundValue = true; | 118 | foundValue = true; |
119 | } else if (matchesFilter(uid, entityBuffer)) { | 119 | } else if (matchesFilter(result.uid, result.buffer)) { |
120 | SinkTrace() << "Accepted: " << uid << operation; | 120 | SinkTrace() << "Accepted: " << result.uid << result.operation; |
121 | callback(operation, uid, entityBuffer); | 121 | callback(result); |
122 | foundValue = true; | 122 | foundValue = true; |
123 | //TODO if something did not match the filter so far but does now, turn into an add operation. | 123 | //TODO if something did not match the filter so far but does now, turn into an add operation. |
124 | } else { | 124 | } else { |
125 | SinkTrace() << "Rejected: " << uid << operation; | 125 | SinkTrace() << "Rejected: " << result.uid << result.operation; |
126 | //TODO emit a removal if we had the uid in the result set and this is a modification. | 126 | //TODO emit a removal if we had the uid in the result set and this is a modification. |
127 | //We don't know if this results in a removal from the dataset, so we emit a removal notification anyways | 127 | //We don't know if this results in a removal from the dataset, so we emit a removal notification anyways |
128 | callback(Sink::Operation_Removal, uid, entityBuffer); | 128 | callback({result.uid, result.buffer, Sink::Operation_Removal, result.aggregateValues}); |
129 | } | 129 | } |
130 | return false; | 130 | return false; |
131 | })) | 131 | })) |
@@ -186,10 +186,10 @@ public: | |||
186 | return false; | 186 | return false; |
187 | } | 187 | } |
188 | 188 | ||
189 | bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE { | 189 | bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { |
190 | bool foundValue = false; | 190 | bool foundValue = false; |
191 | while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { | 191 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { |
192 | auto reductionValue = getProperty(entityBuffer.entity(), mReductionProperty); | 192 | auto reductionValue = getProperty(result.buffer.entity(), mReductionProperty); |
193 | if (!mReducedValues.contains(getByteArray(reductionValue))) { | 193 | if (!mReducedValues.contains(getByteArray(reductionValue))) { |
194 | //Only reduce every value once. | 194 | //Only reduce every value once. |
195 | mReducedValues.insert(getByteArray(reductionValue)); | 195 | mReducedValues.insert(getByteArray(reductionValue)); |
@@ -205,8 +205,11 @@ public: | |||
205 | } | 205 | } |
206 | }); | 206 | }); |
207 | } | 207 | } |
208 | int count = results.size(); | ||
208 | readEntity(selectionResult, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { | 209 | readEntity(selectionResult, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { |
209 | callback(Sink::Operation_Creation, uid, entityBuffer); | 210 | QMap<QByteArray, QVariant> aggregateValues; |
211 | aggregateValues.insert("count", count); | ||
212 | callback({uid, entityBuffer, Sink::Operation_Creation, aggregateValues}); | ||
210 | foundValue = true; | 213 | foundValue = true; |
211 | }); | 214 | }); |
212 | } | 215 | } |
@@ -232,14 +235,14 @@ public: | |||
232 | 235 | ||
233 | virtual ~Bloom(){} | 236 | virtual ~Bloom(){} |
234 | 237 | ||
235 | bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE { | 238 | bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { |
236 | bool foundValue = false; | 239 | bool foundValue = false; |
237 | while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { | 240 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { |
238 | auto bloomValue = getProperty(entityBuffer.entity(), mBloomProperty); | 241 | auto bloomValue = getProperty(result.buffer.entity(), mBloomProperty); |
239 | auto results = indexLookup(mBloomProperty, bloomValue); | 242 | auto results = indexLookup(mBloomProperty, bloomValue); |
240 | for (const auto r : results) { | 243 | for (const auto r : results) { |
241 | readEntity(r, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { | 244 | readEntity(r, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { |
242 | callback(Sink::Operation_Creation, uid, entityBuffer); | 245 | callback({uid, entityBuffer, Sink::Operation_Creation}); |
243 | foundValue = true; | 246 | foundValue = true; |
244 | }); | 247 | }); |
245 | } | 248 | } |
@@ -398,8 +401,8 @@ QByteArrayList DataStoreQuery::executeSubquery(const Query &subquery) | |||
398 | auto sub = prepareQuery(subquery.type, subquery, mTransaction); | 401 | auto sub = prepareQuery(subquery.type, subquery, mTransaction); |
399 | auto result = sub->execute(); | 402 | auto result = sub->execute(); |
400 | QByteArrayList ids; | 403 | QByteArrayList ids; |
401 | while (result.next([&ids](const QByteArray &uid, const Sink::EntityBuffer &, Sink::Operation) { | 404 | while (result.next([&ids](const ResultSet::Result &result) { |
402 | ids << uid; | 405 | ids << result.uid; |
403 | })) | 406 | })) |
404 | {} | 407 | {} |
405 | return ids; | 408 | return ids; |
@@ -502,9 +505,9 @@ ResultSet DataStoreQuery::update(qint64 baseRevision) | |||
502 | SinkTrace() << "Changed: " << incrementalResultSet; | 505 | SinkTrace() << "Changed: " << incrementalResultSet; |
503 | mSource->add(incrementalResultSet); | 506 | mSource->add(incrementalResultSet); |
504 | ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { | 507 | ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { |
505 | if (mCollector->next([this, callback](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &buffer) { | 508 | if (mCollector->next([this, callback](const ResultSet::Result &result) { |
506 | SinkTrace() << "Got incremental result: " << uid << operation; | 509 | SinkTrace() << "Got incremental result: " << result.uid << result.operation; |
507 | callback(uid, buffer, operation); | 510 | callback(result); |
508 | })) | 511 | })) |
509 | { | 512 | { |
510 | return true; | 513 | return true; |
@@ -520,10 +523,10 @@ ResultSet DataStoreQuery::execute() | |||
520 | SinkTrace() << "Executing query"; | 523 | SinkTrace() << "Executing query"; |
521 | 524 | ||
522 | ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { | 525 | ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { |
523 | if (mCollector->next([this, callback](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &buffer) { | 526 | if (mCollector->next([this, callback](const ResultSet::Result &result) { |
524 | if (operation != Sink::Operation_Removal) { | 527 | if (result.operation != Sink::Operation_Removal) { |
525 | SinkTrace() << "Got initial result: " << uid << operation; | 528 | SinkTrace() << "Got initial result: " << result.uid << result.operation; |
526 | callback(uid, buffer, Sink::Operation_Creation); | 529 | callback(ResultSet::Result{result.uid, result.buffer, Sink::Operation_Creation, result.aggregateValues}); |
527 | } | 530 | } |
528 | })) | 531 | })) |
529 | { | 532 | { |