diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-28 16:39:16 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-28 16:39:16 +0100 |
commit | 129333371d28c06d85f75ca579ce17798e615e84 (patch) | |
tree | 2ae01db9d26f6f72a74fa77e6937e03304e81a2c /common/pipeline.h | |
parent | 20f049b65c4bd8c3d0c16bbf398641675648a93f (diff) | |
download | sink-129333371d28c06d85f75ca579ce17798e615e84.tar.gz sink-129333371d28c06d85f75ca579ce17798e615e84.zip |
Made pipeline preprocessing synchronous.
Instead of having the asynchronous preprocessor concept with different
pipelines for new/modify/delete we have a single pipeline with
synchronous preprocessors that act upon new/modify/delete.
This keeps the code simpler due to lack of asynchronity and keeps the
new/modify/delete operations together (which at least for the indexing
makes a lot of sense).
Not supporting asynchronity is ok because the tasks done in
preprocessing are not cpu intensive (if they were we had a problem
since they are directly involved in the round-trip time), and the main
cost comes from i/o, meaning we don't gain much by doing multithreading.
Costly tasks (such as full-text indexing) should rather be implemented
as post-processing, since that doesn't increase the round-trip time directly,
and eventually consistent is typically good enough for that.
Diffstat (limited to 'common/pipeline.h')
-rw-r--r-- | common/pipeline.h | 90 |
1 files changed, 6 insertions, 84 deletions
diff --git a/common/pipeline.h b/common/pipeline.h index c8d9ddc..f11d880 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -35,7 +35,6 @@ | |||
35 | namespace Akonadi2 | 35 | namespace Akonadi2 |
36 | { | 36 | { |
37 | 37 | ||
38 | class PipelineState; | ||
39 | class Preprocessor; | 38 | class Preprocessor; |
40 | 39 | ||
41 | class AKONADI2COMMON_EXPORT Pipeline : public QObject | 40 | class AKONADI2COMMON_EXPORT Pipeline : public QObject |
@@ -43,19 +42,16 @@ class AKONADI2COMMON_EXPORT Pipeline : public QObject | |||
43 | Q_OBJECT | 42 | Q_OBJECT |
44 | 43 | ||
45 | public: | 44 | public: |
46 | enum Type { NullPipeline, NewPipeline, ModifiedPipeline, DeletedPipeline }; | ||
47 | |||
48 | Pipeline(const QString &storagePath, QObject *parent = 0); | 45 | Pipeline(const QString &storagePath, QObject *parent = 0); |
49 | ~Pipeline(); | 46 | ~Pipeline(); |
50 | 47 | ||
51 | Storage &storage() const; | 48 | Storage &storage() const; |
52 | 49 | ||
53 | void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); | 50 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); |
54 | void startTransaction(); | 51 | void startTransaction(); |
55 | void commit(); | 52 | void commit(); |
56 | Storage::Transaction &transaction(); | 53 | Storage::Transaction &transaction(); |
57 | 54 | ||
58 | void null(); | ||
59 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); | 55 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); |
60 | 56 | ||
61 | KAsync::Job<qint64> newEntity(void const *command, size_t size); | 57 | KAsync::Job<qint64> newEntity(void const *command, size_t size); |
@@ -75,104 +71,30 @@ public: | |||
75 | 71 | ||
76 | Q_SIGNALS: | 72 | Q_SIGNALS: |
77 | void revisionUpdated(qint64); | 73 | void revisionUpdated(qint64); |
78 | void pipelinesDrained(); | ||
79 | |||
80 | private Q_SLOTS: | ||
81 | void stepPipelines(); | ||
82 | 74 | ||
83 | private: | 75 | private: |
84 | void pipelineStepped(const PipelineState &state); | ||
85 | //Don't use a reference here (it would invalidate itself) | ||
86 | void pipelineCompleted(PipelineState state); | ||
87 | void scheduleStep(); | ||
88 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | 76 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); |
89 | 77 | ||
90 | friend class PipelineState; | ||
91 | |||
92 | class Private; | 78 | class Private; |
93 | Private * const d; | 79 | Private * const d; |
94 | }; | 80 | }; |
95 | 81 | ||
96 | class AKONADI2COMMON_EXPORT PipelineState | ||
97 | { | ||
98 | public: | ||
99 | PipelineState(); | ||
100 | PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType); | ||
101 | PipelineState(const PipelineState &other); | ||
102 | ~PipelineState(); | ||
103 | |||
104 | PipelineState &operator=(const PipelineState &rhs); | ||
105 | bool operator==(const PipelineState &rhs); | ||
106 | |||
107 | bool isIdle() const; | ||
108 | QByteArray key() const; | ||
109 | Pipeline::Type type() const; | ||
110 | qint64 revision() const; | ||
111 | QByteArray bufferType() const; | ||
112 | |||
113 | void step(); | ||
114 | void processingCompleted(Preprocessor *filter); | ||
115 | |||
116 | void callback(); | ||
117 | |||
118 | private: | ||
119 | class Private; | ||
120 | QExplicitlySharedDataPointer<Private> d; | ||
121 | }; | ||
122 | |||
123 | class AKONADI2COMMON_EXPORT Preprocessor | 82 | class AKONADI2COMMON_EXPORT Preprocessor |
124 | { | 83 | { |
125 | public: | 84 | public: |
126 | Preprocessor(); | 85 | Preprocessor(); |
127 | virtual ~Preprocessor(); | 86 | virtual ~Preprocessor(); |
128 | 87 | ||
129 | virtual void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction); | 88 | virtual void startBatch(); |
130 | //TODO to record progress | 89 | virtual void newEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0; |
131 | virtual QString id() const; | 90 | virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0; |
132 | 91 | virtual void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) = 0; | |
133 | protected: | 92 | virtual void finalize(); |
134 | void processingCompleted(PipelineState state); | ||
135 | 93 | ||
136 | private: | 94 | private: |
137 | class Private; | 95 | class Private; |
138 | Private * const d; | 96 | Private * const d; |
139 | }; | 97 | }; |
140 | 98 | ||
141 | /** | ||
142 | * A simple processor that takes a single function | ||
143 | */ | ||
144 | class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor | ||
145 | { | ||
146 | public: | ||
147 | SimpleProcessor(const QString &id, const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> &f) | ||
148 | : Akonadi2::Preprocessor(), | ||
149 | mFunction(f), | ||
150 | mId(id) | ||
151 | { | ||
152 | } | ||
153 | |||
154 | void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | ||
155 | { | ||
156 | transaction.openDatabase(state.bufferType() + ".main").scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool { | ||
157 | auto entity = Akonadi2::GetEntity(value); | ||
158 | mFunction(state, *entity, transaction); | ||
159 | processingCompleted(state); | ||
160 | return false; | ||
161 | }, [this, state](const Akonadi2::Storage::Error &error) { | ||
162 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | ||
163 | processingCompleted(state); | ||
164 | }); | ||
165 | } | ||
166 | |||
167 | QString id() const Q_DECL_OVERRIDE | ||
168 | { | ||
169 | return mId; | ||
170 | } | ||
171 | |||
172 | protected: | ||
173 | std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> mFunction; | ||
174 | QString mId; | ||
175 | }; | ||
176 | |||
177 | } // namespace Akonadi2 | 99 | } // namespace Akonadi2 |
178 | 100 | ||