diff options
-rw-r--r-- | common/pipeline.cpp | 38 | ||||
-rw-r--r-- | common/pipeline.h | 9 | ||||
-rw-r--r-- | dummyresource/resourcefactory.cpp | 18 |
3 files changed, 44 insertions, 21 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index dda7671..8f15fef 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -168,32 +168,30 @@ void Pipeline::pipelineStepped(const PipelineState &state) | |||
168 | 168 | ||
169 | void Pipeline::scheduleStep() | 169 | void Pipeline::scheduleStep() |
170 | { | 170 | { |
171 | // if (!d->stepScheduled) { | 171 | if (!d->stepScheduled) { |
172 | // d->stepScheduled = true; | 172 | d->stepScheduled = true; |
173 | // QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); | 173 | QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); |
174 | // } | 174 | } |
175 | //FIXME make async again. For some reason the job in newEntity crashes if pipeline processing is async. | ||
176 | stepPipelines(); | ||
177 | } | 175 | } |
178 | 176 | ||
179 | void Pipeline::stepPipelines() | 177 | void Pipeline::stepPipelines() |
180 | { | 178 | { |
179 | d->stepScheduled = false; | ||
181 | for (PipelineState &state: d->activePipelines) { | 180 | for (PipelineState &state: d->activePipelines) { |
182 | if (state.isIdle()) { | 181 | if (state.isIdle()) { |
183 | state.step(); | 182 | state.step(); |
184 | } | 183 | } |
185 | } | 184 | } |
186 | |||
187 | d->stepScheduled = false; | ||
188 | } | 185 | } |
189 | 186 | ||
190 | void Pipeline::pipelineCompleted(const PipelineState &state) | 187 | void Pipeline::pipelineCompleted(PipelineState state) |
191 | { | 188 | { |
192 | //TODO finalize the datastore, inform clients of the new rev | 189 | //TODO finalize the datastore, inform clients of the new rev |
193 | const int index = d->activePipelines.indexOf(state); | 190 | const int index = d->activePipelines.indexOf(state); |
194 | if (index > -1) { | 191 | if (index > -1) { |
195 | d->activePipelines.remove(index); | 192 | d->activePipelines.remove(index); |
196 | } | 193 | } |
194 | state.callback(); | ||
197 | 195 | ||
198 | if (state.type() != NullPipeline) { | 196 | if (state.type() != NullPipeline) { |
199 | //TODO what revision is finalized? | 197 | //TODO what revision is finalized? |
@@ -281,20 +279,23 @@ Pipeline::Type PipelineState::type() const | |||
281 | void PipelineState::step() | 279 | void PipelineState::step() |
282 | { | 280 | { |
283 | if (!d->pipeline) { | 281 | if (!d->pipeline) { |
282 | Q_ASSERT(false); | ||
284 | return; | 283 | return; |
285 | } | 284 | } |
286 | 285 | ||
287 | d->idle = false; | 286 | d->idle = false; |
288 | if (d->filterIt.hasNext()) { | 287 | if (d->filterIt.hasNext()) { |
289 | //TODO skip step if already processed | 288 | //TODO skip step if already processed |
290 | d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { | 289 | //FIXME error handling if no result is found |
290 | auto preprocessor = d->filterIt.next(); | ||
291 | d->pipeline->storage().scan(d->key.toStdString(), [this, preprocessor](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { | ||
291 | auto entity = Akonadi2::GetEntity(dataValue); | 292 | auto entity = Akonadi2::GetEntity(dataValue); |
292 | d->filterIt.next()->process(*this, *entity); | 293 | preprocessor->process(*this, *entity); |
293 | return false; | 294 | return false; |
294 | }); | 295 | }); |
295 | } else { | 296 | } else { |
297 | //This object becomes invalid after this call | ||
296 | d->pipeline->pipelineCompleted(*this); | 298 | d->pipeline->pipelineCompleted(*this); |
297 | d->callback(); | ||
298 | } | 299 | } |
299 | } | 300 | } |
300 | 301 | ||
@@ -307,6 +308,12 @@ void PipelineState::processingCompleted(Preprocessor *filter) | |||
307 | } | 308 | } |
308 | } | 309 | } |
309 | 310 | ||
311 | void PipelineState::callback() | ||
312 | { | ||
313 | d->callback(); | ||
314 | } | ||
315 | |||
316 | |||
310 | Preprocessor::Preprocessor() | 317 | Preprocessor::Preprocessor() |
311 | : d(0) | 318 | : d(0) |
312 | { | 319 | { |
@@ -316,7 +323,7 @@ Preprocessor::~Preprocessor() | |||
316 | { | 323 | { |
317 | } | 324 | } |
318 | 325 | ||
319 | void Preprocessor::process(PipelineState state, const Akonadi2::Entity &) | 326 | void Preprocessor::process(const PipelineState &state, const Akonadi2::Entity &) |
320 | { | 327 | { |
321 | processingCompleted(state); | 328 | processingCompleted(state); |
322 | } | 329 | } |
@@ -326,5 +333,10 @@ void Preprocessor::processingCompleted(PipelineState state) | |||
326 | state.processingCompleted(this); | 333 | state.processingCompleted(this); |
327 | } | 334 | } |
328 | 335 | ||
336 | QString Preprocessor::id() const | ||
337 | { | ||
338 | return QLatin1String("unknown processor"); | ||
339 | } | ||
340 | |||
329 | } // namespace Akonadi2 | 341 | } // namespace Akonadi2 |
330 | 342 | ||
diff --git a/common/pipeline.h b/common/pipeline.h index 918d21e..a574d27 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -66,7 +66,8 @@ private Q_SLOTS: | |||
66 | 66 | ||
67 | private: | 67 | private: |
68 | void pipelineStepped(const PipelineState &state); | 68 | void pipelineStepped(const PipelineState &state); |
69 | void pipelineCompleted(const PipelineState &state); | 69 | //Don't use a reference here (it would invalidate itself) |
70 | void pipelineCompleted(PipelineState state); | ||
70 | void scheduleStep(); | 71 | void scheduleStep(); |
71 | 72 | ||
72 | friend class PipelineState; | 73 | friend class PipelineState; |
@@ -94,6 +95,8 @@ public: | |||
94 | void step(); | 95 | void step(); |
95 | void processingCompleted(Preprocessor *filter); | 96 | void processingCompleted(Preprocessor *filter); |
96 | 97 | ||
98 | void callback(); | ||
99 | |||
97 | private: | 100 | private: |
98 | class Private; | 101 | class Private; |
99 | QExplicitlySharedDataPointer<Private> d; | 102 | QExplicitlySharedDataPointer<Private> d; |
@@ -106,9 +109,9 @@ public: | |||
106 | virtual ~Preprocessor(); | 109 | virtual ~Preprocessor(); |
107 | 110 | ||
108 | //TODO pass actual command as well, for changerecording | 111 | //TODO pass actual command as well, for changerecording |
109 | virtual void process(PipelineState state, const Akonadi2::Entity &); | 112 | virtual void process(const PipelineState &state, const Akonadi2::Entity &); |
110 | //TODO to record progress | 113 | //TODO to record progress |
111 | // virtual QString id(); | 114 | virtual QString id() const; |
112 | 115 | ||
113 | protected: | 116 | protected: |
114 | void processingCompleted(PipelineState state); | 117 | void processingCompleted(PipelineState state); |
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index dc716ef..6fe10ec 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp | |||
@@ -43,19 +43,27 @@ | |||
43 | class SimpleProcessor : public Akonadi2::Preprocessor | 43 | class SimpleProcessor : public Akonadi2::Preprocessor |
44 | { | 44 | { |
45 | public: | 45 | public: |
46 | SimpleProcessor(const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> &f) | 46 | SimpleProcessor(const QString &id, const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> &f) |
47 | : Akonadi2::Preprocessor(), | 47 | : Akonadi2::Preprocessor(), |
48 | mFunction(f) | 48 | mFunction(f), |
49 | mId(id) | ||
49 | { | 50 | { |
50 | } | 51 | } |
51 | 52 | ||
52 | void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) { | 53 | void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) Q_DECL_OVERRIDE |
54 | { | ||
53 | mFunction(state, e); | 55 | mFunction(state, e); |
54 | processingCompleted(state); | 56 | processingCompleted(state); |
55 | } | 57 | } |
56 | 58 | ||
59 | QString id() const | ||
60 | { | ||
61 | return mId; | ||
62 | } | ||
63 | |||
57 | protected: | 64 | protected: |
58 | std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> mFunction; | 65 | std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> mFunction; |
66 | QString mId; | ||
59 | }; | 67 | }; |
60 | 68 | ||
61 | // template <typename DomainType> | 69 | // template <typename DomainType> |
@@ -166,7 +174,7 @@ private slots: | |||
166 | break; | 174 | break; |
167 | case Akonadi2::Commands::CreateEntityCommand: { | 175 | case Akonadi2::Commands::CreateEntityCommand: { |
168 | //TODO job lifetime management | 176 | //TODO job lifetime management |
169 | mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([&messageQueueCallback, whileCallback](Async::Future<void> &future) { | 177 | mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([messageQueueCallback, whileCallback](Async::Future<void> &future) { |
170 | messageQueueCallback(true); | 178 | messageQueueCallback(true); |
171 | whileCallback(false); | 179 | whileCallback(false); |
172 | future.setFinished(); | 180 | future.setFinished(); |
@@ -239,7 +247,7 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) | |||
239 | //i.e. If a resource stores tags as part of each message it needs to update the tag index | 247 | //i.e. If a resource stores tags as part of each message it needs to update the tag index |
240 | //TODO setup preprocessors for each domain type and pipeline type allowing full customization | 248 | //TODO setup preprocessors for each domain type and pipeline type allowing full customization |
241 | //Eventually the order should be self configuring, for now it's hardcoded. | 249 | //Eventually the order should be self configuring, for now it's hardcoded. |
242 | auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { | 250 | auto eventIndexer = new SimpleProcessor("summaryprocessor", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { |
243 | auto adaptor = eventFactory->createAdaptor(entity); | 251 | auto adaptor = eventFactory->createAdaptor(entity); |
244 | qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); | 252 | qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); |
245 | }); | 253 | }); |