From 531972042d4b610258c8af8a17ec3a99cd063dda Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 15 Feb 2018 10:19:08 +0100 Subject: Fixed crashes due to concurrently running queries. A single QueryRunner should never have multiple workers running at the same time. We did not properly enforce this in case of incremental updates coming in. The only way I managed to reproduce the crash: * Open a large folder with lots of unread mail in kube * Select a mail in the maillist and hold the down button * This will: * Repeatedly call fetch more * Trigger lot's of mark as read modifications that result in notifications. * Eventually it crashes somewhere in EntityStore, likely because of concurrent access of the filter structure which is shared through the state. We now ensure in the single threaded portion of the code that we only ever run one worker at a time. If we did receive an update during, we remember that change and fetch more once we're done. To be able to call fetch again that portion was also factored out into a separate function. --- common/datastorequery.cpp | 1 + common/datastorequery.h | 2 +- common/queryrunner.cpp | 153 ++++++++++++++++++-------------- common/queryrunner.h | 4 + common/storage/entitystore.cpp | 1 + common/storage/entitystore.h | 1 + sinksh/syntax_modules/sink_selftest.cpp | 84 ++++++++++++++++-- 7 files changed, 169 insertions(+), 77 deletions(-) diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 3218d1a..50158c7 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -643,6 +643,7 @@ ResultSet DataStoreQuery::execute() { SinkTraceCtx(mLogCtx) << "Executing query"; + Q_ASSERT(mCollector); ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { if (mCollector->next([this, callback](const ResultSet::Result &result) { if (result.operation != Sink::Operation_Removal) { diff --git a/common/datastorequery.h b/common/datastorequery.h index cc501e6..8800644 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h @@ -115,7 +115,7 @@ public: virtual void updateComplete() { } FilterBase::Ptr mSource; - DataStoreQuery *mDatastore; + DataStoreQuery *mDatastore{nullptr}; bool mIncremental = false; }; diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 928e1e0..0ed4cb5 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -69,79 +69,14 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou if (query.limit() && query.sortProperty().isEmpty()) { SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; } - auto guardPtr = QPointer(&guard); - auto fetcher = [=]() { - SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; - auto resultProvider = mResultProvider; - auto resultTransformation = mResultTransformation; - auto batchSize = mBatchSize; - auto resourceContext = mResourceContext; - auto logCtx = mLogCtx; - auto state = mQueryState; - const bool runAsync = !query.synchronousQuery(); - //The lambda will be executed in a separate thread, so copy all arguments - async::run([=]() { - QueryWorker worker(query, resourceContext, bufferType, resultTransformation, logCtx); - return worker.executeInitialQuery(query, *resultProvider, batchSize, state); - }, runAsync) - .then([this, query, resultProvider, guardPtr](const ReplayResult &result) { - if (!guardPtr) { - //Not an error, the query can vanish at any time. - return; - } - mInitialQueryComplete = true; - mQueryState = result.queryState; - // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. - if (query.liveQuery()) { - mResourceAccess->sendRevisionReplayedCommand(result.newRevision); - } - resultProvider->setRevision(result.newRevision); - resultProvider->initialResultSetComplete(result.replayedAll); - }) - .exec(); - }; - // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. - mResultProvider->setFetcher(fetcher); + mResultProvider->setFetcher([this, query, bufferType] { fetch(query, bufferType); }); // In case of a live query we keep the runner for as long alive as the result provider exists if (query.liveQuery()) { Q_ASSERT(!query.synchronousQuery()); // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting - setQuery([=]() -> KAsync::Job { - auto resultProvider = mResultProvider; - auto resourceContext = mResourceContext; - auto logCtx = mLogCtx; - auto state = mQueryState; - auto resultTransformation = mResultTransformation; - if (!mInitialQueryComplete) { - SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; - fetcher(); - return KAsync::null(); - } - if (mQueryInProgress) { - //Can happen if the revision come in quicker than we process them. - return KAsync::null(); - } - Q_ASSERT(!mQueryInProgress); - return KAsync::start([&] { - mQueryInProgress = true; - }) - .then(async::run([=]() { - QueryWorker worker(query, resourceContext, bufferType, resultTransformation, logCtx); - return worker.executeIncrementalQuery(query, *resultProvider, state); - })) - .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { - if (!guardPtr) { - //Not an error, the query can vanish at any time. - return; - } - mQueryInProgress = false; - // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. - mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); - resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); - }); - }); + setQuery([=]() { return incrementalFetch(query, bufferType); }); // Ensure the connection is open, if it wasn't already opened // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates mResourceAccess->open(); @@ -158,6 +93,90 @@ QueryRunner::~QueryRunner() SinkTraceCtx(mLogCtx) << "Stopped query"; } +//This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. +template +void QueryRunner::fetch(const Sink::Query &query, const QByteArray &bufferType) +{ + auto guardPtr = QPointer(&guard); + SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; + if (mQueryInProgress) { + SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize; + mRequestFetchMore = true; + return; + } + mQueryInProgress = true; + auto resultProvider = mResultProvider; + auto resultTransformation = mResultTransformation; + auto batchSize = mBatchSize; + auto resourceContext = mResourceContext; + auto logCtx = mLogCtx; + auto state = mQueryState; + const bool runAsync = !query.synchronousQuery(); + //The lambda will be executed in a separate thread, so copy all arguments + async::run([=]() { + QueryWorker worker(query, resourceContext, bufferType, resultTransformation, logCtx); + return worker.executeInitialQuery(query, *resultProvider, batchSize, state); + }, runAsync) + .then([=](const ReplayResult &result) { + if (!guardPtr) { + //Not an error, the query can vanish at any time. + return; + } + mInitialQueryComplete = true; + mQueryInProgress = false; + mQueryState = result.queryState; + // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. + if (query.liveQuery()) { + mResourceAccess->sendRevisionReplayedCommand(result.newRevision); + } + resultProvider->setRevision(result.newRevision); + resultProvider->initialResultSetComplete(result.replayedAll); + if (mRequestFetchMore) { + mRequestFetchMore = false; + fetch(query, bufferType); + } + }) + .exec(); +} + +template +KAsync::Job QueryRunner::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) +{ + if (!mInitialQueryComplete) { + SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; + fetch(query, bufferType); + return KAsync::null(); + } + if (mQueryInProgress) { + //Can happen if the revision come in quicker than we process them. + return KAsync::null(); + } + auto resultProvider = mResultProvider; + auto resourceContext = mResourceContext; + auto logCtx = mLogCtx; + auto state = mQueryState; + auto resultTransformation = mResultTransformation; + Q_ASSERT(!mQueryInProgress); + auto guardPtr = QPointer(&guard); + return KAsync::start([&] { + mQueryInProgress = true; + }) + .then(async::run([=]() { + QueryWorker worker(query, resourceContext, bufferType, resultTransformation, logCtx); + return worker.executeIncrementalQuery(query, *resultProvider, state); + })) + .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { + if (!guardPtr) { + //Not an error, the query can vanish at any time. + return; + } + mQueryInProgress = false; + // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. + mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); + resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); + }); +} + template void QueryRunner::setResultTransformation(const ResultTransformation &transformation) { diff --git a/common/queryrunner.h b/common/queryrunner.h index a567b3c..af54619 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h @@ -92,6 +92,9 @@ public: typename Sink::ResultEmitter::Ptr emitter(); private: + void fetch(const Sink::Query &query, const QByteArray &bufferType); + KAsync::Job incrementalFetch(const Sink::Query &query, const QByteArray &bufferType); + Sink::ResourceContext mResourceContext; QSharedPointer mResourceAccess; QSharedPointer> mResultProvider; @@ -102,4 +105,5 @@ private: Sink::Log::Context mLogCtx; bool mInitialQueryComplete = false; bool mQueryInProgress = false; + bool mRequestFetchMore = false; }; diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 8fbc2ad..020f3fd 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp @@ -459,6 +459,7 @@ void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback) { + Q_ASSERT(d); auto db = DataStore::mainDatabase(d->getTransaction(), type); db.findLatest(uid, [=](const QByteArray &key, const QByteArray &value) { diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index 00241f2..3eb0b7b 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h @@ -37,6 +37,7 @@ class SINK_EXPORT EntityStore public: typedef QSharedPointer Ptr; EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &); + ~EntityStore() = default; //Only the pipeline may call the following functions outside of tests bool add(const QByteArray &type, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); diff --git a/sinksh/syntax_modules/sink_selftest.cpp b/sinksh/syntax_modules/sink_selftest.cpp index 21dfbff..8ad4f60 100644 --- a/sinksh/syntax_modules/sink_selftest.cpp +++ b/sinksh/syntax_modules/sink_selftest.cpp @@ -46,28 +46,94 @@ bool selfTest(const QStringList &args_, State &state) return false; } + using namespace Sink::ApplicationDomain; auto options = SyntaxTree::parseOptions(args_); if (options.positionalArguments.contains("stresstest")) { auto resource = SinkshUtils::parseUid(options.options.value("resource").first().toUtf8()); qWarning() << "Stresstest on resource: " << resource; - Sink::Query query; - query.resourceFilter(resource); - query.limit(100); - auto models = QSharedPointer>>::create(); - for (int i = 0; i < 50; i++) { + + //Simulate the maillist, where we scroll down and trigger lots of fetchMore calls + { + Sink::Query query; + query.resourceFilter(resource); + query.limit(100); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.sort(); + query.reduce(Sink::Query::Reduce::Selector::max()) + .count("count") + .collect("unreadCollected") + .collect("importantCollected"); + auto model = Sink::Store::loadModel(query); - *models << model; + models->append(model); QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector &roles) { if (roles.contains(Sink::Store::ChildrenFetchedRole)) { - models->removeAll(model); - qWarning() << "Model complete: " << models->count(); + if (!model->canFetchMore({})) { + qWarning() << "Model complete: " << models->count(); + models->removeAll(model); + } else { + qWarning() << "Fetching more"; + //Simulate superfluous fetchMore calls + for (int i = 0; i < 10; i++) { + model->fetchMore({}); + } + return; + } if (models->isEmpty()) { state.commandFinished(); } } - }); + }); + } + + //Simluate lot's of mailviewers doing a bunch of queries + { + Sink::Query query; + query.resourceFilter(resource); + query.limit(10); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.request(); + query.sort(); + query.bloom(); + + for (int i = 0; i < 50; i++) { + auto model = Sink::Store::loadModel(query); + *models << model; + QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector &roles) { + if (roles.contains(Sink::Store::ChildrenFetchedRole)) { + models->removeAll(model); + qWarning() << "Model complete: " << models->count(); + if (models->isEmpty()) { + state.commandFinished(); + } + } + }); + } + } + return true; } return false; -- cgit v1.2.3