diff options
-rw-r--r-- | common/datastorequery.cpp | 1 | ||||
-rw-r--r-- | common/datastorequery.h | 2 | ||||
-rw-r--r-- | common/queryrunner.cpp | 153 | ||||
-rw-r--r-- | common/queryrunner.h | 4 | ||||
-rw-r--r-- | common/storage/entitystore.cpp | 1 | ||||
-rw-r--r-- | common/storage/entitystore.h | 1 | ||||
-rw-r--r-- | sinksh/syntax_modules/sink_selftest.cpp | 84 |
7 files changed, 169 insertions, 77 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 3218d1a..50158c7 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -643,6 +643,7 @@ ResultSet DataStoreQuery::execute() | |||
643 | { | 643 | { |
644 | SinkTraceCtx(mLogCtx) << "Executing query"; | 644 | SinkTraceCtx(mLogCtx) << "Executing query"; |
645 | 645 | ||
646 | Q_ASSERT(mCollector); | ||
646 | ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { | 647 | ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { |
647 | if (mCollector->next([this, callback](const ResultSet::Result &result) { | 648 | if (mCollector->next([this, callback](const ResultSet::Result &result) { |
648 | if (result.operation != Sink::Operation_Removal) { | 649 | if (result.operation != Sink::Operation_Removal) { |
diff --git a/common/datastorequery.h b/common/datastorequery.h index cc501e6..8800644 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h | |||
@@ -115,7 +115,7 @@ public: | |||
115 | virtual void updateComplete() { } | 115 | virtual void updateComplete() { } |
116 | 116 | ||
117 | FilterBase::Ptr mSource; | 117 | FilterBase::Ptr mSource; |
118 | DataStoreQuery *mDatastore; | 118 | DataStoreQuery *mDatastore{nullptr}; |
119 | bool mIncremental = false; | 119 | bool mIncremental = false; |
120 | }; | 120 | }; |
121 | 121 | ||
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 928e1e0..0ed4cb5 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -69,79 +69,14 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
69 | if (query.limit() && query.sortProperty().isEmpty()) { | 69 | if (query.limit() && query.sortProperty().isEmpty()) { |
70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; | 70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; |
71 | } | 71 | } |
72 | auto guardPtr = QPointer<QObject>(&guard); | ||
73 | auto fetcher = [=]() { | ||
74 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; | ||
75 | auto resultProvider = mResultProvider; | ||
76 | auto resultTransformation = mResultTransformation; | ||
77 | auto batchSize = mBatchSize; | ||
78 | auto resourceContext = mResourceContext; | ||
79 | auto logCtx = mLogCtx; | ||
80 | auto state = mQueryState; | ||
81 | const bool runAsync = !query.synchronousQuery(); | ||
82 | //The lambda will be executed in a separate thread, so copy all arguments | ||
83 | async::run<ReplayResult>([=]() { | ||
84 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
85 | return worker.executeInitialQuery(query, *resultProvider, batchSize, state); | ||
86 | }, runAsync) | ||
87 | .then([this, query, resultProvider, guardPtr](const ReplayResult &result) { | ||
88 | if (!guardPtr) { | ||
89 | //Not an error, the query can vanish at any time. | ||
90 | return; | ||
91 | } | ||
92 | mInitialQueryComplete = true; | ||
93 | mQueryState = result.queryState; | ||
94 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
95 | if (query.liveQuery()) { | ||
96 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | ||
97 | } | ||
98 | resultProvider->setRevision(result.newRevision); | ||
99 | resultProvider->initialResultSetComplete(result.replayedAll); | ||
100 | }) | ||
101 | .exec(); | ||
102 | }; | ||
103 | |||
104 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. | 72 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. |
105 | mResultProvider->setFetcher(fetcher); | 73 | mResultProvider->setFetcher([this, query, bufferType] { fetch(query, bufferType); }); |
106 | 74 | ||
107 | // In case of a live query we keep the runner for as long alive as the result provider exists | 75 | // In case of a live query we keep the runner for as long alive as the result provider exists |
108 | if (query.liveQuery()) { | 76 | if (query.liveQuery()) { |
109 | Q_ASSERT(!query.synchronousQuery()); | 77 | Q_ASSERT(!query.synchronousQuery()); |
110 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 78 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
111 | setQuery([=]() -> KAsync::Job<void> { | 79 | setQuery([=]() { return incrementalFetch(query, bufferType); }); |
112 | auto resultProvider = mResultProvider; | ||
113 | auto resourceContext = mResourceContext; | ||
114 | auto logCtx = mLogCtx; | ||
115 | auto state = mQueryState; | ||
116 | auto resultTransformation = mResultTransformation; | ||
117 | if (!mInitialQueryComplete) { | ||
118 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | ||
119 | fetcher(); | ||
120 | return KAsync::null(); | ||
121 | } | ||
122 | if (mQueryInProgress) { | ||
123 | //Can happen if the revision come in quicker than we process them. | ||
124 | return KAsync::null(); | ||
125 | } | ||
126 | Q_ASSERT(!mQueryInProgress); | ||
127 | return KAsync::start([&] { | ||
128 | mQueryInProgress = true; | ||
129 | }) | ||
130 | .then(async::run<ReplayResult>([=]() { | ||
131 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
132 | return worker.executeIncrementalQuery(query, *resultProvider, state); | ||
133 | })) | ||
134 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | ||
135 | if (!guardPtr) { | ||
136 | //Not an error, the query can vanish at any time. | ||
137 | return; | ||
138 | } | ||
139 | mQueryInProgress = false; | ||
140 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
141 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | ||
142 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
143 | }); | ||
144 | }); | ||
145 | // Ensure the connection is open, if it wasn't already opened | 80 | // Ensure the connection is open, if it wasn't already opened |
146 | // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | 81 | // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates |
147 | mResourceAccess->open(); | 82 | mResourceAccess->open(); |
@@ -158,6 +93,90 @@ QueryRunner<DomainType>::~QueryRunner() | |||
158 | SinkTraceCtx(mLogCtx) << "Stopped query"; | 93 | SinkTraceCtx(mLogCtx) << "Stopped query"; |
159 | } | 94 | } |
160 | 95 | ||
96 | //This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. | ||
97 | template <class DomainType> | ||
98 | void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) | ||
99 | { | ||
100 | auto guardPtr = QPointer<QObject>(&guard); | ||
101 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; | ||
102 | if (mQueryInProgress) { | ||
103 | SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize; | ||
104 | mRequestFetchMore = true; | ||
105 | return; | ||
106 | } | ||
107 | mQueryInProgress = true; | ||
108 | auto resultProvider = mResultProvider; | ||
109 | auto resultTransformation = mResultTransformation; | ||
110 | auto batchSize = mBatchSize; | ||
111 | auto resourceContext = mResourceContext; | ||
112 | auto logCtx = mLogCtx; | ||
113 | auto state = mQueryState; | ||
114 | const bool runAsync = !query.synchronousQuery(); | ||
115 | //The lambda will be executed in a separate thread, so copy all arguments | ||
116 | async::run<ReplayResult>([=]() { | ||
117 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
118 | return worker.executeInitialQuery(query, *resultProvider, batchSize, state); | ||
119 | }, runAsync) | ||
120 | .then([=](const ReplayResult &result) { | ||
121 | if (!guardPtr) { | ||
122 | //Not an error, the query can vanish at any time. | ||
123 | return; | ||
124 | } | ||
125 | mInitialQueryComplete = true; | ||
126 | mQueryInProgress = false; | ||
127 | mQueryState = result.queryState; | ||
128 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
129 | if (query.liveQuery()) { | ||
130 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | ||
131 | } | ||
132 | resultProvider->setRevision(result.newRevision); | ||
133 | resultProvider->initialResultSetComplete(result.replayedAll); | ||
134 | if (mRequestFetchMore) { | ||
135 | mRequestFetchMore = false; | ||
136 | fetch(query, bufferType); | ||
137 | } | ||
138 | }) | ||
139 | .exec(); | ||
140 | } | ||
141 | |||
142 | template <class DomainType> | ||
143 | KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) | ||
144 | { | ||
145 | if (!mInitialQueryComplete) { | ||
146 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | ||
147 | fetch(query, bufferType); | ||
148 | return KAsync::null(); | ||
149 | } | ||
150 | if (mQueryInProgress) { | ||
151 | //Can happen if the revision come in quicker than we process them. | ||
152 | return KAsync::null(); | ||
153 | } | ||
154 | auto resultProvider = mResultProvider; | ||
155 | auto resourceContext = mResourceContext; | ||
156 | auto logCtx = mLogCtx; | ||
157 | auto state = mQueryState; | ||
158 | auto resultTransformation = mResultTransformation; | ||
159 | Q_ASSERT(!mQueryInProgress); | ||
160 | auto guardPtr = QPointer<QObject>(&guard); | ||
161 | return KAsync::start([&] { | ||
162 | mQueryInProgress = true; | ||
163 | }) | ||
164 | .then(async::run<ReplayResult>([=]() { | ||
165 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
166 | return worker.executeIncrementalQuery(query, *resultProvider, state); | ||
167 | })) | ||
168 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | ||
169 | if (!guardPtr) { | ||
170 | //Not an error, the query can vanish at any time. | ||
171 | return; | ||
172 | } | ||
173 | mQueryInProgress = false; | ||
174 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
175 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | ||
176 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
177 | }); | ||
178 | } | ||
179 | |||
161 | template <class DomainType> | 180 | template <class DomainType> |
162 | void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) | 181 | void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) |
163 | { | 182 | { |
diff --git a/common/queryrunner.h b/common/queryrunner.h index a567b3c..af54619 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h | |||
@@ -92,6 +92,9 @@ public: | |||
92 | typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); | 92 | typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); |
93 | 93 | ||
94 | private: | 94 | private: |
95 | void fetch(const Sink::Query &query, const QByteArray &bufferType); | ||
96 | KAsync::Job<void> incrementalFetch(const Sink::Query &query, const QByteArray &bufferType); | ||
97 | |||
95 | Sink::ResourceContext mResourceContext; | 98 | Sink::ResourceContext mResourceContext; |
96 | QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; | 99 | QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; |
97 | QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider; | 100 | QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider; |
@@ -102,4 +105,5 @@ private: | |||
102 | Sink::Log::Context mLogCtx; | 105 | Sink::Log::Context mLogCtx; |
103 | bool mInitialQueryComplete = false; | 106 | bool mInitialQueryComplete = false; |
104 | bool mQueryInProgress = false; | 107 | bool mQueryInProgress = false; |
108 | bool mRequestFetchMore = false; | ||
105 | }; | 109 | }; |
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 8fbc2ad..020f3fd 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp | |||
@@ -459,6 +459,7 @@ void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property | |||
459 | 459 | ||
460 | void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) | 460 | void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) |
461 | { | 461 | { |
462 | Q_ASSERT(d); | ||
462 | auto db = DataStore::mainDatabase(d->getTransaction(), type); | 463 | auto db = DataStore::mainDatabase(d->getTransaction(), type); |
463 | db.findLatest(uid, | 464 | db.findLatest(uid, |
464 | [=](const QByteArray &key, const QByteArray &value) { | 465 | [=](const QByteArray &key, const QByteArray &value) { |
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index 00241f2..3eb0b7b 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h | |||
@@ -37,6 +37,7 @@ class SINK_EXPORT EntityStore | |||
37 | public: | 37 | public: |
38 | typedef QSharedPointer<EntityStore> Ptr; | 38 | typedef QSharedPointer<EntityStore> Ptr; |
39 | EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &); | 39 | EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &); |
40 | ~EntityStore() = default; | ||
40 | 41 | ||
41 | //Only the pipeline may call the following functions outside of tests | 42 | //Only the pipeline may call the following functions outside of tests |
42 | bool add(const QByteArray &type, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); | 43 | bool add(const QByteArray &type, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); |
diff --git a/sinksh/syntax_modules/sink_selftest.cpp b/sinksh/syntax_modules/sink_selftest.cpp index 21dfbff..8ad4f60 100644 --- a/sinksh/syntax_modules/sink_selftest.cpp +++ b/sinksh/syntax_modules/sink_selftest.cpp | |||
@@ -46,28 +46,94 @@ bool selfTest(const QStringList &args_, State &state) | |||
46 | return false; | 46 | return false; |
47 | } | 47 | } |
48 | 48 | ||
49 | using namespace Sink::ApplicationDomain; | ||
49 | auto options = SyntaxTree::parseOptions(args_); | 50 | auto options = SyntaxTree::parseOptions(args_); |
50 | if (options.positionalArguments.contains("stresstest")) { | 51 | if (options.positionalArguments.contains("stresstest")) { |
51 | auto resource = SinkshUtils::parseUid(options.options.value("resource").first().toUtf8()); | 52 | auto resource = SinkshUtils::parseUid(options.options.value("resource").first().toUtf8()); |
52 | qWarning() << "Stresstest on resource: " << resource; | 53 | qWarning() << "Stresstest on resource: " << resource; |
53 | Sink::Query query; | ||
54 | query.resourceFilter(resource); | ||
55 | query.limit(100); | ||
56 | |||
57 | auto models = QSharedPointer<QList<QSharedPointer<QAbstractItemModel>>>::create(); | 54 | auto models = QSharedPointer<QList<QSharedPointer<QAbstractItemModel>>>::create(); |
58 | for (int i = 0; i < 50; i++) { | 55 | |
56 | //Simulate the maillist, where we scroll down and trigger lots of fetchMore calls | ||
57 | { | ||
58 | Sink::Query query; | ||
59 | query.resourceFilter(resource); | ||
60 | query.limit(100); | ||
61 | query.request<Mail::Subject>(); | ||
62 | query.request<Mail::Sender>(); | ||
63 | query.request<Mail::To>(); | ||
64 | query.request<Mail::Cc>(); | ||
65 | query.request<Mail::Bcc>(); | ||
66 | query.request<Mail::Date>(); | ||
67 | query.request<Mail::Unread>(); | ||
68 | query.request<Mail::Important>(); | ||
69 | query.request<Mail::Draft>(); | ||
70 | query.request<Mail::Sent>(); | ||
71 | query.request<Mail::Trash>(); | ||
72 | query.request<Mail::Folder>(); | ||
73 | query.sort<Mail::Date>(); | ||
74 | query.reduce<Mail::ThreadId>(Sink::Query::Reduce::Selector::max<Mail::Date>()) | ||
75 | .count("count") | ||
76 | .collect<Mail::Unread>("unreadCollected") | ||
77 | .collect<Mail::Important>("importantCollected"); | ||
78 | |||
59 | auto model = Sink::Store::loadModel<Sink::ApplicationDomain::Mail>(query); | 79 | auto model = Sink::Store::loadModel<Sink::ApplicationDomain::Mail>(query); |
60 | *models << model; | 80 | models->append(model); |
61 | QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector<int> &roles) { | 81 | QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector<int> &roles) { |
62 | if (roles.contains(Sink::Store::ChildrenFetchedRole)) { | 82 | if (roles.contains(Sink::Store::ChildrenFetchedRole)) { |
63 | models->removeAll(model); | 83 | if (!model->canFetchMore({})) { |
64 | qWarning() << "Model complete: " << models->count(); | 84 | qWarning() << "Model complete: " << models->count(); |
85 | models->removeAll(model); | ||
86 | } else { | ||
87 | qWarning() << "Fetching more"; | ||
88 | //Simulate superfluous fetchMore calls | ||
89 | for (int i = 0; i < 10; i++) { | ||
90 | model->fetchMore({}); | ||
91 | } | ||
92 | return; | ||
93 | } | ||
65 | if (models->isEmpty()) { | 94 | if (models->isEmpty()) { |
66 | state.commandFinished(); | 95 | state.commandFinished(); |
67 | } | 96 | } |
68 | } | 97 | } |
69 | }); | 98 | }); |
99 | |||
70 | } | 100 | } |
101 | |||
102 | //Simluate lot's of mailviewers doing a bunch of queries | ||
103 | { | ||
104 | Sink::Query query; | ||
105 | query.resourceFilter(resource); | ||
106 | query.limit(10); | ||
107 | query.request<Mail::Subject>(); | ||
108 | query.request<Mail::Sender>(); | ||
109 | query.request<Mail::To>(); | ||
110 | query.request<Mail::Cc>(); | ||
111 | query.request<Mail::Bcc>(); | ||
112 | query.request<Mail::Date>(); | ||
113 | query.request<Mail::Unread>(); | ||
114 | query.request<Mail::Important>(); | ||
115 | query.request<Mail::Draft>(); | ||
116 | query.request<Mail::Sent>(); | ||
117 | query.request<Mail::Trash>(); | ||
118 | query.request<Mail::Folder>(); | ||
119 | query.sort<Sink::ApplicationDomain::Mail::Date>(); | ||
120 | query.bloom<Sink::ApplicationDomain::Mail::ThreadId>(); | ||
121 | |||
122 | for (int i = 0; i < 50; i++) { | ||
123 | auto model = Sink::Store::loadModel<Sink::ApplicationDomain::Mail>(query); | ||
124 | *models << model; | ||
125 | QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector<int> &roles) { | ||
126 | if (roles.contains(Sink::Store::ChildrenFetchedRole)) { | ||
127 | models->removeAll(model); | ||
128 | qWarning() << "Model complete: " << models->count(); | ||
129 | if (models->isEmpty()) { | ||
130 | state.commandFinished(); | ||
131 | } | ||
132 | } | ||
133 | }); | ||
134 | } | ||
135 | } | ||
136 | |||
71 | return true; | 137 | return true; |
72 | } | 138 | } |
73 | return false; | 139 | return false; |