diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/domain/event.cpp | 6 | ||||
-rw-r--r-- | common/domain/event.h | 2 | ||||
-rw-r--r-- | common/domain/mail.cpp | 7 | ||||
-rw-r--r-- | common/domain/mail.h | 2 | ||||
-rw-r--r-- | common/genericresource.cpp | 13 | ||||
-rw-r--r-- | common/genericresource.h | 1 | ||||
-rw-r--r-- | common/pipeline.cpp | 295 | ||||
-rw-r--r-- | common/pipeline.h | 90 |
8 files changed, 97 insertions, 319 deletions
diff --git a/common/domain/event.cpp b/common/domain/event.cpp index 9759fc3..83a6906 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp | |||
@@ -50,11 +50,11 @@ ResultSet TypeImplementation<Event>::queryIndexes(const Akonadi2::Query &query, | |||
50 | return ResultSet(keys); | 50 | return ResultSet(keys); |
51 | } | 51 | } |
52 | 52 | ||
53 | void TypeImplementation<Event>::index(const Event &type, Akonadi2::Storage::Transaction &transaction) | 53 | void TypeImplementation<Event>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) |
54 | { | 54 | { |
55 | const auto uid = type.getProperty("uid"); | 55 | const auto uid = bufferAdaptor.getProperty("uid"); |
56 | if (uid.isValid()) { | 56 | if (uid.isValid()) { |
57 | Index("event.index.uid", transaction).add(uid.toByteArray(), type.identifier()); | 57 | Index("event.index.uid", transaction).add(uid.toByteArray(), identifier); |
58 | } | 58 | } |
59 | } | 59 | } |
60 | 60 | ||
diff --git a/common/domain/event.h b/common/domain/event.h index f21cd34..e9ba52a 100644 --- a/common/domain/event.h +++ b/common/domain/event.h | |||
@@ -56,7 +56,7 @@ public: | |||
56 | * An empty result set indicates that a full scan is required. | 56 | * An empty result set indicates that a full scan is required. |
57 | */ | 57 | */ |
58 | static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction); | 58 | static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction); |
59 | static void index(const Event &type, Akonadi2::Storage::Transaction &transaction); | 59 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction); |
60 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); | 60 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); |
61 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); | 61 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); |
62 | }; | 62 | }; |
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index d40dde9..ffe322e 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp | |||
@@ -50,12 +50,11 @@ ResultSet TypeImplementation<Mail>::queryIndexes(const Akonadi2::Query &query, c | |||
50 | return ResultSet(keys); | 50 | return ResultSet(keys); |
51 | } | 51 | } |
52 | 52 | ||
53 | void TypeImplementation<Mail>::index(const Mail &type, Akonadi2::Storage::Transaction &transaction) | 53 | void TypeImplementation<Mail>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) |
54 | { | 54 | { |
55 | const auto uid = type.getProperty("uid"); | 55 | const auto uid = bufferAdaptor.getProperty("uid"); |
56 | if (uid.isValid()) { | 56 | if (uid.isValid()) { |
57 | Index uidIndex("mail.index.uid", transaction); | 57 | Index("mail.index.uid", transaction).add(uid.toByteArray(), identifier); |
58 | uidIndex.add(uid.toByteArray(), type.identifier()); | ||
59 | } | 58 | } |
60 | } | 59 | } |
61 | 60 | ||
diff --git a/common/domain/mail.h b/common/domain/mail.h index b58ce44..38f1d03 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h | |||
@@ -51,7 +51,7 @@ public: | |||
51 | * An empty result set indicates that a full scan is required. | 51 | * An empty result set indicates that a full scan is required. |
52 | */ | 52 | */ |
53 | static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction); | 53 | static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction); |
54 | static void index(const Mail &type, Akonadi2::Storage::Transaction &transaction); | 54 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction); |
55 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); | 55 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); |
56 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); | 56 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); |
57 | }; | 57 | }; |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index dcae43d..652154d 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -54,7 +54,7 @@ public Q_SLOTS: | |||
54 | { | 54 | { |
55 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); | 55 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); |
56 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); | 56 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); |
57 | qint64 lastReplayedRevision = 0; | 57 | qint64 lastReplayedRevision = 1; |
58 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | 58 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { |
59 | lastReplayedRevision = value.toLongLong(); | 59 | lastReplayedRevision = value.toLongLong(); |
60 | return false; | 60 | return false; |
@@ -285,10 +285,13 @@ GenericResource::~GenericResource() | |||
285 | delete mSourceChangeReplay; | 285 | delete mSourceChangeReplay; |
286 | } | 286 | } |
287 | 287 | ||
288 | // void GenericResource::revisionChanged() | 288 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) |
289 | // { | 289 | { |
290 | // //TODO replay revision | 290 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk(); |
291 | // } | 291 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); |
292 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); | ||
293 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".changereplay", Akonadi2::Storage::ReadWrite).removeFromDisk(); | ||
294 | } | ||
292 | 295 | ||
293 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) | 296 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) |
294 | { | 297 | { |
diff --git a/common/genericresource.h b/common/genericresource.h index cfc6653..33de0e7 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -48,6 +48,7 @@ public: | |||
48 | 48 | ||
49 | int error() const; | 49 | int error() const; |
50 | 50 | ||
51 | static void removeFromDisk(const QByteArray &instanceIdentifier); | ||
51 | private Q_SLOTS: | 52 | private Q_SLOTS: |
52 | void updateLowerBoundRevision(); | 53 | void updateLowerBoundRevision(); |
53 | 54 | ||
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 15d2401..0ce478b 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -1,5 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> | 2 | * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> |
3 | * Copyright (C) 2015 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | 4 | * |
4 | * This library is free software; you can redistribute it and/or | 5 | * This library is free software; you can redistribute it and/or |
5 | * modify it under the terms of the GNU Lesser General Public | 6 | * modify it under the terms of the GNU Lesser General Public |
@@ -41,19 +42,13 @@ class Pipeline::Private | |||
41 | { | 42 | { |
42 | public: | 43 | public: |
43 | Private(const QString &resourceName) | 44 | Private(const QString &resourceName) |
44 | : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite), | 45 | : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite) |
45 | stepScheduled(false) | ||
46 | { | 46 | { |
47 | } | 47 | } |
48 | 48 | ||
49 | Storage storage; | 49 | Storage storage; |
50 | Storage::Transaction transaction; | 50 | Storage::Transaction transaction; |
51 | QHash<QString, QVector<Preprocessor *> > nullPipeline; | 51 | QHash<QString, QVector<Preprocessor *> > processors; |
52 | QHash<QString, QVector<Preprocessor *> > newPipeline; | ||
53 | QHash<QString, QVector<Preprocessor *> > modifiedPipeline; | ||
54 | QHash<QString, QVector<Preprocessor *> > deletedPipeline; | ||
55 | QVector<PipelineState> activePipelines; | ||
56 | bool stepScheduled; | ||
57 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; | 52 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; |
58 | }; | 53 | }; |
59 | 54 | ||
@@ -68,21 +63,9 @@ Pipeline::~Pipeline() | |||
68 | delete d; | 63 | delete d; |
69 | } | 64 | } |
70 | 65 | ||
71 | void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors) | 66 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) |
72 | { | 67 | { |
73 | switch (pipelineType) { | 68 | d->processors[entityType] = processors; |
74 | case NewPipeline: | ||
75 | d->newPipeline[entityType] = preprocessors; | ||
76 | break; | ||
77 | case ModifiedPipeline: | ||
78 | d->modifiedPipeline[entityType] = preprocessors; | ||
79 | break; | ||
80 | case DeletedPipeline: | ||
81 | d->deletedPipeline[entityType] = preprocessors; | ||
82 | break; | ||
83 | default: | ||
84 | break; | ||
85 | }; | ||
86 | } | 69 | } |
87 | 70 | ||
88 | void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) | 71 | void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) |
@@ -92,6 +75,11 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac | |||
92 | 75 | ||
93 | void Pipeline::startTransaction() | 76 | void Pipeline::startTransaction() |
94 | { | 77 | { |
78 | //TODO call for all types | ||
79 | //But avoid doing it during cleanup | ||
80 | // for (auto processor : d->processors[bufferType]) { | ||
81 | // processor->startBatch(); | ||
82 | // } | ||
95 | if (d->transaction) { | 83 | if (d->transaction) { |
96 | return; | 84 | return; |
97 | } | 85 | } |
@@ -100,7 +88,13 @@ void Pipeline::startTransaction() | |||
100 | 88 | ||
101 | void Pipeline::commit() | 89 | void Pipeline::commit() |
102 | { | 90 | { |
91 | //TODO call for all types | ||
92 | //But avoid doing it during cleanup | ||
93 | // for (auto processor : d->processors[bufferType]) { | ||
94 | // processor->finalize(); | ||
95 | // } | ||
103 | const auto revision = Akonadi2::Storage::maxRevision(d->transaction); | 96 | const auto revision = Akonadi2::Storage::maxRevision(d->transaction); |
97 | Trace() << "Committing " << revision; | ||
104 | if (d->transaction) { | 98 | if (d->transaction) { |
105 | d->transaction.commit(); | 99 | d->transaction.commit(); |
106 | } | 100 | } |
@@ -118,14 +112,6 @@ Storage &Pipeline::storage() const | |||
118 | return d->storage; | 112 | return d->storage; |
119 | } | 113 | } |
120 | 114 | ||
121 | void Pipeline::null() | ||
122 | { | ||
123 | //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) | ||
124 | // PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); | ||
125 | // d->activePipelines << state; | ||
126 | // state.step(); | ||
127 | } | ||
128 | |||
129 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 115 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
130 | { | 116 | { |
131 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), | 117 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), |
@@ -181,15 +167,25 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
181 | 167 | ||
182 | storeNewRevision(newRevision, fbb, bufferType, key); | 168 | storeNewRevision(newRevision, fbb, bufferType, key); |
183 | 169 | ||
184 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 170 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
171 | if (!adaptorFactory) { | ||
172 | Warning() << "no adaptor factory for type " << bufferType; | ||
173 | return KAsync::error<qint64>(0); | ||
174 | } | ||
185 | 175 | ||
186 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { | 176 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
187 | PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future, newRevision]() { | 177 | d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { |
188 | future.setValue(newRevision); | 178 | auto entity = Akonadi2::GetEntity(value); |
189 | future.setFinished(); | 179 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
190 | }, bufferType); | 180 | for (auto processor : d->processors[bufferType]) { |
191 | d->activePipelines << state; | 181 | processor->newEntity(key, newRevision, *adaptor, d->transaction); |
192 | state.step(); | 182 | } |
183 | return false; | ||
184 | }, [this](const Akonadi2::Storage::Error &error) { | ||
185 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | ||
186 | }); | ||
187 | return KAsync::start<qint64>([newRevision](){ | ||
188 | return newRevision; | ||
193 | }); | 189 | }); |
194 | } | 190 | } |
195 | 191 | ||
@@ -237,7 +233,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
237 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 233 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
238 | 234 | ||
239 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | 235 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; |
240 | storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 236 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
241 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 237 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
242 | if (!buffer.isValid()) { | 238 | if (!buffer.isValid()) { |
243 | Warning() << "Read invalid buffer from disk"; | 239 | Warning() << "Read invalid buffer from disk"; |
@@ -287,14 +283,18 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
287 | 283 | ||
288 | storeNewRevision(newRevision, fbb, bufferType, key); | 284 | storeNewRevision(newRevision, fbb, bufferType, key); |
289 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 285 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
290 | 286 | d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool { | |
291 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { | 287 | auto entity = Akonadi2::GetEntity(value); |
292 | PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future, newRevision]() { | 288 | auto newEntity = adaptorFactory->createAdaptor(*entity); |
293 | future.setValue(newRevision); | 289 | for (auto processor : d->processors[bufferType]) { |
294 | future.setFinished(); | 290 | processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); |
295 | }, bufferType); | 291 | } |
296 | d->activePipelines << state; | 292 | return false; |
297 | state.step(); | 293 | }, [this](const Akonadi2::Storage::Error &error) { |
294 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | ||
295 | }); | ||
296 | return KAsync::start<qint64>([newRevision](){ | ||
297 | return newRevision; | ||
298 | }); | 298 | }); |
299 | } | 299 | } |
300 | 300 | ||
@@ -328,16 +328,34 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
328 | flatbuffers::FlatBufferBuilder fbb; | 328 | flatbuffers::FlatBufferBuilder fbb; |
329 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | 329 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); |
330 | 330 | ||
331 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | ||
332 | if (!adaptorFactory) { | ||
333 | Warning() << "no adaptor factory for type " << bufferType; | ||
334 | return KAsync::error<qint64>(0); | ||
335 | } | ||
336 | |||
337 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | ||
338 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { | ||
339 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
340 | if (!buffer.isValid()) { | ||
341 | Warning() << "Read invalid buffer from disk"; | ||
342 | } else { | ||
343 | current = adaptorFactory->createAdaptor(buffer.entity()); | ||
344 | } | ||
345 | return false; | ||
346 | }, [this](const Akonadi2::Storage::Error &error) { | ||
347 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | ||
348 | }); | ||
349 | |||
331 | storeNewRevision(newRevision, fbb, bufferType, key); | 350 | storeNewRevision(newRevision, fbb, bufferType, key); |
332 | Log() << "Pipeline: deleted entity: "<< newRevision; | 351 | Log() << "Pipeline: deleted entity: "<< newRevision; |
333 | 352 | ||
334 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { | 353 | for (auto processor : d->processors[bufferType]) { |
335 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future, newRevision](){ | 354 | processor->deletedEntity(key, newRevision, *current, d->transaction); |
336 | future.setValue(newRevision); | 355 | } |
337 | future.setFinished(); | 356 | |
338 | }, bufferType); | 357 | return KAsync::start<qint64>([newRevision](){ |
339 | d->activePipelines << state; | 358 | return newRevision; |
340 | state.step(); | ||
341 | }); | 359 | }); |
342 | } | 360 | } |
343 | 361 | ||
@@ -372,164 +390,6 @@ qint64 Pipeline::cleanedUpRevision() | |||
372 | return Akonadi2::Storage::cleanedUpRevision(d->transaction); | 390 | return Akonadi2::Storage::cleanedUpRevision(d->transaction); |
373 | } | 391 | } |
374 | 392 | ||
375 | void Pipeline::pipelineStepped(const PipelineState &state) | ||
376 | { | ||
377 | scheduleStep(); | ||
378 | } | ||
379 | |||
380 | void Pipeline::scheduleStep() | ||
381 | { | ||
382 | if (!d->stepScheduled) { | ||
383 | d->stepScheduled = true; | ||
384 | QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); | ||
385 | } | ||
386 | } | ||
387 | |||
388 | void Pipeline::stepPipelines() | ||
389 | { | ||
390 | d->stepScheduled = false; | ||
391 | for (PipelineState &state: d->activePipelines) { | ||
392 | if (state.isIdle()) { | ||
393 | state.step(); | ||
394 | } | ||
395 | } | ||
396 | } | ||
397 | |||
398 | void Pipeline::pipelineCompleted(PipelineState state) | ||
399 | { | ||
400 | //TODO finalize the datastore, inform clients of the new rev | ||
401 | const int index = d->activePipelines.indexOf(state); | ||
402 | if (index > -1) { | ||
403 | d->activePipelines.remove(index); | ||
404 | } | ||
405 | state.callback(); | ||
406 | |||
407 | scheduleStep(); | ||
408 | if (d->activePipelines.isEmpty()) { | ||
409 | emit pipelinesDrained(); | ||
410 | } | ||
411 | } | ||
412 | |||
413 | |||
414 | class PipelineState::Private : public QSharedData | ||
415 | { | ||
416 | public: | ||
417 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c, qint64 r, const QByteArray &b) | ||
418 | : pipeline(p), | ||
419 | type(t), | ||
420 | key(k), | ||
421 | filterIt(filters), | ||
422 | idle(true), | ||
423 | callback(c), | ||
424 | revision(r), | ||
425 | bufferType(b) | ||
426 | {} | ||
427 | |||
428 | Private() | ||
429 | : pipeline(0), | ||
430 | filterIt(QVector<Preprocessor *>()), | ||
431 | idle(true), | ||
432 | revision(-1) | ||
433 | {} | ||
434 | |||
435 | Pipeline *pipeline; | ||
436 | Pipeline::Type type; | ||
437 | QByteArray key; | ||
438 | QVectorIterator<Preprocessor *> filterIt; | ||
439 | bool idle; | ||
440 | std::function<void()> callback; | ||
441 | qint64 revision; | ||
442 | QByteArray bufferType; | ||
443 | }; | ||
444 | |||
445 | PipelineState::PipelineState() | ||
446 | : d(new Private()) | ||
447 | { | ||
448 | |||
449 | } | ||
450 | |||
451 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType) | ||
452 | : d(new Private(pipeline, type, key, filters, callback, revision, bufferType)) | ||
453 | { | ||
454 | } | ||
455 | |||
456 | PipelineState::PipelineState(const PipelineState &other) | ||
457 | : d(other.d) | ||
458 | { | ||
459 | } | ||
460 | |||
461 | PipelineState::~PipelineState() | ||
462 | { | ||
463 | } | ||
464 | |||
465 | PipelineState &PipelineState::operator=(const PipelineState &rhs) | ||
466 | { | ||
467 | d = rhs.d; | ||
468 | return *this; | ||
469 | } | ||
470 | |||
471 | bool PipelineState::operator==(const PipelineState &rhs) | ||
472 | { | ||
473 | return d == rhs.d; | ||
474 | } | ||
475 | |||
476 | bool PipelineState::isIdle() const | ||
477 | { | ||
478 | return d->idle; | ||
479 | } | ||
480 | |||
481 | QByteArray PipelineState::key() const | ||
482 | { | ||
483 | return d->key; | ||
484 | } | ||
485 | |||
486 | Pipeline::Type PipelineState::type() const | ||
487 | { | ||
488 | return d->type; | ||
489 | } | ||
490 | |||
491 | qint64 PipelineState::revision() const | ||
492 | { | ||
493 | return d->revision; | ||
494 | } | ||
495 | |||
496 | QByteArray PipelineState::bufferType() const | ||
497 | { | ||
498 | return d->bufferType; | ||
499 | } | ||
500 | |||
501 | void PipelineState::step() | ||
502 | { | ||
503 | if (!d->pipeline) { | ||
504 | Q_ASSERT(false); | ||
505 | return; | ||
506 | } | ||
507 | |||
508 | d->idle = false; | ||
509 | if (d->filterIt.hasNext()) { | ||
510 | //TODO skip step if already processed | ||
511 | auto preprocessor = d->filterIt.next(); | ||
512 | preprocessor->process(*this, d->pipeline->transaction()); | ||
513 | } else { | ||
514 | //This object becomes invalid after this call | ||
515 | d->pipeline->pipelineCompleted(*this); | ||
516 | } | ||
517 | } | ||
518 | |||
519 | void PipelineState::processingCompleted(Preprocessor *filter) | ||
520 | { | ||
521 | if (d->pipeline && filter == d->filterIt.peekPrevious()) { | ||
522 | d->idle = true; | ||
523 | d->pipeline->pipelineStepped(*this); | ||
524 | } | ||
525 | } | ||
526 | |||
527 | void PipelineState::callback() | ||
528 | { | ||
529 | d->callback(); | ||
530 | } | ||
531 | |||
532 | |||
533 | Preprocessor::Preprocessor() | 393 | Preprocessor::Preprocessor() |
534 | : d(0) | 394 | : d(0) |
535 | { | 395 | { |
@@ -539,19 +399,12 @@ Preprocessor::~Preprocessor() | |||
539 | { | 399 | { |
540 | } | 400 | } |
541 | 401 | ||
542 | void Preprocessor::process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) | 402 | void Preprocessor::startBatch() |
543 | { | ||
544 | processingCompleted(state); | ||
545 | } | ||
546 | |||
547 | void Preprocessor::processingCompleted(PipelineState state) | ||
548 | { | 403 | { |
549 | state.processingCompleted(this); | ||
550 | } | 404 | } |
551 | 405 | ||
552 | QString Preprocessor::id() const | 406 | void Preprocessor::finalize() |
553 | { | 407 | { |
554 | return QLatin1String("unknown processor"); | ||
555 | } | 408 | } |
556 | 409 | ||
557 | } // namespace Akonadi2 | 410 | } // namespace Akonadi2 |
diff --git a/common/pipeline.h b/common/pipeline.h index c8d9ddc..f11d880 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -35,7 +35,6 @@ | |||
35 | namespace Akonadi2 | 35 | namespace Akonadi2 |
36 | { | 36 | { |
37 | 37 | ||
38 | class PipelineState; | ||
39 | class Preprocessor; | 38 | class Preprocessor; |
40 | 39 | ||
41 | class AKONADI2COMMON_EXPORT Pipeline : public QObject | 40 | class AKONADI2COMMON_EXPORT Pipeline : public QObject |
@@ -43,19 +42,16 @@ class AKONADI2COMMON_EXPORT Pipeline : public QObject | |||
43 | Q_OBJECT | 42 | Q_OBJECT |
44 | 43 | ||
45 | public: | 44 | public: |
46 | enum Type { NullPipeline, NewPipeline, ModifiedPipeline, DeletedPipeline }; | ||
47 | |||
48 | Pipeline(const QString &storagePath, QObject *parent = 0); | 45 | Pipeline(const QString &storagePath, QObject *parent = 0); |
49 | ~Pipeline(); | 46 | ~Pipeline(); |
50 | 47 | ||
51 | Storage &storage() const; | 48 | Storage &storage() const; |
52 | 49 | ||
53 | void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); | 50 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); |
54 | void startTransaction(); | 51 | void startTransaction(); |
55 | void commit(); | 52 | void commit(); |
56 | Storage::Transaction &transaction(); | 53 | Storage::Transaction &transaction(); |
57 | 54 | ||
58 | void null(); | ||
59 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); | 55 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); |
60 | 56 | ||
61 | KAsync::Job<qint64> newEntity(void const *command, size_t size); | 57 | KAsync::Job<qint64> newEntity(void const *command, size_t size); |
@@ -75,104 +71,30 @@ public: | |||
75 | 71 | ||
76 | Q_SIGNALS: | 72 | Q_SIGNALS: |
77 | void revisionUpdated(qint64); | 73 | void revisionUpdated(qint64); |
78 | void pipelinesDrained(); | ||
79 | |||
80 | private Q_SLOTS: | ||
81 | void stepPipelines(); | ||
82 | 74 | ||
83 | private: | 75 | private: |
84 | void pipelineStepped(const PipelineState &state); | ||
85 | //Don't use a reference here (it would invalidate itself) | ||
86 | void pipelineCompleted(PipelineState state); | ||
87 | void scheduleStep(); | ||
88 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | 76 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); |
89 | 77 | ||
90 | friend class PipelineState; | ||
91 | |||
92 | class Private; | 78 | class Private; |
93 | Private * const d; | 79 | Private * const d; |
94 | }; | 80 | }; |
95 | 81 | ||
96 | class AKONADI2COMMON_EXPORT PipelineState | ||
97 | { | ||
98 | public: | ||
99 | PipelineState(); | ||
100 | PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType); | ||
101 | PipelineState(const PipelineState &other); | ||
102 | ~PipelineState(); | ||
103 | |||
104 | PipelineState &operator=(const PipelineState &rhs); | ||
105 | bool operator==(const PipelineState &rhs); | ||
106 | |||
107 | bool isIdle() const; | ||
108 | QByteArray key() const; | ||
109 | Pipeline::Type type() const; | ||
110 | qint64 revision() const; | ||
111 | QByteArray bufferType() const; | ||
112 | |||
113 | void step(); | ||
114 | void processingCompleted(Preprocessor *filter); | ||
115 | |||
116 | void callback(); | ||
117 | |||
118 | private: | ||
119 | class Private; | ||
120 | QExplicitlySharedDataPointer<Private> d; | ||
121 | }; | ||
122 | |||
123 | class AKONADI2COMMON_EXPORT Preprocessor | 82 | class AKONADI2COMMON_EXPORT Preprocessor |
124 | { | 83 | { |
125 | public: | 84 | public: |
126 | Preprocessor(); | 85 | Preprocessor(); |
127 | virtual ~Preprocessor(); | 86 | virtual ~Preprocessor(); |
128 | 87 | ||
129 | virtual void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction); | 88 | virtual void startBatch(); |
130 | //TODO to record progress | 89 | virtual void newEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0; |
131 | virtual QString id() const; | 90 | virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0; |
132 | 91 | virtual void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) = 0; | |
133 | protected: | 92 | virtual void finalize(); |
134 | void processingCompleted(PipelineState state); | ||
135 | 93 | ||
136 | private: | 94 | private: |
137 | class Private; | 95 | class Private; |
138 | Private * const d; | 96 | Private * const d; |
139 | }; | 97 | }; |
140 | 98 | ||
141 | /** | ||
142 | * A simple processor that takes a single function | ||
143 | */ | ||
144 | class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor | ||
145 | { | ||
146 | public: | ||
147 | SimpleProcessor(const QString &id, const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> &f) | ||
148 | : Akonadi2::Preprocessor(), | ||
149 | mFunction(f), | ||
150 | mId(id) | ||
151 | { | ||
152 | } | ||
153 | |||
154 | void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
155 | { | ||
156 | transaction.openDatabase(state.bufferType() + ".main").scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool { | ||
157 | auto entity = Akonadi2::GetEntity(value); | ||
158 | mFunction(state, *entity, transaction); | ||
159 | processingCompleted(state); | ||
160 | return false; | ||
161 | }, [this, state](const Akonadi2::Storage::Error &error) { | ||
162 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | ||
163 | processingCompleted(state); | ||
164 | }); | ||
165 | } | ||
166 | |||
167 | QString id() const Q_DECL_OVERRIDE | ||
168 | { | ||
169 | return mId; | ||
170 | } | ||
171 | |||
172 | protected: | ||
173 | std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> mFunction; | ||
174 | QString mId; | ||
175 | }; | ||
176 | |||
177 | } // namespace Akonadi2 | 99 | } // namespace Akonadi2 |
178 | 100 | ||