summaryrefslogtreecommitdiffstats
path: root/common/datastorequery.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-10 10:26:22 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-10 10:26:22 +0200
commit816c0cd6182b1f2cf552fc6ed47139ed7a973dac (patch)
tree83033af514622d35e0b72300b8af7d57cb7eca5f /common/datastorequery.cpp
parent082029f3dedadaceef1524fec214e053b0a26d10 (diff)
downloadsink-816c0cd6182b1f2cf552fc6ed47139ed7a973dac.tar.gz
sink-816c0cd6182b1f2cf552fc6ed47139ed7a973dac.zip
Support for generic aggregations.
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r--common/datastorequery.cpp66
1 files changed, 58 insertions, 8 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 8b14951..7b7d3a3 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -150,11 +150,47 @@ class Reduce : public FilterBase {
150public: 150public:
151 typedef QSharedPointer<Reduce> Ptr; 151 typedef QSharedPointer<Reduce> Ptr;
152 152
153 struct Aggregator {
154 Query::Reduce::Aggregator::Operation operation;
155 QByteArray property;
156 QByteArray resultProperty;
157
158 Aggregator(Query::Reduce::Aggregator::Operation o, const QByteArray &property_, const QByteArray &resultProperty_)
159 : operation(o), property(property_), resultProperty(resultProperty_)
160 {
161
162 }
163
164 void process() {
165 if (operation == Query::Reduce::Aggregator::Count) {
166 mResult = mResult.toInt() + 1;
167 } else {
168 Q_ASSERT(false);
169 }
170 }
171
172 void process(const QVariant &value) {
173 if (operation == Query::Reduce::Aggregator::Collect) {
174 mResult = mResult.toList() << value;
175 } else {
176 Q_ASSERT(false);
177 }
178 }
179
180 QVariant result() const
181 {
182 return mResult;
183 }
184 private:
185 QVariant mResult;
186 };
187
153 QHash<QByteArray, QVariant> mAggregateValues; 188 QHash<QByteArray, QVariant> mAggregateValues;
154 QSet<QByteArray> mReducedValues; 189 QSet<QByteArray> mReducedValues;
155 QByteArray mReductionProperty; 190 QByteArray mReductionProperty;
156 QByteArray mSelectionProperty; 191 QByteArray mSelectionProperty;
157 Query::Reduce::Selector::Comparator mSelectionComparator; 192 Query::Reduce::Selector::Comparator mSelectionComparator;
193 QList<Aggregator> mAggregators;
158 194
159 Reduce(const QByteArray &reductionProperty, const QByteArray &selectionProperty, Query::Reduce::Selector::Comparator comparator, FilterBase::Ptr source, DataStoreQuery *store) 195 Reduce(const QByteArray &reductionProperty, const QByteArray &selectionProperty, Query::Reduce::Selector::Comparator comparator, FilterBase::Ptr source, DataStoreQuery *store)
160 : FilterBase(source, store), 196 : FilterBase(source, store),
@@ -167,8 +203,7 @@ public:
167 203
168 virtual ~Reduce(){} 204 virtual ~Reduce(){}
169 205
170 static QByteArray getByteArray(const QVariant &value) 206 static QByteArray getByteArray(const QVariant &value) {
171 {
172 if (value.type() == QVariant::DateTime) { 207 if (value.type() == QVariant::DateTime) {
173 return value.toDateTime().toString().toLatin1(); 208 return value.toDateTime().toString().toLatin1();
174 } 209 }
@@ -178,8 +213,7 @@ public:
178 return QByteArray(); 213 return QByteArray();
179 } 214 }
180 215
181 static bool compare(const QVariant &left, const QVariant &right, Query::Reduce::Selector::Comparator comparator) 216 static bool compare(const QVariant &left, const QVariant &right, Query::Reduce::Selector::Comparator comparator) {
182 {
183 if (comparator == Query::Reduce::Selector::Max) { 217 if (comparator == Query::Reduce::Selector::Max) {
184 return left > right; 218 return left > right;
185 } 219 }
@@ -196,8 +230,17 @@ public:
196 QVariant selectionResultValue; 230 QVariant selectionResultValue;
197 QByteArray selectionResult; 231 QByteArray selectionResult;
198 auto results = indexLookup(mReductionProperty, reductionValue); 232 auto results = indexLookup(mReductionProperty, reductionValue);
233
234 QVariantList list;
199 for (const auto r : results) { 235 for (const auto r : results) {
200 readEntity(r, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { 236 readEntity(r, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) {
237 for (auto &aggregator : mAggregators) {
238 if (!aggregator.property.isEmpty()) {
239 aggregator.process(getProperty(entityBuffer.entity(), aggregator.property));
240 } else {
241 aggregator.process();
242 }
243 }
201 auto selectionValue = getProperty(entityBuffer.entity(), mSelectionProperty); 244 auto selectionValue = getProperty(entityBuffer.entity(), mSelectionProperty);
202 if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { 245 if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) {
203 selectionResultValue = selectionValue; 246 selectionResultValue = selectionValue;
@@ -205,10 +248,13 @@ public:
205 } 248 }
206 }); 249 });
207 } 250 }
208 int count = results.size(); 251
252 QMap<QByteArray, QVariant> aggregateValues;
253 for (auto &aggregator : mAggregators) {
254 aggregateValues.insert(aggregator.resultProperty, aggregator.result());
255 }
256
209 readEntity(selectionResult, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { 257 readEntity(selectionResult, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) {
210 QMap<QByteArray, QVariant> aggregateValues;
211 aggregateValues.insert("count", count);
212 callback({uid, entityBuffer, Sink::Operation_Creation, aggregateValues}); 258 callback({uid, entityBuffer, Sink::Operation_Creation, aggregateValues});
213 foundValue = true; 259 foundValue = true;
214 }); 260 });
@@ -462,7 +508,11 @@ void DataStoreQuery::setupQuery()
462 f->propertyFilter = filter->propertyFilter; 508 f->propertyFilter = filter->propertyFilter;
463 baseSet = f; 509 baseSet = f;
464 } else if (auto filter = stage.dynamicCast<Query::Reduce>()) { 510 } else if (auto filter = stage.dynamicCast<Query::Reduce>()) {
465 baseSet = Reduce::Ptr::create(filter->property, filter->selector.property, filter->selector.comparator, baseSet, this); 511 auto reduction = Reduce::Ptr::create(filter->property, filter->selector.property, filter->selector.comparator, baseSet, this);
512 for (const auto &aggregator : filter->aggregators) {
513 reduction->mAggregators << Reduce::Aggregator(aggregator.operation, aggregator.propertyToCollect, aggregator.resultProperty);
514 }
515 baseSet = reduction;
466 } else if (auto filter = stage.dynamicCast<Query::Bloom>()) { 516 } else if (auto filter = stage.dynamicCast<Query::Bloom>()) {
467 baseSet = Bloom::Ptr::create(filter->property, baseSet, this); 517 baseSet = Bloom::Ptr::create(filter->property, baseSet, this);
468 } 518 }