summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/pipeline.cpp199
-rw-r--r--common/pipeline.h80
-rw-r--r--common/resource.cpp10
-rw-r--r--common/resource.h7
-rw-r--r--dummyresource/resourcefactory.cpp25
-rw-r--r--dummyresource/resourcefactory.h8
-rw-r--r--synchronizer/listener.cpp23
-rw-r--r--synchronizer/listener.h1
8 files changed, 315 insertions, 38 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 41def7c..5606c30 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -20,7 +20,9 @@
20 20
21#include "pipeline.h" 21#include "pipeline.h"
22 22
23#include <QByteArray>
23#include <QStandardPaths> 24#include <QStandardPaths>
25#include <QVector>
24 26
25namespace Akonadi2 27namespace Akonadi2
26{ 28{
@@ -28,47 +30,214 @@ namespace Akonadi2
28class Pipeline::Private 30class Pipeline::Private
29{ 31{
30public: 32public:
31 Private(const QString &storageName) 33 Private(const QString &resourceName)
32 : storage(QStandardPaths::writableLocation(QStandardPaths::QStandardPaths::GenericDataLocation) + "/akonadi2", storageName, Akonadi2::Storage::ReadWrite) 34 : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName),
35 stepScheduled(false)
33 { 36 {
34
35 } 37 }
36 38
37 Akonadi2::Storage storage; 39 Storage storage;
40 QVector<PipelineFilter *> nullPipeline;
41 QVector<PipelineFilter *> newPipeline;
42 QVector<PipelineFilter *> modifiedPipeline;
43 QVector<PipelineFilter *> deletedPipeline;
44 QVector<PipelineState> activePipelines;
45 bool stepScheduled;
38}; 46};
39 47
40Pipeline::Pipeline(const QString &storageName) 48Pipeline::Pipeline(const QString &resourceName, QObject *parent)
41 : d(new Private(storageName)) 49 : QObject(parent),
50 d(new Private(resourceName))
42{ 51{
43} 52}
44 53
45Pipeline::~Pipeline() 54Pipeline::~Pipeline()
46{ 55{
56 delete d;
47} 57}
48 58
49Storage &Pipeline::storage() 59Storage &Pipeline::storage() const
50{ 60{
51 return d->storage; 61 return d->storage;
52} 62}
53 63
54void Pipeline::null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) 64void Pipeline::null()
65{
66 //TODO: is there really any need for the null pipeline? if so, it should be doing something ;)
67 PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline);
68 d->activePipelines << state;
69 state.step();
70}
71
72void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity)
73{
74 PipelineState state(this, NewPipeline, key, d->newPipeline);
75 d->activePipelines << state;
76 state.step();
77}
78
79void Pipeline::modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta)
80{
81 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline);
82 d->activePipelines << state;
83 state.step();
84}
85
86void Pipeline::deletedEntity(const QByteArray &key)
87{
88 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline);
89 d->activePipelines << state;
90 state.step();
91}
92
93void Pipeline::pipelineStepped(const PipelineState &state)
94{
95 scheduleStep();
96}
97
98void Pipeline::scheduleStep()
99{
100 if (!d->stepScheduled) {
101 d->stepScheduled = true;
102 QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection);
103 }
104}
105
106void Pipeline::stepPipelines()
107{
108 for (PipelineState &state: d->activePipelines) {
109 if (state.isIdle()) {
110 state.step();
111 }
112 }
113
114 d->stepScheduled = false;
115}
116
117void Pipeline::pipelineCompleted(const PipelineState &state)
118{
119 //TODO finalize the datastore, inform clients of the new rev
120 const int index = d->activePipelines.indexOf(state);
121 if (index > -1) {
122 d->activePipelines.remove(index);
123 }
124
125 if (state.type() != NullPipeline) {
126 emit revisionUpdated();
127 }
128 scheduleStep();
129}
130
131
132class PipelineState::Private : public QSharedData
133{
134public:
135 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<PipelineFilter *> filters)
136 : pipeline(p),
137 type(t),
138 key(k),
139 filterIt(filters),
140 idle(true)
141 {}
142
143 Private()
144 : pipeline(0),
145 filterIt(QVector<PipelineFilter *>()),
146 idle(true)
147 {}
148
149 Pipeline *pipeline;
150 Pipeline::Type type;
151 QByteArray key;
152 QVectorIterator<PipelineFilter *> filterIt;
153 bool idle;
154};
155
156PipelineState::PipelineState()
157 : d(new Private())
158{
159
160}
161
162PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<PipelineFilter *> &filters)
163 : d(new Private(pipeline, type, key, filters))
164{
165}
166
167PipelineState::PipelineState(const PipelineState &other)
168 : d(other.d)
169{
170}
171
172PipelineState::~PipelineState()
173{
174}
175
176PipelineState &PipelineState::operator=(const PipelineState &rhs)
177{
178 d = rhs.d;
179 return *this;
180}
181
182bool PipelineState::operator==(const PipelineState &rhs)
183{
184 return d == rhs.d;
185}
186
187bool PipelineState::isIdle() const
188{
189 return d->idle;
190}
191
192QByteArray PipelineState::key() const
193{
194 return d->key;
195}
196
197Pipeline::Type PipelineState::type() const
198{
199 return d->type;
200}
201
202void PipelineState::step()
203{
204 if (!d->pipeline) {
205 return;
206 }
207
208 d->idle = false;
209 if (d->filterIt.hasNext()) {
210 d->filterIt.next()->process(*this);
211 } else {
212 d->pipeline->pipelineCompleted(*this);
213 }
214}
215
216void PipelineState::processingCompleted(PipelineFilter *filter)
217{
218 if (d->pipeline && filter == d->filterIt.peekPrevious()) {
219 d->idle = true;
220 d->pipeline->pipelineStepped(*this);
221 }
222}
223
224PipelineFilter::PipelineFilter()
225 : d(0)
55{ 226{
56 d->storage.write(key, keySize, buffer, bufferSize);
57} 227}
58 228
59void Pipeline::newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) 229PipelineFilter::~PipelineFilter()
60{ 230{
61 d->storage.write(key, keySize, buffer, bufferSize);
62} 231}
63 232
64void Pipeline::modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) 233void PipelineFilter::process(PipelineState state)
65{ 234{
66 d->storage.write(key, keySize, buffer, bufferSize); 235 processingCompleted(state);
67} 236}
68 237
69void Pipeline::deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) 238void PipelineFilter::processingCompleted(PipelineState state)
70{ 239{
71 d->storage.write(key, keySize, buffer, bufferSize); 240 state.processingCompleted(this);
72} 241}
73 242
74} // namespace Akonadi2 243} // namespace Akonadi2
diff --git a/common/pipeline.h b/common/pipeline.h
index 635e630..6ad78df 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -20,23 +20,89 @@
20 20
21#pragma once 21#pragma once
22 22
23#include <flatbuffers/flatbuffers.h>
24
25#include <QSharedDataPointer>
26#include <QObject>
27
23#include <akonadi2common_export.h> 28#include <akonadi2common_export.h>
24#include <storage.h> 29#include <storage.h>
25 30
26namespace Akonadi2 31namespace Akonadi2
27{ 32{
28 33
29class AKONADI2COMMON_EXPORT Pipeline 34class PipelineState;
35class PipelineFilter;
36
37class AKONADI2COMMON_EXPORT Pipeline : public QObject
30{ 38{
39 Q_OBJECT
40
31public: 41public:
32 Pipeline(const QString &storagePath); 42 enum Type { NullPipeline, NewPipeline, ModifiedPipeline, DeletedPipeline };
43
44 Pipeline(const QString &storagePath, QObject *parent = 0);
33 ~Pipeline(); 45 ~Pipeline();
34 46
35 Storage &storage(); 47 Storage &storage() const;
36 void null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); 48
37 void newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); 49 // domain objects needed here
38 void modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); 50 void null();
39 void deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); 51 void newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity);
52 void modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta);
53 void deletedEntity(const QByteArray &key);
54
55Q_SIGNALS:
56 void revisionUpdated();
57
58private Q_SLOTS:
59 void stepPipelines();
60
61private:
62 void pipelineStepped(const PipelineState &state);
63 void pipelineCompleted(const PipelineState &state);
64 void scheduleStep();
65
66 friend class PipelineState;
67
68 class Private;
69 Private * const d;
70};
71
72class AKONADI2COMMON_EXPORT PipelineState
73{
74public:
75 // domain objects?
76 PipelineState();
77 PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<PipelineFilter *> &filters);
78 PipelineState(const PipelineState &other);
79 ~PipelineState();
80
81 PipelineState &operator=(const PipelineState &rhs);
82 bool operator==(const PipelineState &rhs);
83
84 bool isIdle() const;
85 QByteArray key() const;
86 Pipeline::Type type() const;
87
88 void step();
89 void processingCompleted(PipelineFilter *filter);
90
91private:
92 class Private;
93 QExplicitlySharedDataPointer<Private> d;
94};
95
96class AKONADI2COMMON_EXPORT PipelineFilter
97{
98public:
99 PipelineFilter();
100 virtual ~PipelineFilter();
101
102 virtual void process(PipelineState state);
103
104protected:
105 void processingCompleted(PipelineState state);
40 106
41private: 107private:
42 class Private; 108 class Private;
diff --git a/common/resource.cpp b/common/resource.cpp
index 11a03ca..ae28485 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -39,12 +39,18 @@ Resource::~Resource()
39 //delete d; 39 //delete d;
40} 40}
41 41
42void Resource::processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline) 42void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline)
43{ 43{
44 Q_UNUSED(commandId)
45 Q_UNUSED(data)
46 Q_UNUSED(size)
47 Q_UNUSED(pipeline)
48 pipeline->null();
44} 49}
45 50
46void Resource::synchronizeWithSource() 51void Resource::synchronizeWithSource(Pipeline *pipeline)
47{ 52{
53 pipeline->null();
48} 54}
49 55
50class ResourceFactory::Private 56class ResourceFactory::Private
diff --git a/common/resource.h b/common/resource.h
index 53c0bc1..0f65e1f 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -18,8 +18,8 @@
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */ 19 */
20 20
21#include <clientapi.h>
22#include <akonadi2common_export.h> 21#include <akonadi2common_export.h>
22#include <clientapi.h>
23#include <pipeline.h> 23#include <pipeline.h>
24 24
25namespace Akonadi2 25namespace Akonadi2
@@ -32,9 +32,8 @@ public:
32 Resource(); 32 Resource();
33 virtual ~Resource(); 33 virtual ~Resource();
34 34
35 //TODO: this will need to be async 35 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline);
36 virtual void processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline); 36 virtual void synchronizeWithSource(Pipeline *pipeline);
37 virtual void synchronizeWithSource();
38 37
39private: 38private:
40 class Private; 39 class Private;
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index 37bfdac..bd85b4f 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -19,6 +19,7 @@
19 19
20#include "resourcefactory.h" 20#include "resourcefactory.h"
21#include "facade.h" 21#include "facade.h"
22#include "dummycalendar_generated.h"
22 23
23DummyResource::DummyResource() 24DummyResource::DummyResource()
24 : Akonadi2::Resource() 25 : Akonadi2::Resource()
@@ -26,9 +27,29 @@ DummyResource::DummyResource()
26 27
27} 28}
28 29
29void DummyResource::synchronizeWithSource() 30void DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline)
30{ 31{
31 // TODO populate the storage 32 // TODO actually populate the storage with new items
33 auto builder = DummyCalendar::DummyEventBuilder(m_fbb);
34 builder .add_summary(m_fbb.CreateString("summary summary!"));
35 auto buffer = builder.Finish();
36 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
37 pipeline->newEntity("fakekey", m_fbb);
38 m_fbb.Clear();
39}
40
41void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline)
42{
43 Q_UNUSED(commandId)
44 Q_UNUSED(data)
45 Q_UNUSED(size)
46 //TODO reallly process the commands :)
47 auto builder = DummyCalendar::DummyEventBuilder(m_fbb);
48 builder .add_summary(m_fbb.CreateString("summary summary!"));
49 auto buffer = builder.Finish();
50 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
51 pipeline->newEntity("fakekey", m_fbb);
52 m_fbb.Clear();
32} 53}
33 54
34DummyResourceFactory::DummyResourceFactory(QObject *parent) 55DummyResourceFactory::DummyResourceFactory(QObject *parent)
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h
index 7c40084..807a654 100644
--- a/dummyresource/resourcefactory.h
+++ b/dummyresource/resourcefactory.h
@@ -21,6 +21,8 @@
21 21
22#include "common/resource.h" 22#include "common/resource.h"
23 23
24#include <flatbuffers/flatbuffers.h>
25
24//TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA 26//TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA
25#define PLUGIN_NAME "org.kde.dummy" 27#define PLUGIN_NAME "org.kde.dummy"
26 28
@@ -28,7 +30,11 @@ class DummyResource : public Akonadi2::Resource
28{ 30{
29public: 31public:
30 DummyResource(); 32 DummyResource();
31 void synchronizeWithSource(); 33 void synchronizeWithSource(Akonadi2::Pipeline *pipeline);
34 void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline);
35
36private:
37 flatbuffers::FlatBufferBuilder m_fbb;
32}; 38};
33 39
34class DummyResourceFactory : public Akonadi2::ResourceFactory 40class DummyResourceFactory : public Akonadi2::ResourceFactory
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 18442e7..328d4d6 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -38,10 +38,12 @@ Listener::Listener(const QString &resourceName, QObject *parent)
38 m_revision(0), 38 m_revision(0),
39 m_resourceName(resourceName), 39 m_resourceName(resourceName),
40 m_resource(0), 40 m_resource(0),
41 m_pipeline(new Akonadi2::Pipeline(resourceName)), 41 m_pipeline(new Akonadi2::Pipeline(resourceName, parent)),
42 m_clientBufferProcessesTimer(new QTimer(this)), 42 m_clientBufferProcessesTimer(new QTimer(this)),
43 m_messageId(0) 43 m_messageId(0)
44{ 44{
45 connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated,
46 this, &Listener::refreshRevision);
45 connect(m_server, &QLocalServer::newConnection, 47 connect(m_server, &QLocalServer::newConnection,
46 this, &Listener::acceptConnection); 48 this, &Listener::acceptConnection);
47 Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); 49 Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName));
@@ -69,7 +71,6 @@ Listener::Listener(const QString &resourceName, QObject *parent)
69 71
70Listener::~Listener() 72Listener::~Listener()
71{ 73{
72 delete m_pipeline;
73} 74}
74 75
75void Listener::setRevision(unsigned long long revision) 76void Listener::setRevision(unsigned long long revision)
@@ -219,7 +220,7 @@ bool Listener::processClientBuffer(Client &client)
219 Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); 220 Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name));
220 loadResource(); 221 loadResource();
221 if (m_resource) { 222 if (m_resource) {
222 m_resource->synchronizeWithSource(); 223 m_resource->synchronizeWithSource(m_pipeline);
223 } 224 }
224 break; 225 break;
225 } 226 }
@@ -227,14 +228,14 @@ bool Listener::processClientBuffer(Client &client)
227 case Akonadi2::Commands::DeleteEntityCommand: 228 case Akonadi2::Commands::DeleteEntityCommand:
228 case Akonadi2::Commands::ModifyEntityCommand: 229 case Akonadi2::Commands::ModifyEntityCommand:
229 case Akonadi2::Commands::CreateEntityCommand: 230 case Akonadi2::Commands::CreateEntityCommand:
230 Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name)); 231 Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
231 m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); 232 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
232 break; 233 break;
233 default: 234 default:
234 if (commandId > Akonadi2::Commands::CustomCommand) { 235 if (commandId > Akonadi2::Commands::CustomCommand) {
235 loadResource(); 236 loadResource();
236 if (m_resource) { 237 if (m_resource) {
237 m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); 238 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
238 } 239 }
239 } else { 240 } else {
240 //TODO: handle error: we don't know wtf this command is 241 //TODO: handle error: we don't know wtf this command is
@@ -243,8 +244,9 @@ bool Listener::processClientBuffer(Client &client)
243 } 244 }
244 245
245 //TODO: async commands == async sendCommandCompleted 246 //TODO: async commands == async sendCommandCompleted
246 sendCommandCompleted(client, messageId); 247 Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
247 client.commandBuffer.remove(0, size); 248 client.commandBuffer.remove(0, size);
249 sendCommandCompleted(client, messageId);
248 return client.commandBuffer.size() >= headerSize; 250 return client.commandBuffer.size() >= headerSize;
249 } 251 }
250 252
@@ -275,6 +277,13 @@ void Listener::sendCommandCompleted(Client &client, uint messageId)
275 m_fbb.Clear(); 277 m_fbb.Clear();
276} 278}
277 279
280void Listener::refreshRevision()
281{
282 //TODO this should be coming out of m_pipeline->storage()
283 ++m_revision;
284 updateClientsWithRevision();
285}
286
278void Listener::updateClientsWithRevision() 287void Listener::updateClientsWithRevision()
279{ 288{
280 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); 289 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision);
diff --git a/synchronizer/listener.h b/synchronizer/listener.h
index b294277..357ae37 100644
--- a/synchronizer/listener.h
+++ b/synchronizer/listener.h
@@ -76,6 +76,7 @@ private Q_SLOTS:
76 void checkConnections(); 76 void checkConnections();
77 void readFromSocket(); 77 void readFromSocket();
78 void processClientBuffers(); 78 void processClientBuffers();
79 void refreshRevision();
79 80
80private: 81private:
81 bool processClientBuffer(Client &client); 82 bool processClientBuffer(Client &client);