summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-19 03:16:29 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-19 03:16:29 +0100
commitdb477b2f35411a55c051d59588b6fabd153e4013 (patch)
treecaf886e7fa8333168208db736da6b2dfcb85ee67 /common/pipeline.cpp
parent58385e4a48ff85fa847b4264d3f5216f62af24c1 (diff)
downloadsink-db477b2f35411a55c051d59588b6fabd153e4013.tar.gz
sink-db477b2f35411a55c051d59588b6fabd153e4013.zip
Fixed pipeline.
Steps are now finally processed as they should be and a job tracks the processing progress.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp38
1 files changed, 25 insertions, 13 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index dda7671..8f15fef 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -168,32 +168,30 @@ void Pipeline::pipelineStepped(const PipelineState &state)
168 168
169void Pipeline::scheduleStep() 169void Pipeline::scheduleStep()
170{ 170{
171 // if (!d->stepScheduled) { 171 if (!d->stepScheduled) {
172 // d->stepScheduled = true; 172 d->stepScheduled = true;
173 // QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); 173 QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection);
174 // } 174 }
175 //FIXME make async again. For some reason the job in newEntity crashes if pipeline processing is async.
176 stepPipelines();
177} 175}
178 176
179void Pipeline::stepPipelines() 177void Pipeline::stepPipelines()
180{ 178{
179 d->stepScheduled = false;
181 for (PipelineState &state: d->activePipelines) { 180 for (PipelineState &state: d->activePipelines) {
182 if (state.isIdle()) { 181 if (state.isIdle()) {
183 state.step(); 182 state.step();
184 } 183 }
185 } 184 }
186
187 d->stepScheduled = false;
188} 185}
189 186
190void Pipeline::pipelineCompleted(const PipelineState &state) 187void Pipeline::pipelineCompleted(PipelineState state)
191{ 188{
192 //TODO finalize the datastore, inform clients of the new rev 189 //TODO finalize the datastore, inform clients of the new rev
193 const int index = d->activePipelines.indexOf(state); 190 const int index = d->activePipelines.indexOf(state);
194 if (index > -1) { 191 if (index > -1) {
195 d->activePipelines.remove(index); 192 d->activePipelines.remove(index);
196 } 193 }
194 state.callback();
197 195
198 if (state.type() != NullPipeline) { 196 if (state.type() != NullPipeline) {
199 //TODO what revision is finalized? 197 //TODO what revision is finalized?
@@ -281,20 +279,23 @@ Pipeline::Type PipelineState::type() const
281void PipelineState::step() 279void PipelineState::step()
282{ 280{
283 if (!d->pipeline) { 281 if (!d->pipeline) {
282 Q_ASSERT(false);
284 return; 283 return;
285 } 284 }
286 285
287 d->idle = false; 286 d->idle = false;
288 if (d->filterIt.hasNext()) { 287 if (d->filterIt.hasNext()) {
289 //TODO skip step if already processed 288 //TODO skip step if already processed
290 d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { 289 //FIXME error handling if no result is found
290 auto preprocessor = d->filterIt.next();
291 d->pipeline->storage().scan(d->key.toStdString(), [this, preprocessor](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool {
291 auto entity = Akonadi2::GetEntity(dataValue); 292 auto entity = Akonadi2::GetEntity(dataValue);
292 d->filterIt.next()->process(*this, *entity); 293 preprocessor->process(*this, *entity);
293 return false; 294 return false;
294 }); 295 });
295 } else { 296 } else {
297 //This object becomes invalid after this call
296 d->pipeline->pipelineCompleted(*this); 298 d->pipeline->pipelineCompleted(*this);
297 d->callback();
298 } 299 }
299} 300}
300 301
@@ -307,6 +308,12 @@ void PipelineState::processingCompleted(Preprocessor *filter)
307 } 308 }
308} 309}
309 310
311void PipelineState::callback()
312{
313 d->callback();
314}
315
316
310Preprocessor::Preprocessor() 317Preprocessor::Preprocessor()
311 : d(0) 318 : d(0)
312{ 319{
@@ -316,7 +323,7 @@ Preprocessor::~Preprocessor()
316{ 323{
317} 324}
318 325
319void Preprocessor::process(PipelineState state, const Akonadi2::Entity &) 326void Preprocessor::process(const PipelineState &state, const Akonadi2::Entity &)
320{ 327{
321 processingCompleted(state); 328 processingCompleted(state);
322} 329}
@@ -326,5 +333,10 @@ void Preprocessor::processingCompleted(PipelineState state)
326 state.processingCompleted(this); 333 state.processingCompleted(this);
327} 334}
328 335
336QString Preprocessor::id() const
337{
338 return QLatin1String("unknown processor");
339}
340
329} // namespace Akonadi2 341} // namespace Akonadi2
330 342