diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 199 |
1 files changed, 184 insertions, 15 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 41def7c..5606c30 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -20,7 +20,9 @@ | |||
20 | 20 | ||
21 | #include "pipeline.h" | 21 | #include "pipeline.h" |
22 | 22 | ||
23 | #include <QByteArray> | ||
23 | #include <QStandardPaths> | 24 | #include <QStandardPaths> |
25 | #include <QVector> | ||
24 | 26 | ||
25 | namespace Akonadi2 | 27 | namespace Akonadi2 |
26 | { | 28 | { |
@@ -28,47 +30,214 @@ namespace Akonadi2 | |||
28 | class Pipeline::Private | 30 | class Pipeline::Private |
29 | { | 31 | { |
30 | public: | 32 | public: |
31 | Private(const QString &storageName) | 33 | Private(const QString &resourceName) |
32 | : storage(QStandardPaths::writableLocation(QStandardPaths::QStandardPaths::GenericDataLocation) + "/akonadi2", storageName, Akonadi2::Storage::ReadWrite) | 34 | : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName), |
35 | stepScheduled(false) | ||
33 | { | 36 | { |
34 | |||
35 | } | 37 | } |
36 | 38 | ||
37 | Akonadi2::Storage storage; | 39 | Storage storage; |
40 | QVector<PipelineFilter *> nullPipeline; | ||
41 | QVector<PipelineFilter *> newPipeline; | ||
42 | QVector<PipelineFilter *> modifiedPipeline; | ||
43 | QVector<PipelineFilter *> deletedPipeline; | ||
44 | QVector<PipelineState> activePipelines; | ||
45 | bool stepScheduled; | ||
38 | }; | 46 | }; |
39 | 47 | ||
40 | Pipeline::Pipeline(const QString &storageName) | 48 | Pipeline::Pipeline(const QString &resourceName, QObject *parent) |
41 | : d(new Private(storageName)) | 49 | : QObject(parent), |
50 | d(new Private(resourceName)) | ||
42 | { | 51 | { |
43 | } | 52 | } |
44 | 53 | ||
45 | Pipeline::~Pipeline() | 54 | Pipeline::~Pipeline() |
46 | { | 55 | { |
56 | delete d; | ||
47 | } | 57 | } |
48 | 58 | ||
49 | Storage &Pipeline::storage() | 59 | Storage &Pipeline::storage() const |
50 | { | 60 | { |
51 | return d->storage; | 61 | return d->storage; |
52 | } | 62 | } |
53 | 63 | ||
54 | void Pipeline::null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | 64 | void Pipeline::null() |
65 | { | ||
66 | //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) | ||
67 | PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); | ||
68 | d->activePipelines << state; | ||
69 | state.step(); | ||
70 | } | ||
71 | |||
72 | void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity) | ||
73 | { | ||
74 | PipelineState state(this, NewPipeline, key, d->newPipeline); | ||
75 | d->activePipelines << state; | ||
76 | state.step(); | ||
77 | } | ||
78 | |||
79 | void Pipeline::modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta) | ||
80 | { | ||
81 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline); | ||
82 | d->activePipelines << state; | ||
83 | state.step(); | ||
84 | } | ||
85 | |||
86 | void Pipeline::deletedEntity(const QByteArray &key) | ||
87 | { | ||
88 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline); | ||
89 | d->activePipelines << state; | ||
90 | state.step(); | ||
91 | } | ||
92 | |||
93 | void Pipeline::pipelineStepped(const PipelineState &state) | ||
94 | { | ||
95 | scheduleStep(); | ||
96 | } | ||
97 | |||
98 | void Pipeline::scheduleStep() | ||
99 | { | ||
100 | if (!d->stepScheduled) { | ||
101 | d->stepScheduled = true; | ||
102 | QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); | ||
103 | } | ||
104 | } | ||
105 | |||
106 | void Pipeline::stepPipelines() | ||
107 | { | ||
108 | for (PipelineState &state: d->activePipelines) { | ||
109 | if (state.isIdle()) { | ||
110 | state.step(); | ||
111 | } | ||
112 | } | ||
113 | |||
114 | d->stepScheduled = false; | ||
115 | } | ||
116 | |||
117 | void Pipeline::pipelineCompleted(const PipelineState &state) | ||
118 | { | ||
119 | //TODO finalize the datastore, inform clients of the new rev | ||
120 | const int index = d->activePipelines.indexOf(state); | ||
121 | if (index > -1) { | ||
122 | d->activePipelines.remove(index); | ||
123 | } | ||
124 | |||
125 | if (state.type() != NullPipeline) { | ||
126 | emit revisionUpdated(); | ||
127 | } | ||
128 | scheduleStep(); | ||
129 | } | ||
130 | |||
131 | |||
132 | class PipelineState::Private : public QSharedData | ||
133 | { | ||
134 | public: | ||
135 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<PipelineFilter *> filters) | ||
136 | : pipeline(p), | ||
137 | type(t), | ||
138 | key(k), | ||
139 | filterIt(filters), | ||
140 | idle(true) | ||
141 | {} | ||
142 | |||
143 | Private() | ||
144 | : pipeline(0), | ||
145 | filterIt(QVector<PipelineFilter *>()), | ||
146 | idle(true) | ||
147 | {} | ||
148 | |||
149 | Pipeline *pipeline; | ||
150 | Pipeline::Type type; | ||
151 | QByteArray key; | ||
152 | QVectorIterator<PipelineFilter *> filterIt; | ||
153 | bool idle; | ||
154 | }; | ||
155 | |||
156 | PipelineState::PipelineState() | ||
157 | : d(new Private()) | ||
158 | { | ||
159 | |||
160 | } | ||
161 | |||
162 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<PipelineFilter *> &filters) | ||
163 | : d(new Private(pipeline, type, key, filters)) | ||
164 | { | ||
165 | } | ||
166 | |||
167 | PipelineState::PipelineState(const PipelineState &other) | ||
168 | : d(other.d) | ||
169 | { | ||
170 | } | ||
171 | |||
172 | PipelineState::~PipelineState() | ||
173 | { | ||
174 | } | ||
175 | |||
176 | PipelineState &PipelineState::operator=(const PipelineState &rhs) | ||
177 | { | ||
178 | d = rhs.d; | ||
179 | return *this; | ||
180 | } | ||
181 | |||
182 | bool PipelineState::operator==(const PipelineState &rhs) | ||
183 | { | ||
184 | return d == rhs.d; | ||
185 | } | ||
186 | |||
187 | bool PipelineState::isIdle() const | ||
188 | { | ||
189 | return d->idle; | ||
190 | } | ||
191 | |||
192 | QByteArray PipelineState::key() const | ||
193 | { | ||
194 | return d->key; | ||
195 | } | ||
196 | |||
197 | Pipeline::Type PipelineState::type() const | ||
198 | { | ||
199 | return d->type; | ||
200 | } | ||
201 | |||
202 | void PipelineState::step() | ||
203 | { | ||
204 | if (!d->pipeline) { | ||
205 | return; | ||
206 | } | ||
207 | |||
208 | d->idle = false; | ||
209 | if (d->filterIt.hasNext()) { | ||
210 | d->filterIt.next()->process(*this); | ||
211 | } else { | ||
212 | d->pipeline->pipelineCompleted(*this); | ||
213 | } | ||
214 | } | ||
215 | |||
216 | void PipelineState::processingCompleted(PipelineFilter *filter) | ||
217 | { | ||
218 | if (d->pipeline && filter == d->filterIt.peekPrevious()) { | ||
219 | d->idle = true; | ||
220 | d->pipeline->pipelineStepped(*this); | ||
221 | } | ||
222 | } | ||
223 | |||
224 | PipelineFilter::PipelineFilter() | ||
225 | : d(0) | ||
55 | { | 226 | { |
56 | d->storage.write(key, keySize, buffer, bufferSize); | ||
57 | } | 227 | } |
58 | 228 | ||
59 | void Pipeline::newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | 229 | PipelineFilter::~PipelineFilter() |
60 | { | 230 | { |
61 | d->storage.write(key, keySize, buffer, bufferSize); | ||
62 | } | 231 | } |
63 | 232 | ||
64 | void Pipeline::modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | 233 | void PipelineFilter::process(PipelineState state) |
65 | { | 234 | { |
66 | d->storage.write(key, keySize, buffer, bufferSize); | 235 | processingCompleted(state); |
67 | } | 236 | } |
68 | 237 | ||
69 | void Pipeline::deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | 238 | void PipelineFilter::processingCompleted(PipelineState state) |
70 | { | 239 | { |
71 | d->storage.write(key, keySize, buffer, bufferSize); | 240 | state.processingCompleted(this); |
72 | } | 241 | } |
73 | 242 | ||
74 | } // namespace Akonadi2 | 243 | } // namespace Akonadi2 |