diff options
author | Aaron Seigo <aseigo@kde.org> | 2014-12-17 08:27:31 +0100 |
---|---|---|
committer | Aaron Seigo <aseigo@kde.org> | 2014-12-17 08:27:31 +0100 |
commit | 1c7e8fd482bb67a5487449948488bd286a3504c1 (patch) | |
tree | 38789d22037a0b2ed7550b60fd2280fa522c086d /common | |
parent | 7265d88245767960f1b551bb57f8f84942b898d2 (diff) | |
download | sink-1c7e8fd482bb67a5487449948488bd286a3504c1.tar.gz sink-1c7e8fd482bb67a5487449948488bd286a3504c1.zip |
a basically-working Pipeline implementation
still a skeleton rather than a full body with flesh and blood, but
it is getting there!
Diffstat (limited to 'common')
-rw-r--r-- | common/pipeline.cpp | 199 | ||||
-rw-r--r-- | common/pipeline.h | 80 | ||||
-rw-r--r-- | common/resource.cpp | 10 | ||||
-rw-r--r-- | common/resource.h | 7 |
4 files changed, 268 insertions, 28 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 41def7c..5606c30 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -20,7 +20,9 @@ | |||
20 | 20 | ||
21 | #include "pipeline.h" | 21 | #include "pipeline.h" |
22 | 22 | ||
23 | #include <QByteArray> | ||
23 | #include <QStandardPaths> | 24 | #include <QStandardPaths> |
25 | #include <QVector> | ||
24 | 26 | ||
25 | namespace Akonadi2 | 27 | namespace Akonadi2 |
26 | { | 28 | { |
@@ -28,47 +30,214 @@ namespace Akonadi2 | |||
28 | class Pipeline::Private | 30 | class Pipeline::Private |
29 | { | 31 | { |
30 | public: | 32 | public: |
31 | Private(const QString &storageName) | 33 | Private(const QString &resourceName) |
32 | : storage(QStandardPaths::writableLocation(QStandardPaths::QStandardPaths::GenericDataLocation) + "/akonadi2", storageName, Akonadi2::Storage::ReadWrite) | 34 | : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName), |
35 | stepScheduled(false) | ||
33 | { | 36 | { |
34 | |||
35 | } | 37 | } |
36 | 38 | ||
37 | Akonadi2::Storage storage; | 39 | Storage storage; |
40 | QVector<PipelineFilter *> nullPipeline; | ||
41 | QVector<PipelineFilter *> newPipeline; | ||
42 | QVector<PipelineFilter *> modifiedPipeline; | ||
43 | QVector<PipelineFilter *> deletedPipeline; | ||
44 | QVector<PipelineState> activePipelines; | ||
45 | bool stepScheduled; | ||
38 | }; | 46 | }; |
39 | 47 | ||
40 | Pipeline::Pipeline(const QString &storageName) | 48 | Pipeline::Pipeline(const QString &resourceName, QObject *parent) |
41 | : d(new Private(storageName)) | 49 | : QObject(parent), |
50 | d(new Private(resourceName)) | ||
42 | { | 51 | { |
43 | } | 52 | } |
44 | 53 | ||
45 | Pipeline::~Pipeline() | 54 | Pipeline::~Pipeline() |
46 | { | 55 | { |
56 | delete d; | ||
47 | } | 57 | } |
48 | 58 | ||
49 | Storage &Pipeline::storage() | 59 | Storage &Pipeline::storage() const |
50 | { | 60 | { |
51 | return d->storage; | 61 | return d->storage; |
52 | } | 62 | } |
53 | 63 | ||
54 | void Pipeline::null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | 64 | void Pipeline::null() |
65 | { | ||
66 | //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) | ||
67 | PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); | ||
68 | d->activePipelines << state; | ||
69 | state.step(); | ||
70 | } | ||
71 | |||
72 | void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity) | ||
73 | { | ||
74 | PipelineState state(this, NewPipeline, key, d->newPipeline); | ||
75 | d->activePipelines << state; | ||
76 | state.step(); | ||
77 | } | ||
78 | |||
79 | void Pipeline::modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta) | ||
80 | { | ||
81 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline); | ||
82 | d->activePipelines << state; | ||
83 | state.step(); | ||
84 | } | ||
85 | |||
86 | void Pipeline::deletedEntity(const QByteArray &key) | ||
87 | { | ||
88 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline); | ||
89 | d->activePipelines << state; | ||
90 | state.step(); | ||
91 | } | ||
92 | |||
93 | void Pipeline::pipelineStepped(const PipelineState &state) | ||
94 | { | ||
95 | scheduleStep(); | ||
96 | } | ||
97 | |||
98 | void Pipeline::scheduleStep() | ||
99 | { | ||
100 | if (!d->stepScheduled) { | ||
101 | d->stepScheduled = true; | ||
102 | QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); | ||
103 | } | ||
104 | } | ||
105 | |||
106 | void Pipeline::stepPipelines() | ||
107 | { | ||
108 | for (PipelineState &state: d->activePipelines) { | ||
109 | if (state.isIdle()) { | ||
110 | state.step(); | ||
111 | } | ||
112 | } | ||
113 | |||
114 | d->stepScheduled = false; | ||
115 | } | ||
116 | |||
117 | void Pipeline::pipelineCompleted(const PipelineState &state) | ||
118 | { | ||
119 | //TODO finalize the datastore, inform clients of the new rev | ||
120 | const int index = d->activePipelines.indexOf(state); | ||
121 | if (index > -1) { | ||
122 | d->activePipelines.remove(index); | ||
123 | } | ||
124 | |||
125 | if (state.type() != NullPipeline) { | ||
126 | emit revisionUpdated(); | ||
127 | } | ||
128 | scheduleStep(); | ||
129 | } | ||
130 | |||
131 | |||
132 | class PipelineState::Private : public QSharedData | ||
133 | { | ||
134 | public: | ||
135 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<PipelineFilter *> filters) | ||
136 | : pipeline(p), | ||
137 | type(t), | ||
138 | key(k), | ||
139 | filterIt(filters), | ||
140 | idle(true) | ||
141 | {} | ||
142 | |||
143 | Private() | ||
144 | : pipeline(0), | ||
145 | filterIt(QVector<PipelineFilter *>()), | ||
146 | idle(true) | ||
147 | {} | ||
148 | |||
149 | Pipeline *pipeline; | ||
150 | Pipeline::Type type; | ||
151 | QByteArray key; | ||
152 | QVectorIterator<PipelineFilter *> filterIt; | ||
153 | bool idle; | ||
154 | }; | ||
155 | |||
156 | PipelineState::PipelineState() | ||
157 | : d(new Private()) | ||
158 | { | ||
159 | |||
160 | } | ||
161 | |||
162 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<PipelineFilter *> &filters) | ||
163 | : d(new Private(pipeline, type, key, filters)) | ||
164 | { | ||
165 | } | ||
166 | |||
167 | PipelineState::PipelineState(const PipelineState &other) | ||
168 | : d(other.d) | ||
169 | { | ||
170 | } | ||
171 | |||
172 | PipelineState::~PipelineState() | ||
173 | { | ||
174 | } | ||
175 | |||
176 | PipelineState &PipelineState::operator=(const PipelineState &rhs) | ||
177 | { | ||
178 | d = rhs.d; | ||
179 | return *this; | ||
180 | } | ||
181 | |||
182 | bool PipelineState::operator==(const PipelineState &rhs) | ||
183 | { | ||
184 | return d == rhs.d; | ||
185 | } | ||
186 | |||
187 | bool PipelineState::isIdle() const | ||
188 | { | ||
189 | return d->idle; | ||
190 | } | ||
191 | |||
192 | QByteArray PipelineState::key() const | ||
193 | { | ||
194 | return d->key; | ||
195 | } | ||
196 | |||
197 | Pipeline::Type PipelineState::type() const | ||
198 | { | ||
199 | return d->type; | ||
200 | } | ||
201 | |||
202 | void PipelineState::step() | ||
203 | { | ||
204 | if (!d->pipeline) { | ||
205 | return; | ||
206 | } | ||
207 | |||
208 | d->idle = false; | ||
209 | if (d->filterIt.hasNext()) { | ||
210 | d->filterIt.next()->process(*this); | ||
211 | } else { | ||
212 | d->pipeline->pipelineCompleted(*this); | ||
213 | } | ||
214 | } | ||
215 | |||
216 | void PipelineState::processingCompleted(PipelineFilter *filter) | ||
217 | { | ||
218 | if (d->pipeline && filter == d->filterIt.peekPrevious()) { | ||
219 | d->idle = true; | ||
220 | d->pipeline->pipelineStepped(*this); | ||
221 | } | ||
222 | } | ||
223 | |||
224 | PipelineFilter::PipelineFilter() | ||
225 | : d(0) | ||
55 | { | 226 | { |
56 | d->storage.write(key, keySize, buffer, bufferSize); | ||
57 | } | 227 | } |
58 | 228 | ||
59 | void Pipeline::newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | 229 | PipelineFilter::~PipelineFilter() |
60 | { | 230 | { |
61 | d->storage.write(key, keySize, buffer, bufferSize); | ||
62 | } | 231 | } |
63 | 232 | ||
64 | void Pipeline::modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | 233 | void PipelineFilter::process(PipelineState state) |
65 | { | 234 | { |
66 | d->storage.write(key, keySize, buffer, bufferSize); | 235 | processingCompleted(state); |
67 | } | 236 | } |
68 | 237 | ||
69 | void Pipeline::deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | 238 | void PipelineFilter::processingCompleted(PipelineState state) |
70 | { | 239 | { |
71 | d->storage.write(key, keySize, buffer, bufferSize); | 240 | state.processingCompleted(this); |
72 | } | 241 | } |
73 | 242 | ||
74 | } // namespace Akonadi2 | 243 | } // namespace Akonadi2 |
diff --git a/common/pipeline.h b/common/pipeline.h index 635e630..6ad78df 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -20,23 +20,89 @@ | |||
20 | 20 | ||
21 | #pragma once | 21 | #pragma once |
22 | 22 | ||
23 | #include <flatbuffers/flatbuffers.h> | ||
24 | |||
25 | #include <QSharedDataPointer> | ||
26 | #include <QObject> | ||
27 | |||
23 | #include <akonadi2common_export.h> | 28 | #include <akonadi2common_export.h> |
24 | #include <storage.h> | 29 | #include <storage.h> |
25 | 30 | ||
26 | namespace Akonadi2 | 31 | namespace Akonadi2 |
27 | { | 32 | { |
28 | 33 | ||
29 | class AKONADI2COMMON_EXPORT Pipeline | 34 | class PipelineState; |
35 | class PipelineFilter; | ||
36 | |||
37 | class AKONADI2COMMON_EXPORT Pipeline : public QObject | ||
30 | { | 38 | { |
39 | Q_OBJECT | ||
40 | |||
31 | public: | 41 | public: |
32 | Pipeline(const QString &storagePath); | 42 | enum Type { NullPipeline, NewPipeline, ModifiedPipeline, DeletedPipeline }; |
43 | |||
44 | Pipeline(const QString &storagePath, QObject *parent = 0); | ||
33 | ~Pipeline(); | 45 | ~Pipeline(); |
34 | 46 | ||
35 | Storage &storage(); | 47 | Storage &storage() const; |
36 | void null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | 48 | |
37 | void newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | 49 | // domain objects needed here |
38 | void modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | 50 | void null(); |
39 | void deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | 51 | void newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity); |
52 | void modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta); | ||
53 | void deletedEntity(const QByteArray &key); | ||
54 | |||
55 | Q_SIGNALS: | ||
56 | void revisionUpdated(); | ||
57 | |||
58 | private Q_SLOTS: | ||
59 | void stepPipelines(); | ||
60 | |||
61 | private: | ||
62 | void pipelineStepped(const PipelineState &state); | ||
63 | void pipelineCompleted(const PipelineState &state); | ||
64 | void scheduleStep(); | ||
65 | |||
66 | friend class PipelineState; | ||
67 | |||
68 | class Private; | ||
69 | Private * const d; | ||
70 | }; | ||
71 | |||
72 | class AKONADI2COMMON_EXPORT PipelineState | ||
73 | { | ||
74 | public: | ||
75 | // domain objects? | ||
76 | PipelineState(); | ||
77 | PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<PipelineFilter *> &filters); | ||
78 | PipelineState(const PipelineState &other); | ||
79 | ~PipelineState(); | ||
80 | |||
81 | PipelineState &operator=(const PipelineState &rhs); | ||
82 | bool operator==(const PipelineState &rhs); | ||
83 | |||
84 | bool isIdle() const; | ||
85 | QByteArray key() const; | ||
86 | Pipeline::Type type() const; | ||
87 | |||
88 | void step(); | ||
89 | void processingCompleted(PipelineFilter *filter); | ||
90 | |||
91 | private: | ||
92 | class Private; | ||
93 | QExplicitlySharedDataPointer<Private> d; | ||
94 | }; | ||
95 | |||
96 | class AKONADI2COMMON_EXPORT PipelineFilter | ||
97 | { | ||
98 | public: | ||
99 | PipelineFilter(); | ||
100 | virtual ~PipelineFilter(); | ||
101 | |||
102 | virtual void process(PipelineState state); | ||
103 | |||
104 | protected: | ||
105 | void processingCompleted(PipelineState state); | ||
40 | 106 | ||
41 | private: | 107 | private: |
42 | class Private; | 108 | class Private; |
diff --git a/common/resource.cpp b/common/resource.cpp index 11a03ca..ae28485 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -39,12 +39,18 @@ Resource::~Resource() | |||
39 | //delete d; | 39 | //delete d; |
40 | } | 40 | } |
41 | 41 | ||
42 | void Resource::processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline) | 42 | void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) |
43 | { | 43 | { |
44 | Q_UNUSED(commandId) | ||
45 | Q_UNUSED(data) | ||
46 | Q_UNUSED(size) | ||
47 | Q_UNUSED(pipeline) | ||
48 | pipeline->null(); | ||
44 | } | 49 | } |
45 | 50 | ||
46 | void Resource::synchronizeWithSource() | 51 | void Resource::synchronizeWithSource(Pipeline *pipeline) |
47 | { | 52 | { |
53 | pipeline->null(); | ||
48 | } | 54 | } |
49 | 55 | ||
50 | class ResourceFactory::Private | 56 | class ResourceFactory::Private |
diff --git a/common/resource.h b/common/resource.h index 53c0bc1..0f65e1f 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -18,8 +18,8 @@ | |||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | 18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. |
19 | */ | 19 | */ |
20 | 20 | ||
21 | #include <clientapi.h> | ||
22 | #include <akonadi2common_export.h> | 21 | #include <akonadi2common_export.h> |
22 | #include <clientapi.h> | ||
23 | #include <pipeline.h> | 23 | #include <pipeline.h> |
24 | 24 | ||
25 | namespace Akonadi2 | 25 | namespace Akonadi2 |
@@ -32,9 +32,8 @@ public: | |||
32 | Resource(); | 32 | Resource(); |
33 | virtual ~Resource(); | 33 | virtual ~Resource(); |
34 | 34 | ||
35 | //TODO: this will need to be async | 35 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); |
36 | virtual void processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline); | 36 | virtual void synchronizeWithSource(Pipeline *pipeline); |
37 | virtual void synchronizeWithSource(); | ||
38 | 37 | ||
39 | private: | 38 | private: |
40 | class Private; | 39 | class Private; |