diff options
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 49 |
1 files changed, 32 insertions, 17 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 25d69b1..22682d3 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -26,6 +26,9 @@ | |||
26 | #include "domainadaptor.h" | 26 | #include "domainadaptor.h" |
27 | #include "asyncutils.h" | 27 | #include "asyncutils.h" |
28 | 28 | ||
29 | #undef DEBUG_AREA | ||
30 | #define DEBUG_AREA "client.queryrunner" | ||
31 | |||
29 | using namespace Sink; | 32 | using namespace Sink; |
30 | 33 | ||
31 | /* | 34 | /* |
@@ -38,14 +41,14 @@ template<typename DomainType> | |||
38 | class QueryWorker : public QObject | 41 | class QueryWorker : public QObject |
39 | { | 42 | { |
40 | public: | 43 | public: |
41 | QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); | 44 | QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); |
42 | virtual ~QueryWorker(); | 45 | virtual ~QueryWorker(); |
43 | 46 | ||
44 | qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 47 | qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
45 | qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 48 | qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
46 | 49 | ||
47 | private: | 50 | private: |
48 | static void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties); | 51 | void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties); |
49 | 52 | ||
50 | void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); | 53 | void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); |
51 | 54 | ||
@@ -57,6 +60,7 @@ private: | |||
57 | qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); | 60 | qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); |
58 | 61 | ||
59 | private: | 62 | private: |
63 | QueryRunnerBase::ResultTransformation mResultTransformation; | ||
60 | DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; | 64 | DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; |
61 | QByteArray mResourceInstanceIdentifier; | 65 | QByteArray mResourceInstanceIdentifier; |
62 | QByteArray mBufferType; | 66 | QByteArray mBufferType; |
@@ -72,11 +76,11 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
72 | { | 76 | { |
73 | Trace() << "Starting query"; | 77 | Trace() << "Starting query"; |
74 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. | 78 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. |
75 | mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) { | 79 | mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { |
76 | Trace() << "Running fetcher"; | 80 | Trace() << "Running fetcher"; |
77 | auto resultProvider = mResultProvider; | 81 | auto resultProvider = mResultProvider; |
78 | async::run<qint64>([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 { | 82 | async::run<qint64>([=]() -> qint64 { |
79 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); | 83 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); |
80 | const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); | 84 | const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); |
81 | return newRevision; | 85 | return newRevision; |
82 | }) | 86 | }) |
@@ -91,18 +95,17 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
91 | // In case of a live query we keep the runner for as long alive as the result provider exists | 95 | // In case of a live query we keep the runner for as long alive as the result provider exists |
92 | if (query.liveQuery) { | 96 | if (query.liveQuery) { |
93 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 97 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
94 | setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job<void> { | 98 | setQuery([=] () -> KAsync::Job<void> { |
95 | auto resultProvider = mResultProvider; | 99 | auto resultProvider = mResultProvider; |
96 | return async::run<qint64>([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 { | 100 | return async::run<qint64>([=]() -> qint64 { |
97 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); | 101 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); |
98 | const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); | 102 | const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); |
99 | return newRevision; | 103 | return newRevision; |
100 | }) | 104 | }) |
101 | .template then<void, qint64>([query, this](qint64 newRevision) { | 105 | .template then<void, qint64>([query, this](qint64 newRevision) { |
102 | //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 106 | //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
103 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | 107 | mResourceAccess->sendRevisionReplayedCommand(newRevision); |
104 | }) | 108 | }); |
105 | .template then<void>([](){}); | ||
106 | }); | 109 | }); |
107 | //Ensure the connection is open, if it wasn't already opened | 110 | //Ensure the connection is open, if it wasn't already opened |
108 | //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | 111 | //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates |
@@ -118,6 +121,12 @@ QueryRunner<DomainType>::~QueryRunner() | |||
118 | } | 121 | } |
119 | 122 | ||
120 | template<class DomainType> | 123 | template<class DomainType> |
124 | void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) | ||
125 | { | ||
126 | mResultTransformation = transformation; | ||
127 | } | ||
128 | |||
129 | template<class DomainType> | ||
121 | typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter() | 130 | typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter() |
122 | { | 131 | { |
123 | return mResultProvider->emitter(); | 132 | return mResultProvider->emitter(); |
@@ -129,7 +138,7 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, | |||
129 | { | 138 | { |
130 | //TODO use a result set with an iterator, to read values on demand | 139 | //TODO use a result set with an iterator, to read values on demand |
131 | QVector<QByteArray> keys; | 140 | QVector<QByteArray> keys; |
132 | transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { | 141 | Storage::mainDatabase(transaction, bufferType).scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { |
133 | //Skip internals | 142 | //Skip internals |
134 | if (Sink::Storage::isInternalKey(key)) { | 143 | if (Sink::Storage::isInternalKey(key)) { |
135 | return true; | 144 | return true; |
@@ -147,8 +156,9 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, | |||
147 | 156 | ||
148 | 157 | ||
149 | template<class DomainType> | 158 | template<class DomainType> |
150 | QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) | 159 | QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) |
151 | : QObject(), | 160 | : QObject(), |
161 | mResultTransformation(transformation), | ||
152 | mDomainTypeAdaptorFactory(factory), | 162 | mDomainTypeAdaptorFactory(factory), |
153 | mResourceInstanceIdentifier(instanceIdentifier), | 163 | mResourceInstanceIdentifier(instanceIdentifier), |
154 | mBufferType(bufferType), | 164 | mBufferType(bufferType), |
@@ -167,20 +177,25 @@ template<class DomainType> | |||
167 | void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties) | 177 | void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties) |
168 | { | 178 | { |
169 | int counter = 0; | 179 | int counter = 0; |
170 | while (resultSet.next([&resultProvider, &counter, &properties](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { | 180 | while (resultSet.next([this, &resultProvider, &counter, &properties](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { |
181 | //FIXME allow maildir resource to set the mimeMessage property | ||
182 | auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>(); | ||
183 | if (mResultTransformation) { | ||
184 | mResultTransformation(*valueCopy); | ||
185 | } | ||
171 | counter++; | 186 | counter++; |
172 | switch (operation) { | 187 | switch (operation) { |
173 | case Sink::Operation_Creation: | 188 | case Sink::Operation_Creation: |
174 | // Trace() << "Got creation"; | 189 | // Trace() << "Got creation"; |
175 | resultProvider.add(Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>()); | 190 | resultProvider.add(valueCopy); |
176 | break; | 191 | break; |
177 | case Sink::Operation_Modification: | 192 | case Sink::Operation_Modification: |
178 | // Trace() << "Got modification"; | 193 | // Trace() << "Got modification"; |
179 | resultProvider.modify(Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>()); | 194 | resultProvider.modify(valueCopy); |
180 | break; | 195 | break; |
181 | case Sink::Operation_Removal: | 196 | case Sink::Operation_Removal: |
182 | // Trace() << "Got removal"; | 197 | // Trace() << "Got removal"; |
183 | resultProvider.remove(Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>()); | 198 | resultProvider.remove(valueCopy); |
184 | break; | 199 | break; |
185 | } | 200 | } |
186 | return true; | 201 | return true; |
@@ -319,7 +334,7 @@ qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::functi | |||
319 | Warning() << "Error during query: " << error.store << error.message; | 334 | Warning() << "Error during query: " << error.store << error.message; |
320 | }); | 335 | }); |
321 | auto transaction = storage.createTransaction(Sink::Storage::ReadOnly); | 336 | auto transaction = storage.createTransaction(Sink::Storage::ReadOnly); |
322 | auto db = transaction.openDatabase(mBufferType + ".main"); | 337 | auto db = Storage::mainDatabase(transaction, mBufferType); |
323 | 338 | ||
324 | QSet<QByteArray> remainingFilters; | 339 | QSet<QByteArray> remainingFilters; |
325 | auto resultSet = baseSetRetriever(transaction, remainingFilters); | 340 | auto resultSet = baseSetRetriever(transaction, remainingFilters); |