summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2018-02-21 15:47:50 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2018-02-21 15:47:50 +0100
commitd2d8a85aa6c56195368f7ec563a98afb6861acd9 (patch)
treebabd7454902b7e3a83b82b04cf5ffca5720aca12 /common
parent17f0dd179e9fd57b41b29abe84e7e49f9a7ac8a8 (diff)
downloadsink-d2d8a85aa6c56195368f7ec563a98afb6861acd9.tar.gz
sink-d2d8a85aa6c56195368f7ec563a98afb6861acd9.zip
Remember aggregated ids
Diffstat (limited to 'common')
-rw-r--r--common/datastorequery.cpp41
-rw-r--r--common/domain/applicationdomaintype.cpp16
-rw-r--r--common/domain/applicationdomaintype.h6
-rw-r--r--common/queryrunner.cpp2
-rw-r--r--common/resultset.h3
5 files changed, 48 insertions, 20 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 50158c7..218796f 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -252,11 +252,18 @@ public:
252 return false; 252 return false;
253 } 253 }
254 254
255 QByteArray reduceOnValue(const QVariant &reductionValue, QMap<QByteArray, QVariant> &aggregateValues) 255 struct ReductionResult {
256 QByteArray selection;
257 QVector<QByteArray> aggregateIds;
258 QMap<QByteArray, QVariant> aggregateValues;
259 };
260
261 ReductionResult reduceOnValue(const QVariant &reductionValue)
256 { 262 {
263 QMap<QByteArray, QVariant> aggregateValues;
257 QVariant selectionResultValue; 264 QVariant selectionResultValue;
258 QByteArray selectionResult; 265 QByteArray selectionResult;
259 auto results = indexLookup(mReductionProperty, reductionValue); 266 const auto results = indexLookup(mReductionProperty, reductionValue);
260 for (auto &aggregator : mAggregators) { 267 for (auto &aggregator : mAggregators) {
261 aggregator.reset(); 268 aggregator.reset();
262 } 269 }
@@ -287,7 +294,7 @@ public:
287 for (auto &aggregator : mAggregators) { 294 for (auto &aggregator : mAggregators) {
288 aggregateValues.insert(aggregator.resultProperty, aggregator.result()); 295 aggregateValues.insert(aggregator.resultProperty, aggregator.result());
289 } 296 }
290 return selectionResult; 297 return {selectionResult, results, aggregateValues};
291 } 298 }
292 299
293 bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { 300 bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE {
@@ -302,12 +309,11 @@ public:
302 if (!mReducedValues.contains(reductionValueBa)) { 309 if (!mReducedValues.contains(reductionValueBa)) {
303 //Only reduce every value once. 310 //Only reduce every value once.
304 mReducedValues.insert(reductionValueBa); 311 mReducedValues.insert(reductionValueBa);
305 QMap<QByteArray, QVariant> aggregateValues; 312 auto reductionResult = reduceOnValue(reductionValue);
306 auto selectionResult = reduceOnValue(reductionValue, aggregateValues);
307 313
308 mSelectedValues.insert(reductionValueBa, selectionResult); 314 mSelectedValues.insert(reductionValueBa, reductionResult.selection);
309 readEntity(selectionResult, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 315 readEntity(reductionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
310 callback({entity, operation, aggregateValues}); 316 callback({entity, operation, reductionResult.aggregateValues, reductionResult.aggregateIds});
311 foundValue = true; 317 foundValue = true;
312 }); 318 });
313 } else { 319 } else {
@@ -317,15 +323,14 @@ public:
317 if (mIncremental && !mIncrementallyReducedValues.contains(reductionValueBa)) { 323 if (mIncremental && !mIncrementallyReducedValues.contains(reductionValueBa)) {
318 mIncrementallyReducedValues.insert(reductionValueBa); 324 mIncrementallyReducedValues.insert(reductionValueBa);
319 //Redo the reduction to find new aggregated values 325 //Redo the reduction to find new aggregated values
320 QMap<QByteArray, QVariant> aggregateValues; 326 auto selectionResult = reduceOnValue(reductionValue);
321 auto selectionResult = reduceOnValue(reductionValue, aggregateValues);
322 327
323 //TODO if old and new are the same a modification would be enough 328 //TODO if old and new are the same a modification would be enough
324 auto oldSelectionResult = mSelectedValues.take(reductionValueBa); 329 auto oldSelectionResult = mSelectedValues.take(reductionValueBa);
325 if (oldSelectionResult == selectionResult) { 330 if (oldSelectionResult == selectionResult.selection) {
326 mSelectedValues.insert(reductionValueBa, selectionResult); 331 mSelectedValues.insert(reductionValueBa, selectionResult.selection);
327 readEntity(selectionResult, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { 332 readEntity(selectionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) {
328 callback({entity, Sink::Operation_Modification, aggregateValues}); 333 callback({entity, Sink::Operation_Modification, selectionResult.aggregateValues, selectionResult.aggregateIds});
329 }); 334 });
330 } else { 335 } else {
331 //remove old result 336 //remove old result
@@ -334,9 +339,9 @@ public:
334 }); 339 });
335 340
336 //add new result 341 //add new result
337 mSelectedValues.insert(reductionValueBa, selectionResult); 342 mSelectedValues.insert(reductionValueBa, selectionResult.selection);
338 readEntity(selectionResult, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { 343 readEntity(selectionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) {
339 callback({entity, Sink::Operation_Creation, aggregateValues}); 344 callback({entity, Sink::Operation_Creation, selectionResult.aggregateValues, selectionResult.aggregateIds});
340 }); 345 });
341 } 346 }
342 } 347 }
@@ -648,7 +653,7 @@ ResultSet DataStoreQuery::execute()
648 if (mCollector->next([this, callback](const ResultSet::Result &result) { 653 if (mCollector->next([this, callback](const ResultSet::Result &result) {
649 if (result.operation != Sink::Operation_Removal) { 654 if (result.operation != Sink::Operation_Removal) {
650 SinkTraceCtx(mLogCtx) << "Got initial result: " << result.entity.identifier() << result.operation; 655 SinkTraceCtx(mLogCtx) << "Got initial result: " << result.entity.identifier() << result.operation;
651 callback(ResultSet::Result{result.entity, Sink::Operation_Creation, result.aggregateValues}); 656 callback(ResultSet::Result{result.entity, Sink::Operation_Creation, result.aggregateValues, result.aggregateIds});
652 } 657 }
653 })) 658 }))
654 { 659 {
diff --git a/common/domain/applicationdomaintype.cpp b/common/domain/applicationdomaintype.cpp
index 8bb74e3..c315e33 100644
--- a/common/domain/applicationdomaintype.cpp
+++ b/common/domain/applicationdomaintype.cpp
@@ -184,6 +184,7 @@ ApplicationDomainType& ApplicationDomainType::operator=(const ApplicationDomainT
184 mResourceInstanceIdentifier = other.mResourceInstanceIdentifier; 184 mResourceInstanceIdentifier = other.mResourceInstanceIdentifier;
185 mIdentifier = other.mIdentifier; 185 mIdentifier = other.mIdentifier;
186 mRevision = other.mRevision; 186 mRevision = other.mRevision;
187 mAggreatedIds = other.mAggreatedIds;
187 return *this; 188 return *this;
188} 189}
189 190
@@ -262,6 +263,21 @@ QByteArray ApplicationDomainType::identifier() const
262 return mIdentifier; 263 return mIdentifier;
263} 264}
264 265
266bool ApplicationDomainType::isAggregate() const
267{
268 return !mAggreatedIds.isEmpty();
269}
270
271QVector<QByteArray> ApplicationDomainType::aggregatedIds() const
272{
273 return mAggreatedIds;
274}
275
276QVector<QByteArray> &ApplicationDomainType::aggregatedIds()
277{
278 return mAggreatedIds;
279}
280
265SinkResource::SinkResource(const QByteArray &identifier) 281SinkResource::SinkResource(const QByteArray &identifier)
266 : ApplicationDomainType("", identifier, 0, QSharedPointer<BufferAdaptor>(new MemoryBufferAdaptor())) 282 : ApplicationDomainType("", identifier, 0, QSharedPointer<BufferAdaptor>(new MemoryBufferAdaptor()))
267{ 283{
diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h
index b4db54e..dcd401c 100644
--- a/common/domain/applicationdomaintype.h
+++ b/common/domain/applicationdomaintype.h
@@ -268,6 +268,10 @@ public:
268 void setResource(const QByteArray &identifier); 268 void setResource(const QByteArray &identifier);
269 QByteArray identifier() const; 269 QByteArray identifier() const;
270 270
271 bool isAggregate() const;
272 QVector<QByteArray> aggregatedIds() const;
273 QVector<QByteArray> &aggregatedIds();
274
271private: 275private:
272 friend QDebug operator<<(QDebug, const ApplicationDomainType &); 276 friend QDebug operator<<(QDebug, const ApplicationDomainType &);
273 QSharedPointer<BufferAdaptor> mAdaptor; 277 QSharedPointer<BufferAdaptor> mAdaptor;
@@ -278,6 +282,8 @@ private:
278 QByteArray mResourceInstanceIdentifier; 282 QByteArray mResourceInstanceIdentifier;
279 QByteArray mIdentifier; 283 QByteArray mIdentifier;
280 qint64 mRevision; 284 qint64 mRevision;
285
286 QVector<QByteArray> mAggreatedIds;
281}; 287};
282 288
283/* 289/*
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 0ed4cb5..2062828 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -45,7 +45,6 @@ struct ReplayResult {
45template <typename DomainType> 45template <typename DomainType>
46class QueryWorker : public QObject 46class QueryWorker : public QObject
47{ 47{
48 typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues)> ResultCallback;
49public: 48public:
50 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); 49 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx);
51 virtual ~QueryWorker(); 50 virtual ~QueryWorker();
@@ -210,6 +209,7 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S
210 for (auto it = result.aggregateValues.constBegin(); it != result.aggregateValues.constEnd(); it++) { 209 for (auto it = result.aggregateValues.constBegin(); it != result.aggregateValues.constEnd(); it++) {
211 valueCopy->setProperty(it.key(), it.value()); 210 valueCopy->setProperty(it.key(), it.value());
212 } 211 }
212 valueCopy->aggregatedIds() = result.aggregateIds;
213 if (mResultTransformation) { 213 if (mResultTransformation) {
214 mResultTransformation(*valueCopy); 214 mResultTransformation(*valueCopy);
215 } 215 }
diff --git a/common/resultset.h b/common/resultset.h
index 707bc7e..5587c54 100644
--- a/common/resultset.h
+++ b/common/resultset.h
@@ -35,10 +35,11 @@ class ResultSet
35{ 35{
36public: 36public:
37 struct Result { 37 struct Result {
38 Result(const Sink::ApplicationDomain::ApplicationDomainType &e, Sink::Operation op, const QMap<QByteArray, QVariant> &v = QMap<QByteArray, QVariant>{}) : entity(e), operation(op), aggregateValues(v) {} 38 Result(const Sink::ApplicationDomain::ApplicationDomainType &e, Sink::Operation op, const QMap<QByteArray, QVariant> &v = {}, const QVector<QByteArray> &a = {}) : entity(e), operation(op), aggregateValues(v), aggregateIds(a) {}
39 Sink::ApplicationDomain::ApplicationDomainType entity; 39 Sink::ApplicationDomain::ApplicationDomainType entity;
40 Sink::Operation operation; 40 Sink::Operation operation;
41 QMap<QByteArray, QVariant> aggregateValues; 41 QMap<QByteArray, QVariant> aggregateValues;
42 QVector<QByteArray> aggregateIds;
42 }; 43 };
43 typedef std::function<void(const Result &)> Callback; 44 typedef std::function<void(const Result &)> Callback;
44 typedef std::function<bool(Callback)> ValueGenerator; 45 typedef std::function<bool(Callback)> ValueGenerator;