diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-03-03 09:01:05 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-03-03 09:01:05 +0100 |
commit | 4d9746c828558c9f872e0aed52442863affb25d5 (patch) | |
tree | 507d7c2ba67f47d3cbbcf01a722236ff1b48426b /common/queryrunner.cpp | |
parent | 9cea920b7dd51867a0be0fed2f461b6be73c103e (diff) | |
download | sink-4d9746c828558c9f872e0aed52442863affb25d5.tar.gz sink-4d9746c828558c9f872e0aed52442863affb25d5.zip |
Fromatted the whole codebase with clang-format.
clang-format -i */**{.cpp,.h}
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 292 |
1 files changed, 147 insertions, 145 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 5ac1344..c150159 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -39,11 +39,12 @@ using namespace Sink; | |||
39 | * This is a worker object that can be moved to a thread to execute the query. | 39 | * This is a worker object that can be moved to a thread to execute the query. |
40 | * The only interaction point is the ResultProvider, which handles the threadsafe reporting of the result. | 40 | * The only interaction point is the ResultProvider, which handles the threadsafe reporting of the result. |
41 | */ | 41 | */ |
42 | template<typename DomainType> | 42 | template <typename DomainType> |
43 | class QueryWorker : public QObject | 43 | class QueryWorker : public QObject |
44 | { | 44 | { |
45 | public: | 45 | public: |
46 | QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); | 46 | QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, |
47 | const QueryRunnerBase::ResultTransformation &transformation); | ||
47 | virtual ~QueryWorker(); | 48 | virtual ~QueryWorker(); |
48 | 49 | ||
49 | qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 50 | qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
@@ -52,14 +53,17 @@ public: | |||
52 | private: | 53 | private: |
53 | void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize); | 54 | void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize); |
54 | 55 | ||
55 | void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); | 56 | void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, |
57 | const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); | ||
56 | 58 | ||
57 | ResultSet loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting); | 59 | ResultSet loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting); |
58 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | 60 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); |
59 | 61 | ||
60 | ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); | 62 | ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, |
63 | const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); | ||
61 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query); | 64 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query); |
62 | qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize); | 65 | qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, |
66 | Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize); | ||
63 | 67 | ||
64 | private: | 68 | private: |
65 | QueryRunnerBase::ResultTransformation mResultTransformation; | 69 | QueryRunnerBase::ResultTransformation mResultTransformation; |
@@ -70,176 +74,171 @@ private: | |||
70 | }; | 74 | }; |
71 | 75 | ||
72 | 76 | ||
73 | template<class DomainType> | 77 | template <class DomainType> |
74 | QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) | 78 | QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, |
75 | : QueryRunnerBase(), | 79 | const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) |
76 | mResourceAccess(resourceAccess), | 80 | : QueryRunnerBase(), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mOffset(0), mBatchSize(query.limit) |
77 | mResultProvider(new ResultProvider<typename DomainType::Ptr>), | ||
78 | mOffset(0), | ||
79 | mBatchSize(query.limit) | ||
80 | { | 81 | { |
81 | Trace() << "Starting query"; | 82 | Trace() << "Starting query"; |
82 | if (query.limit && query.sortProperty.isEmpty()) { | 83 | if (query.limit && query.sortProperty.isEmpty()) { |
83 | Warning() << "A limited query without sorting is typically a bad idea."; | 84 | Warning() << "A limited query without sorting is typically a bad idea."; |
84 | } | 85 | } |
85 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. | 86 | // We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. |
86 | mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { | 87 | mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { |
87 | Trace() << "Running fetcher. Offset: " << mOffset << " Batchsize: " << mBatchSize; | 88 | Trace() << "Running fetcher. Offset: " << mOffset << " Batchsize: " << mBatchSize; |
88 | auto resultProvider = mResultProvider; | 89 | auto resultProvider = mResultProvider; |
89 | async::run<qint64>([=]() -> qint64 { | 90 | async::run<qint64>([=]() -> qint64 { |
90 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); | 91 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); |
91 | const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); | 92 | const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); |
92 | return newRevision; | 93 | return newRevision; |
93 | }) | 94 | }) |
94 | .template then<void, qint64>([query, this](qint64 newRevision) { | 95 | .template then<void, qint64>([query, this](qint64 newRevision) { |
95 | mOffset += mBatchSize; | 96 | mOffset += mBatchSize; |
96 | //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 97 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
97 | if (query.liveQuery) { | 98 | if (query.liveQuery) { |
98 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | 99 | mResourceAccess->sendRevisionReplayedCommand(newRevision); |
99 | } | 100 | } |
100 | }).exec(); | 101 | }) |
102 | .exec(); | ||
101 | }); | 103 | }); |
102 | 104 | ||
103 | // In case of a live query we keep the runner for as long alive as the result provider exists | 105 | // In case of a live query we keep the runner for as long alive as the result provider exists |
104 | if (query.liveQuery) { | 106 | if (query.liveQuery) { |
105 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 107 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
106 | setQuery([=] () -> KAsync::Job<void> { | 108 | setQuery([=]() -> KAsync::Job<void> { |
107 | auto resultProvider = mResultProvider; | 109 | auto resultProvider = mResultProvider; |
108 | return async::run<qint64>([=]() -> qint64 { | 110 | return async::run<qint64>([=]() -> qint64 { |
109 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); | 111 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); |
110 | const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); | 112 | const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); |
111 | return newRevision; | 113 | return newRevision; |
112 | }) | 114 | }) |
113 | .template then<void, qint64>([query, this](qint64 newRevision) { | 115 | .template then<void, qint64>([query, this](qint64 newRevision) { |
114 | //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 116 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
115 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | 117 | mResourceAccess->sendRevisionReplayedCommand(newRevision); |
116 | }); | 118 | }); |
117 | }); | 119 | }); |
118 | //Ensure the connection is open, if it wasn't already opened | 120 | // Ensure the connection is open, if it wasn't already opened |
119 | //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | 121 | // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates |
120 | mResourceAccess->open(); | 122 | mResourceAccess->open(); |
121 | QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged); | 123 | QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged); |
122 | } | 124 | } |
123 | } | 125 | } |
124 | 126 | ||
125 | template<class DomainType> | 127 | template <class DomainType> |
126 | QueryRunner<DomainType>::~QueryRunner() | 128 | QueryRunner<DomainType>::~QueryRunner() |
127 | { | 129 | { |
128 | Trace() << "Stopped query"; | 130 | Trace() << "Stopped query"; |
129 | } | 131 | } |
130 | 132 | ||
131 | template<class DomainType> | 133 | template <class DomainType> |
132 | void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) | 134 | void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) |
133 | { | 135 | { |
134 | mResultTransformation = transformation; | 136 | mResultTransformation = transformation; |
135 | } | 137 | } |
136 | 138 | ||
137 | template<class DomainType> | 139 | template <class DomainType> |
138 | typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter() | 140 | typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter() |
139 | { | 141 | { |
140 | return mResultProvider->emitter(); | 142 | return mResultProvider->emitter(); |
141 | } | 143 | } |
142 | 144 | ||
143 | 145 | ||
144 | |||
145 | static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) | 146 | static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) |
146 | { | 147 | { |
147 | //TODO use a result set with an iterator, to read values on demand | 148 | // TODO use a result set with an iterator, to read values on demand |
148 | QVector<QByteArray> keys; | 149 | QVector<QByteArray> keys; |
149 | Storage::mainDatabase(transaction, bufferType).scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { | 150 | Storage::mainDatabase(transaction, bufferType) |
150 | //Skip internals | 151 | .scan(QByteArray(), |
151 | if (Sink::Storage::isInternalKey(key)) { | 152 | [&](const QByteArray &key, const QByteArray &value) -> bool { |
152 | return true; | 153 | // Skip internals |
153 | } | 154 | if (Sink::Storage::isInternalKey(key)) { |
154 | keys << Sink::Storage::uidFromKey(key); | 155 | return true; |
155 | return true; | 156 | } |
156 | }, | 157 | keys << Sink::Storage::uidFromKey(key); |
157 | [](const Sink::Storage::Error &error) { | 158 | return true; |
158 | Warning() << "Error during query: " << error.message; | 159 | }, |
159 | }); | 160 | [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; }); |
160 | 161 | ||
161 | Trace() << "Full scan retrieved " << keys.size() << " results."; | 162 | Trace() << "Full scan retrieved " << keys.size() << " results."; |
162 | return ResultSet(keys); | 163 | return ResultSet(keys); |
163 | } | 164 | } |
164 | 165 | ||
165 | 166 | ||
166 | template<class DomainType> | 167 | template <class DomainType> |
167 | QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) | 168 | QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, |
168 | : QObject(), | 169 | const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) |
169 | mResultTransformation(transformation), | 170 | : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mBufferType(bufferType), mQuery(query) |
170 | mDomainTypeAdaptorFactory(factory), | ||
171 | mResourceInstanceIdentifier(instanceIdentifier), | ||
172 | mBufferType(bufferType), | ||
173 | mQuery(query) | ||
174 | { | 171 | { |
175 | Trace() << "Starting query worker"; | 172 | Trace() << "Starting query worker"; |
176 | } | 173 | } |
177 | 174 | ||
178 | template<class DomainType> | 175 | template <class DomainType> |
179 | QueryWorker<DomainType>::~QueryWorker() | 176 | QueryWorker<DomainType>::~QueryWorker() |
180 | { | 177 | { |
181 | Trace() << "Stopped query worker"; | 178 | Trace() << "Stopped query worker"; |
182 | } | 179 | } |
183 | 180 | ||
184 | template<class DomainType> | 181 | template <class DomainType> |
185 | void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize) | 182 | void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize) |
186 | { | 183 | { |
187 | Trace() << "Skipping over " << offset << " results"; | 184 | Trace() << "Skipping over " << offset << " results"; |
188 | resultSet.skip(offset); | 185 | resultSet.skip(offset); |
189 | int counter; | 186 | int counter; |
190 | for (counter = 0; !batchSize || (counter < batchSize); counter++) { | 187 | for (counter = 0; !batchSize || (counter < batchSize); counter++) { |
191 | const bool ret = resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { | 188 | const bool ret = |
192 | //FIXME allow maildir resource to set the mimeMessage property | 189 | resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { |
193 | auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>(); | 190 | // FIXME allow maildir resource to set the mimeMessage property |
194 | if (mResultTransformation) { | 191 | auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>(); |
195 | mResultTransformation(*valueCopy); | 192 | if (mResultTransformation) { |
196 | } | 193 | mResultTransformation(*valueCopy); |
197 | switch (operation) { | 194 | } |
198 | case Sink::Operation_Creation: | 195 | switch (operation) { |
199 | // Trace() << "Got creation"; | 196 | case Sink::Operation_Creation: |
200 | resultProvider.add(valueCopy); | 197 | // Trace() << "Got creation"; |
201 | break; | 198 | resultProvider.add(valueCopy); |
202 | case Sink::Operation_Modification: | 199 | break; |
203 | // Trace() << "Got modification"; | 200 | case Sink::Operation_Modification: |
204 | resultProvider.modify(valueCopy); | 201 | // Trace() << "Got modification"; |
205 | break; | 202 | resultProvider.modify(valueCopy); |
206 | case Sink::Operation_Removal: | 203 | break; |
207 | // Trace() << "Got removal"; | 204 | case Sink::Operation_Removal: |
208 | resultProvider.remove(valueCopy); | 205 | // Trace() << "Got removal"; |
209 | break; | 206 | resultProvider.remove(valueCopy); |
210 | } | 207 | break; |
211 | return true; | 208 | } |
212 | }); | 209 | return true; |
210 | }); | ||
213 | if (!ret) { | 211 | if (!ret) { |
214 | break; | 212 | break; |
215 | } | 213 | } |
216 | }; | 214 | }; |
217 | Trace() << "Replayed " << counter << " results." << "Limit " << batchSize; | 215 | Trace() << "Replayed " << counter << " results." |
216 | << "Limit " << batchSize; | ||
218 | } | 217 | } |
219 | 218 | ||
220 | template<class DomainType> | 219 | template <class DomainType> |
221 | void QueryWorker<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) | 220 | void QueryWorker<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, |
221 | const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) | ||
222 | { | 222 | { |
223 | //This only works for a 1:1 mapping of resource to domain types. | 223 | // This only works for a 1:1 mapping of resource to domain types. |
224 | //Not i.e. for tags that are stored as flags in each entity of an imap store. | 224 | // Not i.e. for tags that are stored as flags in each entity of an imap store. |
225 | //additional properties that don't have a 1:1 mapping (such as separately stored tags), | 225 | // additional properties that don't have a 1:1 mapping (such as separately stored tags), |
226 | //could be added to the adaptor. | 226 | // could be added to the adaptor. |
227 | db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { | 227 | db.findLatest(key, |
228 | Sink::EntityBuffer buffer(value.data(), value.size()); | 228 | [=](const QByteArray &key, const QByteArray &value) -> bool { |
229 | const Sink::Entity &entity = buffer.entity(); | 229 | Sink::EntityBuffer buffer(value.data(), value.size()); |
230 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | 230 | const Sink::Entity &entity = buffer.entity(); |
231 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | 231 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); |
232 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | 232 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; |
233 | auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity); | 233 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; |
234 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); | 234 | auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity); |
235 | return false; | 235 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); |
236 | }, | 236 | return false; |
237 | [](const Sink::Storage::Error &error) { | 237 | }, |
238 | Warning() << "Error during query: " << error.message; | 238 | [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; }); |
239 | }); | ||
240 | } | 239 | } |
241 | 240 | ||
242 | template<class DomainType> | 241 | template <class DomainType> |
243 | ResultSet QueryWorker<DomainType>::loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) | 242 | ResultSet QueryWorker<DomainType>::loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) |
244 | { | 243 | { |
245 | if (!query.ids.isEmpty()) { | 244 | if (!query.ids.isEmpty()) { |
@@ -253,15 +252,15 @@ ResultSet QueryWorker<DomainType>::loadInitialResultSet(const Sink::Query &query | |||
253 | remainingSorting = query.sortProperty; | 252 | remainingSorting = query.sortProperty; |
254 | } | 253 | } |
255 | 254 | ||
256 | //We do a full scan if there were no indexes available to create the initial set. | 255 | // We do a full scan if there were no indexes available to create the initial set. |
257 | if (appliedFilters.isEmpty()) { | 256 | if (appliedFilters.isEmpty()) { |
258 | //TODO this should be replaced by an index lookup as well | 257 | // TODO this should be replaced by an index lookup as well |
259 | resultSet = fullScan(transaction, mBufferType); | 258 | resultSet = fullScan(transaction, mBufferType); |
260 | } | 259 | } |
261 | return resultSet; | 260 | return resultSet; |
262 | } | 261 | } |
263 | 262 | ||
264 | template<class DomainType> | 263 | template <class DomainType> |
265 | ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | 264 | ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) |
266 | { | 265 | { |
267 | const auto bufferType = mBufferType; | 266 | const auto bufferType = mBufferType; |
@@ -269,13 +268,13 @@ ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision, | |||
269 | remainingFilters = query.propertyFilter.keys().toSet(); | 268 | remainingFilters = query.propertyFilter.keys().toSet(); |
270 | return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { | 269 | return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { |
271 | const qint64 topRevision = Sink::Storage::maxRevision(transaction); | 270 | const qint64 topRevision = Sink::Storage::maxRevision(transaction); |
272 | //Spit out the revision keys one by one. | 271 | // Spit out the revision keys one by one. |
273 | while (*revisionCounter <= topRevision) { | 272 | while (*revisionCounter <= topRevision) { |
274 | const auto uid = Sink::Storage::getUidFromRevision(transaction, *revisionCounter); | 273 | const auto uid = Sink::Storage::getUidFromRevision(transaction, *revisionCounter); |
275 | const auto type = Sink::Storage::getTypeFromRevision(transaction, *revisionCounter); | 274 | const auto type = Sink::Storage::getTypeFromRevision(transaction, *revisionCounter); |
276 | // Trace() << "Revision" << *revisionCounter << type << uid; | 275 | // Trace() << "Revision" << *revisionCounter << type << uid; |
277 | if (type != bufferType) { | 276 | if (type != bufferType) { |
278 | //Skip revision | 277 | // Skip revision |
279 | *revisionCounter += 1; | 278 | *revisionCounter += 1; |
280 | continue; | 279 | continue; |
281 | } | 280 | } |
@@ -284,45 +283,47 @@ ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision, | |||
284 | return key; | 283 | return key; |
285 | } | 284 | } |
286 | Trace() << "Finished reading incremental result set:" << *revisionCounter; | 285 | Trace() << "Finished reading incremental result set:" << *revisionCounter; |
287 | //We're done | 286 | // We're done |
288 | return QByteArray(); | 287 | return QByteArray(); |
289 | }); | 288 | }); |
290 | } | 289 | } |
291 | 290 | ||
292 | template<class DomainType> | 291 | template <class DomainType> |
293 | ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) | 292 | ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, |
293 | const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) | ||
294 | { | 294 | { |
295 | const bool sortingRequired = !sortProperty.isEmpty(); | 295 | const bool sortingRequired = !sortProperty.isEmpty(); |
296 | if (initialQuery && sortingRequired) { | 296 | if (initialQuery && sortingRequired) { |
297 | Trace() << "Sorting the resultset in memory according to property: " << sortProperty; | 297 | Trace() << "Sorting the resultset in memory according to property: " << sortProperty; |
298 | //Sort the complete set by reading the sort property and filling into a sorted map | 298 | // Sort the complete set by reading the sort property and filling into a sorted map |
299 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); | 299 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); |
300 | while (resultSet.next()) { | 300 | while (resultSet.next()) { |
301 | //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | 301 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) |
302 | readEntity(db, resultSet.id(), [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { | 302 | readEntity(db, resultSet.id(), |
303 | //We're not interested in removals during the initial query | 303 | [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { |
304 | if ((operation != Sink::Operation_Removal) && filter(domainObject)) { | 304 | // We're not interested in removals during the initial query |
305 | if (!sortProperty.isEmpty()) { | 305 | if ((operation != Sink::Operation_Removal) && filter(domainObject)) { |
306 | const auto sortValue = domainObject->getProperty(sortProperty); | 306 | if (!sortProperty.isEmpty()) { |
307 | if (sortValue.type() == QVariant::DateTime) { | 307 | const auto sortValue = domainObject->getProperty(sortProperty); |
308 | sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); | 308 | if (sortValue.type() == QVariant::DateTime) { |
309 | sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); | ||
310 | } else { | ||
311 | sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); | ||
312 | } | ||
309 | } else { | 313 | } else { |
310 | sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); | 314 | sortedMap->insert(domainObject->identifier(), domainObject->identifier()); |
311 | } | 315 | } |
312 | } else { | ||
313 | sortedMap->insert(domainObject->identifier(), domainObject->identifier()); | ||
314 | } | 316 | } |
315 | } | 317 | }); |
316 | }); | ||
317 | } | 318 | } |
318 | 319 | ||
319 | Trace() << "Sorted " << sortedMap->size() << " values."; | 320 | Trace() << "Sorted " << sortedMap->size() << " values."; |
320 | auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray> >::create(*sortedMap); | 321 | auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); |
321 | ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | 322 | ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( |
323 | std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | ||
322 | if (iterator->hasNext()) { | 324 | if (iterator->hasNext()) { |
323 | readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { | 325 | readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, |
324 | callback(domainObject, Sink::Operation_Creation); | 326 | Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); }); |
325 | }); | ||
326 | return true; | 327 | return true; |
327 | } | 328 | } |
328 | return false; | 329 | return false; |
@@ -336,19 +337,21 @@ ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const | |||
336 | return ResultSet(generator, skip); | 337 | return ResultSet(generator, skip); |
337 | } else { | 338 | } else { |
338 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | 339 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); |
339 | ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | 340 | ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery]( |
341 | std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | ||
340 | if (resultSetPtr->next()) { | 342 | if (resultSetPtr->next()) { |
341 | //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | 343 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) |
342 | readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { | 344 | readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { |
343 | if (initialQuery) { | 345 | if (initialQuery) { |
344 | //We're not interested in removals during the initial query | 346 | // We're not interested in removals during the initial query |
345 | if ((operation != Sink::Operation_Removal) && filter(domainObject)) { | 347 | if ((operation != Sink::Operation_Removal) && filter(domainObject)) { |
346 | //In the initial set every entity is new | 348 | // In the initial set every entity is new |
347 | callback(domainObject, Sink::Operation_Creation); | 349 | callback(domainObject, Sink::Operation_Creation); |
348 | } } else { | 350 | } |
349 | //Always remove removals, they probably don't match due to non-available properties | 351 | } else { |
352 | // Always remove removals, they probably don't match due to non-available properties | ||
350 | if ((operation == Sink::Operation_Removal) || filter(domainObject)) { | 353 | if ((operation == Sink::Operation_Removal) || filter(domainObject)) { |
351 | //TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) | 354 | // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) |
352 | callback(domainObject, operation); | 355 | callback(domainObject, operation); |
353 | } | 356 | } |
354 | } | 357 | } |
@@ -357,15 +360,14 @@ ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const | |||
357 | } | 360 | } |
358 | return false; | 361 | return false; |
359 | }; | 362 | }; |
360 | auto skip = [resultSetPtr]() { | 363 | auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; |
361 | resultSetPtr->skip(1); | ||
362 | }; | ||
363 | return ResultSet(generator, skip); | 364 | return ResultSet(generator, skip); |
364 | } | 365 | } |
365 | } | 366 | } |
366 | 367 | ||
367 | template<class DomainType> | 368 | template <class DomainType> |
368 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryWorker<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query) | 369 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> |
370 | QueryWorker<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query) | ||
369 | { | 371 | { |
370 | return [remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | 372 | return [remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { |
371 | if (!query.ids.isEmpty()) { | 373 | if (!query.ids.isEmpty()) { |
@@ -376,7 +378,7 @@ std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &do | |||
376 | for (const auto &filterProperty : remainingFilters) { | 378 | for (const auto &filterProperty : remainingFilters) { |
377 | const auto property = domainObject->getProperty(filterProperty); | 379 | const auto property = domainObject->getProperty(filterProperty); |
378 | if (property.isValid()) { | 380 | if (property.isValid()) { |
379 | //TODO implement other comparison operators than equality | 381 | // TODO implement other comparison operators than equality |
380 | if (property != query.propertyFilter.value(filterProperty)) { | 382 | if (property != query.propertyFilter.value(filterProperty)) { |
381 | Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << query.propertyFilter.value(filterProperty); | 383 | Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << query.propertyFilter.value(filterProperty); |
382 | return false; | 384 | return false; |
@@ -389,16 +391,15 @@ std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &do | |||
389 | }; | 391 | }; |
390 | } | 392 | } |
391 | 393 | ||
392 | template<class DomainType> | 394 | template <class DomainType> |
393 | qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize) | 395 | qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, |
396 | Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize) | ||
394 | { | 397 | { |
395 | QTime time; | 398 | QTime time; |
396 | time.start(); | 399 | time.start(); |
397 | 400 | ||
398 | Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); | 401 | Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); |
399 | storage.setDefaultErrorHandler([](const Sink::Storage::Error &error) { | 402 | storage.setDefaultErrorHandler([](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.store << error.message; }); |
400 | Warning() << "Error during query: " << error.store << error.message; | ||
401 | }); | ||
402 | auto transaction = storage.createTransaction(Sink::Storage::ReadOnly); | 403 | auto transaction = storage.createTransaction(Sink::Storage::ReadOnly); |
403 | auto db = Storage::mainDatabase(transaction, mBufferType); | 404 | auto db = Storage::mainDatabase(transaction, mBufferType); |
404 | 405 | ||
@@ -414,7 +415,7 @@ qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::functi | |||
414 | return Sink::Storage::maxRevision(transaction); | 415 | return Sink::Storage::maxRevision(transaction); |
415 | } | 416 | } |
416 | 417 | ||
417 | template<class DomainType> | 418 | template <class DomainType> |
418 | qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 419 | qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) |
419 | { | 420 | { |
420 | QTime time; | 421 | QTime time; |
@@ -429,8 +430,9 @@ qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query | |||
429 | return revision; | 430 | return revision; |
430 | } | 431 | } |
431 | 432 | ||
432 | template<class DomainType> | 433 | template <class DomainType> |
433 | qint64 QueryWorker<DomainType>::executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) | 434 | qint64 QueryWorker<DomainType>::executeInitialQuery( |
435 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) | ||
434 | { | 436 | { |
435 | QTime time; | 437 | QTime time; |
436 | time.start(); | 438 | time.start(); |