summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-12-20 16:02:38 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-12-20 16:02:38 +0100
commit1864024012213dc0a17c76e9755bf50a19944ec7 (patch)
tree06690c35c05c29deec30a3dd412d4043e2c956a3
parent6625bb5a2145008ad47ae963e1546714b7342bf0 (diff)
downloadsink-1864024012213dc0a17c76e9755bf50a19944ec7.tar.gz
sink-1864024012213dc0a17c76e9755bf50a19944ec7.zip
Report when we don't have any more to fetch.
... so we can use that information in fetchMore.
-rw-r--r--common/modelresult.cpp10
-rw-r--r--common/modelresult.h1
-rw-r--r--common/queryrunner.cpp52
-rw-r--r--common/resourcefacade.cpp2
-rw-r--r--common/resultprovider.h27
-rw-r--r--common/resultset.cpp6
-rw-r--r--common/resultset.h6
-rw-r--r--common/store.cpp2
-rw-r--r--common/test.cpp2
-rw-r--r--tests/clientapitest.cpp2
-rw-r--r--tests/databasepopulationandfacadequerybenchmark.cpp2
-rw-r--r--tests/mailquerybenchmark.cpp2
12 files changed, 67 insertions, 47 deletions
diff --git a/common/modelresult.cpp b/common/modelresult.cpp
index 34e6dfc..8e92365 100644
--- a/common/modelresult.cpp
+++ b/common/modelresult.cpp
@@ -167,7 +167,10 @@ template <class T, class Ptr>
167bool ModelResult<T, Ptr>::canFetchMore(const QModelIndex &parent) const 167bool ModelResult<T, Ptr>::canFetchMore(const QModelIndex &parent) const
168{ 168{
169 const auto id = parent.internalId(); 169 const auto id = parent.internalId();
170 return !mEntityChildrenFetched.contains(id) || !mEntityChildrenFetchComplete.contains(id); 170 if (mEntityAllChildrenFetched.contains(id)) {
171 return false;
172 }
173 return true;
171} 174}
172 175
173template <class T, class Ptr> 176template <class T, class Ptr>
@@ -269,11 +272,14 @@ void ModelResult<T, Ptr>::setEmitter(const typename Sink::ResultEmitter<Ptr>::Pt
269 remove(value); 272 remove(value);
270 }); 273 });
271 }); 274 });
272 emitter->onInitialResultSetComplete([this](const Ptr &parent) { 275 emitter->onInitialResultSetComplete([this](const Ptr &parent, bool fetchedAll) {
273 SinkTrace() << "Initial result set complete"; 276 SinkTrace() << "Initial result set complete";
274 const qint64 parentId = parent ? qHash(*parent) : 0; 277 const qint64 parentId = parent ? qHash(*parent) : 0;
275 const auto parentIndex = createIndexFromId(parentId); 278 const auto parentIndex = createIndexFromId(parentId);
276 mEntityChildrenFetchComplete.insert(parentId); 279 mEntityChildrenFetchComplete.insert(parentId);
280 if (fetchedAll) {
281 mEntityAllChildrenFetched.insert(parentId);
282 }
277 emit dataChanged(parentIndex, parentIndex, QVector<int>() << ChildrenFetchedRole); 283 emit dataChanged(parentIndex, parentIndex, QVector<int>() << ChildrenFetchedRole);
278 }); 284 });
279 mEmitter = emitter; 285 mEmitter = emitter;
diff --git a/common/modelresult.h b/common/modelresult.h
index 7924e2d..b7fc0ec 100644
--- a/common/modelresult.h
+++ b/common/modelresult.h
@@ -73,6 +73,7 @@ private:
73 QMap<qint64 /* child entity id */, qint64 /* parent entity id*/> mParents; 73 QMap<qint64 /* child entity id */, qint64 /* parent entity id*/> mParents;
74 QSet<qint64 /* entity id */> mEntityChildrenFetched; 74 QSet<qint64 /* entity id */> mEntityChildrenFetched;
75 QSet<qint64 /* entity id */> mEntityChildrenFetchComplete; 75 QSet<qint64 /* entity id */> mEntityChildrenFetchComplete;
76 QSet<qint64 /* entity id */> mEntityAllChildrenFetched;
76 QList<QByteArray> mPropertyColumns; 77 QList<QByteArray> mPropertyColumns;
77 Sink::Query mQuery; 78 Sink::Query mQuery;
78 std::function<void(const Ptr &)> loadEntities; 79 std::function<void(const Ptr &)> loadEntities;
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index d6a90de..377e3b9 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -31,6 +31,12 @@ SINK_DEBUG_AREA("queryrunner")
31using namespace Sink; 31using namespace Sink;
32using namespace Sink::Storage; 32using namespace Sink::Storage;
33 33
34struct ReplayResult {
35 qint64 newRevision;
36 qint64 replayedEntities;
37 bool replayedAll;
38};
39
34/* 40/*
35 * This class wraps the actual query implementation. 41 * This class wraps the actual query implementation.
36 * 42 *
@@ -47,8 +53,8 @@ public:
47 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); 53 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation);
48 virtual ~QueryWorker(); 54 virtual ~QueryWorker();
49 55
50 QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 56 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
51 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 57 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
52 58
53private: 59private:
54 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); 60 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result);
@@ -64,7 +70,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
64{ 70{
65 SinkTrace() << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit(); 71 SinkTrace() << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit();
66 if (query.limit() && query.sortProperty().isEmpty()) { 72 if (query.limit() && query.sortProperty().isEmpty()) {
67 SinkWarning() << "A limited query without sorting is typically a bad idea."; 73 SinkWarning() << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get.";
68 } 74 }
69 auto guardPtr = QPointer<QObject>(&guard); 75 auto guardPtr = QPointer<QObject>(&guard);
70 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 76 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
@@ -74,31 +80,33 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
74 auto resultProvider = mResultProvider; 80 auto resultProvider = mResultProvider;
75 if (query.synchronousQuery()) { 81 if (query.synchronousQuery()) {
76 QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation); 82 QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation);
77 worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 83 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize);
78 resultProvider->initialResultSetComplete(parent); 84 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities;
85 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
86 resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll);
79 } else { 87 } else {
80 auto resultTransformation = mResultTransformation; 88 auto resultTransformation = mResultTransformation;
81 auto offset = mOffset[parentId]; 89 auto offset = mOffset[parentId];
82 auto batchSize = mBatchSize; 90 auto batchSize = mBatchSize;
83 auto resourceContext = mResourceContext; 91 auto resourceContext = mResourceContext;
84 //The lambda will be executed in a separate thread, so we're extra careful 92 //The lambda will be executed in a separate thread, so copy all arguments
85 async::run<QPair<qint64, qint64> >([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() { 93 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() {
86 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation); 94 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation);
87 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 95 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize);
88 return newRevisionAndReplayedEntities; 96 return newRevisionAndReplayedEntities;
89 }) 97 })
90 .template syncThen<void, QPair<qint64, qint64>>([this, parentId, query, parent, resultProvider, guardPtr](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { 98 .template syncThen<void, ReplayResult>([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
91 if (!guardPtr) { 99 if (!guardPtr) {
92 qWarning() << "The parent object is already gone"; 100 qWarning() << "The parent object is already gone";
93 return; 101 return;
94 } 102 }
95 mOffset[parentId] += newRevisionAndReplayedEntities.second; 103 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities;
96 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 104 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
97 if (query.liveQuery()) { 105 if (query.liveQuery()) {
98 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); 106 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
99 } 107 }
100 resultProvider->setRevision(newRevisionAndReplayedEntities.first); 108 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
101 resultProvider->initialResultSetComplete(parent); 109 resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll);
102 }) 110 })
103 .exec(); 111 .exec();
104 } 112 }
@@ -111,19 +119,19 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
111 setQuery([=]() -> KAsync::Job<void> { 119 setQuery([=]() -> KAsync::Job<void> {
112 auto resultProvider = mResultProvider; 120 auto resultProvider = mResultProvider;
113 auto resourceContext = mResourceContext; 121 auto resourceContext = mResourceContext;
114 return async::run<QPair<qint64, qint64> >([=]() { 122 return async::run<ReplayResult>([=]() {
115 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation); 123 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation);
116 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); 124 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider);
117 return newRevisionAndReplayedEntities; 125 return newRevisionAndReplayedEntities;
118 }) 126 })
119 .template syncThen<void, QPair<qint64, qint64> >([query, this, resultProvider, guardPtr](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { 127 .template syncThen<void, ReplayResult>([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
120 if (!guardPtr) { 128 if (!guardPtr) {
121 qWarning() << "The parent object is already gone"; 129 qWarning() << "The parent object is already gone";
122 return; 130 return;
123 } 131 }
124 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 132 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
125 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); 133 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
126 resultProvider->setRevision(newRevisionAndReplayedEntities.first); 134 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
127 }); 135 });
128 }); 136 });
129 // Ensure the connection is open, if it wasn't already opened 137 // Ensure the connection is open, if it wasn't already opened
@@ -195,7 +203,7 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S
195} 203}
196 204
197template <class DomainType> 205template <class DomainType>
198QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 206ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
199{ 207{
200 QTime time; 208 QTime time;
201 time.start(); 209 time.start();
@@ -205,16 +213,16 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sin
205 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 213 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore};
206 auto resultSet = preparedQuery.update(baseRevision); 214 auto resultSet = preparedQuery.update(baseRevision);
207 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 215 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
208 auto replayedEntities = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 216 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
209 resultProviderCallback(query, resultProvider, result); 217 resultProviderCallback(query, resultProvider, result);
210 }); 218 });
211 219
212 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); 220 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
213 return qMakePair(entityStore.maxRevision(), replayedEntities); 221 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll};
214} 222}
215 223
216template <class DomainType> 224template <class DomainType>
217QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery( 225ReplayResult QueryWorker<DomainType>::executeInitialQuery(
218 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) 226 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize)
219{ 227{
220 QTime time; 228 QTime time;
@@ -236,12 +244,12 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
236 auto resultSet = preparedQuery.execute(); 244 auto resultSet = preparedQuery.execute();
237 245
238 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 246 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
239 auto replayedEntities = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { 247 auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) {
240 resultProviderCallback(query, resultProvider, result); 248 resultProviderCallback(query, resultProvider, result);
241 }); 249 });
242 250
243 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 251 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
244 return qMakePair(entityStore.maxRevision(), replayedEntities); 252 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll};
245} 253}
246 254
247template class QueryRunner<Sink::ApplicationDomain::Folder>; 255template class QueryRunner<Sink::ApplicationDomain::Folder>;
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp
index ea4218d..861d37a 100644
--- a/common/resourcefacade.cpp
+++ b/common/resourcefacade.cpp
@@ -111,7 +111,7 @@ LocalStorageQueryRunner<DomainType>::LocalStorageQueryRunner(const Query &query,
111 mResultProvider->add(entity); 111 mResultProvider->add(entity);
112 } 112 }
113 // TODO initialResultSetComplete should be implicit 113 // TODO initialResultSetComplete should be implicit
114 mResultProvider->initialResultSetComplete(typename DomainType::Ptr()); 114 mResultProvider->initialResultSetComplete(typename DomainType::Ptr(), true);
115 mResultProvider->complete(); 115 mResultProvider->complete();
116 }); 116 });
117 if (query.liveQuery()) { 117 if (query.liveQuery()) {
diff --git a/common/resultprovider.h b/common/resultprovider.h
index defeb6a..cda4dac 100644
--- a/common/resultprovider.h
+++ b/common/resultprovider.h
@@ -46,7 +46,7 @@ public:
46 virtual void add(const T &value) = 0; 46 virtual void add(const T &value) = 0;
47 virtual void modify(const T &value) = 0; 47 virtual void modify(const T &value) = 0;
48 virtual void remove(const T &value) = 0; 48 virtual void remove(const T &value) = 0;
49 virtual void initialResultSetComplete(const T &parent) = 0; 49 virtual void initialResultSetComplete(const T &parent, bool) = 0;
50 virtual void complete() = 0; 50 virtual void complete() = 0;
51 virtual void clear() = 0; 51 virtual void clear() = 0;
52 virtual void setFetcher(const std::function<void(const T &parent)> &fetcher) = 0; 52 virtual void setFetcher(const std::function<void(const T &parent)> &fetcher) = 0;
@@ -100,10 +100,10 @@ public:
100 } 100 }
101 } 101 }
102 102
103 void initialResultSetComplete(const T &parent) 103 void initialResultSetComplete(const T &parent, bool replayedAll)
104 { 104 {
105 if (auto strongRef = mResultEmitter.toStrongRef()) { 105 if (auto strongRef = mResultEmitter.toStrongRef()) {
106 strongRef->initialResultSetComplete(parent); 106 strongRef->initialResultSetComplete(parent, replayedAll);
107 } 107 }
108 } 108 }
109 109
@@ -211,7 +211,7 @@ public:
211 removeHandler = handler; 211 removeHandler = handler;
212 } 212 }
213 213
214 void onInitialResultSetComplete(const std::function<void(const DomainType &)> &handler) 214 void onInitialResultSetComplete(const std::function<void(const DomainType &, bool)> &handler)
215 { 215 {
216 initialResultSetCompleteHandler = handler; 216 initialResultSetCompleteHandler = handler;
217 } 217 }
@@ -241,10 +241,10 @@ public:
241 removeHandler(value); 241 removeHandler(value);
242 } 242 }
243 243
244 void initialResultSetComplete(const DomainType &parent) 244 void initialResultSetComplete(const DomainType &parent, bool replayedAll)
245 { 245 {
246 if (initialResultSetCompleteHandler) { 246 if (initialResultSetCompleteHandler) {
247 initialResultSetCompleteHandler(parent); 247 initialResultSetCompleteHandler(parent, replayedAll);
248 } 248 }
249 } 249 }
250 250
@@ -280,7 +280,7 @@ private:
280 std::function<void(const DomainType &)> addHandler; 280 std::function<void(const DomainType &)> addHandler;
281 std::function<void(const DomainType &)> modifyHandler; 281 std::function<void(const DomainType &)> modifyHandler;
282 std::function<void(const DomainType &)> removeHandler; 282 std::function<void(const DomainType &)> removeHandler;
283 std::function<void(const DomainType &)> initialResultSetCompleteHandler; 283 std::function<void(const DomainType &, bool)> initialResultSetCompleteHandler;
284 std::function<void(void)> completeHandler; 284 std::function<void(void)> completeHandler;
285 std::function<void(void)> clearHandler; 285 std::function<void(void)> clearHandler;
286 286
@@ -300,30 +300,31 @@ public:
300 emitter->onModified([this](const DomainType &value) { this->modify(value); }); 300 emitter->onModified([this](const DomainType &value) { this->modify(value); });
301 emitter->onRemoved([this](const DomainType &value) { this->remove(value); }); 301 emitter->onRemoved([this](const DomainType &value) { this->remove(value); });
302 auto ptr = emitter.data(); 302 auto ptr = emitter.data();
303 emitter->onInitialResultSetComplete([this, ptr](const DomainType &parent) { 303 emitter->onInitialResultSetComplete([this, ptr](const DomainType &parent, bool replayedAll) {
304 auto hashValue = qHash(parent); 304 auto hashValue = qHash(parent);
305 mInitialResultSetInProgress.remove(hashValue, ptr); 305 mInitialResultSetInProgress.remove(hashValue, ptr);
306 callInitialResultCompleteIfDone(parent); 306 callInitialResultCompleteIfDone(parent, replayedAll);
307 }); 307 });
308 emitter->onComplete([this]() { this->complete(); }); 308 emitter->onComplete([this]() { this->complete(); });
309 emitter->onClear([this]() { this->clear(); }); 309 emitter->onClear([this]() { this->clear(); });
310 mEmitter << emitter; 310 mEmitter << emitter;
311 } 311 }
312 312
313 void callInitialResultCompleteIfDone(const DomainType &parent) 313 void callInitialResultCompleteIfDone(const DomainType &parent, bool replayedAll)
314 { 314 {
315 auto hashValue = qHash(parent); 315 auto hashValue = qHash(parent);
316 // Normally a parent is only in a single resource, except the toplevel (invalid) parent 316 // Normally a parent is only in a single resource, except the toplevel (invalid) parent
317 if (!mInitialResultSetInProgress.contains(hashValue) && mAllResultsFetched && !mResultEmitted) { 317 if (!mInitialResultSetInProgress.contains(hashValue) && mAllResultsFetched && !mResultEmitted) {
318 mResultEmitted = true; 318 mResultEmitted = true;
319 this->initialResultSetComplete(parent); 319 //FIXME set replayed all only to true if all had set it to true
320 this->initialResultSetComplete(parent, true);
320 } 321 }
321 } 322 }
322 323
323 void fetch(const DomainType &parent) Q_DECL_OVERRIDE 324 void fetch(const DomainType &parent) Q_DECL_OVERRIDE
324 { 325 {
325 if (mEmitter.isEmpty()) { 326 if (mEmitter.isEmpty()) {
326 this->initialResultSetComplete(parent); 327 this->initialResultSetComplete(parent, true);
327 } else { 328 } else {
328 mResultEmitted = false; 329 mResultEmitted = false;
329 mAllResultsFetched = false; 330 mAllResultsFetched = false;
@@ -332,7 +333,7 @@ public:
332 emitter->fetch(parent); 333 emitter->fetch(parent);
333 } 334 }
334 mAllResultsFetched = true; 335 mAllResultsFetched = true;
335 callInitialResultCompleteIfDone(parent); 336 callInitialResultCompleteIfDone(parent, true);
336 } 337 }
337 } 338 }
338 339
diff --git a/common/resultset.cpp b/common/resultset.cpp
index 9883f44..b82b14d 100644
--- a/common/resultset.cpp
+++ b/common/resultset.cpp
@@ -98,7 +98,7 @@ void ResultSet::skip(int number)
98 } 98 }
99} 99}
100 100
101qint64 ResultSet::replaySet(int offset, int batchSize, const Callback &callback) 101ResultSet::ReplayResult ResultSet::replaySet(int offset, int batchSize, const Callback &callback)
102{ 102{
103 skip(offset); 103 skip(offset);
104 int counter = 0; 104 int counter = 0;
@@ -108,10 +108,10 @@ qint64 ResultSet::replaySet(int offset, int batchSize, const Callback &callback)
108 callback(result); 108 callback(result);
109 }); 109 });
110 if (!ret) { 110 if (!ret) {
111 break; 111 return {counter, true};
112 } 112 }
113 }; 113 };
114 return counter; 114 return {counter, false};
115} 115}
116 116
117QByteArray ResultSet::id() 117QByteArray ResultSet::id()
diff --git a/common/resultset.h b/common/resultset.h
index 4f2c278..db7d1e0 100644
--- a/common/resultset.h
+++ b/common/resultset.h
@@ -55,7 +55,11 @@ public:
55 55
56 void skip(int number); 56 void skip(int number);
57 57
58 qint64 replaySet(int offset, int batchSize, const Callback &callback); 58 struct ReplayResult {
59 qint64 replayedEntities;
60 bool replayedAll;
61 };
62 ReplayResult replaySet(int offset, int batchSize, const Callback &callback);
59 63
60 QByteArray id(); 64 QByteArray id();
61 65
diff --git a/common/store.cpp b/common/store.cpp
index dd00bfe..a70be05 100644
--- a/common/store.cpp
+++ b/common/store.cpp
@@ -162,7 +162,7 @@ QSharedPointer<QAbstractItemModel> Store::loadModel(Query query)
162 }); 162 });
163 emitter->onRemoved([](const ApplicationDomain::SinkResource::Ptr &) { 163 emitter->onRemoved([](const ApplicationDomain::SinkResource::Ptr &) {
164 }); 164 });
165 emitter->onInitialResultSetComplete([](const ApplicationDomain::SinkResource::Ptr &) { 165 emitter->onInitialResultSetComplete([](const ApplicationDomain::SinkResource::Ptr &, bool) {
166 }); 166 });
167 emitter->onComplete([query, aggregatingEmitter]() { 167 emitter->onComplete([query, aggregatingEmitter]() {
168 SinkTrace() << "Resource query complete"; 168 SinkTrace() << "Resource query complete";
diff --git a/common/test.cpp b/common/test.cpp
index dc63afc..71bb972 100644
--- a/common/test.cpp
+++ b/common/test.cpp
@@ -166,7 +166,7 @@ public:
166 resultProvider->add(res.template staticCast<T>()); 166 resultProvider->add(res.template staticCast<T>());
167 } 167 }
168 } 168 }
169 resultProvider->initialResultSetComplete(parent); 169 resultProvider->initialResultSetComplete(parent, true);
170 }); 170 });
171 auto job = KAsync::syncStart<void>([query, resultProvider]() {}); 171 auto job = KAsync::syncStart<void>([query, resultProvider]() {});
172 return qMakePair(job, emitter); 172 return qMakePair(job, emitter);
diff --git a/tests/clientapitest.cpp b/tests/clientapitest.cpp
index 4afe328..d2fb747 100644
--- a/tests/clientapitest.cpp
+++ b/tests/clientapitest.cpp
@@ -78,7 +78,7 @@ public:
78 resultProvider->add(res); 78 resultProvider->add(res);
79 } 79 }
80 } 80 }
81 resultProvider->initialResultSetComplete(parent); 81 resultProvider->initialResultSetComplete(parent, true);
82 }); 82 });
83 auto job = KAsync::syncStart<void>([query, resultProvider]() {}); 83 auto job = KAsync::syncStart<void>([query, resultProvider]() {});
84 mResultProvider = resultProvider; 84 mResultProvider = resultProvider;
diff --git a/tests/databasepopulationandfacadequerybenchmark.cpp b/tests/databasepopulationandfacadequerybenchmark.cpp
index f1904ad..6bd2051 100644
--- a/tests/databasepopulationandfacadequerybenchmark.cpp
+++ b/tests/databasepopulationandfacadequerybenchmark.cpp
@@ -112,7 +112,7 @@ class DatabasePopulationAndFacadeQueryBenchmark : public QObject
112 QList<Sink::ApplicationDomain::Event::Ptr> list; 112 QList<Sink::ApplicationDomain::Event::Ptr> list;
113 emitter->onAdded([&list](const Sink::ApplicationDomain::Event::Ptr &event) { list << event; }); 113 emitter->onAdded([&list](const Sink::ApplicationDomain::Event::Ptr &event) { list << event; });
114 bool done = false; 114 bool done = false;
115 emitter->onInitialResultSetComplete([&done](const Sink::ApplicationDomain::Event::Ptr &event) { done = true; }); 115 emitter->onInitialResultSetComplete([&done](const Sink::ApplicationDomain::Event::Ptr &event, bool) { done = true; });
116 emitter->fetch(Sink::ApplicationDomain::Event::Ptr()); 116 emitter->fetch(Sink::ApplicationDomain::Event::Ptr());
117 QTRY_VERIFY(done); 117 QTRY_VERIFY(done);
118 QCOMPARE(list.size(), count); 118 QCOMPARE(list.size(), count);
diff --git a/tests/mailquerybenchmark.cpp b/tests/mailquerybenchmark.cpp
index 0e2f6fa..d3598b2 100644
--- a/tests/mailquerybenchmark.cpp
+++ b/tests/mailquerybenchmark.cpp
@@ -99,7 +99,7 @@ class MailQueryBenchmark : public QObject
99 QList<Mail::Ptr> list; 99 QList<Mail::Ptr> list;
100 emitter->onAdded([&list](const Mail::Ptr &mail) { list << mail; }); 100 emitter->onAdded([&list](const Mail::Ptr &mail) { list << mail; });
101 bool done = false; 101 bool done = false;
102 emitter->onInitialResultSetComplete([&done](const Mail::Ptr &mail) { done = true; }); 102 emitter->onInitialResultSetComplete([&done](const Mail::Ptr &mail, bool) { done = true; });
103 emitter->fetch(Mail::Ptr()); 103 emitter->fetch(Mail::Ptr());
104 QTRY_VERIFY(done); 104 QTRY_VERIFY(done);
105 QCOMPARE(list.size(), query.limit()); 105 QCOMPARE(list.size(), query.limit());