diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-02-21 15:47:50 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-02-21 15:47:50 +0100 |
commit | d2d8a85aa6c56195368f7ec563a98afb6861acd9 (patch) | |
tree | babd7454902b7e3a83b82b04cf5ffca5720aca12 /common | |
parent | 17f0dd179e9fd57b41b29abe84e7e49f9a7ac8a8 (diff) | |
download | sink-d2d8a85aa6c56195368f7ec563a98afb6861acd9.tar.gz sink-d2d8a85aa6c56195368f7ec563a98afb6861acd9.zip |
Remember aggregated ids
Diffstat (limited to 'common')
-rw-r--r-- | common/datastorequery.cpp | 41 | ||||
-rw-r--r-- | common/domain/applicationdomaintype.cpp | 16 | ||||
-rw-r--r-- | common/domain/applicationdomaintype.h | 6 | ||||
-rw-r--r-- | common/queryrunner.cpp | 2 | ||||
-rw-r--r-- | common/resultset.h | 3 |
5 files changed, 48 insertions, 20 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 50158c7..218796f 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -252,11 +252,18 @@ public: | |||
252 | return false; | 252 | return false; |
253 | } | 253 | } |
254 | 254 | ||
255 | QByteArray reduceOnValue(const QVariant &reductionValue, QMap<QByteArray, QVariant> &aggregateValues) | 255 | struct ReductionResult { |
256 | QByteArray selection; | ||
257 | QVector<QByteArray> aggregateIds; | ||
258 | QMap<QByteArray, QVariant> aggregateValues; | ||
259 | }; | ||
260 | |||
261 | ReductionResult reduceOnValue(const QVariant &reductionValue) | ||
256 | { | 262 | { |
263 | QMap<QByteArray, QVariant> aggregateValues; | ||
257 | QVariant selectionResultValue; | 264 | QVariant selectionResultValue; |
258 | QByteArray selectionResult; | 265 | QByteArray selectionResult; |
259 | auto results = indexLookup(mReductionProperty, reductionValue); | 266 | const auto results = indexLookup(mReductionProperty, reductionValue); |
260 | for (auto &aggregator : mAggregators) { | 267 | for (auto &aggregator : mAggregators) { |
261 | aggregator.reset(); | 268 | aggregator.reset(); |
262 | } | 269 | } |
@@ -287,7 +294,7 @@ public: | |||
287 | for (auto &aggregator : mAggregators) { | 294 | for (auto &aggregator : mAggregators) { |
288 | aggregateValues.insert(aggregator.resultProperty, aggregator.result()); | 295 | aggregateValues.insert(aggregator.resultProperty, aggregator.result()); |
289 | } | 296 | } |
290 | return selectionResult; | 297 | return {selectionResult, results, aggregateValues}; |
291 | } | 298 | } |
292 | 299 | ||
293 | bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { | 300 | bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { |
@@ -302,12 +309,11 @@ public: | |||
302 | if (!mReducedValues.contains(reductionValueBa)) { | 309 | if (!mReducedValues.contains(reductionValueBa)) { |
303 | //Only reduce every value once. | 310 | //Only reduce every value once. |
304 | mReducedValues.insert(reductionValueBa); | 311 | mReducedValues.insert(reductionValueBa); |
305 | QMap<QByteArray, QVariant> aggregateValues; | 312 | auto reductionResult = reduceOnValue(reductionValue); |
306 | auto selectionResult = reduceOnValue(reductionValue, aggregateValues); | ||
307 | 313 | ||
308 | mSelectedValues.insert(reductionValueBa, selectionResult); | 314 | mSelectedValues.insert(reductionValueBa, reductionResult.selection); |
309 | readEntity(selectionResult, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | 315 | readEntity(reductionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { |
310 | callback({entity, operation, aggregateValues}); | 316 | callback({entity, operation, reductionResult.aggregateValues, reductionResult.aggregateIds}); |
311 | foundValue = true; | 317 | foundValue = true; |
312 | }); | 318 | }); |
313 | } else { | 319 | } else { |
@@ -317,15 +323,14 @@ public: | |||
317 | if (mIncremental && !mIncrementallyReducedValues.contains(reductionValueBa)) { | 323 | if (mIncremental && !mIncrementallyReducedValues.contains(reductionValueBa)) { |
318 | mIncrementallyReducedValues.insert(reductionValueBa); | 324 | mIncrementallyReducedValues.insert(reductionValueBa); |
319 | //Redo the reduction to find new aggregated values | 325 | //Redo the reduction to find new aggregated values |
320 | QMap<QByteArray, QVariant> aggregateValues; | 326 | auto selectionResult = reduceOnValue(reductionValue); |
321 | auto selectionResult = reduceOnValue(reductionValue, aggregateValues); | ||
322 | 327 | ||
323 | //TODO if old and new are the same a modification would be enough | 328 | //TODO if old and new are the same a modification would be enough |
324 | auto oldSelectionResult = mSelectedValues.take(reductionValueBa); | 329 | auto oldSelectionResult = mSelectedValues.take(reductionValueBa); |
325 | if (oldSelectionResult == selectionResult) { | 330 | if (oldSelectionResult == selectionResult.selection) { |
326 | mSelectedValues.insert(reductionValueBa, selectionResult); | 331 | mSelectedValues.insert(reductionValueBa, selectionResult.selection); |
327 | readEntity(selectionResult, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { | 332 | readEntity(selectionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { |
328 | callback({entity, Sink::Operation_Modification, aggregateValues}); | 333 | callback({entity, Sink::Operation_Modification, selectionResult.aggregateValues, selectionResult.aggregateIds}); |
329 | }); | 334 | }); |
330 | } else { | 335 | } else { |
331 | //remove old result | 336 | //remove old result |
@@ -334,9 +339,9 @@ public: | |||
334 | }); | 339 | }); |
335 | 340 | ||
336 | //add new result | 341 | //add new result |
337 | mSelectedValues.insert(reductionValueBa, selectionResult); | 342 | mSelectedValues.insert(reductionValueBa, selectionResult.selection); |
338 | readEntity(selectionResult, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { | 343 | readEntity(selectionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { |
339 | callback({entity, Sink::Operation_Creation, aggregateValues}); | 344 | callback({entity, Sink::Operation_Creation, selectionResult.aggregateValues, selectionResult.aggregateIds}); |
340 | }); | 345 | }); |
341 | } | 346 | } |
342 | } | 347 | } |
@@ -648,7 +653,7 @@ ResultSet DataStoreQuery::execute() | |||
648 | if (mCollector->next([this, callback](const ResultSet::Result &result) { | 653 | if (mCollector->next([this, callback](const ResultSet::Result &result) { |
649 | if (result.operation != Sink::Operation_Removal) { | 654 | if (result.operation != Sink::Operation_Removal) { |
650 | SinkTraceCtx(mLogCtx) << "Got initial result: " << result.entity.identifier() << result.operation; | 655 | SinkTraceCtx(mLogCtx) << "Got initial result: " << result.entity.identifier() << result.operation; |
651 | callback(ResultSet::Result{result.entity, Sink::Operation_Creation, result.aggregateValues}); | 656 | callback(ResultSet::Result{result.entity, Sink::Operation_Creation, result.aggregateValues, result.aggregateIds}); |
652 | } | 657 | } |
653 | })) | 658 | })) |
654 | { | 659 | { |
diff --git a/common/domain/applicationdomaintype.cpp b/common/domain/applicationdomaintype.cpp index 8bb74e3..c315e33 100644 --- a/common/domain/applicationdomaintype.cpp +++ b/common/domain/applicationdomaintype.cpp | |||
@@ -184,6 +184,7 @@ ApplicationDomainType& ApplicationDomainType::operator=(const ApplicationDomainT | |||
184 | mResourceInstanceIdentifier = other.mResourceInstanceIdentifier; | 184 | mResourceInstanceIdentifier = other.mResourceInstanceIdentifier; |
185 | mIdentifier = other.mIdentifier; | 185 | mIdentifier = other.mIdentifier; |
186 | mRevision = other.mRevision; | 186 | mRevision = other.mRevision; |
187 | mAggreatedIds = other.mAggreatedIds; | ||
187 | return *this; | 188 | return *this; |
188 | } | 189 | } |
189 | 190 | ||
@@ -262,6 +263,21 @@ QByteArray ApplicationDomainType::identifier() const | |||
262 | return mIdentifier; | 263 | return mIdentifier; |
263 | } | 264 | } |
264 | 265 | ||
266 | bool ApplicationDomainType::isAggregate() const | ||
267 | { | ||
268 | return !mAggreatedIds.isEmpty(); | ||
269 | } | ||
270 | |||
271 | QVector<QByteArray> ApplicationDomainType::aggregatedIds() const | ||
272 | { | ||
273 | return mAggreatedIds; | ||
274 | } | ||
275 | |||
276 | QVector<QByteArray> &ApplicationDomainType::aggregatedIds() | ||
277 | { | ||
278 | return mAggreatedIds; | ||
279 | } | ||
280 | |||
265 | SinkResource::SinkResource(const QByteArray &identifier) | 281 | SinkResource::SinkResource(const QByteArray &identifier) |
266 | : ApplicationDomainType("", identifier, 0, QSharedPointer<BufferAdaptor>(new MemoryBufferAdaptor())) | 282 | : ApplicationDomainType("", identifier, 0, QSharedPointer<BufferAdaptor>(new MemoryBufferAdaptor())) |
267 | { | 283 | { |
diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h index b4db54e..dcd401c 100644 --- a/common/domain/applicationdomaintype.h +++ b/common/domain/applicationdomaintype.h | |||
@@ -268,6 +268,10 @@ public: | |||
268 | void setResource(const QByteArray &identifier); | 268 | void setResource(const QByteArray &identifier); |
269 | QByteArray identifier() const; | 269 | QByteArray identifier() const; |
270 | 270 | ||
271 | bool isAggregate() const; | ||
272 | QVector<QByteArray> aggregatedIds() const; | ||
273 | QVector<QByteArray> &aggregatedIds(); | ||
274 | |||
271 | private: | 275 | private: |
272 | friend QDebug operator<<(QDebug, const ApplicationDomainType &); | 276 | friend QDebug operator<<(QDebug, const ApplicationDomainType &); |
273 | QSharedPointer<BufferAdaptor> mAdaptor; | 277 | QSharedPointer<BufferAdaptor> mAdaptor; |
@@ -278,6 +282,8 @@ private: | |||
278 | QByteArray mResourceInstanceIdentifier; | 282 | QByteArray mResourceInstanceIdentifier; |
279 | QByteArray mIdentifier; | 283 | QByteArray mIdentifier; |
280 | qint64 mRevision; | 284 | qint64 mRevision; |
285 | |||
286 | QVector<QByteArray> mAggreatedIds; | ||
281 | }; | 287 | }; |
282 | 288 | ||
283 | /* | 289 | /* |
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 0ed4cb5..2062828 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -45,7 +45,6 @@ struct ReplayResult { | |||
45 | template <typename DomainType> | 45 | template <typename DomainType> |
46 | class QueryWorker : public QObject | 46 | class QueryWorker : public QObject |
47 | { | 47 | { |
48 | typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues)> ResultCallback; | ||
49 | public: | 48 | public: |
50 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); | 49 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); |
51 | virtual ~QueryWorker(); | 50 | virtual ~QueryWorker(); |
@@ -210,6 +209,7 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S | |||
210 | for (auto it = result.aggregateValues.constBegin(); it != result.aggregateValues.constEnd(); it++) { | 209 | for (auto it = result.aggregateValues.constBegin(); it != result.aggregateValues.constEnd(); it++) { |
211 | valueCopy->setProperty(it.key(), it.value()); | 210 | valueCopy->setProperty(it.key(), it.value()); |
212 | } | 211 | } |
212 | valueCopy->aggregatedIds() = result.aggregateIds; | ||
213 | if (mResultTransformation) { | 213 | if (mResultTransformation) { |
214 | mResultTransformation(*valueCopy); | 214 | mResultTransformation(*valueCopy); |
215 | } | 215 | } |
diff --git a/common/resultset.h b/common/resultset.h index 707bc7e..5587c54 100644 --- a/common/resultset.h +++ b/common/resultset.h | |||
@@ -35,10 +35,11 @@ class ResultSet | |||
35 | { | 35 | { |
36 | public: | 36 | public: |
37 | struct Result { | 37 | struct Result { |
38 | Result(const Sink::ApplicationDomain::ApplicationDomainType &e, Sink::Operation op, const QMap<QByteArray, QVariant> &v = QMap<QByteArray, QVariant>{}) : entity(e), operation(op), aggregateValues(v) {} | 38 | Result(const Sink::ApplicationDomain::ApplicationDomainType &e, Sink::Operation op, const QMap<QByteArray, QVariant> &v = {}, const QVector<QByteArray> &a = {}) : entity(e), operation(op), aggregateValues(v), aggregateIds(a) {} |
39 | Sink::ApplicationDomain::ApplicationDomainType entity; | 39 | Sink::ApplicationDomain::ApplicationDomainType entity; |
40 | Sink::Operation operation; | 40 | Sink::Operation operation; |
41 | QMap<QByteArray, QVariant> aggregateValues; | 41 | QMap<QByteArray, QVariant> aggregateValues; |
42 | QVector<QByteArray> aggregateIds; | ||
42 | }; | 43 | }; |
43 | typedef std::function<void(const Result &)> Callback; | 44 | typedef std::function<void(const Result &)> Callback; |
44 | typedef std::function<bool(Callback)> ValueGenerator; | 45 | typedef std::function<bool(Callback)> ValueGenerator; |