From 1864024012213dc0a17c76e9755bf50a19944ec7 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 20 Dec 2016 16:02:38 +0100 Subject: Report when we don't have any more to fetch. ... so we can use that information in fetchMore. --- common/modelresult.cpp | 10 +++++++-- common/modelresult.h | 1 + common/queryrunner.cpp | 52 +++++++++++++++++++++++++++-------------------- common/resourcefacade.cpp | 2 +- common/resultprovider.h | 27 ++++++++++++------------ common/resultset.cpp | 6 +++--- common/resultset.h | 6 +++++- common/store.cpp | 2 +- common/test.cpp | 2 +- 9 files changed, 64 insertions(+), 44 deletions(-) (limited to 'common') diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 34e6dfc..8e92365 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -167,7 +167,10 @@ template bool ModelResult::canFetchMore(const QModelIndex &parent) const { const auto id = parent.internalId(); - return !mEntityChildrenFetched.contains(id) || !mEntityChildrenFetchComplete.contains(id); + if (mEntityAllChildrenFetched.contains(id)) { + return false; + } + return true; } template @@ -269,11 +272,14 @@ void ModelResult::setEmitter(const typename Sink::ResultEmitter::Pt remove(value); }); }); - emitter->onInitialResultSetComplete([this](const Ptr &parent) { + emitter->onInitialResultSetComplete([this](const Ptr &parent, bool fetchedAll) { SinkTrace() << "Initial result set complete"; const qint64 parentId = parent ? qHash(*parent) : 0; const auto parentIndex = createIndexFromId(parentId); mEntityChildrenFetchComplete.insert(parentId); + if (fetchedAll) { + mEntityAllChildrenFetched.insert(parentId); + } emit dataChanged(parentIndex, parentIndex, QVector() << ChildrenFetchedRole); }); mEmitter = emitter; diff --git a/common/modelresult.h b/common/modelresult.h index 7924e2d..b7fc0ec 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -73,6 +73,7 @@ private: QMap mParents; QSet mEntityChildrenFetched; QSet mEntityChildrenFetchComplete; + QSet mEntityAllChildrenFetched; QList mPropertyColumns; Sink::Query mQuery; std::function loadEntities; diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index d6a90de..377e3b9 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -31,6 +31,12 @@ SINK_DEBUG_AREA("queryrunner") using namespace Sink; using namespace Sink::Storage; +struct ReplayResult { + qint64 newRevision; + qint64 replayedEntities; + bool replayedAll; +}; + /* * This class wraps the actual query implementation. * @@ -47,8 +53,8 @@ public: QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); virtual ~QueryWorker(); - QPair executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); - QPair executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize); + ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); + ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize); private: void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider, const ResultSet::Result &result); @@ -64,7 +70,7 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou { SinkTrace() << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit(); if (query.limit() && query.sortProperty().isEmpty()) { - SinkWarning() << "A limited query without sorting is typically a bad idea."; + SinkWarning() << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; } auto guardPtr = QPointer(&guard); // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. @@ -74,31 +80,33 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou auto resultProvider = mResultProvider; if (query.synchronousQuery()) { QueryWorker worker(query, mResourceContext, bufferType, mResultTransformation); - worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); - resultProvider->initialResultSetComplete(parent); + const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); + mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; + resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); + resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); } else { auto resultTransformation = mResultTransformation; auto offset = mOffset[parentId]; auto batchSize = mBatchSize; auto resourceContext = mResourceContext; - //The lambda will be executed in a separate thread, so we're extra careful - async::run >([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() { + //The lambda will be executed in a separate thread, so copy all arguments + async::run([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() { QueryWorker worker(query, resourceContext, bufferType, resultTransformation); const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); return newRevisionAndReplayedEntities; }) - .template syncThen>([this, parentId, query, parent, resultProvider, guardPtr](const QPair &newRevisionAndReplayedEntities) { + .template syncThen([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { if (!guardPtr) { qWarning() << "The parent object is already gone"; return; } - mOffset[parentId] += newRevisionAndReplayedEntities.second; + mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. if (query.liveQuery()) { - mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); + mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); } - resultProvider->setRevision(newRevisionAndReplayedEntities.first); - resultProvider->initialResultSetComplete(parent); + resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); + resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); }) .exec(); } @@ -111,19 +119,19 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou setQuery([=]() -> KAsync::Job { auto resultProvider = mResultProvider; auto resourceContext = mResourceContext; - return async::run >([=]() { + return async::run([=]() { QueryWorker worker(query, resourceContext, bufferType, mResultTransformation); const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); return newRevisionAndReplayedEntities; }) - .template syncThen >([query, this, resultProvider, guardPtr](const QPair &newRevisionAndReplayedEntities) { + .template syncThen([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { if (!guardPtr) { qWarning() << "The parent object is already gone"; return; } // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. - mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); - resultProvider->setRevision(newRevisionAndReplayedEntities.first); + mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); + resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); }); }); // Ensure the connection is open, if it wasn't already opened @@ -195,7 +203,7 @@ void QueryWorker::resultProviderCallback(const Sink::Query &query, S } template -QPair QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) +ReplayResult QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) { QTime time; time.start(); @@ -205,16 +213,16 @@ QPair QueryWorker::executeIncrementalQuery(const Sin auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName(), entityStore}; auto resultSet = preparedQuery.update(baseRevision); SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); - auto replayedEntities = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { + auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { resultProviderCallback(query, resultProvider, result); }); SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); - return qMakePair(entityStore.maxRevision(), replayedEntities); + return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; } template -QPair QueryWorker::executeInitialQuery( +ReplayResult QueryWorker::executeInitialQuery( const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize) { QTime time; @@ -236,12 +244,12 @@ QPair QueryWorker::executeInitialQuery( auto resultSet = preparedQuery.execute(); SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); - auto replayedEntities = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { + auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { resultProviderCallback(query, resultProvider, result); }); SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); - return qMakePair(entityStore.maxRevision(), replayedEntities); + return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; } template class QueryRunner; diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index ea4218d..861d37a 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp @@ -111,7 +111,7 @@ LocalStorageQueryRunner::LocalStorageQueryRunner(const Query &query, mResultProvider->add(entity); } // TODO initialResultSetComplete should be implicit - mResultProvider->initialResultSetComplete(typename DomainType::Ptr()); + mResultProvider->initialResultSetComplete(typename DomainType::Ptr(), true); mResultProvider->complete(); }); if (query.liveQuery()) { diff --git a/common/resultprovider.h b/common/resultprovider.h index defeb6a..cda4dac 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h @@ -46,7 +46,7 @@ public: virtual void add(const T &value) = 0; virtual void modify(const T &value) = 0; virtual void remove(const T &value) = 0; - virtual void initialResultSetComplete(const T &parent) = 0; + virtual void initialResultSetComplete(const T &parent, bool) = 0; virtual void complete() = 0; virtual void clear() = 0; virtual void setFetcher(const std::function &fetcher) = 0; @@ -100,10 +100,10 @@ public: } } - void initialResultSetComplete(const T &parent) + void initialResultSetComplete(const T &parent, bool replayedAll) { if (auto strongRef = mResultEmitter.toStrongRef()) { - strongRef->initialResultSetComplete(parent); + strongRef->initialResultSetComplete(parent, replayedAll); } } @@ -211,7 +211,7 @@ public: removeHandler = handler; } - void onInitialResultSetComplete(const std::function &handler) + void onInitialResultSetComplete(const std::function &handler) { initialResultSetCompleteHandler = handler; } @@ -241,10 +241,10 @@ public: removeHandler(value); } - void initialResultSetComplete(const DomainType &parent) + void initialResultSetComplete(const DomainType &parent, bool replayedAll) { if (initialResultSetCompleteHandler) { - initialResultSetCompleteHandler(parent); + initialResultSetCompleteHandler(parent, replayedAll); } } @@ -280,7 +280,7 @@ private: std::function addHandler; std::function modifyHandler; std::function removeHandler; - std::function initialResultSetCompleteHandler; + std::function initialResultSetCompleteHandler; std::function completeHandler; std::function clearHandler; @@ -300,30 +300,31 @@ public: emitter->onModified([this](const DomainType &value) { this->modify(value); }); emitter->onRemoved([this](const DomainType &value) { this->remove(value); }); auto ptr = emitter.data(); - emitter->onInitialResultSetComplete([this, ptr](const DomainType &parent) { + emitter->onInitialResultSetComplete([this, ptr](const DomainType &parent, bool replayedAll) { auto hashValue = qHash(parent); mInitialResultSetInProgress.remove(hashValue, ptr); - callInitialResultCompleteIfDone(parent); + callInitialResultCompleteIfDone(parent, replayedAll); }); emitter->onComplete([this]() { this->complete(); }); emitter->onClear([this]() { this->clear(); }); mEmitter << emitter; } - void callInitialResultCompleteIfDone(const DomainType &parent) + void callInitialResultCompleteIfDone(const DomainType &parent, bool replayedAll) { auto hashValue = qHash(parent); // Normally a parent is only in a single resource, except the toplevel (invalid) parent if (!mInitialResultSetInProgress.contains(hashValue) && mAllResultsFetched && !mResultEmitted) { mResultEmitted = true; - this->initialResultSetComplete(parent); + //FIXME set replayed all only to true if all had set it to true + this->initialResultSetComplete(parent, true); } } void fetch(const DomainType &parent) Q_DECL_OVERRIDE { if (mEmitter.isEmpty()) { - this->initialResultSetComplete(parent); + this->initialResultSetComplete(parent, true); } else { mResultEmitted = false; mAllResultsFetched = false; @@ -332,7 +333,7 @@ public: emitter->fetch(parent); } mAllResultsFetched = true; - callInitialResultCompleteIfDone(parent); + callInitialResultCompleteIfDone(parent, true); } } diff --git a/common/resultset.cpp b/common/resultset.cpp index 9883f44..b82b14d 100644 --- a/common/resultset.cpp +++ b/common/resultset.cpp @@ -98,7 +98,7 @@ void ResultSet::skip(int number) } } -qint64 ResultSet::replaySet(int offset, int batchSize, const Callback &callback) +ResultSet::ReplayResult ResultSet::replaySet(int offset, int batchSize, const Callback &callback) { skip(offset); int counter = 0; @@ -108,10 +108,10 @@ qint64 ResultSet::replaySet(int offset, int batchSize, const Callback &callback) callback(result); }); if (!ret) { - break; + return {counter, true}; } }; - return counter; + return {counter, false}; } QByteArray ResultSet::id() diff --git a/common/resultset.h b/common/resultset.h index 4f2c278..db7d1e0 100644 --- a/common/resultset.h +++ b/common/resultset.h @@ -55,7 +55,11 @@ public: void skip(int number); - qint64 replaySet(int offset, int batchSize, const Callback &callback); + struct ReplayResult { + qint64 replayedEntities; + bool replayedAll; + }; + ReplayResult replaySet(int offset, int batchSize, const Callback &callback); QByteArray id(); diff --git a/common/store.cpp b/common/store.cpp index dd00bfe..a70be05 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -162,7 +162,7 @@ QSharedPointer Store::loadModel(Query query) }); emitter->onRemoved([](const ApplicationDomain::SinkResource::Ptr &) { }); - emitter->onInitialResultSetComplete([](const ApplicationDomain::SinkResource::Ptr &) { + emitter->onInitialResultSetComplete([](const ApplicationDomain::SinkResource::Ptr &, bool) { }); emitter->onComplete([query, aggregatingEmitter]() { SinkTrace() << "Resource query complete"; diff --git a/common/test.cpp b/common/test.cpp index dc63afc..71bb972 100644 --- a/common/test.cpp +++ b/common/test.cpp @@ -166,7 +166,7 @@ public: resultProvider->add(res.template staticCast()); } } - resultProvider->initialResultSetComplete(parent); + resultProvider->initialResultSetComplete(parent, true); }); auto job = KAsync::syncStart([query, resultProvider]() {}); return qMakePair(job, emitter); -- cgit v1.2.3