summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-03-03 09:01:05 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-03-03 09:01:05 +0100
commit4d9746c828558c9f872e0aed52442863affb25d5 (patch)
tree507d7c2ba67f47d3cbbcf01a722236ff1b48426b /common/queryrunner.cpp
parent9cea920b7dd51867a0be0fed2f461b6be73c103e (diff)
downloadsink-4d9746c828558c9f872e0aed52442863affb25d5.tar.gz
sink-4d9746c828558c9f872e0aed52442863affb25d5.zip
Fromatted the whole codebase with clang-format.
clang-format -i */**{.cpp,.h}
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp292
1 files changed, 147 insertions, 145 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 5ac1344..c150159 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -39,11 +39,12 @@ using namespace Sink;
39 * This is a worker object that can be moved to a thread to execute the query. 39 * This is a worker object that can be moved to a thread to execute the query.
40 * The only interaction point is the ResultProvider, which handles the threadsafe reporting of the result. 40 * The only interaction point is the ResultProvider, which handles the threadsafe reporting of the result.
41 */ 41 */
42template<typename DomainType> 42template <typename DomainType>
43class QueryWorker : public QObject 43class QueryWorker : public QObject
44{ 44{
45public: 45public:
46 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); 46 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType,
47 const QueryRunnerBase::ResultTransformation &transformation);
47 virtual ~QueryWorker(); 48 virtual ~QueryWorker();
48 49
49 qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 50 qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
@@ -52,14 +53,17 @@ public:
52private: 53private:
53 void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize); 54 void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize);
54 55
55 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); 56 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key,
57 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback);
56 58
57 ResultSet loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting); 59 ResultSet loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting);
58 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); 60 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
59 61
60 ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); 62 ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
63 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty);
61 std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query); 64 std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query);
62 qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize); 65 qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever,
66 Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize);
63 67
64private: 68private:
65 QueryRunnerBase::ResultTransformation mResultTransformation; 69 QueryRunnerBase::ResultTransformation mResultTransformation;
@@ -70,176 +74,171 @@ private:
70}; 74};
71 75
72 76
73template<class DomainType> 77template <class DomainType>
74QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) 78QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier,
75 : QueryRunnerBase(), 79 const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType)
76 mResourceAccess(resourceAccess), 80 : QueryRunnerBase(), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mOffset(0), mBatchSize(query.limit)
77 mResultProvider(new ResultProvider<typename DomainType::Ptr>),
78 mOffset(0),
79 mBatchSize(query.limit)
80{ 81{
81 Trace() << "Starting query"; 82 Trace() << "Starting query";
82 if (query.limit && query.sortProperty.isEmpty()) { 83 if (query.limit && query.sortProperty.isEmpty()) {
83 Warning() << "A limited query without sorting is typically a bad idea."; 84 Warning() << "A limited query without sorting is typically a bad idea.";
84 } 85 }
85 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. 86 // We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
86 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { 87 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) {
87 Trace() << "Running fetcher. Offset: " << mOffset << " Batchsize: " << mBatchSize; 88 Trace() << "Running fetcher. Offset: " << mOffset << " Batchsize: " << mBatchSize;
88 auto resultProvider = mResultProvider; 89 auto resultProvider = mResultProvider;
89 async::run<qint64>([=]() -> qint64 { 90 async::run<qint64>([=]() -> qint64 {
90 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 91 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
91 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); 92 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize);
92 return newRevision; 93 return newRevision;
93 }) 94 })
94 .template then<void, qint64>([query, this](qint64 newRevision) { 95 .template then<void, qint64>([query, this](qint64 newRevision) {
95 mOffset += mBatchSize; 96 mOffset += mBatchSize;
96 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 97 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
97 if (query.liveQuery) { 98 if (query.liveQuery) {
98 mResourceAccess->sendRevisionReplayedCommand(newRevision); 99 mResourceAccess->sendRevisionReplayedCommand(newRevision);
99 } 100 }
100 }).exec(); 101 })
102 .exec();
101 }); 103 });
102 104
103 // In case of a live query we keep the runner for as long alive as the result provider exists 105 // In case of a live query we keep the runner for as long alive as the result provider exists
104 if (query.liveQuery) { 106 if (query.liveQuery) {
105 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 107 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
106 setQuery([=] () -> KAsync::Job<void> { 108 setQuery([=]() -> KAsync::Job<void> {
107 auto resultProvider = mResultProvider; 109 auto resultProvider = mResultProvider;
108 return async::run<qint64>([=]() -> qint64 { 110 return async::run<qint64>([=]() -> qint64 {
109 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 111 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
110 const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); 112 const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider);
111 return newRevision; 113 return newRevision;
112 }) 114 })
113 .template then<void, qint64>([query, this](qint64 newRevision) { 115 .template then<void, qint64>([query, this](qint64 newRevision) {
114 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 116 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
115 mResourceAccess->sendRevisionReplayedCommand(newRevision); 117 mResourceAccess->sendRevisionReplayedCommand(newRevision);
116 }); 118 });
117 }); 119 });
118 //Ensure the connection is open, if it wasn't already opened 120 // Ensure the connection is open, if it wasn't already opened
119 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates 121 // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
120 mResourceAccess->open(); 122 mResourceAccess->open();
121 QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged); 123 QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged);
122 } 124 }
123} 125}
124 126
125template<class DomainType> 127template <class DomainType>
126QueryRunner<DomainType>::~QueryRunner() 128QueryRunner<DomainType>::~QueryRunner()
127{ 129{
128 Trace() << "Stopped query"; 130 Trace() << "Stopped query";
129} 131}
130 132
131template<class DomainType> 133template <class DomainType>
132void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) 134void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation)
133{ 135{
134 mResultTransformation = transformation; 136 mResultTransformation = transformation;
135} 137}
136 138
137template<class DomainType> 139template <class DomainType>
138typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter() 140typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter()
139{ 141{
140 return mResultProvider->emitter(); 142 return mResultProvider->emitter();
141} 143}
142 144
143 145
144
145static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) 146static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
146{ 147{
147 //TODO use a result set with an iterator, to read values on demand 148 // TODO use a result set with an iterator, to read values on demand
148 QVector<QByteArray> keys; 149 QVector<QByteArray> keys;
149 Storage::mainDatabase(transaction, bufferType).scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { 150 Storage::mainDatabase(transaction, bufferType)
150 //Skip internals 151 .scan(QByteArray(),
151 if (Sink::Storage::isInternalKey(key)) { 152 [&](const QByteArray &key, const QByteArray &value) -> bool {
152 return true; 153 // Skip internals
153 } 154 if (Sink::Storage::isInternalKey(key)) {
154 keys << Sink::Storage::uidFromKey(key); 155 return true;
155 return true; 156 }
156 }, 157 keys << Sink::Storage::uidFromKey(key);
157 [](const Sink::Storage::Error &error) { 158 return true;
158 Warning() << "Error during query: " << error.message; 159 },
159 }); 160 [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; });
160 161
161 Trace() << "Full scan retrieved " << keys.size() << " results."; 162 Trace() << "Full scan retrieved " << keys.size() << " results.";
162 return ResultSet(keys); 163 return ResultSet(keys);
163} 164}
164 165
165 166
166template<class DomainType> 167template <class DomainType>
167QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) 168QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory,
168 : QObject(), 169 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation)
169 mResultTransformation(transformation), 170 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mBufferType(bufferType), mQuery(query)
170 mDomainTypeAdaptorFactory(factory),
171 mResourceInstanceIdentifier(instanceIdentifier),
172 mBufferType(bufferType),
173 mQuery(query)
174{ 171{
175 Trace() << "Starting query worker"; 172 Trace() << "Starting query worker";
176} 173}
177 174
178template<class DomainType> 175template <class DomainType>
179QueryWorker<DomainType>::~QueryWorker() 176QueryWorker<DomainType>::~QueryWorker()
180{ 177{
181 Trace() << "Stopped query worker"; 178 Trace() << "Stopped query worker";
182} 179}
183 180
184template<class DomainType> 181template <class DomainType>
185void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize) 182void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize)
186{ 183{
187 Trace() << "Skipping over " << offset << " results"; 184 Trace() << "Skipping over " << offset << " results";
188 resultSet.skip(offset); 185 resultSet.skip(offset);
189 int counter; 186 int counter;
190 for (counter = 0; !batchSize || (counter < batchSize); counter++) { 187 for (counter = 0; !batchSize || (counter < batchSize); counter++) {
191 const bool ret = resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { 188 const bool ret =
192 //FIXME allow maildir resource to set the mimeMessage property 189 resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool {
193 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>(); 190 // FIXME allow maildir resource to set the mimeMessage property
194 if (mResultTransformation) { 191 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>();
195 mResultTransformation(*valueCopy); 192 if (mResultTransformation) {
196 } 193 mResultTransformation(*valueCopy);
197 switch (operation) { 194 }
198 case Sink::Operation_Creation: 195 switch (operation) {
199 // Trace() << "Got creation"; 196 case Sink::Operation_Creation:
200 resultProvider.add(valueCopy); 197 // Trace() << "Got creation";
201 break; 198 resultProvider.add(valueCopy);
202 case Sink::Operation_Modification: 199 break;
203 // Trace() << "Got modification"; 200 case Sink::Operation_Modification:
204 resultProvider.modify(valueCopy); 201 // Trace() << "Got modification";
205 break; 202 resultProvider.modify(valueCopy);
206 case Sink::Operation_Removal: 203 break;
207 // Trace() << "Got removal"; 204 case Sink::Operation_Removal:
208 resultProvider.remove(valueCopy); 205 // Trace() << "Got removal";
209 break; 206 resultProvider.remove(valueCopy);
210 } 207 break;
211 return true; 208 }
212 }); 209 return true;
210 });
213 if (!ret) { 211 if (!ret) {
214 break; 212 break;
215 } 213 }
216 }; 214 };
217 Trace() << "Replayed " << counter << " results." << "Limit " << batchSize; 215 Trace() << "Replayed " << counter << " results."
216 << "Limit " << batchSize;
218} 217}
219 218
220template<class DomainType> 219template <class DomainType>
221void QueryWorker<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) 220void QueryWorker<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key,
221 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback)
222{ 222{
223 //This only works for a 1:1 mapping of resource to domain types. 223 // This only works for a 1:1 mapping of resource to domain types.
224 //Not i.e. for tags that are stored as flags in each entity of an imap store. 224 // Not i.e. for tags that are stored as flags in each entity of an imap store.
225 //additional properties that don't have a 1:1 mapping (such as separately stored tags), 225 // additional properties that don't have a 1:1 mapping (such as separately stored tags),
226 //could be added to the adaptor. 226 // could be added to the adaptor.
227 db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { 227 db.findLatest(key,
228 Sink::EntityBuffer buffer(value.data(), value.size()); 228 [=](const QByteArray &key, const QByteArray &value) -> bool {
229 const Sink::Entity &entity = buffer.entity(); 229 Sink::EntityBuffer buffer(value.data(), value.size());
230 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 230 const Sink::Entity &entity = buffer.entity();
231 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; 231 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
232 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; 232 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
233 auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity); 233 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
234 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); 234 auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity);
235 return false; 235 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation);
236 }, 236 return false;
237 [](const Sink::Storage::Error &error) { 237 },
238 Warning() << "Error during query: " << error.message; 238 [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; });
239 });
240} 239}
241 240
242template<class DomainType> 241template <class DomainType>
243ResultSet QueryWorker<DomainType>::loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) 242ResultSet QueryWorker<DomainType>::loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
244{ 243{
245 if (!query.ids.isEmpty()) { 244 if (!query.ids.isEmpty()) {
@@ -253,15 +252,15 @@ ResultSet QueryWorker<DomainType>::loadInitialResultSet(const Sink::Query &query
253 remainingSorting = query.sortProperty; 252 remainingSorting = query.sortProperty;
254 } 253 }
255 254
256 //We do a full scan if there were no indexes available to create the initial set. 255 // We do a full scan if there were no indexes available to create the initial set.
257 if (appliedFilters.isEmpty()) { 256 if (appliedFilters.isEmpty()) {
258 //TODO this should be replaced by an index lookup as well 257 // TODO this should be replaced by an index lookup as well
259 resultSet = fullScan(transaction, mBufferType); 258 resultSet = fullScan(transaction, mBufferType);
260 } 259 }
261 return resultSet; 260 return resultSet;
262} 261}
263 262
264template<class DomainType> 263template <class DomainType>
265ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) 264ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
266{ 265{
267 const auto bufferType = mBufferType; 266 const auto bufferType = mBufferType;
@@ -269,13 +268,13 @@ ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision,
269 remainingFilters = query.propertyFilter.keys().toSet(); 268 remainingFilters = query.propertyFilter.keys().toSet();
270 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { 269 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
271 const qint64 topRevision = Sink::Storage::maxRevision(transaction); 270 const qint64 topRevision = Sink::Storage::maxRevision(transaction);
272 //Spit out the revision keys one by one. 271 // Spit out the revision keys one by one.
273 while (*revisionCounter <= topRevision) { 272 while (*revisionCounter <= topRevision) {
274 const auto uid = Sink::Storage::getUidFromRevision(transaction, *revisionCounter); 273 const auto uid = Sink::Storage::getUidFromRevision(transaction, *revisionCounter);
275 const auto type = Sink::Storage::getTypeFromRevision(transaction, *revisionCounter); 274 const auto type = Sink::Storage::getTypeFromRevision(transaction, *revisionCounter);
276 // Trace() << "Revision" << *revisionCounter << type << uid; 275 // Trace() << "Revision" << *revisionCounter << type << uid;
277 if (type != bufferType) { 276 if (type != bufferType) {
278 //Skip revision 277 // Skip revision
279 *revisionCounter += 1; 278 *revisionCounter += 1;
280 continue; 279 continue;
281 } 280 }
@@ -284,45 +283,47 @@ ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision,
284 return key; 283 return key;
285 } 284 }
286 Trace() << "Finished reading incremental result set:" << *revisionCounter; 285 Trace() << "Finished reading incremental result set:" << *revisionCounter;
287 //We're done 286 // We're done
288 return QByteArray(); 287 return QByteArray();
289 }); 288 });
290} 289}
291 290
292template<class DomainType> 291template <class DomainType>
293ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) 292ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
293 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty)
294{ 294{
295 const bool sortingRequired = !sortProperty.isEmpty(); 295 const bool sortingRequired = !sortProperty.isEmpty();
296 if (initialQuery && sortingRequired) { 296 if (initialQuery && sortingRequired) {
297 Trace() << "Sorting the resultset in memory according to property: " << sortProperty; 297 Trace() << "Sorting the resultset in memory according to property: " << sortProperty;
298 //Sort the complete set by reading the sort property and filling into a sorted map 298 // Sort the complete set by reading the sort property and filling into a sorted map
299 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); 299 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
300 while (resultSet.next()) { 300 while (resultSet.next()) {
301 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) 301 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
302 readEntity(db, resultSet.id(), [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { 302 readEntity(db, resultSet.id(),
303 //We're not interested in removals during the initial query 303 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
304 if ((operation != Sink::Operation_Removal) && filter(domainObject)) { 304 // We're not interested in removals during the initial query
305 if (!sortProperty.isEmpty()) { 305 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
306 const auto sortValue = domainObject->getProperty(sortProperty); 306 if (!sortProperty.isEmpty()) {
307 if (sortValue.type() == QVariant::DateTime) { 307 const auto sortValue = domainObject->getProperty(sortProperty);
308 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); 308 if (sortValue.type() == QVariant::DateTime) {
309 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier());
310 } else {
311 sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier());
312 }
309 } else { 313 } else {
310 sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); 314 sortedMap->insert(domainObject->identifier(), domainObject->identifier());
311 } 315 }
312 } else {
313 sortedMap->insert(domainObject->identifier(), domainObject->identifier());
314 } 316 }
315 } 317 });
316 });
317 } 318 }
318 319
319 Trace() << "Sorted " << sortedMap->size() << " values."; 320 Trace() << "Sorted " << sortedMap->size() << " values.";
320 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray> >::create(*sortedMap); 321 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
321 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { 322 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](
323 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
322 if (iterator->hasNext()) { 324 if (iterator->hasNext()) {
323 readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { 325 readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject,
324 callback(domainObject, Sink::Operation_Creation); 326 Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); });
325 });
326 return true; 327 return true;
327 } 328 }
328 return false; 329 return false;
@@ -336,19 +337,21 @@ ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const
336 return ResultSet(generator, skip); 337 return ResultSet(generator, skip);
337 } else { 338 } else {
338 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); 339 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
339 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { 340 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](
341 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
340 if (resultSetPtr->next()) { 342 if (resultSetPtr->next()) {
341 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) 343 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
342 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { 344 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
343 if (initialQuery) { 345 if (initialQuery) {
344 //We're not interested in removals during the initial query 346 // We're not interested in removals during the initial query
345 if ((operation != Sink::Operation_Removal) && filter(domainObject)) { 347 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
346 //In the initial set every entity is new 348 // In the initial set every entity is new
347 callback(domainObject, Sink::Operation_Creation); 349 callback(domainObject, Sink::Operation_Creation);
348 } } else { 350 }
349 //Always remove removals, they probably don't match due to non-available properties 351 } else {
352 // Always remove removals, they probably don't match due to non-available properties
350 if ((operation == Sink::Operation_Removal) || filter(domainObject)) { 353 if ((operation == Sink::Operation_Removal) || filter(domainObject)) {
351 //TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) 354 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
352 callback(domainObject, operation); 355 callback(domainObject, operation);
353 } 356 }
354 } 357 }
@@ -357,15 +360,14 @@ ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const
357 } 360 }
358 return false; 361 return false;
359 }; 362 };
360 auto skip = [resultSetPtr]() { 363 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
361 resultSetPtr->skip(1);
362 };
363 return ResultSet(generator, skip); 364 return ResultSet(generator, skip);
364 } 365 }
365} 366}
366 367
367template<class DomainType> 368template <class DomainType>
368std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryWorker<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query) 369std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)>
370QueryWorker<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query)
369{ 371{
370 return [remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { 372 return [remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
371 if (!query.ids.isEmpty()) { 373 if (!query.ids.isEmpty()) {
@@ -376,7 +378,7 @@ std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &do
376 for (const auto &filterProperty : remainingFilters) { 378 for (const auto &filterProperty : remainingFilters) {
377 const auto property = domainObject->getProperty(filterProperty); 379 const auto property = domainObject->getProperty(filterProperty);
378 if (property.isValid()) { 380 if (property.isValid()) {
379 //TODO implement other comparison operators than equality 381 // TODO implement other comparison operators than equality
380 if (property != query.propertyFilter.value(filterProperty)) { 382 if (property != query.propertyFilter.value(filterProperty)) {
381 Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << query.propertyFilter.value(filterProperty); 383 Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << query.propertyFilter.value(filterProperty);
382 return false; 384 return false;
@@ -389,16 +391,15 @@ std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &do
389 }; 391 };
390} 392}
391 393
392template<class DomainType> 394template <class DomainType>
393qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize) 395qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever,
396 Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize)
394{ 397{
395 QTime time; 398 QTime time;
396 time.start(); 399 time.start();
397 400
398 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); 401 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier);
399 storage.setDefaultErrorHandler([](const Sink::Storage::Error &error) { 402 storage.setDefaultErrorHandler([](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.store << error.message; });
400 Warning() << "Error during query: " << error.store << error.message;
401 });
402 auto transaction = storage.createTransaction(Sink::Storage::ReadOnly); 403 auto transaction = storage.createTransaction(Sink::Storage::ReadOnly);
403 auto db = Storage::mainDatabase(transaction, mBufferType); 404 auto db = Storage::mainDatabase(transaction, mBufferType);
404 405
@@ -414,7 +415,7 @@ qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::functi
414 return Sink::Storage::maxRevision(transaction); 415 return Sink::Storage::maxRevision(transaction);
415} 416}
416 417
417template<class DomainType> 418template <class DomainType>
418qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 419qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
419{ 420{
420 QTime time; 421 QTime time;
@@ -429,8 +430,9 @@ qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query
429 return revision; 430 return revision;
430} 431}
431 432
432template<class DomainType> 433template <class DomainType>
433qint64 QueryWorker<DomainType>::executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) 434qint64 QueryWorker<DomainType>::executeInitialQuery(
435 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize)
434{ 436{
435 QTime time; 437 QTime time;
436 time.start(); 438 time.start();