summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorAaron Seigo <aseigo@kde.org>2014-12-17 08:27:31 +0100
committerAaron Seigo <aseigo@kde.org>2014-12-17 08:27:31 +0100
commit1c7e8fd482bb67a5487449948488bd286a3504c1 (patch)
tree38789d22037a0b2ed7550b60fd2280fa522c086d /common
parent7265d88245767960f1b551bb57f8f84942b898d2 (diff)
downloadsink-1c7e8fd482bb67a5487449948488bd286a3504c1.tar.gz
sink-1c7e8fd482bb67a5487449948488bd286a3504c1.zip
a basically-working Pipeline implementation
still a skeleton rather than a full body with flesh and blood, but it is getting there!
Diffstat (limited to 'common')
-rw-r--r--common/pipeline.cpp199
-rw-r--r--common/pipeline.h80
-rw-r--r--common/resource.cpp10
-rw-r--r--common/resource.h7
4 files changed, 268 insertions, 28 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 41def7c..5606c30 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -20,7 +20,9 @@
20 20
21#include "pipeline.h" 21#include "pipeline.h"
22 22
23#include <QByteArray>
23#include <QStandardPaths> 24#include <QStandardPaths>
25#include <QVector>
24 26
25namespace Akonadi2 27namespace Akonadi2
26{ 28{
@@ -28,47 +30,214 @@ namespace Akonadi2
28class Pipeline::Private 30class Pipeline::Private
29{ 31{
30public: 32public:
31 Private(const QString &storageName) 33 Private(const QString &resourceName)
32 : storage(QStandardPaths::writableLocation(QStandardPaths::QStandardPaths::GenericDataLocation) + "/akonadi2", storageName, Akonadi2::Storage::ReadWrite) 34 : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName),
35 stepScheduled(false)
33 { 36 {
34
35 } 37 }
36 38
37 Akonadi2::Storage storage; 39 Storage storage;
40 QVector<PipelineFilter *> nullPipeline;
41 QVector<PipelineFilter *> newPipeline;
42 QVector<PipelineFilter *> modifiedPipeline;
43 QVector<PipelineFilter *> deletedPipeline;
44 QVector<PipelineState> activePipelines;
45 bool stepScheduled;
38}; 46};
39 47
40Pipeline::Pipeline(const QString &storageName) 48Pipeline::Pipeline(const QString &resourceName, QObject *parent)
41 : d(new Private(storageName)) 49 : QObject(parent),
50 d(new Private(resourceName))
42{ 51{
43} 52}
44 53
45Pipeline::~Pipeline() 54Pipeline::~Pipeline()
46{ 55{
56 delete d;
47} 57}
48 58
49Storage &Pipeline::storage() 59Storage &Pipeline::storage() const
50{ 60{
51 return d->storage; 61 return d->storage;
52} 62}
53 63
54void Pipeline::null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) 64void Pipeline::null()
65{
66 //TODO: is there really any need for the null pipeline? if so, it should be doing something ;)
67 PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline);
68 d->activePipelines << state;
69 state.step();
70}
71
72void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity)
73{
74 PipelineState state(this, NewPipeline, key, d->newPipeline);
75 d->activePipelines << state;
76 state.step();
77}
78
79void Pipeline::modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta)
80{
81 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline);
82 d->activePipelines << state;
83 state.step();
84}
85
86void Pipeline::deletedEntity(const QByteArray &key)
87{
88 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline);
89 d->activePipelines << state;
90 state.step();
91}
92
93void Pipeline::pipelineStepped(const PipelineState &state)
94{
95 scheduleStep();
96}
97
98void Pipeline::scheduleStep()
99{
100 if (!d->stepScheduled) {
101 d->stepScheduled = true;
102 QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection);
103 }
104}
105
106void Pipeline::stepPipelines()
107{
108 for (PipelineState &state: d->activePipelines) {
109 if (state.isIdle()) {
110 state.step();
111 }
112 }
113
114 d->stepScheduled = false;
115}
116
117void Pipeline::pipelineCompleted(const PipelineState &state)
118{
119 //TODO finalize the datastore, inform clients of the new rev
120 const int index = d->activePipelines.indexOf(state);
121 if (index > -1) {
122 d->activePipelines.remove(index);
123 }
124
125 if (state.type() != NullPipeline) {
126 emit revisionUpdated();
127 }
128 scheduleStep();
129}
130
131
132class PipelineState::Private : public QSharedData
133{
134public:
135 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<PipelineFilter *> filters)
136 : pipeline(p),
137 type(t),
138 key(k),
139 filterIt(filters),
140 idle(true)
141 {}
142
143 Private()
144 : pipeline(0),
145 filterIt(QVector<PipelineFilter *>()),
146 idle(true)
147 {}
148
149 Pipeline *pipeline;
150 Pipeline::Type type;
151 QByteArray key;
152 QVectorIterator<PipelineFilter *> filterIt;
153 bool idle;
154};
155
156PipelineState::PipelineState()
157 : d(new Private())
158{
159
160}
161
162PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<PipelineFilter *> &filters)
163 : d(new Private(pipeline, type, key, filters))
164{
165}
166
167PipelineState::PipelineState(const PipelineState &other)
168 : d(other.d)
169{
170}
171
172PipelineState::~PipelineState()
173{
174}
175
176PipelineState &PipelineState::operator=(const PipelineState &rhs)
177{
178 d = rhs.d;
179 return *this;
180}
181
182bool PipelineState::operator==(const PipelineState &rhs)
183{
184 return d == rhs.d;
185}
186
187bool PipelineState::isIdle() const
188{
189 return d->idle;
190}
191
192QByteArray PipelineState::key() const
193{
194 return d->key;
195}
196
197Pipeline::Type PipelineState::type() const
198{
199 return d->type;
200}
201
202void PipelineState::step()
203{
204 if (!d->pipeline) {
205 return;
206 }
207
208 d->idle = false;
209 if (d->filterIt.hasNext()) {
210 d->filterIt.next()->process(*this);
211 } else {
212 d->pipeline->pipelineCompleted(*this);
213 }
214}
215
216void PipelineState::processingCompleted(PipelineFilter *filter)
217{
218 if (d->pipeline && filter == d->filterIt.peekPrevious()) {
219 d->idle = true;
220 d->pipeline->pipelineStepped(*this);
221 }
222}
223
224PipelineFilter::PipelineFilter()
225 : d(0)
55{ 226{
56 d->storage.write(key, keySize, buffer, bufferSize);
57} 227}
58 228
59void Pipeline::newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) 229PipelineFilter::~PipelineFilter()
60{ 230{
61 d->storage.write(key, keySize, buffer, bufferSize);
62} 231}
63 232
64void Pipeline::modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) 233void PipelineFilter::process(PipelineState state)
65{ 234{
66 d->storage.write(key, keySize, buffer, bufferSize); 235 processingCompleted(state);
67} 236}
68 237
69void Pipeline::deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) 238void PipelineFilter::processingCompleted(PipelineState state)
70{ 239{
71 d->storage.write(key, keySize, buffer, bufferSize); 240 state.processingCompleted(this);
72} 241}
73 242
74} // namespace Akonadi2 243} // namespace Akonadi2
diff --git a/common/pipeline.h b/common/pipeline.h
index 635e630..6ad78df 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -20,23 +20,89 @@
20 20
21#pragma once 21#pragma once
22 22
23#include <flatbuffers/flatbuffers.h>
24
25#include <QSharedDataPointer>
26#include <QObject>
27
23#include <akonadi2common_export.h> 28#include <akonadi2common_export.h>
24#include <storage.h> 29#include <storage.h>
25 30
26namespace Akonadi2 31namespace Akonadi2
27{ 32{
28 33
29class AKONADI2COMMON_EXPORT Pipeline 34class PipelineState;
35class PipelineFilter;
36
37class AKONADI2COMMON_EXPORT Pipeline : public QObject
30{ 38{
39 Q_OBJECT
40
31public: 41public:
32 Pipeline(const QString &storagePath); 42 enum Type { NullPipeline, NewPipeline, ModifiedPipeline, DeletedPipeline };
43
44 Pipeline(const QString &storagePath, QObject *parent = 0);
33 ~Pipeline(); 45 ~Pipeline();
34 46
35 Storage &storage(); 47 Storage &storage() const;
36 void null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); 48
37 void newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); 49 // domain objects needed here
38 void modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); 50 void null();
39 void deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); 51 void newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity);
52 void modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta);
53 void deletedEntity(const QByteArray &key);
54
55Q_SIGNALS:
56 void revisionUpdated();
57
58private Q_SLOTS:
59 void stepPipelines();
60
61private:
62 void pipelineStepped(const PipelineState &state);
63 void pipelineCompleted(const PipelineState &state);
64 void scheduleStep();
65
66 friend class PipelineState;
67
68 class Private;
69 Private * const d;
70};
71
72class AKONADI2COMMON_EXPORT PipelineState
73{
74public:
75 // domain objects?
76 PipelineState();
77 PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<PipelineFilter *> &filters);
78 PipelineState(const PipelineState &other);
79 ~PipelineState();
80
81 PipelineState &operator=(const PipelineState &rhs);
82 bool operator==(const PipelineState &rhs);
83
84 bool isIdle() const;
85 QByteArray key() const;
86 Pipeline::Type type() const;
87
88 void step();
89 void processingCompleted(PipelineFilter *filter);
90
91private:
92 class Private;
93 QExplicitlySharedDataPointer<Private> d;
94};
95
96class AKONADI2COMMON_EXPORT PipelineFilter
97{
98public:
99 PipelineFilter();
100 virtual ~PipelineFilter();
101
102 virtual void process(PipelineState state);
103
104protected:
105 void processingCompleted(PipelineState state);
40 106
41private: 107private:
42 class Private; 108 class Private;
diff --git a/common/resource.cpp b/common/resource.cpp
index 11a03ca..ae28485 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -39,12 +39,18 @@ Resource::~Resource()
39 //delete d; 39 //delete d;
40} 40}
41 41
42void Resource::processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline) 42void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline)
43{ 43{
44 Q_UNUSED(commandId)
45 Q_UNUSED(data)
46 Q_UNUSED(size)
47 Q_UNUSED(pipeline)
48 pipeline->null();
44} 49}
45 50
46void Resource::synchronizeWithSource() 51void Resource::synchronizeWithSource(Pipeline *pipeline)
47{ 52{
53 pipeline->null();
48} 54}
49 55
50class ResourceFactory::Private 56class ResourceFactory::Private
diff --git a/common/resource.h b/common/resource.h
index 53c0bc1..0f65e1f 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -18,8 +18,8 @@
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */ 19 */
20 20
21#include <clientapi.h>
22#include <akonadi2common_export.h> 21#include <akonadi2common_export.h>
22#include <clientapi.h>
23#include <pipeline.h> 23#include <pipeline.h>
24 24
25namespace Akonadi2 25namespace Akonadi2
@@ -32,9 +32,8 @@ public:
32 Resource(); 32 Resource();
33 virtual ~Resource(); 33 virtual ~Resource();
34 34
35 //TODO: this will need to be async 35 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline);
36 virtual void processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline); 36 virtual void synchronizeWithSource(Pipeline *pipeline);
37 virtual void synchronizeWithSource();
38 37
39private: 38private:
40 class Private; 39 class Private;