summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2018-02-15 10:19:08 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2018-02-15 11:08:52 +0100
commit531972042d4b610258c8af8a17ec3a99cd063dda (patch)
treeef4356ec141f0f1ccd756e8610b08553b866bf78
parentf51963f057bcbdd175114433913a1c5f0eebd546 (diff)
downloadsink-531972042d4b610258c8af8a17ec3a99cd063dda.tar.gz
sink-531972042d4b610258c8af8a17ec3a99cd063dda.zip
Fixed crashes due to concurrently running queries.
A single QueryRunner should never have multiple workers running at the same time. We did not properly enforce this in case of incremental updates coming in. The only way I managed to reproduce the crash: * Open a large folder with lots of unread mail in kube * Select a mail in the maillist and hold the down button * This will: * Repeatedly call fetch more * Trigger lot's of mark as read modifications that result in notifications. * Eventually it crashes somewhere in EntityStore, likely because of concurrent access of the filter structure which is shared through the state. We now ensure in the single threaded portion of the code that we only ever run one worker at a time. If we did receive an update during, we remember that change and fetch more once we're done. To be able to call fetch again that portion was also factored out into a separate function.
-rw-r--r--common/datastorequery.cpp1
-rw-r--r--common/datastorequery.h2
-rw-r--r--common/queryrunner.cpp153
-rw-r--r--common/queryrunner.h4
-rw-r--r--common/storage/entitystore.cpp1
-rw-r--r--common/storage/entitystore.h1
-rw-r--r--sinksh/syntax_modules/sink_selftest.cpp84
7 files changed, 169 insertions, 77 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 3218d1a..50158c7 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -643,6 +643,7 @@ ResultSet DataStoreQuery::execute()
643{ 643{
644 SinkTraceCtx(mLogCtx) << "Executing query"; 644 SinkTraceCtx(mLogCtx) << "Executing query";
645 645
646 Q_ASSERT(mCollector);
646 ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { 647 ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool {
647 if (mCollector->next([this, callback](const ResultSet::Result &result) { 648 if (mCollector->next([this, callback](const ResultSet::Result &result) {
648 if (result.operation != Sink::Operation_Removal) { 649 if (result.operation != Sink::Operation_Removal) {
diff --git a/common/datastorequery.h b/common/datastorequery.h
index cc501e6..8800644 100644
--- a/common/datastorequery.h
+++ b/common/datastorequery.h
@@ -115,7 +115,7 @@ public:
115 virtual void updateComplete() { } 115 virtual void updateComplete() { }
116 116
117 FilterBase::Ptr mSource; 117 FilterBase::Ptr mSource;
118 DataStoreQuery *mDatastore; 118 DataStoreQuery *mDatastore{nullptr};
119 bool mIncremental = false; 119 bool mIncremental = false;
120}; 120};
121 121
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 928e1e0..0ed4cb5 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -69,79 +69,14 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
69 if (query.limit() && query.sortProperty().isEmpty()) { 69 if (query.limit() && query.sortProperty().isEmpty()) {
70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; 70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get.";
71 } 71 }
72 auto guardPtr = QPointer<QObject>(&guard);
73 auto fetcher = [=]() {
74 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
75 auto resultProvider = mResultProvider;
76 auto resultTransformation = mResultTransformation;
77 auto batchSize = mBatchSize;
78 auto resourceContext = mResourceContext;
79 auto logCtx = mLogCtx;
80 auto state = mQueryState;
81 const bool runAsync = !query.synchronousQuery();
82 //The lambda will be executed in a separate thread, so copy all arguments
83 async::run<ReplayResult>([=]() {
84 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
85 return worker.executeInitialQuery(query, *resultProvider, batchSize, state);
86 }, runAsync)
87 .then([this, query, resultProvider, guardPtr](const ReplayResult &result) {
88 if (!guardPtr) {
89 //Not an error, the query can vanish at any time.
90 return;
91 }
92 mInitialQueryComplete = true;
93 mQueryState = result.queryState;
94 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
95 if (query.liveQuery()) {
96 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
97 }
98 resultProvider->setRevision(result.newRevision);
99 resultProvider->initialResultSetComplete(result.replayedAll);
100 })
101 .exec();
102 };
103
104 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 72 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
105 mResultProvider->setFetcher(fetcher); 73 mResultProvider->setFetcher([this, query, bufferType] { fetch(query, bufferType); });
106 74
107 // In case of a live query we keep the runner for as long alive as the result provider exists 75 // In case of a live query we keep the runner for as long alive as the result provider exists
108 if (query.liveQuery()) { 76 if (query.liveQuery()) {
109 Q_ASSERT(!query.synchronousQuery()); 77 Q_ASSERT(!query.synchronousQuery());
110 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 78 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
111 setQuery([=]() -> KAsync::Job<void> { 79 setQuery([=]() { return incrementalFetch(query, bufferType); });
112 auto resultProvider = mResultProvider;
113 auto resourceContext = mResourceContext;
114 auto logCtx = mLogCtx;
115 auto state = mQueryState;
116 auto resultTransformation = mResultTransformation;
117 if (!mInitialQueryComplete) {
118 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
119 fetcher();
120 return KAsync::null();
121 }
122 if (mQueryInProgress) {
123 //Can happen if the revision come in quicker than we process them.
124 return KAsync::null();
125 }
126 Q_ASSERT(!mQueryInProgress);
127 return KAsync::start([&] {
128 mQueryInProgress = true;
129 })
130 .then(async::run<ReplayResult>([=]() {
131 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
132 return worker.executeIncrementalQuery(query, *resultProvider, state);
133 }))
134 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
135 if (!guardPtr) {
136 //Not an error, the query can vanish at any time.
137 return;
138 }
139 mQueryInProgress = false;
140 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
141 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
142 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
143 });
144 });
145 // Ensure the connection is open, if it wasn't already opened 80 // Ensure the connection is open, if it wasn't already opened
146 // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates 81 // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
147 mResourceAccess->open(); 82 mResourceAccess->open();
@@ -158,6 +93,90 @@ QueryRunner<DomainType>::~QueryRunner()
158 SinkTraceCtx(mLogCtx) << "Stopped query"; 93 SinkTraceCtx(mLogCtx) << "Stopped query";
159} 94}
160 95
96//This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize.
97template <class DomainType>
98void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType)
99{
100 auto guardPtr = QPointer<QObject>(&guard);
101 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
102 if (mQueryInProgress) {
103 SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize;
104 mRequestFetchMore = true;
105 return;
106 }
107 mQueryInProgress = true;
108 auto resultProvider = mResultProvider;
109 auto resultTransformation = mResultTransformation;
110 auto batchSize = mBatchSize;
111 auto resourceContext = mResourceContext;
112 auto logCtx = mLogCtx;
113 auto state = mQueryState;
114 const bool runAsync = !query.synchronousQuery();
115 //The lambda will be executed in a separate thread, so copy all arguments
116 async::run<ReplayResult>([=]() {
117 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
118 return worker.executeInitialQuery(query, *resultProvider, batchSize, state);
119 }, runAsync)
120 .then([=](const ReplayResult &result) {
121 if (!guardPtr) {
122 //Not an error, the query can vanish at any time.
123 return;
124 }
125 mInitialQueryComplete = true;
126 mQueryInProgress = false;
127 mQueryState = result.queryState;
128 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
129 if (query.liveQuery()) {
130 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
131 }
132 resultProvider->setRevision(result.newRevision);
133 resultProvider->initialResultSetComplete(result.replayedAll);
134 if (mRequestFetchMore) {
135 mRequestFetchMore = false;
136 fetch(query, bufferType);
137 }
138 })
139 .exec();
140}
141
142template <class DomainType>
143KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType)
144{
145 if (!mInitialQueryComplete) {
146 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
147 fetch(query, bufferType);
148 return KAsync::null();
149 }
150 if (mQueryInProgress) {
151 //Can happen if the revision come in quicker than we process them.
152 return KAsync::null();
153 }
154 auto resultProvider = mResultProvider;
155 auto resourceContext = mResourceContext;
156 auto logCtx = mLogCtx;
157 auto state = mQueryState;
158 auto resultTransformation = mResultTransformation;
159 Q_ASSERT(!mQueryInProgress);
160 auto guardPtr = QPointer<QObject>(&guard);
161 return KAsync::start([&] {
162 mQueryInProgress = true;
163 })
164 .then(async::run<ReplayResult>([=]() {
165 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
166 return worker.executeIncrementalQuery(query, *resultProvider, state);
167 }))
168 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
169 if (!guardPtr) {
170 //Not an error, the query can vanish at any time.
171 return;
172 }
173 mQueryInProgress = false;
174 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
175 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
176 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
177 });
178}
179
161template <class DomainType> 180template <class DomainType>
162void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) 181void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation)
163{ 182{
diff --git a/common/queryrunner.h b/common/queryrunner.h
index a567b3c..af54619 100644
--- a/common/queryrunner.h
+++ b/common/queryrunner.h
@@ -92,6 +92,9 @@ public:
92 typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); 92 typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter();
93 93
94private: 94private:
95 void fetch(const Sink::Query &query, const QByteArray &bufferType);
96 KAsync::Job<void> incrementalFetch(const Sink::Query &query, const QByteArray &bufferType);
97
95 Sink::ResourceContext mResourceContext; 98 Sink::ResourceContext mResourceContext;
96 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; 99 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess;
97 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider; 100 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider;
@@ -102,4 +105,5 @@ private:
102 Sink::Log::Context mLogCtx; 105 Sink::Log::Context mLogCtx;
103 bool mInitialQueryComplete = false; 106 bool mInitialQueryComplete = false;
104 bool mQueryInProgress = false; 107 bool mQueryInProgress = false;
108 bool mRequestFetchMore = false;
105}; 109};
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index 8fbc2ad..020f3fd 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -459,6 +459,7 @@ void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property
459 459
460void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) 460void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback)
461{ 461{
462 Q_ASSERT(d);
462 auto db = DataStore::mainDatabase(d->getTransaction(), type); 463 auto db = DataStore::mainDatabase(d->getTransaction(), type);
463 db.findLatest(uid, 464 db.findLatest(uid,
464 [=](const QByteArray &key, const QByteArray &value) { 465 [=](const QByteArray &key, const QByteArray &value) {
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h
index 00241f2..3eb0b7b 100644
--- a/common/storage/entitystore.h
+++ b/common/storage/entitystore.h
@@ -37,6 +37,7 @@ class SINK_EXPORT EntityStore
37public: 37public:
38 typedef QSharedPointer<EntityStore> Ptr; 38 typedef QSharedPointer<EntityStore> Ptr;
39 EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &); 39 EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &);
40 ~EntityStore() = default;
40 41
41 //Only the pipeline may call the following functions outside of tests 42 //Only the pipeline may call the following functions outside of tests
42 bool add(const QByteArray &type, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); 43 bool add(const QByteArray &type, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource);
diff --git a/sinksh/syntax_modules/sink_selftest.cpp b/sinksh/syntax_modules/sink_selftest.cpp
index 21dfbff..8ad4f60 100644
--- a/sinksh/syntax_modules/sink_selftest.cpp
+++ b/sinksh/syntax_modules/sink_selftest.cpp
@@ -46,28 +46,94 @@ bool selfTest(const QStringList &args_, State &state)
46 return false; 46 return false;
47 } 47 }
48 48
49 using namespace Sink::ApplicationDomain;
49 auto options = SyntaxTree::parseOptions(args_); 50 auto options = SyntaxTree::parseOptions(args_);
50 if (options.positionalArguments.contains("stresstest")) { 51 if (options.positionalArguments.contains("stresstest")) {
51 auto resource = SinkshUtils::parseUid(options.options.value("resource").first().toUtf8()); 52 auto resource = SinkshUtils::parseUid(options.options.value("resource").first().toUtf8());
52 qWarning() << "Stresstest on resource: " << resource; 53 qWarning() << "Stresstest on resource: " << resource;
53 Sink::Query query;
54 query.resourceFilter(resource);
55 query.limit(100);
56
57 auto models = QSharedPointer<QList<QSharedPointer<QAbstractItemModel>>>::create(); 54 auto models = QSharedPointer<QList<QSharedPointer<QAbstractItemModel>>>::create();
58 for (int i = 0; i < 50; i++) { 55
56 //Simulate the maillist, where we scroll down and trigger lots of fetchMore calls
57 {
58 Sink::Query query;
59 query.resourceFilter(resource);
60 query.limit(100);
61 query.request<Mail::Subject>();
62 query.request<Mail::Sender>();
63 query.request<Mail::To>();
64 query.request<Mail::Cc>();
65 query.request<Mail::Bcc>();
66 query.request<Mail::Date>();
67 query.request<Mail::Unread>();
68 query.request<Mail::Important>();
69 query.request<Mail::Draft>();
70 query.request<Mail::Sent>();
71 query.request<Mail::Trash>();
72 query.request<Mail::Folder>();
73 query.sort<Mail::Date>();
74 query.reduce<Mail::ThreadId>(Sink::Query::Reduce::Selector::max<Mail::Date>())
75 .count("count")
76 .collect<Mail::Unread>("unreadCollected")
77 .collect<Mail::Important>("importantCollected");
78
59 auto model = Sink::Store::loadModel<Sink::ApplicationDomain::Mail>(query); 79 auto model = Sink::Store::loadModel<Sink::ApplicationDomain::Mail>(query);
60 *models << model; 80 models->append(model);
61 QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector<int> &roles) { 81 QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector<int> &roles) {
62 if (roles.contains(Sink::Store::ChildrenFetchedRole)) { 82 if (roles.contains(Sink::Store::ChildrenFetchedRole)) {
63 models->removeAll(model); 83 if (!model->canFetchMore({})) {
64 qWarning() << "Model complete: " << models->count(); 84 qWarning() << "Model complete: " << models->count();
85 models->removeAll(model);
86 } else {
87 qWarning() << "Fetching more";
88 //Simulate superfluous fetchMore calls
89 for (int i = 0; i < 10; i++) {
90 model->fetchMore({});
91 }
92 return;
93 }
65 if (models->isEmpty()) { 94 if (models->isEmpty()) {
66 state.commandFinished(); 95 state.commandFinished();
67 } 96 }
68 } 97 }
69 }); 98 });
99
70 } 100 }
101
102 //Simluate lot's of mailviewers doing a bunch of queries
103 {
104 Sink::Query query;
105 query.resourceFilter(resource);
106 query.limit(10);
107 query.request<Mail::Subject>();
108 query.request<Mail::Sender>();
109 query.request<Mail::To>();
110 query.request<Mail::Cc>();
111 query.request<Mail::Bcc>();
112 query.request<Mail::Date>();
113 query.request<Mail::Unread>();
114 query.request<Mail::Important>();
115 query.request<Mail::Draft>();
116 query.request<Mail::Sent>();
117 query.request<Mail::Trash>();
118 query.request<Mail::Folder>();
119 query.sort<Sink::ApplicationDomain::Mail::Date>();
120 query.bloom<Sink::ApplicationDomain::Mail::ThreadId>();
121
122 for (int i = 0; i < 50; i++) {
123 auto model = Sink::Store::loadModel<Sink::ApplicationDomain::Mail>(query);
124 *models << model;
125 QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector<int> &roles) {
126 if (roles.contains(Sink::Store::ChildrenFetchedRole)) {
127 models->removeAll(model);
128 qWarning() << "Model complete: " << models->count();
129 if (models->isEmpty()) {
130 state.commandFinished();
131 }
132 }
133 });
134 }
135 }
136
71 return true; 137 return true;
72 } 138 }
73 return false; 139 return false;