summaryrefslogtreecommitdiffstats
path: root/common/pipeline.h
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-10-28 16:39:16 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-10-28 16:39:16 +0100
commit129333371d28c06d85f75ca579ce17798e615e84 (patch)
tree2ae01db9d26f6f72a74fa77e6937e03304e81a2c /common/pipeline.h
parent20f049b65c4bd8c3d0c16bbf398641675648a93f (diff)
downloadsink-129333371d28c06d85f75ca579ce17798e615e84.tar.gz
sink-129333371d28c06d85f75ca579ce17798e615e84.zip
Made pipeline preprocessing synchronous.
Instead of having the asynchronous preprocessor concept with different pipelines for new/modify/delete we have a single pipeline with synchronous preprocessors that act upon new/modify/delete. This keeps the code simpler due to lack of asynchronity and keeps the new/modify/delete operations together (which at least for the indexing makes a lot of sense). Not supporting asynchronity is ok because the tasks done in preprocessing are not cpu intensive (if they were we had a problem since they are directly involved in the round-trip time), and the main cost comes from i/o, meaning we don't gain much by doing multithreading. Costly tasks (such as full-text indexing) should rather be implemented as post-processing, since that doesn't increase the round-trip time directly, and eventually consistent is typically good enough for that.
Diffstat (limited to 'common/pipeline.h')
-rw-r--r--common/pipeline.h90
1 files changed, 6 insertions, 84 deletions
diff --git a/common/pipeline.h b/common/pipeline.h
index c8d9ddc..f11d880 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -35,7 +35,6 @@
35namespace Akonadi2 35namespace Akonadi2
36{ 36{
37 37
38class PipelineState;
39class Preprocessor; 38class Preprocessor;
40 39
41class AKONADI2COMMON_EXPORT Pipeline : public QObject 40class AKONADI2COMMON_EXPORT Pipeline : public QObject
@@ -43,19 +42,16 @@ class AKONADI2COMMON_EXPORT Pipeline : public QObject
43 Q_OBJECT 42 Q_OBJECT
44 43
45public: 44public:
46 enum Type { NullPipeline, NewPipeline, ModifiedPipeline, DeletedPipeline };
47
48 Pipeline(const QString &storagePath, QObject *parent = 0); 45 Pipeline(const QString &storagePath, QObject *parent = 0);
49 ~Pipeline(); 46 ~Pipeline();
50 47
51 Storage &storage() const; 48 Storage &storage() const;
52 49
53 void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); 50 void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors);
54 void startTransaction(); 51 void startTransaction();
55 void commit(); 52 void commit();
56 Storage::Transaction &transaction(); 53 Storage::Transaction &transaction();
57 54
58 void null();
59 void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); 55 void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory);
60 56
61 KAsync::Job<qint64> newEntity(void const *command, size_t size); 57 KAsync::Job<qint64> newEntity(void const *command, size_t size);
@@ -75,104 +71,30 @@ public:
75 71
76Q_SIGNALS: 72Q_SIGNALS:
77 void revisionUpdated(qint64); 73 void revisionUpdated(qint64);
78 void pipelinesDrained();
79
80private Q_SLOTS:
81 void stepPipelines();
82 74
83private: 75private:
84 void pipelineStepped(const PipelineState &state);
85 //Don't use a reference here (it would invalidate itself)
86 void pipelineCompleted(PipelineState state);
87 void scheduleStep();
88 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 76 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
89 77
90 friend class PipelineState;
91
92 class Private; 78 class Private;
93 Private * const d; 79 Private * const d;
94}; 80};
95 81
96class AKONADI2COMMON_EXPORT PipelineState
97{
98public:
99 PipelineState();
100 PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType);
101 PipelineState(const PipelineState &other);
102 ~PipelineState();
103
104 PipelineState &operator=(const PipelineState &rhs);
105 bool operator==(const PipelineState &rhs);
106
107 bool isIdle() const;
108 QByteArray key() const;
109 Pipeline::Type type() const;
110 qint64 revision() const;
111 QByteArray bufferType() const;
112
113 void step();
114 void processingCompleted(Preprocessor *filter);
115
116 void callback();
117
118private:
119 class Private;
120 QExplicitlySharedDataPointer<Private> d;
121};
122
123class AKONADI2COMMON_EXPORT Preprocessor 82class AKONADI2COMMON_EXPORT Preprocessor
124{ 83{
125public: 84public:
126 Preprocessor(); 85 Preprocessor();
127 virtual ~Preprocessor(); 86 virtual ~Preprocessor();
128 87
129 virtual void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction); 88 virtual void startBatch();
130 //TODO to record progress 89 virtual void newEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0;
131 virtual QString id() const; 90 virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0;
132 91 virtual void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) = 0;
133protected: 92 virtual void finalize();
134 void processingCompleted(PipelineState state);
135 93
136private: 94private:
137 class Private; 95 class Private;
138 Private * const d; 96 Private * const d;
139}; 97};
140 98
141/**
142 * A simple processor that takes a single function
143 */
144class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor
145{
146public:
147 SimpleProcessor(const QString &id, const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> &f)
148 : Akonadi2::Preprocessor(),
149 mFunction(f),
150 mId(id)
151 {
152 }
153
154 void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE
155 {
156 transaction.openDatabase(state.bufferType() + ".main").scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool {
157 auto entity = Akonadi2::GetEntity(value);
158 mFunction(state, *entity, transaction);
159 processingCompleted(state);
160 return false;
161 }, [this, state](const Akonadi2::Storage::Error &error) {
162 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
163 processingCompleted(state);
164 });
165 }
166
167 QString id() const Q_DECL_OVERRIDE
168 {
169 return mId;
170 }
171
172protected:
173 std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e, Akonadi2::Storage::Transaction &transaction)> mFunction;
174 QString mId;
175};
176
177} // namespace Akonadi2 99} // namespace Akonadi2
178 100