summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/datastorequery.cpp1
-rw-r--r--common/datastorequery.h2
-rw-r--r--common/queryrunner.cpp153
-rw-r--r--common/queryrunner.h4
-rw-r--r--common/storage/entitystore.cpp1
-rw-r--r--common/storage/entitystore.h1
-rw-r--r--sinksh/syntax_modules/sink_selftest.cpp84
7 files changed, 169 insertions, 77 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 3218d1a..50158c7 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -643,6 +643,7 @@ ResultSet DataStoreQuery::execute()
643{ 643{
644 SinkTraceCtx(mLogCtx) << "Executing query"; 644 SinkTraceCtx(mLogCtx) << "Executing query";
645 645
646 Q_ASSERT(mCollector);
646 ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { 647 ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool {
647 if (mCollector->next([this, callback](const ResultSet::Result &result) { 648 if (mCollector->next([this, callback](const ResultSet::Result &result) {
648 if (result.operation != Sink::Operation_Removal) { 649 if (result.operation != Sink::Operation_Removal) {
diff --git a/common/datastorequery.h b/common/datastorequery.h
index cc501e6..8800644 100644
--- a/common/datastorequery.h
+++ b/common/datastorequery.h
@@ -115,7 +115,7 @@ public:
115 virtual void updateComplete() { } 115 virtual void updateComplete() { }
116 116
117 FilterBase::Ptr mSource; 117 FilterBase::Ptr mSource;
118 DataStoreQuery *mDatastore; 118 DataStoreQuery *mDatastore{nullptr};
119 bool mIncremental = false; 119 bool mIncremental = false;
120}; 120};
121 121
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 928e1e0..0ed4cb5 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -69,79 +69,14 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
69 if (query.limit() && query.sortProperty().isEmpty()) { 69 if (query.limit() && query.sortProperty().isEmpty()) {
70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; 70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get.";
71 } 71 }
72 auto guardPtr = QPointer<QObject>(&guard);
73 auto fetcher = [=]() {
74 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
75 auto resultProvider = mResultProvider;
76 auto resultTransformation = mResultTransformation;
77 auto batchSize = mBatchSize;
78 auto resourceContext = mResourceContext;
79 auto logCtx = mLogCtx;
80 auto state = mQueryState;
81 const bool runAsync = !query.synchronousQuery();
82 //The lambda will be executed in a separate thread, so copy all arguments
83 async::run<ReplayResult>([=]() {
84 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
85 return worker.executeInitialQuery(query, *resultProvider, batchSize, state);
86 }, runAsync)
87 .then([this, query, resultProvider, guardPtr](const ReplayResult &result) {
88 if (!guardPtr) {
89 //Not an error, the query can vanish at any time.
90 return;
91 }
92 mInitialQueryComplete = true;
93 mQueryState = result.queryState;
94 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
95 if (query.liveQuery()) {
96 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
97 }
98 resultProvider->setRevision(result.newRevision);
99 resultProvider->initialResultSetComplete(result.replayedAll);
100 })
101 .exec();
102 };
103
104 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 72 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
105 mResultProvider->setFetcher(fetcher); 73 mResultProvider->setFetcher([this, query, bufferType] { fetch(query, bufferType); });
106 74
107 // In case of a live query we keep the runner for as long alive as the result provider exists 75 // In case of a live query we keep the runner for as long alive as the result provider exists
108 if (query.liveQuery()) { 76 if (query.liveQuery()) {
109 Q_ASSERT(!query.synchronousQuery()); 77 Q_ASSERT(!query.synchronousQuery());
110 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 78 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
111 setQuery([=]() -> KAsync::Job<void> { 79 setQuery([=]() { return incrementalFetch(query, bufferType); });
112 auto resultProvider = mResultProvider;
113 auto resourceContext = mResourceContext;
114 auto logCtx = mLogCtx;
115 auto state = mQueryState;
116 auto resultTransformation = mResultTransformation;
117 if (!mInitialQueryComplete) {
118 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
119 fetcher();
120 return KAsync::null();
121 }
122 if (mQueryInProgress) {
123 //Can happen if the revision come in quicker than we process them.
124 return KAsync::null();
125 }
126 Q_ASSERT(!mQueryInProgress);
127 return KAsync::start([&] {
128 mQueryInProgress = true;
129 })
130 .then(async::run<ReplayResult>([=]() {
131 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
132 return worker.executeIncrementalQuery(query, *resultProvider, state);
133 }))
134 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
135 if (!guardPtr) {
136 //Not an error, the query can vanish at any time.
137 return;
138 }
139 mQueryInProgress = false;
140 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
141 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
142 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
143 });
144 });
145 // Ensure the connection is open, if it wasn't already opened 80 // Ensure the connection is open, if it wasn't already opened
146 // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates 81 // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
147 mResourceAccess->open(); 82 mResourceAccess->open();
@@ -158,6 +93,90 @@ QueryRunner<DomainType>::~QueryRunner()
158 SinkTraceCtx(mLogCtx) << "Stopped query"; 93 SinkTraceCtx(mLogCtx) << "Stopped query";
159} 94}
160 95
96//This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize.
97template <class DomainType>
98void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType)
99{
100 auto guardPtr = QPointer<QObject>(&guard);
101 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
102 if (mQueryInProgress) {
103 SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize;
104 mRequestFetchMore = true;
105 return;
106 }
107 mQueryInProgress = true;
108 auto resultProvider = mResultProvider;
109 auto resultTransformation = mResultTransformation;
110 auto batchSize = mBatchSize;
111 auto resourceContext = mResourceContext;
112 auto logCtx = mLogCtx;
113 auto state = mQueryState;
114 const bool runAsync = !query.synchronousQuery();
115 //The lambda will be executed in a separate thread, so copy all arguments
116 async::run<ReplayResult>([=]() {
117 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
118 return worker.executeInitialQuery(query, *resultProvider, batchSize, state);
119 }, runAsync)
120 .then([=](const ReplayResult &result) {
121 if (!guardPtr) {
122 //Not an error, the query can vanish at any time.
123 return;
124 }
125 mInitialQueryComplete = true;
126 mQueryInProgress = false;
127 mQueryState = result.queryState;
128 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
129 if (query.liveQuery()) {
130 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
131 }
132 resultProvider->setRevision(result.newRevision);
133 resultProvider->initialResultSetComplete(result.replayedAll);
134 if (mRequestFetchMore) {
135 mRequestFetchMore = false;
136 fetch(query, bufferType);
137 }
138 })
139 .exec();
140}
141
142template <class DomainType>
143KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType)
144{
145 if (!mInitialQueryComplete) {
146 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
147 fetch(query, bufferType);
148 return KAsync::null();
149 }
150 if (mQueryInProgress) {
151 //Can happen if the revision come in quicker than we process them.
152 return KAsync::null();
153 }
154 auto resultProvider = mResultProvider;
155 auto resourceContext = mResourceContext;
156 auto logCtx = mLogCtx;
157 auto state = mQueryState;
158 auto resultTransformation = mResultTransformation;
159 Q_ASSERT(!mQueryInProgress);
160 auto guardPtr = QPointer<QObject>(&guard);
161 return KAsync::start([&] {
162 mQueryInProgress = true;
163 })
164 .then(async::run<ReplayResult>([=]() {
165 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
166 return worker.executeIncrementalQuery(query, *resultProvider, state);
167 }))
168 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
169 if (!guardPtr) {
170 //Not an error, the query can vanish at any time.
171 return;
172 }
173 mQueryInProgress = false;
174 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
175 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
176 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
177 });
178}
179
161template <class DomainType> 180template <class DomainType>
162void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) 181void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation)
163{ 182{
diff --git a/common/queryrunner.h b/common/queryrunner.h
index a567b3c..af54619 100644
--- a/common/queryrunner.h
+++ b/common/queryrunner.h
@@ -92,6 +92,9 @@ public:
92 typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); 92 typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter();
93 93
94private: 94private:
95 void fetch(const Sink::Query &query, const QByteArray &bufferType);
96 KAsync::Job<void> incrementalFetch(const Sink::Query &query, const QByteArray &bufferType);
97
95 Sink::ResourceContext mResourceContext; 98 Sink::ResourceContext mResourceContext;
96 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; 99 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess;
97 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider; 100 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider;
@@ -102,4 +105,5 @@ private:
102 Sink::Log::Context mLogCtx; 105 Sink::Log::Context mLogCtx;
103 bool mInitialQueryComplete = false; 106 bool mInitialQueryComplete = false;
104 bool mQueryInProgress = false; 107 bool mQueryInProgress = false;
108 bool mRequestFetchMore = false;
105}; 109};
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index 8fbc2ad..020f3fd 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -459,6 +459,7 @@ void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property
459 459
460void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) 460void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback)
461{ 461{
462 Q_ASSERT(d);
462 auto db = DataStore::mainDatabase(d->getTransaction(), type); 463 auto db = DataStore::mainDatabase(d->getTransaction(), type);
463 db.findLatest(uid, 464 db.findLatest(uid,
464 [=](const QByteArray &key, const QByteArray &value) { 465 [=](const QByteArray &key, const QByteArray &value) {
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h
index 00241f2..3eb0b7b 100644
--- a/common/storage/entitystore.h
+++ b/common/storage/entitystore.h
@@ -37,6 +37,7 @@ class SINK_EXPORT EntityStore
37public: 37public:
38 typedef QSharedPointer<EntityStore> Ptr; 38 typedef QSharedPointer<EntityStore> Ptr;
39 EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &); 39 EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &);
40 ~EntityStore() = default;
40 41
41 //Only the pipeline may call the following functions outside of tests 42 //Only the pipeline may call the following functions outside of tests
42 bool add(const QByteArray &type, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); 43 bool add(const QByteArray &type, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource);
diff --git a/sinksh/syntax_modules/sink_selftest.cpp b/sinksh/syntax_modules/sink_selftest.cpp
index 21dfbff..8ad4f60 100644
--- a/sinksh/syntax_modules/sink_selftest.cpp
+++ b/sinksh/syntax_modules/sink_selftest.cpp
@@ -46,28 +46,94 @@ bool selfTest(const QStringList &args_, State &state)
46 return false; 46 return false;
47 } 47 }
48 48
49 using namespace Sink::ApplicationDomain;
49 auto options = SyntaxTree::parseOptions(args_); 50 auto options = SyntaxTree::parseOptions(args_);
50 if (options.positionalArguments.contains("stresstest")) { 51 if (options.positionalArguments.contains("stresstest")) {
51 auto resource = SinkshUtils::parseUid(options.options.value("resource").first().toUtf8()); 52 auto resource = SinkshUtils::parseUid(options.options.value("resource").first().toUtf8());
52 qWarning() << "Stresstest on resource: " << resource; 53 qWarning() << "Stresstest on resource: " << resource;
53 Sink::Query query;
54 query.resourceFilter(resource);
55 query.limit(100);
56
57 auto models = QSharedPointer<QList<QSharedPointer<QAbstractItemModel>>>::create(); 54 auto models = QSharedPointer<QList<QSharedPointer<QAbstractItemModel>>>::create();
58 for (int i = 0; i < 50; i++) { 55
56 //Simulate the maillist, where we scroll down and trigger lots of fetchMore calls
57 {
58 Sink::Query query;
59 query.resourceFilter(resource);
60 query.limit(100);
61 query.request<Mail::Subject>();
62 query.request<Mail::Sender>();
63 query.request<Mail::To>();
64 query.request<Mail::Cc>();
65 query.request<Mail::Bcc>();
66 query.request<Mail::Date>();
67 query.request<Mail::Unread>();
68 query.request<Mail::Important>();
69 query.request<Mail::Draft>();
70 query.request<Mail::Sent>();
71 query.request<Mail::Trash>();
72 query.request<Mail::Folder>();
73 query.sort<Mail::Date>();
74 query.reduce<Mail::ThreadId>(Sink::Query::Reduce::Selector::max<Mail::Date>())
75 .count("count")
76 .collect<Mail::Unread>("unreadCollected")
77 .collect<Mail::Important>("importantCollected");
78
59 auto model = Sink::Store::loadModel<Sink::ApplicationDomain::Mail>(query); 79 auto model = Sink::Store::loadModel<Sink::ApplicationDomain::Mail>(query);
60 *models << model; 80 models->append(model);
61 QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector<int> &roles) { 81 QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector<int> &roles) {
62 if (roles.contains(Sink::Store::ChildrenFetchedRole)) { 82 if (roles.contains(Sink::Store::ChildrenFetchedRole)) {
63 models->removeAll(model); 83 if (!model->canFetchMore({})) {
64 qWarning() << "Model complete: " << models->count(); 84 qWarning() << "Model complete: " << models->count();
85 models->removeAll(model);
86 } else {
87 qWarning() << "Fetching more";
88 //Simulate superfluous fetchMore calls
89 for (int i = 0; i < 10; i++) {
90 model->fetchMore({});
91 }
92 return;
93 }
65 if (models->isEmpty()) { 94 if (models->isEmpty()) {
66 state.commandFinished(); 95 state.commandFinished();
67 } 96 }
68 } 97 }
69 }); 98 });
99
70 } 100 }
101
102 //Simluate lot's of mailviewers doing a bunch of queries
103 {
104 Sink::Query query;
105 query.resourceFilter(resource);
106 query.limit(10);
107 query.request<Mail::Subject>();
108 query.request<Mail::Sender>();
109 query.request<Mail::To>();
110 query.request<Mail::Cc>();
111 query.request<Mail::Bcc>();
112 query.request<Mail::Date>();
113 query.request<Mail::Unread>();
114 query.request<Mail::Important>();
115 query.request<Mail::Draft>();
116 query.request<Mail::Sent>();
117 query.request<Mail::Trash>();
118 query.request<Mail::Folder>();
119 query.sort<Sink::ApplicationDomain::Mail::Date>();
120 query.bloom<Sink::ApplicationDomain::Mail::ThreadId>();
121
122 for (int i = 0; i < 50; i++) {
123 auto model = Sink::Store::loadModel<Sink::ApplicationDomain::Mail>(query);
124 *models << model;
125 QObject::connect(model.data(), &QAbstractItemModel::dataChanged, [models, model, &state](const QModelIndex &start, const QModelIndex &end, const QVector<int> &roles) {
126 if (roles.contains(Sink::Store::ChildrenFetchedRole)) {
127 models->removeAll(model);
128 qWarning() << "Model complete: " << models->count();
129 if (models->isEmpty()) {
130 state.commandFinished();
131 }
132 }
133 });
134 }
135 }
136
71 return true; 137 return true;
72 } 138 }
73 return false; 139 return false;