summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/commands/createentity.fbs2
-rw-r--r--common/entitybuffer.cpp9
-rw-r--r--common/entitybuffer.h2
-rw-r--r--common/metadata.fbs1
-rw-r--r--common/pipeline.cpp79
-rw-r--r--common/pipeline.h41
-rw-r--r--common/queuedcommand.fbs11
-rw-r--r--common/resourceaccess.h2
9 files changed, 85 insertions, 63 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 671d1cd..6f8fee3 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -10,6 +10,7 @@ generate_flatbuffers(
10 domain/event 10 domain/event
11 entity 11 entity
12 metadata 12 metadata
13 queuedcommand
13) 14)
14 15
15if (STORAGE_unqlite) 16if (STORAGE_unqlite)
diff --git a/common/commands/createentity.fbs b/common/commands/createentity.fbs
index 564c231..23eeff9 100644
--- a/common/commands/createentity.fbs
+++ b/common/commands/createentity.fbs
@@ -1,4 +1,4 @@
1namespace Akonadi2; 1namespace Akonadi2.Commands;
2 2
3table CreateEntity { 3table CreateEntity {
4 domainType: string; 4 domainType: string;
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp
index aa5847c..4af84ef 100644
--- a/common/entitybuffer.cpp
+++ b/common/entitybuffer.cpp
@@ -56,11 +56,12 @@ void EntityBuffer::extractResourceBuffer(void *dataValue, int dataSize, const st
56 } 56 }
57} 57}
58 58
59void EntityBuffer::assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize) 59void EntityBuffer::assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize)
60{ 60{
61 auto metadata = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(metadataData), metadataSize); 61 qDebug() << "res size: " << resourceSize;
62 auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(resourceData), resourceSize); 62 auto metadata = fbb.CreateVector<uint8_t>(static_cast<uint8_t const*>(metadataData), metadataSize);
63 auto local = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(localData), localSize); 63 auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t const*>(resourceData), resourceSize);
64 auto local = fbb.CreateVector<uint8_t>(static_cast<uint8_t const*>(localData), localSize);
64 auto builder = Akonadi2::EntityBuilder(fbb); 65 auto builder = Akonadi2::EntityBuilder(fbb);
65 builder.add_metadata(metadata); 66 builder.add_metadata(metadata);
66 builder.add_resource(resource); 67 builder.add_resource(resource);
diff --git a/common/entitybuffer.h b/common/entitybuffer.h
index 600b04d..097b450 100644
--- a/common/entitybuffer.h
+++ b/common/entitybuffer.h
@@ -15,7 +15,7 @@ public:
15 const Entity &entity(); 15 const Entity &entity();
16 16
17 static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler); 17 static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler);
18 static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize); 18 static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize);
19 19
20private: 20private:
21 const Entity *mEntity; 21 const Entity *mEntity;
diff --git a/common/metadata.fbs b/common/metadata.fbs
index 34a8df2..bb1163d 100644
--- a/common/metadata.fbs
+++ b/common/metadata.fbs
@@ -3,6 +3,7 @@ namespace Akonadi2;
3table Metadata { 3table Metadata {
4 revision: ulong; 4 revision: ulong;
5 processed: bool = true; 5 processed: bool = true;
6 processingProgress: [string];
6} 7}
7 8
8root_type Metadata; 9root_type Metadata;
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 10bae54..9cc7450 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -23,10 +23,13 @@
23#include <QByteArray> 23#include <QByteArray>
24#include <QStandardPaths> 24#include <QStandardPaths>
25#include <QVector> 25#include <QVector>
26#include <QUuid>
26#include <QDebug> 27#include <QDebug>
27#include "entity_generated.h" 28#include "entity_generated.h"
28#include "metadata_generated.h" 29#include "metadata_generated.h"
30#include "createentity_generated.h"
29#include "entitybuffer.h" 31#include "entitybuffer.h"
32#include "async/src/async.h"
30 33
31namespace Akonadi2 34namespace Akonadi2
32{ 35{
@@ -90,39 +93,59 @@ void Pipeline::null()
90 // state.step(); 93 // state.step();
91} 94}
92 95
93void Pipeline::newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size) 96Async::Job<void> Pipeline::newEntity(void const *command, size_t size)
94{ 97{
95 const qint64 newRevision = storage().maxRevision() + 1; 98 qDebug() << "new entity";
96 99 Async::start<void>([&](Async::Future<void> future) {
97 //Add metadata buffer 100
98 flatbuffers::FlatBufferBuilder metadataFbb; 101 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
99 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 102 const auto key = QUuid::createUuid().toString().toUtf8();
100 metadataBuilder.add_revision(newRevision); 103
101 metadataBuilder.add_processed(false); 104 //TODO figure out if we already have created a revision for the message?
102 auto metadataBuffer = metadataBuilder.Finish(); 105 const qint64 newRevision = storage().maxRevision() + 1;
103 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); 106
104 107 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
105 flatbuffers::FlatBufferBuilder fbb; 108 //TODO rename createEntitiy->domainType to bufferType
106 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), resourceBufferData, size, 0, 0); 109 const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
107 110 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data());
108 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); 111 //
109 storage().setMaxRevision(newRevision); 112 // const QString entityType;
113 // auto entity = Akonadi2::GetEntity(0);
114
115 //Add metadata buffer
116 flatbuffers::FlatBufferBuilder metadataFbb;
117 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
118 metadataBuilder.add_revision(newRevision);
119 metadataBuilder.add_processed(false);
120 auto metadataBuffer = metadataBuilder.Finish();
121 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
122 //TODO we should reserve some space in metadata for in-place updates
123
124 flatbuffers::FlatBufferBuilder fbb;
125 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
126
127 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
128 storage().setMaxRevision(newRevision);
129
130 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() {
131 future.setFinished();
132 });
133 d->activePipelines << state;
134 state.step();
110 135
111 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType]); 136 });
112 d->activePipelines << state;
113 state.step();
114} 137}
115 138
116void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) 139void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size)
117{ 140{
118 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType]); 141 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [](){});
119 d->activePipelines << state; 142 d->activePipelines << state;
120 state.step(); 143 state.step();
121} 144}
122 145
123void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) 146void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key)
124{ 147{
125 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType]); 148 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [](){});
126 d->activePipelines << state; 149 d->activePipelines << state;
127 state.step(); 150 state.step();
128} 151}
@@ -160,6 +183,7 @@ void Pipeline::pipelineCompleted(const PipelineState &state)
160 } 183 }
161 184
162 if (state.type() != NullPipeline) { 185 if (state.type() != NullPipeline) {
186 //TODO what revision is finalized?
163 emit revisionUpdated(); 187 emit revisionUpdated();
164 } 188 }
165 scheduleStep(); 189 scheduleStep();
@@ -172,12 +196,13 @@ void Pipeline::pipelineCompleted(const PipelineState &state)
172class PipelineState::Private : public QSharedData 196class PipelineState::Private : public QSharedData
173{ 197{
174public: 198public:
175 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters) 199 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c)
176 : pipeline(p), 200 : pipeline(p),
177 type(t), 201 type(t),
178 key(k), 202 key(k),
179 filterIt(filters), 203 filterIt(filters),
180 idle(true) 204 idle(true),
205 callback(c)
181 {} 206 {}
182 207
183 Private() 208 Private()
@@ -191,6 +216,7 @@ public:
191 QByteArray key; 216 QByteArray key;
192 QVectorIterator<Preprocessor *> filterIt; 217 QVectorIterator<Preprocessor *> filterIt;
193 bool idle; 218 bool idle;
219 std::function<void()> callback;
194}; 220};
195 221
196PipelineState::PipelineState() 222PipelineState::PipelineState()
@@ -199,8 +225,8 @@ PipelineState::PipelineState()
199 225
200} 226}
201 227
202PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters) 228PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, const std::function<void()> &callback)
203 : d(new Private(pipeline, type, key, filters)) 229 : d(new Private(pipeline, type, key, filters, callback))
204{ 230{
205} 231}
206 232
@@ -247,6 +273,7 @@ void PipelineState::step()
247 273
248 d->idle = false; 274 d->idle = false;
249 if (d->filterIt.hasNext()) { 275 if (d->filterIt.hasNext()) {
276 //TODO skip step if already processed
250 d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { 277 d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool {
251 auto entity = Akonadi2::GetEntity(dataValue); 278 auto entity = Akonadi2::GetEntity(dataValue);
252 d->filterIt.next()->process(*this, *entity); 279 d->filterIt.next()->process(*this, *entity);
@@ -254,11 +281,13 @@ void PipelineState::step()
254 }); 281 });
255 } else { 282 } else {
256 d->pipeline->pipelineCompleted(*this); 283 d->pipeline->pipelineCompleted(*this);
284 d->callback();
257 } 285 }
258} 286}
259 287
260void PipelineState::processingCompleted(Preprocessor *filter) 288void PipelineState::processingCompleted(Preprocessor *filter)
261{ 289{
290 //TODO record processing progress
262 if (d->pipeline && filter == d->filterIt.peekPrevious()) { 291 if (d->pipeline && filter == d->filterIt.peekPrevious()) {
263 d->idle = true; 292 d->idle = true;
264 d->pipeline->pipelineStepped(*this); 293 d->pipeline->pipelineStepped(*this);
diff --git a/common/pipeline.h b/common/pipeline.h
index 6b847f5..918d21e 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -27,7 +27,7 @@
27 27
28#include <akonadi2common_export.h> 28#include <akonadi2common_export.h>
29#include <storage.h> 29#include <storage.h>
30#include <clientapi.h> //For domain types 30#include "async/src/async.h"
31 31
32#include "entity_generated.h" 32#include "entity_generated.h"
33 33
@@ -49,34 +49,13 @@ public:
49 49
50 Storage &storage() const; 50 Storage &storage() const;
51 51
52 // template <typename T> 52 void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors);
53 // Storage &storage() const;
54
55 template <typename T>
56 void setPreprocessors(Type type, const QVector<Preprocessor *> &preprocessors)
57 {
58 setPreprocessors(Akonadi2::Domain::getTypeName<T>(), type, preprocessors);
59 }
60 53
61 void null(); 54 void null();
62 55
63 template <typename T> 56 Async::Job<void> newEntity(void const *command, size_t size);
64 void newEntity(const QByteArray &key, void *resourceBufferData, size_t size) 57 void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size);
65 { 58 void deletedEntity(const QString &entityType, const QByteArray &key);
66 newEntity(Akonadi2::Domain::getTypeName<T>(), key, resourceBufferData, size);
67 }
68
69 template <typename T>
70 void modifiedEntity(const QByteArray &key, void *data, size_t size)
71 {
72 modifiedEntity(Akonadi2::Domain::getTypeName<T>(), key, data, size);
73 }
74
75 template <typename T>
76 void deletedEntity(const QByteArray &key)
77 {
78 deletedEntity(Akonadi2::Domain::getTypeName<T>(), key);
79 }
80 59
81Q_SIGNALS: 60Q_SIGNALS:
82 void revisionUpdated(); 61 void revisionUpdated();
@@ -86,10 +65,6 @@ private Q_SLOTS:
86 void stepPipelines(); 65 void stepPipelines();
87 66
88private: 67private:
89 void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors);
90 void newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size);
91 void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size);
92 void deletedEntity(const QString &entityType, const QByteArray &key);
93 void pipelineStepped(const PipelineState &state); 68 void pipelineStepped(const PipelineState &state);
94 void pipelineCompleted(const PipelineState &state); 69 void pipelineCompleted(const PipelineState &state);
95 void scheduleStep(); 70 void scheduleStep();
@@ -104,7 +79,7 @@ class AKONADI2COMMON_EXPORT PipelineState
104{ 79{
105public: 80public:
106 PipelineState(); 81 PipelineState();
107 PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters); 82 PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, const std::function<void()> &callback);
108 PipelineState(const PipelineState &other); 83 PipelineState(const PipelineState &other);
109 ~PipelineState(); 84 ~PipelineState();
110 85
@@ -114,6 +89,7 @@ public:
114 bool isIdle() const; 89 bool isIdle() const;
115 QByteArray key() const; 90 QByteArray key() const;
116 Pipeline::Type type() const; 91 Pipeline::Type type() const;
92 //TODO expose command
117 93
118 void step(); 94 void step();
119 void processingCompleted(Preprocessor *filter); 95 void processingCompleted(Preprocessor *filter);
@@ -129,7 +105,10 @@ public:
129 Preprocessor(); 105 Preprocessor();
130 virtual ~Preprocessor(); 106 virtual ~Preprocessor();
131 107
108 //TODO pass actual command as well, for changerecording
132 virtual void process(PipelineState state, const Akonadi2::Entity &); 109 virtual void process(PipelineState state, const Akonadi2::Entity &);
110 //TODO to record progress
111 // virtual QString id();
133 112
134protected: 113protected:
135 void processingCompleted(PipelineState state); 114 void processingCompleted(PipelineState state);
diff --git a/common/queuedcommand.fbs b/common/queuedcommand.fbs
new file mode 100644
index 0000000..0ca899f
--- /dev/null
+++ b/common/queuedcommand.fbs
@@ -0,0 +1,11 @@
1namespace Akonadi2;
2
3table QueuedCommand {
4 commandId: int;
5 command: [ubyte];
6 // entityId: string;
7 // sourceRevision: ulong;
8 // targetRevision: [ubyte];
9}
10
11root_type QueuedCommand;
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index d45ebde..0a333f6 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -43,7 +43,7 @@ public:
43 43
44 //TODO use jobs 44 //TODO use jobs
45 void sendCommand(int commandId, const std::function<void()> &callback = std::function<void()>()); 45 void sendCommand(int commandId, const std::function<void()> &callback = std::function<void()>());
46 void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback); 46 void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback = std::function<void()>());
47 Async::Job<void> synchronizeResource(); 47 Async::Job<void> synchronizeResource();
48 48
49public Q_SLOTS: 49public Q_SLOTS: