diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-02-15 10:19:08 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-02-15 11:08:52 +0100 |
commit | 531972042d4b610258c8af8a17ec3a99cd063dda (patch) | |
tree | ef4356ec141f0f1ccd756e8610b08553b866bf78 | |
parent | f51963f057bcbdd175114433913a1c5f0eebd546 (diff) | |
download | sink-531972042d4b610258c8af8a17ec3a99cd063dda.tar.gz sink-531972042d4b610258c8af8a17ec3a99cd063dda.zip |
Fixed crashes due to concurrently running queries.
A single QueryRunner should never have multiple workers running at the
same time. We did not properly enforce this in case of incremental
updates coming in.
The only way I managed to reproduce the crash:
* Open a large folder with lots of unread mail in kube
* Select a mail in the maillist and hold the down button
* This will:
* Repeatedly call fetch more
* Trigger lot's of mark as read modifications that result in
notifications.
* Eventually it crashes somewhere in EntityStore, likely because
of concurrent access of the filter structure which is shared through
the state.
We now ensure in the single threaded portion of the code that we only
ever run one worker at a time. If we did receive an update during,
we remember that change and fetch more once we're done.
To be able to call fetch again that portion was also factored out into a
separate function.
-rw-r--r-- | common/datastorequery.cpp | 1 | ||||
-rw-r--r-- | common/datastorequery.h | 2 | ||||
-rw-r--r-- | common/queryrunner.cpp | 153 | ||||
-rw-r--r-- | common/queryrunner.h | 4 | ||||
-rw-r--r-- | common/storage/entitystore.cpp | 1 | ||||
-rw-r--r-- | common/storage/entitystore.h | 1 | ||||
-rw-r--r-- | sinksh/syntax_modules/sink_selftest.cpp | 84 |
7 files changed, 169 insertions, 77 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 3218d1a..50158c7 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -643,6 +643,7 @@ ResultSet DataStoreQuery::execute() | |||
643 | { | 643 | { |
644 | SinkTraceCtx(mLogCtx) << "Executing query"; | 644 | SinkTraceCtx(mLogCtx) << "Executing query"; |
645 | 645 | ||
646 | Q_ASSERT(mCollector); | ||
646 | ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { | 647 | ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { |
647 | if (mCollector->next([this, callback](const ResultSet::Result &result) { | 648 | if (mCollector->next([this, callback](const ResultSet::Result &result) { |
648 | if (result.operation != Sink::Operation_Removal) { | 649 | if (result.operation != Sink::Operation_Removal) { |
diff --git a/common/datastorequery.h b/common/datastorequery.h index cc501e6..8800644 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h | |||
@@ -115,7 +115,7 @@ public: | |||
115 | virtual void updateComplete() { } | 115 | virtual void updateComplete() { } |
116 | 116 | ||
117 | FilterBase::Ptr mSource; | 117 | FilterBase::Ptr mSource; |
118 | DataStoreQuery *mDatastore; | 118 | DataStoreQuery *mDatastore{nullptr}; |
119 | bool mIncremental = false; | 119 | bool mIncremental = false; |
120 | }; | 120 | }; |
121 | 121 | ||
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 928e1e0..0ed4cb5 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -69,79 +69,14 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
69 | if (query.limit() && query.sortProperty().isEmpty()) { | 69 | if (query.limit() && query.sortProperty().isEmpty()) { |
70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; | 70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; |
71 | } | 71 | } |
72 | auto guardPtr = QPointer<QObject>(&guard); | ||
73 | auto fetcher = [=]() { | ||
74 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; | ||
75 | auto resultProvider = mResultProvider; | ||
76 | auto resultTransformation = mResultTransformation; | ||
77 | auto batchSize = mBatchSize; | ||
78 | auto resourceContext = mResourceContext; | ||
79 | auto logCtx = mLogCtx; | ||
80 | auto state = mQueryState; | ||
81 | const bool runAsync = !query.synchronousQuery(); | ||
82 | //The lambda will be executed in a separate thread, so copy all arguments | ||
83 | async::run<ReplayResult>([=]() { | ||
84 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
85 | return worker.executeInitialQuery(query, *resultProvider, batchSize, state); | ||
86 | }, runAsync) | ||
87 | .then([this, query, resultProvider, guardPtr](const ReplayResult &result) { | ||
88 | if (!guardPtr) { | ||
89 | //Not an error, the query can vanish at any time. | ||
90 | return; | ||
91 | } | ||
92 | mInitialQueryComplete = true; | ||
93 | mQueryState = result.queryState; | ||
94 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
95 | if (query.liveQuery()) { | ||
96 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | ||
97 | } | ||
98 | resultProvider->setRevision(result.newRevision); | ||
99 | resultProvider->initialResultSetComplete(result.replayedAll); | ||
100 | }) | ||
101 | .exec(); | ||
102 | }; | ||
103 | |||
104 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. | 72 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. |
105 | mResultProvider->setFetcher(fetcher); | 73 | mResultProvider->setFetcher([this, query, bufferType] { fetch(query, bufferType); }); |
106 | 74 | ||
107 | // In case of a live query we keep the runner for as long alive as the result provider exists | 75 | // In case of a live query we keep the runner for as long alive as the result provider exists |
108 | if (query.liveQuery()) { | 76 | if (query.liveQuery()) { |
109 | Q_ASSERT(!query.synchronousQuery()); | 77 | Q_ASSERT(!query.synchronousQuery()); |
110 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 78 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
111 | setQuery([=]() -> KAsync::Job<void> { | 79 | setQuery([=]() { return incrementalFetch(query, bufferType); }); |
112 | auto resultProvider = mResultProvider; | ||
113 | auto resourceContext = mResourceContext; | ||
114 | auto logCtx = mLogCtx; | ||
115 | auto state = mQueryState; | ||
116 | auto resultTransformation = mResultTransformation; | ||
117 | if (!mInitialQueryComplete) { | ||
118 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | ||
119 | fetcher(); | ||
120 | return KAsync::null(); | ||
121 | } | ||
122 | if (mQueryInProgress) { | ||
123 | //Can happen if the revision come in quicker than we process them. | ||
124 | return KAsync::null(); | ||
125 | } | ||
126 | Q_ASSERT(!mQueryInProgress); | ||
127 | return KAsync::start([&] { | ||
128 | mQueryInProgress = true; | ||
129 | }) | ||
130 | .then(async::run<ReplayResult>([=]() { | ||
131 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
132 | return worker.executeIncrementalQuery(query, *resultProvider, state); | ||
133 | })) | ||
134 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | ||
135 | if (!guardPtr) { | ||
136 | //Not an error, the query can vanish at any time. | ||
137 | return; | ||
138 | } | ||
139 | mQueryInProgress = false; | ||
140 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
141 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | ||
142 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
143 | }); | ||
144 | }); | ||
145 | // Ensure the connection is open, if it wasn't already opened | 80 | // Ensure the connection is open, if it wasn't already opened |
146 | // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | 81 | // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates |
147 | mResourceAccess->open(); | 82 | mResourceAccess->open(); |
@@ -158,6 +93,90 @@ QueryRunner<DomainType>::~QueryRunner() | |||
158 | SinkTraceCtx(mLogCtx) << "Stopped query"; | 93 | SinkTraceCtx(mLogCtx) << "Stopped query"; |
159 | } | 94 | } |
160 | 95 | ||
96 | //This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. | ||
97 | template <class DomainType> | ||
98 | void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) | ||
99 | { | ||
100 | auto guardPtr = QPointer<QObject>(&guard); | ||
101 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; | ||
102 | if (mQueryInProgress) { | ||
103 | SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize; | ||
104 | mRequestFetchMore = true; | ||
105 | return; | ||
106 | } | ||
107 | mQueryInProgress = true; | ||
108 | auto resultProvider = mResultProvider; | ||
109 | auto resultTransformation = mResultTransformation; | ||
110 | auto batchSize = mBatchSize; | ||
111 | auto resourceContext = mResourceContext; | ||
112 | auto logCtx = mLogCtx; | ||
113 | auto state = mQueryState; | ||
114 | const bool runAsync = !query.synchronousQuery(); | ||
115 | //The lambda will be executed in a separate thread, so copy all arguments | ||
116 | async::run<ReplayResult>([=]() { | ||
117 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
118 | return worker.executeInitialQuery(query, *resultProvider, batchSize, state); | ||
119 | }, runAsync) | ||
120 | .then([=](const ReplayResult &result) { | ||
121 | if (!guardPtr) { | ||
122 | //Not an error, the query can vanish at any time. | ||
123 | return; | ||
124 | } | ||
125 | mInitialQueryComplete = true; | ||
126 | mQueryInProgress = false; | ||
127 | mQueryState = result.queryState; | ||
128 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
129 | if (query.liveQuery()) { | ||
130 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | ||
131 | } | ||
132 | resultProvider->setRevision(result.newRevision); | ||
133 | resultProvider->initialResultSetComplete(result.replayedAll); | ||
134 | if (mRequestFetchMore) { | ||
135 | mRequestFetchMore = false; | ||
136 | fetch(query, bufferType); | ||
137 | } | ||
138 | }) | ||
139 | .exec(); | ||
140 | } | ||
141 | |||
142 | template <class DomainType> | ||
143 | KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) | ||
144 | { | ||
145 | if (!mInitialQueryComplete) { | ||
146 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | ||
147 | fetch(query, bufferType); | ||
148 | return KAsync::null(); | ||
149 | } | ||
150 | if (mQueryInProgress) { | ||
151 | //Can happen if the revision come in quicker than we process them. | ||
152 | return KAsync::null(); | ||
153 | } | ||
154 | auto resultProvider = mResultProvider; | ||
155 | auto resourceContext = mResourceContext; | ||
156 | auto logCtx = mLogCtx; | ||
157 | auto state = mQueryState; | ||
158 | auto resultTransformation = mResultTransformation; | ||
159 | Q_ASSERT(!mQueryInProgress); | ||
160 | auto guardPtr = QPointer<QObject>(&guard); | ||
161 | return KAsync::start([&] { | ||
162 | mQueryInProgress = true; | ||
163 | }) | ||
164 | .then(async::run<ReplayResult>([=]() { | ||
165 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
166 | return worker.executeIncrementalQuery(query, *resultProvider, state); | ||
167 | })) | ||
168 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | ||
169 | if (!guardPtr) { | ||
170 | //Not an error, the query can vanish at any time. | ||
171 | return; | ||
172 | } | ||
173 | mQueryInProgress = false; | ||
174 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
175 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | ||
176 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
177 | }); | ||
178 | } | ||
179 | |||
161 | template <class DomainType> | 180 | template <class DomainType> |
162 | void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) | 181 | void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) |
163 | { | 182 | { |
diff --git a/common/queryrunner.h b/common/queryrunner.h index a567b3c..af54619 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h | |||
@@ -92,6 +92,9 @@ public: | |||
92 | typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); | 92 | typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); |
93 | 93 | ||
94 | private: | 94 | private: |
95 | void fetch(const Sink::Query &query, const QByteArray &bufferType); | ||
96 | KAsync::Job<void> incrementalFetch(const Sink::Query &query, const QByteArray &bufferType); | ||
97 | |||
95 | Sink::ResourceContext mResourceContext; | 98 | Sink::ResourceContext mResourceContext; |
96 | QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; | 99 | QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; |
97 | QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider; | 100 | QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider; |
@@ -102,4 +105,5 @@ private: | |||
102 | Sink::Log::Context mLogCtx; | 105 | Sink::Log::Context mLogCtx; |
103 | bool mInitialQueryComplete = false; | 106 | bool mInitialQueryComplete = false; |
104 | bool mQueryInProgress = false; | 107 | bool mQueryInProgress = false; |
108 | bool mRequestFetchMore = false; | ||
105 | }; | 109 | }; |
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 8fbc2ad..020f3fd 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp | |||
@@ -459,6 +459,7 @@ void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property | |||
459 | 459 | ||
460 | void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) | 460 | void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) |
461 | { | 461 | { |
462 | Q_ASSERT(d); | ||
462 | auto db = DataStore::mainDatabase(d->getTransaction(), type); | 463 | auto db = DataStore::mainDatabase(d->getTransaction(), type); |
463 | db.findLatest(uid, | 464 | db.findLatest(uid, |
464 | [=](const QByteArray &key, const QByteArray &value) { | 465 | [=](const QByteArray &key, const QByteArray &value) { |
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index 00241f2..3eb0b7b 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h | |||
@@ -37,6 +37,7 @@ class SINK_EXPORT EntityStore | |||
37 | public: | 37 | public: |
38 | typedef QSharedPointer<EntityStore> Ptr; | 38 | typedef QSharedPointer<EntityStore> Ptr; |
39 | EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &); | 39 | EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &); |
40 | ~EntityStore() = default; | ||
40 | 41 | ||
41 | //Only the pipeline may call the following functions outside of tests | 42 | //Only the pipeline may call the following functions outside of tests |
42 | bool add(const QByteArray &type, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); | 43 | bool add(const QByteArray &type, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); |
diff --git a/sinksh/syntax_modules/sink_selftest.cpp b/sinksh/syntax_modules/sink_selftest.cpp index 21dfbff..8ad4f60 100644 --- a/sinksh/syntax_modules/sink_selftest.cpp +++ b/sinksh/syntax_modules/sink_selftest.cpp | |||
@@ -46,28 +46,94 @@ bool selfTest(const QStringList &args_, State &state) | |||
46 | return false; | 46 | return false; |
47 | } | 47 | } |
48 | 48 | ||
49 | using namespace Sink::ApplicationDomain; | ||
49 | auto options = SyntaxTree::parseOptions(args_); | 50 | auto options = SyntaxTree::parseOptions(args_); |
50 | if (options.positionalArguments.contains("stresstest")) { | 51 | if (options.positionalArguments.contains("stresstest")) { |
51 | auto resource = SinkshUtils::parseUid(options.options.value("resource").first().toUtf8()); | 52 | auto resource = SinkshUtils::parseUid(options.options.value("resource").first().toUtf8()); |
52 | qWarning() << "Stresstest on resource: " << resource; | 53 | qWarning() << "Stresstest on resource: " << resource; |
53 | Sink::Query query; | ||
54 | query.resourceFilter(resource); | ||
55 | query.limit(100); | ||
56 | |||
57 | auto models = QSharedPointer<QList<QSharedPointer<QAbstractItemModel>>>::create(); | 54 | auto models = QSharedPointer<QList<QSharedPointer<QAbstractItemModel>>>::create(); |
58 | for (int i = 0; i < 50; i++) { | 55 | |
56 | //Simulate the maillist, where we scroll down and trigger lots of fetchMore calls | ||
57 | { | ||
58 | Sink::Query query; | ||
59 | query.resourceFilter(resource); | ||
60 | query.limit(100); | ||
61 | query.request<Mail::Subject>(); | ||
62 | query.request<Mail::Sender>(); | ||
63 | query.request<Mail::To>(); | ||
64 | query.request<Mail::Cc>(); | ||
65 | query.request<Mail::Bcc>(); | ||
66 | query.request<Mail::Date>(); | ||
67 | query.request<Mail::Unread>(); | ||
68 | query.request<Mail::Important>(); | ||
69 | query.request<Mail::Draft>(); | ||
70 | query.request<Mail::Sent>(); | ||
71 | query.request<Mail::Trash>(); | ||
72 | query.request<Mail::Folder>(); | ||
73 | query.sort<Mail::Date>(); | ||
74 | query.reduce<Mail::ThreadId>(Sink::Query::Reduce::Selector::max<Mail::Date>()) | ||
75 | .count("count") | ||
76 | .collect<Mail::Unread>("unreadCollected") | ||
77 | .collect<Mail::Important>("importantCollected"); | ||
78 | |||
59 | auto model = Sink::Store::loadModel<Sink::ApplicationDomain::Mail>(query); | 79 | auto model = Sink::Store::loadModel<Sink::ApplicationDomain::Mail>(query); |
60 | *models << model; | 80 | models->append(model); |
61 | QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector<int> &roles) { | 81 | QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector<int> &roles) { |
62 | if (roles.contains(Sink::Store::ChildrenFetchedRole)) { | 82 | if (roles.contains(Sink::Store::ChildrenFetchedRole)) { |
63 | models->removeAll(model); | 83 | if (!model->canFetchMore({})) { |
64 | qWarning() << "Model complete: " << models->count(); | 84 | qWarning() << "Model complete: " << models->count(); |
85 | models->removeAll(model); | ||
86 | } else { | ||
87 | qWarning() << "Fetching more"; | ||
88 | //Simulate superfluous fetchMore calls | ||
89 | for (int i = 0; i < 10; i++) { | ||
90 | model->fetchMore({}); | ||
91 | } | ||
92 | return; | ||
93 | } | ||
65 | if (models->isEmpty()) { | 94 | if (models->isEmpty()) { |
66 | state.commandFinished(); | 95 | state.commandFinished(); |
67 | } | 96 | } |
68 | } | 97 | } |
69 | }); | 98 | }); |
99 | |||
70 | } | 100 | } |
101 | |||
102 | //Simluate lot's of mailviewers doing a bunch of queries | ||
103 | { | ||
104 | Sink::Query query; | ||
105 | query.resourceFilter(resource); | ||
106 | query.limit(10); | ||
107 | query.request<Mail::Subject>(); | ||
108 | query.request<Mail::Sender>(); | ||
109 | query.request<Mail::To>(); | ||
110 | query.request<Mail::Cc>(); | ||
111 | query.request<Mail::Bcc>(); | ||
112 | query.request<Mail::Date>(); | ||
113 | query.request<Mail::Unread>(); | ||
114 | query.request<Mail::Important>(); | ||
115 | query.request<Mail::Draft>(); | ||
116 | query.request<Mail::Sent>(); | ||
117 | query.request<Mail::Trash>(); | ||
118 | query.request<Mail::Folder>(); | ||
119 | query.sort<Sink::ApplicationDomain::Mail::Date>(); | ||
120 | query.bloom<Sink::ApplicationDomain::Mail::ThreadId>(); | ||
121 | |||
122 | for (int i = 0; i < 50; i++) { | ||
123 | auto model = Sink::Store::loadModel<Sink::ApplicationDomain::Mail>(query); | ||
124 | *models << model; | ||
125 | QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector<int> &roles) { | ||
126 | if (roles.contains(Sink::Store::ChildrenFetchedRole)) { | ||
127 | models->removeAll(model); | ||
128 | qWarning() << "Model complete: " << models->count(); | ||
129 | if (models->isEmpty()) { | ||
130 | state.commandFinished(); | ||
131 | } | ||
132 | } | ||
133 | }); | ||
134 | } | ||
135 | } | ||
136 | |||
71 | return true; | 137 | return true; |
72 | } | 138 | } |
73 | return false; | 139 | return false; |