summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/pipeline.cpp14
-rw-r--r--common/pipeline.h3
-rw-r--r--dummyresource/resourcefactory.cpp18
-rw-r--r--synchronizer/listener.cpp3
4 files changed, 17 insertions, 21 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 18b6d51..10bae54 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -189,7 +189,6 @@ public:
189 Pipeline *pipeline; 189 Pipeline *pipeline;
190 Pipeline::Type type; 190 Pipeline::Type type;
191 QByteArray key; 191 QByteArray key;
192 Akonadi2::Entity *entity;
193 QVectorIterator<Preprocessor *> filterIt; 192 QVectorIterator<Preprocessor *> filterIt;
194 bool idle; 193 bool idle;
195}; 194};
@@ -240,11 +239,6 @@ Pipeline::Type PipelineState::type() const
240 return d->type; 239 return d->type;
241} 240}
242 241
243const Akonadi2::Entity &PipelineState::entity() const
244{
245 return *d->entity;
246}
247
248void PipelineState::step() 242void PipelineState::step()
249{ 243{
250 if (!d->pipeline) { 244 if (!d->pipeline) {
@@ -253,7 +247,11 @@ void PipelineState::step()
253 247
254 d->idle = false; 248 d->idle = false;
255 if (d->filterIt.hasNext()) { 249 if (d->filterIt.hasNext()) {
256 d->filterIt.next()->process(*this); 250 d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool {
251 auto entity = Akonadi2::GetEntity(dataValue);
252 d->filterIt.next()->process(*this, *entity);
253 return false;
254 });
257 } else { 255 } else {
258 d->pipeline->pipelineCompleted(*this); 256 d->pipeline->pipelineCompleted(*this);
259 } 257 }
@@ -276,7 +274,7 @@ Preprocessor::~Preprocessor()
276{ 274{
277} 275}
278 276
279void Preprocessor::process(PipelineState state) 277void Preprocessor::process(PipelineState state, const Akonadi2::Entity &)
280{ 278{
281 processingCompleted(state); 279 processingCompleted(state);
282} 280}
diff --git a/common/pipeline.h b/common/pipeline.h
index 6005331..6b847f5 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -114,7 +114,6 @@ public:
114 bool isIdle() const; 114 bool isIdle() const;
115 QByteArray key() const; 115 QByteArray key() const;
116 Pipeline::Type type() const; 116 Pipeline::Type type() const;
117 const Akonadi2::Entity &entity() const;
118 117
119 void step(); 118 void step();
120 void processingCompleted(Preprocessor *filter); 119 void processingCompleted(Preprocessor *filter);
@@ -130,7 +129,7 @@ public:
130 Preprocessor(); 129 Preprocessor();
131 virtual ~Preprocessor(); 130 virtual ~Preprocessor();
132 131
133 virtual void process(PipelineState state); 132 virtual void process(PipelineState state, const Akonadi2::Entity &);
134 133
135protected: 134protected:
136 void processingCompleted(PipelineState state); 135 void processingCompleted(PipelineState state);
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index da6969a..87a6048 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -38,18 +38,19 @@
38class SimpleProcessor : public Akonadi2::Preprocessor 38class SimpleProcessor : public Akonadi2::Preprocessor
39{ 39{
40public: 40public:
41 SimpleProcessor(const std::function<void(const Akonadi2::PipelineState &state)> &f) 41 SimpleProcessor(const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> &f)
42 : Akonadi2::Preprocessor(), 42 : Akonadi2::Preprocessor(),
43 mFunction(f) 43 mFunction(f)
44 { 44 {
45 } 45 }
46 46
47 void process(const Akonadi2::PipelineState &state) { 47 void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) {
48 mFunction(state); 48 mFunction(state, e);
49 processingCompleted(state);
49 } 50 }
50 51
51protected: 52protected:
52 std::function<void(const Akonadi2::PipelineState &state)> mFunction; 53 std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> mFunction;
53}; 54};
54 55
55// template <typename DomainType> 56// template <typename DomainType>
@@ -102,13 +103,12 @@ DummyResource::DummyResource()
102 103
103void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) 104void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline)
104{ 105{
105 auto factory = QSharedPointer<DummyEventAdaptorFactory>::create(); 106 auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create();
106 //TODO setup preprocessors for each domain type and pipeline type allowing full customization 107 //TODO setup preprocessors for each domain type and pipeline type allowing full customization
107 //Eventually the order should be self configuring, for now it's hardcoded. 108 //Eventually the order should be self configuring, for now it's hardcoded.
108 auto eventIndexer = new SimpleProcessor([factory](const Akonadi2::PipelineState &state) { 109 auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) {
109 auto adaptor = factory->createAdaptor(state.entity()); 110 auto adaptor = eventFactory->createAdaptor(entity);
110 //Here we can plug in generic preprocessors 111 qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString();
111 qDebug() << adaptor->getProperty("summary").toString();
112 }); 112 });
113 pipeline->setPreprocessors<Akonadi2::Domain::Event>(Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); 113 pipeline->setPreprocessors<Akonadi2::Domain::Event>(Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer);
114} 114}
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 8b5a19a..dc0d9dd 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -37,12 +37,10 @@ Listener::Listener(const QString &resourceName, QObject *parent)
37 m_server(new QLocalServer(this)), 37 m_server(new QLocalServer(this)),
38 m_resourceName(resourceName), 38 m_resourceName(resourceName),
39 m_resource(0), 39 m_resource(0),
40 //TODO move pipeline(s) to resource
41 m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), 40 m_pipeline(new Akonadi2::Pipeline(resourceName, parent)),
42 m_clientBufferProcessesTimer(new QTimer(this)), 41 m_clientBufferProcessesTimer(new QTimer(this)),
43 m_messageId(0) 42 m_messageId(0)
44{ 43{
45 m_resource->configurePipeline(m_pipeline);
46 connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated, 44 connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated,
47 this, &Listener::refreshRevision); 45 this, &Listener::refreshRevision);
48 connect(m_server, &QLocalServer::newConnection, 46 connect(m_server, &QLocalServer::newConnection,
@@ -313,6 +311,7 @@ void Listener::loadResource()
313 log(QString("\tResource: %1").arg((qlonglong)m_resource)); 311 log(QString("\tResource: %1").arg((qlonglong)m_resource));
314 //TODO: this doesn't really list all the facades .. fix 312 //TODO: this doesn't really list all the facades .. fix
315 log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type())); 313 log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type()));
314 m_resource->configurePipeline(m_pipeline);
316 } else { 315 } else {
317 log(QString("Failed to load resource %1").arg(m_resourceName)); 316 log(QString("Failed to load resource %1").arg(m_resourceName));
318 } 317 }