diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-12-20 16:02:38 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-12-20 16:02:38 +0100 |
commit | 1864024012213dc0a17c76e9755bf50a19944ec7 (patch) | |
tree | 06690c35c05c29deec30a3dd412d4043e2c956a3 | |
parent | 6625bb5a2145008ad47ae963e1546714b7342bf0 (diff) | |
download | sink-1864024012213dc0a17c76e9755bf50a19944ec7.tar.gz sink-1864024012213dc0a17c76e9755bf50a19944ec7.zip |
Report when we don't have any more to fetch.
... so we can use that information in fetchMore.
-rw-r--r-- | common/modelresult.cpp | 10 | ||||
-rw-r--r-- | common/modelresult.h | 1 | ||||
-rw-r--r-- | common/queryrunner.cpp | 52 | ||||
-rw-r--r-- | common/resourcefacade.cpp | 2 | ||||
-rw-r--r-- | common/resultprovider.h | 27 | ||||
-rw-r--r-- | common/resultset.cpp | 6 | ||||
-rw-r--r-- | common/resultset.h | 6 | ||||
-rw-r--r-- | common/store.cpp | 2 | ||||
-rw-r--r-- | common/test.cpp | 2 | ||||
-rw-r--r-- | tests/clientapitest.cpp | 2 | ||||
-rw-r--r-- | tests/databasepopulationandfacadequerybenchmark.cpp | 2 | ||||
-rw-r--r-- | tests/mailquerybenchmark.cpp | 2 |
12 files changed, 67 insertions, 47 deletions
diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 34e6dfc..8e92365 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp | |||
@@ -167,7 +167,10 @@ template <class T, class Ptr> | |||
167 | bool ModelResult<T, Ptr>::canFetchMore(const QModelIndex &parent) const | 167 | bool ModelResult<T, Ptr>::canFetchMore(const QModelIndex &parent) const |
168 | { | 168 | { |
169 | const auto id = parent.internalId(); | 169 | const auto id = parent.internalId(); |
170 | return !mEntityChildrenFetched.contains(id) || !mEntityChildrenFetchComplete.contains(id); | 170 | if (mEntityAllChildrenFetched.contains(id)) { |
171 | return false; | ||
172 | } | ||
173 | return true; | ||
171 | } | 174 | } |
172 | 175 | ||
173 | template <class T, class Ptr> | 176 | template <class T, class Ptr> |
@@ -269,11 +272,14 @@ void ModelResult<T, Ptr>::setEmitter(const typename Sink::ResultEmitter<Ptr>::Pt | |||
269 | remove(value); | 272 | remove(value); |
270 | }); | 273 | }); |
271 | }); | 274 | }); |
272 | emitter->onInitialResultSetComplete([this](const Ptr &parent) { | 275 | emitter->onInitialResultSetComplete([this](const Ptr &parent, bool fetchedAll) { |
273 | SinkTrace() << "Initial result set complete"; | 276 | SinkTrace() << "Initial result set complete"; |
274 | const qint64 parentId = parent ? qHash(*parent) : 0; | 277 | const qint64 parentId = parent ? qHash(*parent) : 0; |
275 | const auto parentIndex = createIndexFromId(parentId); | 278 | const auto parentIndex = createIndexFromId(parentId); |
276 | mEntityChildrenFetchComplete.insert(parentId); | 279 | mEntityChildrenFetchComplete.insert(parentId); |
280 | if (fetchedAll) { | ||
281 | mEntityAllChildrenFetched.insert(parentId); | ||
282 | } | ||
277 | emit dataChanged(parentIndex, parentIndex, QVector<int>() << ChildrenFetchedRole); | 283 | emit dataChanged(parentIndex, parentIndex, QVector<int>() << ChildrenFetchedRole); |
278 | }); | 284 | }); |
279 | mEmitter = emitter; | 285 | mEmitter = emitter; |
diff --git a/common/modelresult.h b/common/modelresult.h index 7924e2d..b7fc0ec 100644 --- a/common/modelresult.h +++ b/common/modelresult.h | |||
@@ -73,6 +73,7 @@ private: | |||
73 | QMap<qint64 /* child entity id */, qint64 /* parent entity id*/> mParents; | 73 | QMap<qint64 /* child entity id */, qint64 /* parent entity id*/> mParents; |
74 | QSet<qint64 /* entity id */> mEntityChildrenFetched; | 74 | QSet<qint64 /* entity id */> mEntityChildrenFetched; |
75 | QSet<qint64 /* entity id */> mEntityChildrenFetchComplete; | 75 | QSet<qint64 /* entity id */> mEntityChildrenFetchComplete; |
76 | QSet<qint64 /* entity id */> mEntityAllChildrenFetched; | ||
76 | QList<QByteArray> mPropertyColumns; | 77 | QList<QByteArray> mPropertyColumns; |
77 | Sink::Query mQuery; | 78 | Sink::Query mQuery; |
78 | std::function<void(const Ptr &)> loadEntities; | 79 | std::function<void(const Ptr &)> loadEntities; |
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index d6a90de..377e3b9 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -31,6 +31,12 @@ SINK_DEBUG_AREA("queryrunner") | |||
31 | using namespace Sink; | 31 | using namespace Sink; |
32 | using namespace Sink::Storage; | 32 | using namespace Sink::Storage; |
33 | 33 | ||
34 | struct ReplayResult { | ||
35 | qint64 newRevision; | ||
36 | qint64 replayedEntities; | ||
37 | bool replayedAll; | ||
38 | }; | ||
39 | |||
34 | /* | 40 | /* |
35 | * This class wraps the actual query implementation. | 41 | * This class wraps the actual query implementation. |
36 | * | 42 | * |
@@ -47,8 +53,8 @@ public: | |||
47 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); | 53 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); |
48 | virtual ~QueryWorker(); | 54 | virtual ~QueryWorker(); |
49 | 55 | ||
50 | QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 56 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
51 | QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); | 57 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); |
52 | 58 | ||
53 | private: | 59 | private: |
54 | void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); | 60 | void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); |
@@ -64,7 +70,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
64 | { | 70 | { |
65 | SinkTrace() << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit(); | 71 | SinkTrace() << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit(); |
66 | if (query.limit() && query.sortProperty().isEmpty()) { | 72 | if (query.limit() && query.sortProperty().isEmpty()) { |
67 | SinkWarning() << "A limited query without sorting is typically a bad idea."; | 73 | SinkWarning() << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; |
68 | } | 74 | } |
69 | auto guardPtr = QPointer<QObject>(&guard); | 75 | auto guardPtr = QPointer<QObject>(&guard); |
70 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. | 76 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. |
@@ -74,31 +80,33 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
74 | auto resultProvider = mResultProvider; | 80 | auto resultProvider = mResultProvider; |
75 | if (query.synchronousQuery()) { | 81 | if (query.synchronousQuery()) { |
76 | QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation); | 82 | QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation); |
77 | worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); | 83 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); |
78 | resultProvider->initialResultSetComplete(parent); | 84 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; |
85 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
86 | resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); | ||
79 | } else { | 87 | } else { |
80 | auto resultTransformation = mResultTransformation; | 88 | auto resultTransformation = mResultTransformation; |
81 | auto offset = mOffset[parentId]; | 89 | auto offset = mOffset[parentId]; |
82 | auto batchSize = mBatchSize; | 90 | auto batchSize = mBatchSize; |
83 | auto resourceContext = mResourceContext; | 91 | auto resourceContext = mResourceContext; |
84 | //The lambda will be executed in a separate thread, so we're extra careful | 92 | //The lambda will be executed in a separate thread, so copy all arguments |
85 | async::run<QPair<qint64, qint64> >([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() { | 93 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() { |
86 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation); | 94 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation); |
87 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); | 95 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); |
88 | return newRevisionAndReplayedEntities; | 96 | return newRevisionAndReplayedEntities; |
89 | }) | 97 | }) |
90 | .template syncThen<void, QPair<qint64, qint64>>([this, parentId, query, parent, resultProvider, guardPtr](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { | 98 | .template syncThen<void, ReplayResult>([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { |
91 | if (!guardPtr) { | 99 | if (!guardPtr) { |
92 | qWarning() << "The parent object is already gone"; | 100 | qWarning() << "The parent object is already gone"; |
93 | return; | 101 | return; |
94 | } | 102 | } |
95 | mOffset[parentId] += newRevisionAndReplayedEntities.second; | 103 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; |
96 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 104 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
97 | if (query.liveQuery()) { | 105 | if (query.liveQuery()) { |
98 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); | 106 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); |
99 | } | 107 | } |
100 | resultProvider->setRevision(newRevisionAndReplayedEntities.first); | 108 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); |
101 | resultProvider->initialResultSetComplete(parent); | 109 | resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); |
102 | }) | 110 | }) |
103 | .exec(); | 111 | .exec(); |
104 | } | 112 | } |
@@ -111,19 +119,19 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
111 | setQuery([=]() -> KAsync::Job<void> { | 119 | setQuery([=]() -> KAsync::Job<void> { |
112 | auto resultProvider = mResultProvider; | 120 | auto resultProvider = mResultProvider; |
113 | auto resourceContext = mResourceContext; | 121 | auto resourceContext = mResourceContext; |
114 | return async::run<QPair<qint64, qint64> >([=]() { | 122 | return async::run<ReplayResult>([=]() { |
115 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation); | 123 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation); |
116 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); | 124 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); |
117 | return newRevisionAndReplayedEntities; | 125 | return newRevisionAndReplayedEntities; |
118 | }) | 126 | }) |
119 | .template syncThen<void, QPair<qint64, qint64> >([query, this, resultProvider, guardPtr](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { | 127 | .template syncThen<void, ReplayResult>([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { |
120 | if (!guardPtr) { | 128 | if (!guardPtr) { |
121 | qWarning() << "The parent object is already gone"; | 129 | qWarning() << "The parent object is already gone"; |
122 | return; | 130 | return; |
123 | } | 131 | } |
124 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 132 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
125 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); | 133 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); |
126 | resultProvider->setRevision(newRevisionAndReplayedEntities.first); | 134 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); |
127 | }); | 135 | }); |
128 | }); | 136 | }); |
129 | // Ensure the connection is open, if it wasn't already opened | 137 | // Ensure the connection is open, if it wasn't already opened |
@@ -195,7 +203,7 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S | |||
195 | } | 203 | } |
196 | 204 | ||
197 | template <class DomainType> | 205 | template <class DomainType> |
198 | QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 206 | ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) |
199 | { | 207 | { |
200 | QTime time; | 208 | QTime time; |
201 | time.start(); | 209 | time.start(); |
@@ -205,16 +213,16 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sin | |||
205 | auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 213 | auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; |
206 | auto resultSet = preparedQuery.update(baseRevision); | 214 | auto resultSet = preparedQuery.update(baseRevision); |
207 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 215 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
208 | auto replayedEntities = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { | 216 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { |
209 | resultProviderCallback(query, resultProvider, result); | 217 | resultProviderCallback(query, resultProvider, result); |
210 | }); | 218 | }); |
211 | 219 | ||
212 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); | 220 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
213 | return qMakePair(entityStore.maxRevision(), replayedEntities); | 221 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; |
214 | } | 222 | } |
215 | 223 | ||
216 | template <class DomainType> | 224 | template <class DomainType> |
217 | QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery( | 225 | ReplayResult QueryWorker<DomainType>::executeInitialQuery( |
218 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) | 226 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) |
219 | { | 227 | { |
220 | QTime time; | 228 | QTime time; |
@@ -236,12 +244,12 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery( | |||
236 | auto resultSet = preparedQuery.execute(); | 244 | auto resultSet = preparedQuery.execute(); |
237 | 245 | ||
238 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 246 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
239 | auto replayedEntities = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { | 247 | auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { |
240 | resultProviderCallback(query, resultProvider, result); | 248 | resultProviderCallback(query, resultProvider, result); |
241 | }); | 249 | }); |
242 | 250 | ||
243 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 251 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
244 | return qMakePair(entityStore.maxRevision(), replayedEntities); | 252 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; |
245 | } | 253 | } |
246 | 254 | ||
247 | template class QueryRunner<Sink::ApplicationDomain::Folder>; | 255 | template class QueryRunner<Sink::ApplicationDomain::Folder>; |
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index ea4218d..861d37a 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp | |||
@@ -111,7 +111,7 @@ LocalStorageQueryRunner<DomainType>::LocalStorageQueryRunner(const Query &query, | |||
111 | mResultProvider->add(entity); | 111 | mResultProvider->add(entity); |
112 | } | 112 | } |
113 | // TODO initialResultSetComplete should be implicit | 113 | // TODO initialResultSetComplete should be implicit |
114 | mResultProvider->initialResultSetComplete(typename DomainType::Ptr()); | 114 | mResultProvider->initialResultSetComplete(typename DomainType::Ptr(), true); |
115 | mResultProvider->complete(); | 115 | mResultProvider->complete(); |
116 | }); | 116 | }); |
117 | if (query.liveQuery()) { | 117 | if (query.liveQuery()) { |
diff --git a/common/resultprovider.h b/common/resultprovider.h index defeb6a..cda4dac 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h | |||
@@ -46,7 +46,7 @@ public: | |||
46 | virtual void add(const T &value) = 0; | 46 | virtual void add(const T &value) = 0; |
47 | virtual void modify(const T &value) = 0; | 47 | virtual void modify(const T &value) = 0; |
48 | virtual void remove(const T &value) = 0; | 48 | virtual void remove(const T &value) = 0; |
49 | virtual void initialResultSetComplete(const T &parent) = 0; | 49 | virtual void initialResultSetComplete(const T &parent, bool) = 0; |
50 | virtual void complete() = 0; | 50 | virtual void complete() = 0; |
51 | virtual void clear() = 0; | 51 | virtual void clear() = 0; |
52 | virtual void setFetcher(const std::function<void(const T &parent)> &fetcher) = 0; | 52 | virtual void setFetcher(const std::function<void(const T &parent)> &fetcher) = 0; |
@@ -100,10 +100,10 @@ public: | |||
100 | } | 100 | } |
101 | } | 101 | } |
102 | 102 | ||
103 | void initialResultSetComplete(const T &parent) | 103 | void initialResultSetComplete(const T &parent, bool replayedAll) |
104 | { | 104 | { |
105 | if (auto strongRef = mResultEmitter.toStrongRef()) { | 105 | if (auto strongRef = mResultEmitter.toStrongRef()) { |
106 | strongRef->initialResultSetComplete(parent); | 106 | strongRef->initialResultSetComplete(parent, replayedAll); |
107 | } | 107 | } |
108 | } | 108 | } |
109 | 109 | ||
@@ -211,7 +211,7 @@ public: | |||
211 | removeHandler = handler; | 211 | removeHandler = handler; |
212 | } | 212 | } |
213 | 213 | ||
214 | void onInitialResultSetComplete(const std::function<void(const DomainType &)> &handler) | 214 | void onInitialResultSetComplete(const std::function<void(const DomainType &, bool)> &handler) |
215 | { | 215 | { |
216 | initialResultSetCompleteHandler = handler; | 216 | initialResultSetCompleteHandler = handler; |
217 | } | 217 | } |
@@ -241,10 +241,10 @@ public: | |||
241 | removeHandler(value); | 241 | removeHandler(value); |
242 | } | 242 | } |
243 | 243 | ||
244 | void initialResultSetComplete(const DomainType &parent) | 244 | void initialResultSetComplete(const DomainType &parent, bool replayedAll) |
245 | { | 245 | { |
246 | if (initialResultSetCompleteHandler) { | 246 | if (initialResultSetCompleteHandler) { |
247 | initialResultSetCompleteHandler(parent); | 247 | initialResultSetCompleteHandler(parent, replayedAll); |
248 | } | 248 | } |
249 | } | 249 | } |
250 | 250 | ||
@@ -280,7 +280,7 @@ private: | |||
280 | std::function<void(const DomainType &)> addHandler; | 280 | std::function<void(const DomainType &)> addHandler; |
281 | std::function<void(const DomainType &)> modifyHandler; | 281 | std::function<void(const DomainType &)> modifyHandler; |
282 | std::function<void(const DomainType &)> removeHandler; | 282 | std::function<void(const DomainType &)> removeHandler; |
283 | std::function<void(const DomainType &)> initialResultSetCompleteHandler; | 283 | std::function<void(const DomainType &, bool)> initialResultSetCompleteHandler; |
284 | std::function<void(void)> completeHandler; | 284 | std::function<void(void)> completeHandler; |
285 | std::function<void(void)> clearHandler; | 285 | std::function<void(void)> clearHandler; |
286 | 286 | ||
@@ -300,30 +300,31 @@ public: | |||
300 | emitter->onModified([this](const DomainType &value) { this->modify(value); }); | 300 | emitter->onModified([this](const DomainType &value) { this->modify(value); }); |
301 | emitter->onRemoved([this](const DomainType &value) { this->remove(value); }); | 301 | emitter->onRemoved([this](const DomainType &value) { this->remove(value); }); |
302 | auto ptr = emitter.data(); | 302 | auto ptr = emitter.data(); |
303 | emitter->onInitialResultSetComplete([this, ptr](const DomainType &parent) { | 303 | emitter->onInitialResultSetComplete([this, ptr](const DomainType &parent, bool replayedAll) { |
304 | auto hashValue = qHash(parent); | 304 | auto hashValue = qHash(parent); |
305 | mInitialResultSetInProgress.remove(hashValue, ptr); | 305 | mInitialResultSetInProgress.remove(hashValue, ptr); |
306 | callInitialResultCompleteIfDone(parent); | 306 | callInitialResultCompleteIfDone(parent, replayedAll); |
307 | }); | 307 | }); |
308 | emitter->onComplete([this]() { this->complete(); }); | 308 | emitter->onComplete([this]() { this->complete(); }); |
309 | emitter->onClear([this]() { this->clear(); }); | 309 | emitter->onClear([this]() { this->clear(); }); |
310 | mEmitter << emitter; | 310 | mEmitter << emitter; |
311 | } | 311 | } |
312 | 312 | ||
313 | void callInitialResultCompleteIfDone(const DomainType &parent) | 313 | void callInitialResultCompleteIfDone(const DomainType &parent, bool replayedAll) |
314 | { | 314 | { |
315 | auto hashValue = qHash(parent); | 315 | auto hashValue = qHash(parent); |
316 | // Normally a parent is only in a single resource, except the toplevel (invalid) parent | 316 | // Normally a parent is only in a single resource, except the toplevel (invalid) parent |
317 | if (!mInitialResultSetInProgress.contains(hashValue) && mAllResultsFetched && !mResultEmitted) { | 317 | if (!mInitialResultSetInProgress.contains(hashValue) && mAllResultsFetched && !mResultEmitted) { |
318 | mResultEmitted = true; | 318 | mResultEmitted = true; |
319 | this->initialResultSetComplete(parent); | 319 | //FIXME set replayed all only to true if all had set it to true |
320 | this->initialResultSetComplete(parent, true); | ||
320 | } | 321 | } |
321 | } | 322 | } |
322 | 323 | ||
323 | void fetch(const DomainType &parent) Q_DECL_OVERRIDE | 324 | void fetch(const DomainType &parent) Q_DECL_OVERRIDE |
324 | { | 325 | { |
325 | if (mEmitter.isEmpty()) { | 326 | if (mEmitter.isEmpty()) { |
326 | this->initialResultSetComplete(parent); | 327 | this->initialResultSetComplete(parent, true); |
327 | } else { | 328 | } else { |
328 | mResultEmitted = false; | 329 | mResultEmitted = false; |
329 | mAllResultsFetched = false; | 330 | mAllResultsFetched = false; |
@@ -332,7 +333,7 @@ public: | |||
332 | emitter->fetch(parent); | 333 | emitter->fetch(parent); |
333 | } | 334 | } |
334 | mAllResultsFetched = true; | 335 | mAllResultsFetched = true; |
335 | callInitialResultCompleteIfDone(parent); | 336 | callInitialResultCompleteIfDone(parent, true); |
336 | } | 337 | } |
337 | } | 338 | } |
338 | 339 | ||
diff --git a/common/resultset.cpp b/common/resultset.cpp index 9883f44..b82b14d 100644 --- a/common/resultset.cpp +++ b/common/resultset.cpp | |||
@@ -98,7 +98,7 @@ void ResultSet::skip(int number) | |||
98 | } | 98 | } |
99 | } | 99 | } |
100 | 100 | ||
101 | qint64 ResultSet::replaySet(int offset, int batchSize, const Callback &callback) | 101 | ResultSet::ReplayResult ResultSet::replaySet(int offset, int batchSize, const Callback &callback) |
102 | { | 102 | { |
103 | skip(offset); | 103 | skip(offset); |
104 | int counter = 0; | 104 | int counter = 0; |
@@ -108,10 +108,10 @@ qint64 ResultSet::replaySet(int offset, int batchSize, const Callback &callback) | |||
108 | callback(result); | 108 | callback(result); |
109 | }); | 109 | }); |
110 | if (!ret) { | 110 | if (!ret) { |
111 | break; | 111 | return {counter, true}; |
112 | } | 112 | } |
113 | }; | 113 | }; |
114 | return counter; | 114 | return {counter, false}; |
115 | } | 115 | } |
116 | 116 | ||
117 | QByteArray ResultSet::id() | 117 | QByteArray ResultSet::id() |
diff --git a/common/resultset.h b/common/resultset.h index 4f2c278..db7d1e0 100644 --- a/common/resultset.h +++ b/common/resultset.h | |||
@@ -55,7 +55,11 @@ public: | |||
55 | 55 | ||
56 | void skip(int number); | 56 | void skip(int number); |
57 | 57 | ||
58 | qint64 replaySet(int offset, int batchSize, const Callback &callback); | 58 | struct ReplayResult { |
59 | qint64 replayedEntities; | ||
60 | bool replayedAll; | ||
61 | }; | ||
62 | ReplayResult replaySet(int offset, int batchSize, const Callback &callback); | ||
59 | 63 | ||
60 | QByteArray id(); | 64 | QByteArray id(); |
61 | 65 | ||
diff --git a/common/store.cpp b/common/store.cpp index dd00bfe..a70be05 100644 --- a/common/store.cpp +++ b/common/store.cpp | |||
@@ -162,7 +162,7 @@ QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) | |||
162 | }); | 162 | }); |
163 | emitter->onRemoved([](const ApplicationDomain::SinkResource::Ptr &) { | 163 | emitter->onRemoved([](const ApplicationDomain::SinkResource::Ptr &) { |
164 | }); | 164 | }); |
165 | emitter->onInitialResultSetComplete([](const ApplicationDomain::SinkResource::Ptr &) { | 165 | emitter->onInitialResultSetComplete([](const ApplicationDomain::SinkResource::Ptr &, bool) { |
166 | }); | 166 | }); |
167 | emitter->onComplete([query, aggregatingEmitter]() { | 167 | emitter->onComplete([query, aggregatingEmitter]() { |
168 | SinkTrace() << "Resource query complete"; | 168 | SinkTrace() << "Resource query complete"; |
diff --git a/common/test.cpp b/common/test.cpp index dc63afc..71bb972 100644 --- a/common/test.cpp +++ b/common/test.cpp | |||
@@ -166,7 +166,7 @@ public: | |||
166 | resultProvider->add(res.template staticCast<T>()); | 166 | resultProvider->add(res.template staticCast<T>()); |
167 | } | 167 | } |
168 | } | 168 | } |
169 | resultProvider->initialResultSetComplete(parent); | 169 | resultProvider->initialResultSetComplete(parent, true); |
170 | }); | 170 | }); |
171 | auto job = KAsync::syncStart<void>([query, resultProvider]() {}); | 171 | auto job = KAsync::syncStart<void>([query, resultProvider]() {}); |
172 | return qMakePair(job, emitter); | 172 | return qMakePair(job, emitter); |
diff --git a/tests/clientapitest.cpp b/tests/clientapitest.cpp index 4afe328..d2fb747 100644 --- a/tests/clientapitest.cpp +++ b/tests/clientapitest.cpp | |||
@@ -78,7 +78,7 @@ public: | |||
78 | resultProvider->add(res); | 78 | resultProvider->add(res); |
79 | } | 79 | } |
80 | } | 80 | } |
81 | resultProvider->initialResultSetComplete(parent); | 81 | resultProvider->initialResultSetComplete(parent, true); |
82 | }); | 82 | }); |
83 | auto job = KAsync::syncStart<void>([query, resultProvider]() {}); | 83 | auto job = KAsync::syncStart<void>([query, resultProvider]() {}); |
84 | mResultProvider = resultProvider; | 84 | mResultProvider = resultProvider; |
diff --git a/tests/databasepopulationandfacadequerybenchmark.cpp b/tests/databasepopulationandfacadequerybenchmark.cpp index f1904ad..6bd2051 100644 --- a/tests/databasepopulationandfacadequerybenchmark.cpp +++ b/tests/databasepopulationandfacadequerybenchmark.cpp | |||
@@ -112,7 +112,7 @@ class DatabasePopulationAndFacadeQueryBenchmark : public QObject | |||
112 | QList<Sink::ApplicationDomain::Event::Ptr> list; | 112 | QList<Sink::ApplicationDomain::Event::Ptr> list; |
113 | emitter->onAdded([&list](const Sink::ApplicationDomain::Event::Ptr &event) { list << event; }); | 113 | emitter->onAdded([&list](const Sink::ApplicationDomain::Event::Ptr &event) { list << event; }); |
114 | bool done = false; | 114 | bool done = false; |
115 | emitter->onInitialResultSetComplete([&done](const Sink::ApplicationDomain::Event::Ptr &event) { done = true; }); | 115 | emitter->onInitialResultSetComplete([&done](const Sink::ApplicationDomain::Event::Ptr &event, bool) { done = true; }); |
116 | emitter->fetch(Sink::ApplicationDomain::Event::Ptr()); | 116 | emitter->fetch(Sink::ApplicationDomain::Event::Ptr()); |
117 | QTRY_VERIFY(done); | 117 | QTRY_VERIFY(done); |
118 | QCOMPARE(list.size(), count); | 118 | QCOMPARE(list.size(), count); |
diff --git a/tests/mailquerybenchmark.cpp b/tests/mailquerybenchmark.cpp index 0e2f6fa..d3598b2 100644 --- a/tests/mailquerybenchmark.cpp +++ b/tests/mailquerybenchmark.cpp | |||
@@ -99,7 +99,7 @@ class MailQueryBenchmark : public QObject | |||
99 | QList<Mail::Ptr> list; | 99 | QList<Mail::Ptr> list; |
100 | emitter->onAdded([&list](const Mail::Ptr &mail) { list << mail; }); | 100 | emitter->onAdded([&list](const Mail::Ptr &mail) { list << mail; }); |
101 | bool done = false; | 101 | bool done = false; |
102 | emitter->onInitialResultSetComplete([&done](const Mail::Ptr &mail) { done = true; }); | 102 | emitter->onInitialResultSetComplete([&done](const Mail::Ptr &mail, bool) { done = true; }); |
103 | emitter->fetch(Mail::Ptr()); | 103 | emitter->fetch(Mail::Ptr()); |
104 | QTRY_VERIFY(done); | 104 | QTRY_VERIFY(done); |
105 | QCOMPARE(list.size(), query.limit()); | 105 | QCOMPARE(list.size(), query.limit()); |