summaryrefslogtreecommitdiffstats
path: root/common/datastorequery.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-06 17:52:52 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-06 17:52:52 +0200
commit87695f52d5ac627cdd710f37c275fccdf920af0b (patch)
tree733a7e66fafd3a0ae747b050427f2d7762bde793 /common/datastorequery.cpp
parentf1e496f7c12ebc787ed47a4c048015f2098e65d9 (diff)
downloadsink-87695f52d5ac627cdd710f37c275fccdf920af0b.tar.gz
sink-87695f52d5ac627cdd710f37c275fccdf920af0b.zip
count as a first aggregation function
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r--common/datastorequery.cpp65
1 files changed, 34 insertions, 31 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 7341675..8b14951 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -62,13 +62,13 @@ class Source : public FilterBase {
62 mIt = mIds.constBegin(); 62 mIt = mIds.constBegin();
63 } 63 }
64 64
65 bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE 65 bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE
66 { 66 {
67 if (mIt == mIds.constEnd()) { 67 if (mIt == mIds.constEnd()) {
68 return false; 68 return false;
69 } 69 }
70 readEntity(*mIt, [callback](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { 70 readEntity(*mIt, [callback](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) {
71 callback(entityBuffer.operation(), uid, entityBuffer); 71 callback({uid, entityBuffer, entityBuffer.operation()});
72 }); 72 });
73 mIt++; 73 mIt++;
74 return mIt != mIds.constEnd(); 74 return mIt != mIds.constEnd();
@@ -86,7 +86,7 @@ public:
86 } 86 }
87 virtual ~Collector(){} 87 virtual ~Collector(){}
88 88
89 bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE 89 bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE
90 { 90 {
91 return mSource->next(callback); 91 return mSource->next(callback);
92 } 92 }
@@ -106,26 +106,26 @@ public:
106 106
107 virtual ~Filter(){} 107 virtual ~Filter(){}
108 108
109 bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE { 109 bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE {
110 bool foundValue = false; 110 bool foundValue = false;
111 while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { 111 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
112 SinkTrace() << "Filter: " << uid << operation; 112 SinkTrace() << "Filter: " << result.uid << result.operation;
113 113
114 //Always accept removals. They can't match the filter since the data is gone. 114 //Always accept removals. They can't match the filter since the data is gone.
115 if (operation == Sink::Operation_Removal) { 115 if (result.operation == Sink::Operation_Removal) {
116 SinkTrace() << "Removal: " << uid << operation; 116 SinkTrace() << "Removal: " << result.uid << result.operation;
117 callback(operation, uid, entityBuffer); 117 callback(result);
118 foundValue = true; 118 foundValue = true;
119 } else if (matchesFilter(uid, entityBuffer)) { 119 } else if (matchesFilter(result.uid, result.buffer)) {
120 SinkTrace() << "Accepted: " << uid << operation; 120 SinkTrace() << "Accepted: " << result.uid << result.operation;
121 callback(operation, uid, entityBuffer); 121 callback(result);
122 foundValue = true; 122 foundValue = true;
123 //TODO if something did not match the filter so far but does now, turn into an add operation. 123 //TODO if something did not match the filter so far but does now, turn into an add operation.
124 } else { 124 } else {
125 SinkTrace() << "Rejected: " << uid << operation; 125 SinkTrace() << "Rejected: " << result.uid << result.operation;
126 //TODO emit a removal if we had the uid in the result set and this is a modification. 126 //TODO emit a removal if we had the uid in the result set and this is a modification.
127 //We don't know if this results in a removal from the dataset, so we emit a removal notification anyways 127 //We don't know if this results in a removal from the dataset, so we emit a removal notification anyways
128 callback(Sink::Operation_Removal, uid, entityBuffer); 128 callback({result.uid, result.buffer, Sink::Operation_Removal, result.aggregateValues});
129 } 129 }
130 return false; 130 return false;
131 })) 131 }))
@@ -186,10 +186,10 @@ public:
186 return false; 186 return false;
187 } 187 }
188 188
189 bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE { 189 bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE {
190 bool foundValue = false; 190 bool foundValue = false;
191 while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { 191 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
192 auto reductionValue = getProperty(entityBuffer.entity(), mReductionProperty); 192 auto reductionValue = getProperty(result.buffer.entity(), mReductionProperty);
193 if (!mReducedValues.contains(getByteArray(reductionValue))) { 193 if (!mReducedValues.contains(getByteArray(reductionValue))) {
194 //Only reduce every value once. 194 //Only reduce every value once.
195 mReducedValues.insert(getByteArray(reductionValue)); 195 mReducedValues.insert(getByteArray(reductionValue));
@@ -205,8 +205,11 @@ public:
205 } 205 }
206 }); 206 });
207 } 207 }
208 int count = results.size();
208 readEntity(selectionResult, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { 209 readEntity(selectionResult, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) {
209 callback(Sink::Operation_Creation, uid, entityBuffer); 210 QMap<QByteArray, QVariant> aggregateValues;
211 aggregateValues.insert("count", count);
212 callback({uid, entityBuffer, Sink::Operation_Creation, aggregateValues});
210 foundValue = true; 213 foundValue = true;
211 }); 214 });
212 } 215 }
@@ -232,14 +235,14 @@ public:
232 235
233 virtual ~Bloom(){} 236 virtual ~Bloom(){}
234 237
235 bool next(const std::function<void(Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> &callback) Q_DECL_OVERRIDE { 238 bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE {
236 bool foundValue = false; 239 bool foundValue = false;
237 while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { 240 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
238 auto bloomValue = getProperty(entityBuffer.entity(), mBloomProperty); 241 auto bloomValue = getProperty(result.buffer.entity(), mBloomProperty);
239 auto results = indexLookup(mBloomProperty, bloomValue); 242 auto results = indexLookup(mBloomProperty, bloomValue);
240 for (const auto r : results) { 243 for (const auto r : results) {
241 readEntity(r, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { 244 readEntity(r, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) {
242 callback(Sink::Operation_Creation, uid, entityBuffer); 245 callback({uid, entityBuffer, Sink::Operation_Creation});
243 foundValue = true; 246 foundValue = true;
244 }); 247 });
245 } 248 }
@@ -398,8 +401,8 @@ QByteArrayList DataStoreQuery::executeSubquery(const Query &subquery)
398 auto sub = prepareQuery(subquery.type, subquery, mTransaction); 401 auto sub = prepareQuery(subquery.type, subquery, mTransaction);
399 auto result = sub->execute(); 402 auto result = sub->execute();
400 QByteArrayList ids; 403 QByteArrayList ids;
401 while (result.next([&ids](const QByteArray &uid, const Sink::EntityBuffer &, Sink::Operation) { 404 while (result.next([&ids](const ResultSet::Result &result) {
402 ids << uid; 405 ids << result.uid;
403 })) 406 }))
404 {} 407 {}
405 return ids; 408 return ids;
@@ -502,9 +505,9 @@ ResultSet DataStoreQuery::update(qint64 baseRevision)
502 SinkTrace() << "Changed: " << incrementalResultSet; 505 SinkTrace() << "Changed: " << incrementalResultSet;
503 mSource->add(incrementalResultSet); 506 mSource->add(incrementalResultSet);
504 ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { 507 ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool {
505 if (mCollector->next([this, callback](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &buffer) { 508 if (mCollector->next([this, callback](const ResultSet::Result &result) {
506 SinkTrace() << "Got incremental result: " << uid << operation; 509 SinkTrace() << "Got incremental result: " << result.uid << result.operation;
507 callback(uid, buffer, operation); 510 callback(result);
508 })) 511 }))
509 { 512 {
510 return true; 513 return true;
@@ -520,10 +523,10 @@ ResultSet DataStoreQuery::execute()
520 SinkTrace() << "Executing query"; 523 SinkTrace() << "Executing query";
521 524
522 ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { 525 ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool {
523 if (mCollector->next([this, callback](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &buffer) { 526 if (mCollector->next([this, callback](const ResultSet::Result &result) {
524 if (operation != Sink::Operation_Removal) { 527 if (result.operation != Sink::Operation_Removal) {
525 SinkTrace() << "Got initial result: " << uid << operation; 528 SinkTrace() << "Got initial result: " << result.uid << result.operation;
526 callback(uid, buffer, Sink::Operation_Creation); 529 callback(ResultSet::Result{result.uid, result.buffer, Sink::Operation_Creation, result.aggregateValues});
527 } 530 }
528 })) 531 }))
529 { 532 {