summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorAaron Seigo <aseigo@kde.org>2014-12-17 08:27:31 +0100
committerAaron Seigo <aseigo@kde.org>2014-12-17 08:27:31 +0100
commit1c7e8fd482bb67a5487449948488bd286a3504c1 (patch)
tree38789d22037a0b2ed7550b60fd2280fa522c086d /common/pipeline.cpp
parent7265d88245767960f1b551bb57f8f84942b898d2 (diff)
downloadsink-1c7e8fd482bb67a5487449948488bd286a3504c1.tar.gz
sink-1c7e8fd482bb67a5487449948488bd286a3504c1.zip
a basically-working Pipeline implementation
still a skeleton rather than a full body with flesh and blood, but it is getting there!
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp199
1 files changed, 184 insertions, 15 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 41def7c..5606c30 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -20,7 +20,9 @@
20 20
21#include "pipeline.h" 21#include "pipeline.h"
22 22
23#include <QByteArray>
23#include <QStandardPaths> 24#include <QStandardPaths>
25#include <QVector>
24 26
25namespace Akonadi2 27namespace Akonadi2
26{ 28{
@@ -28,47 +30,214 @@ namespace Akonadi2
28class Pipeline::Private 30class Pipeline::Private
29{ 31{
30public: 32public:
31 Private(const QString &storageName) 33 Private(const QString &resourceName)
32 : storage(QStandardPaths::writableLocation(QStandardPaths::QStandardPaths::GenericDataLocation) + "/akonadi2", storageName, Akonadi2::Storage::ReadWrite) 34 : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName),
35 stepScheduled(false)
33 { 36 {
34
35 } 37 }
36 38
37 Akonadi2::Storage storage; 39 Storage storage;
40 QVector<PipelineFilter *> nullPipeline;
41 QVector<PipelineFilter *> newPipeline;
42 QVector<PipelineFilter *> modifiedPipeline;
43 QVector<PipelineFilter *> deletedPipeline;
44 QVector<PipelineState> activePipelines;
45 bool stepScheduled;
38}; 46};
39 47
40Pipeline::Pipeline(const QString &storageName) 48Pipeline::Pipeline(const QString &resourceName, QObject *parent)
41 : d(new Private(storageName)) 49 : QObject(parent),
50 d(new Private(resourceName))
42{ 51{
43} 52}
44 53
45Pipeline::~Pipeline() 54Pipeline::~Pipeline()
46{ 55{
56 delete d;
47} 57}
48 58
49Storage &Pipeline::storage() 59Storage &Pipeline::storage() const
50{ 60{
51 return d->storage; 61 return d->storage;
52} 62}
53 63
54void Pipeline::null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) 64void Pipeline::null()
65{
66 //TODO: is there really any need for the null pipeline? if so, it should be doing something ;)
67 PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline);
68 d->activePipelines << state;
69 state.step();
70}
71
72void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity)
73{
74 PipelineState state(this, NewPipeline, key, d->newPipeline);
75 d->activePipelines << state;
76 state.step();
77}
78
79void Pipeline::modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta)
80{
81 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline);
82 d->activePipelines << state;
83 state.step();
84}
85
86void Pipeline::deletedEntity(const QByteArray &key)
87{
88 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline);
89 d->activePipelines << state;
90 state.step();
91}
92
93void Pipeline::pipelineStepped(const PipelineState &state)
94{
95 scheduleStep();
96}
97
98void Pipeline::scheduleStep()
99{
100 if (!d->stepScheduled) {
101 d->stepScheduled = true;
102 QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection);
103 }
104}
105
106void Pipeline::stepPipelines()
107{
108 for (PipelineState &state: d->activePipelines) {
109 if (state.isIdle()) {
110 state.step();
111 }
112 }
113
114 d->stepScheduled = false;
115}
116
117void Pipeline::pipelineCompleted(const PipelineState &state)
118{
119 //TODO finalize the datastore, inform clients of the new rev
120 const int index = d->activePipelines.indexOf(state);
121 if (index > -1) {
122 d->activePipelines.remove(index);
123 }
124
125 if (state.type() != NullPipeline) {
126 emit revisionUpdated();
127 }
128 scheduleStep();
129}
130
131
132class PipelineState::Private : public QSharedData
133{
134public:
135 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<PipelineFilter *> filters)
136 : pipeline(p),
137 type(t),
138 key(k),
139 filterIt(filters),
140 idle(true)
141 {}
142
143 Private()
144 : pipeline(0),
145 filterIt(QVector<PipelineFilter *>()),
146 idle(true)
147 {}
148
149 Pipeline *pipeline;
150 Pipeline::Type type;
151 QByteArray key;
152 QVectorIterator<PipelineFilter *> filterIt;
153 bool idle;
154};
155
156PipelineState::PipelineState()
157 : d(new Private())
158{
159
160}
161
162PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<PipelineFilter *> &filters)
163 : d(new Private(pipeline, type, key, filters))
164{
165}
166
167PipelineState::PipelineState(const PipelineState &other)
168 : d(other.d)
169{
170}
171
172PipelineState::~PipelineState()
173{
174}
175
176PipelineState &PipelineState::operator=(const PipelineState &rhs)
177{
178 d = rhs.d;
179 return *this;
180}
181
182bool PipelineState::operator==(const PipelineState &rhs)
183{
184 return d == rhs.d;
185}
186
187bool PipelineState::isIdle() const
188{
189 return d->idle;
190}
191
192QByteArray PipelineState::key() const
193{
194 return d->key;
195}
196
197Pipeline::Type PipelineState::type() const
198{
199 return d->type;
200}
201
202void PipelineState::step()
203{
204 if (!d->pipeline) {
205 return;
206 }
207
208 d->idle = false;
209 if (d->filterIt.hasNext()) {
210 d->filterIt.next()->process(*this);
211 } else {
212 d->pipeline->pipelineCompleted(*this);
213 }
214}
215
216void PipelineState::processingCompleted(PipelineFilter *filter)
217{
218 if (d->pipeline && filter == d->filterIt.peekPrevious()) {
219 d->idle = true;
220 d->pipeline->pipelineStepped(*this);
221 }
222}
223
224PipelineFilter::PipelineFilter()
225 : d(0)
55{ 226{
56 d->storage.write(key, keySize, buffer, bufferSize);
57} 227}
58 228
59void Pipeline::newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) 229PipelineFilter::~PipelineFilter()
60{ 230{
61 d->storage.write(key, keySize, buffer, bufferSize);
62} 231}
63 232
64void Pipeline::modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) 233void PipelineFilter::process(PipelineState state)
65{ 234{
66 d->storage.write(key, keySize, buffer, bufferSize); 235 processingCompleted(state);
67} 236}
68 237
69void Pipeline::deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) 238void PipelineFilter::processingCompleted(PipelineState state)
70{ 239{
71 d->storage.write(key, keySize, buffer, bufferSize); 240 state.processingCompleted(this);
72} 241}
73 242
74} // namespace Akonadi2 243} // namespace Akonadi2