summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp49
1 files changed, 32 insertions, 17 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 25d69b1..22682d3 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -26,6 +26,9 @@
26#include "domainadaptor.h" 26#include "domainadaptor.h"
27#include "asyncutils.h" 27#include "asyncutils.h"
28 28
29#undef DEBUG_AREA
30#define DEBUG_AREA "client.queryrunner"
31
29using namespace Sink; 32using namespace Sink;
30 33
31/* 34/*
@@ -38,14 +41,14 @@ template<typename DomainType>
38class QueryWorker : public QObject 41class QueryWorker : public QObject
39{ 42{
40public: 43public:
41 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); 44 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation);
42 virtual ~QueryWorker(); 45 virtual ~QueryWorker();
43 46
44 qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 47 qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
45 qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 48 qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
46 49
47private: 50private:
48 static void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties); 51 void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties);
49 52
50 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); 53 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback);
51 54
@@ -57,6 +60,7 @@ private:
57 qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); 60 qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery);
58 61
59private: 62private:
63 QueryRunnerBase::ResultTransformation mResultTransformation;
60 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; 64 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory;
61 QByteArray mResourceInstanceIdentifier; 65 QByteArray mResourceInstanceIdentifier;
62 QByteArray mBufferType; 66 QByteArray mBufferType;
@@ -72,11 +76,11 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
72{ 76{
73 Trace() << "Starting query"; 77 Trace() << "Starting query";
74 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. 78 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
75 mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) { 79 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) {
76 Trace() << "Running fetcher"; 80 Trace() << "Running fetcher";
77 auto resultProvider = mResultProvider; 81 auto resultProvider = mResultProvider;
78 async::run<qint64>([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 { 82 async::run<qint64>([=]() -> qint64 {
79 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); 83 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
80 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); 84 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider);
81 return newRevision; 85 return newRevision;
82 }) 86 })
@@ -91,18 +95,17 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
91 // In case of a live query we keep the runner for as long alive as the result provider exists 95 // In case of a live query we keep the runner for as long alive as the result provider exists
92 if (query.liveQuery) { 96 if (query.liveQuery) {
93 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 97 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
94 setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job<void> { 98 setQuery([=] () -> KAsync::Job<void> {
95 auto resultProvider = mResultProvider; 99 auto resultProvider = mResultProvider;
96 return async::run<qint64>([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 { 100 return async::run<qint64>([=]() -> qint64 {
97 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); 101 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
98 const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); 102 const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider);
99 return newRevision; 103 return newRevision;
100 }) 104 })
101 .template then<void, qint64>([query, this](qint64 newRevision) { 105 .template then<void, qint64>([query, this](qint64 newRevision) {
102 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 106 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
103 mResourceAccess->sendRevisionReplayedCommand(newRevision); 107 mResourceAccess->sendRevisionReplayedCommand(newRevision);
104 }) 108 });
105 .template then<void>([](){});
106 }); 109 });
107 //Ensure the connection is open, if it wasn't already opened 110 //Ensure the connection is open, if it wasn't already opened
108 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates 111 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
@@ -118,6 +121,12 @@ QueryRunner<DomainType>::~QueryRunner()
118} 121}
119 122
120template<class DomainType> 123template<class DomainType>
124void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation)
125{
126 mResultTransformation = transformation;
127}
128
129template<class DomainType>
121typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter() 130typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter()
122{ 131{
123 return mResultProvider->emitter(); 132 return mResultProvider->emitter();
@@ -129,7 +138,7 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction,
129{ 138{
130 //TODO use a result set with an iterator, to read values on demand 139 //TODO use a result set with an iterator, to read values on demand
131 QVector<QByteArray> keys; 140 QVector<QByteArray> keys;
132 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { 141 Storage::mainDatabase(transaction, bufferType).scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
133 //Skip internals 142 //Skip internals
134 if (Sink::Storage::isInternalKey(key)) { 143 if (Sink::Storage::isInternalKey(key)) {
135 return true; 144 return true;
@@ -147,8 +156,9 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction,
147 156
148 157
149template<class DomainType> 158template<class DomainType>
150QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) 159QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation)
151 : QObject(), 160 : QObject(),
161 mResultTransformation(transformation),
152 mDomainTypeAdaptorFactory(factory), 162 mDomainTypeAdaptorFactory(factory),
153 mResourceInstanceIdentifier(instanceIdentifier), 163 mResourceInstanceIdentifier(instanceIdentifier),
154 mBufferType(bufferType), 164 mBufferType(bufferType),
@@ -167,20 +177,25 @@ template<class DomainType>
167void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties) 177void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties)
168{ 178{
169 int counter = 0; 179 int counter = 0;
170 while (resultSet.next([&resultProvider, &counter, &properties](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { 180 while (resultSet.next([this, &resultProvider, &counter, &properties](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool {
181 //FIXME allow maildir resource to set the mimeMessage property
182 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>();
183 if (mResultTransformation) {
184 mResultTransformation(*valueCopy);
185 }
171 counter++; 186 counter++;
172 switch (operation) { 187 switch (operation) {
173 case Sink::Operation_Creation: 188 case Sink::Operation_Creation:
174 // Trace() << "Got creation"; 189 // Trace() << "Got creation";
175 resultProvider.add(Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>()); 190 resultProvider.add(valueCopy);
176 break; 191 break;
177 case Sink::Operation_Modification: 192 case Sink::Operation_Modification:
178 // Trace() << "Got modification"; 193 // Trace() << "Got modification";
179 resultProvider.modify(Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>()); 194 resultProvider.modify(valueCopy);
180 break; 195 break;
181 case Sink::Operation_Removal: 196 case Sink::Operation_Removal:
182 // Trace() << "Got removal"; 197 // Trace() << "Got removal";
183 resultProvider.remove(Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>()); 198 resultProvider.remove(valueCopy);
184 break; 199 break;
185 } 200 }
186 return true; 201 return true;
@@ -319,7 +334,7 @@ qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::functi
319 Warning() << "Error during query: " << error.store << error.message; 334 Warning() << "Error during query: " << error.store << error.message;
320 }); 335 });
321 auto transaction = storage.createTransaction(Sink::Storage::ReadOnly); 336 auto transaction = storage.createTransaction(Sink::Storage::ReadOnly);
322 auto db = transaction.openDatabase(mBufferType + ".main"); 337 auto db = Storage::mainDatabase(transaction, mBufferType);
323 338
324 QSet<QByteArray> remainingFilters; 339 QSet<QByteArray> remainingFilters;
325 auto resultSet = baseSetRetriever(transaction, remainingFilters); 340 auto resultSet = baseSetRetriever(transaction, remainingFilters);