diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | common/commands/createentity.fbs | 2 | ||||
-rw-r--r-- | common/entitybuffer.cpp | 9 | ||||
-rw-r--r-- | common/entitybuffer.h | 2 | ||||
-rw-r--r-- | common/metadata.fbs | 1 | ||||
-rw-r--r-- | common/pipeline.cpp | 79 | ||||
-rw-r--r-- | common/pipeline.h | 41 | ||||
-rw-r--r-- | common/queuedcommand.fbs | 11 | ||||
-rw-r--r-- | common/resourceaccess.h | 2 |
9 files changed, 85 insertions, 63 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 671d1cd..6f8fee3 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -10,6 +10,7 @@ generate_flatbuffers( | |||
10 | domain/event | 10 | domain/event |
11 | entity | 11 | entity |
12 | metadata | 12 | metadata |
13 | queuedcommand | ||
13 | ) | 14 | ) |
14 | 15 | ||
15 | if (STORAGE_unqlite) | 16 | if (STORAGE_unqlite) |
diff --git a/common/commands/createentity.fbs b/common/commands/createentity.fbs index 564c231..23eeff9 100644 --- a/common/commands/createentity.fbs +++ b/common/commands/createentity.fbs | |||
@@ -1,4 +1,4 @@ | |||
1 | namespace Akonadi2; | 1 | namespace Akonadi2.Commands; |
2 | 2 | ||
3 | table CreateEntity { | 3 | table CreateEntity { |
4 | domainType: string; | 4 | domainType: string; |
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp index aa5847c..4af84ef 100644 --- a/common/entitybuffer.cpp +++ b/common/entitybuffer.cpp | |||
@@ -56,11 +56,12 @@ void EntityBuffer::extractResourceBuffer(void *dataValue, int dataSize, const st | |||
56 | } | 56 | } |
57 | } | 57 | } |
58 | 58 | ||
59 | void EntityBuffer::assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize) | 59 | void EntityBuffer::assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize) |
60 | { | 60 | { |
61 | auto metadata = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(metadataData), metadataSize); | 61 | qDebug() << "res size: " << resourceSize; |
62 | auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(resourceData), resourceSize); | 62 | auto metadata = fbb.CreateVector<uint8_t>(static_cast<uint8_t const*>(metadataData), metadataSize); |
63 | auto local = fbb.CreateVector<uint8_t>(static_cast<uint8_t*>(localData), localSize); | 63 | auto resource = fbb.CreateVector<uint8_t>(static_cast<uint8_t const*>(resourceData), resourceSize); |
64 | auto local = fbb.CreateVector<uint8_t>(static_cast<uint8_t const*>(localData), localSize); | ||
64 | auto builder = Akonadi2::EntityBuilder(fbb); | 65 | auto builder = Akonadi2::EntityBuilder(fbb); |
65 | builder.add_metadata(metadata); | 66 | builder.add_metadata(metadata); |
66 | builder.add_resource(resource); | 67 | builder.add_resource(resource); |
diff --git a/common/entitybuffer.h b/common/entitybuffer.h index 600b04d..097b450 100644 --- a/common/entitybuffer.h +++ b/common/entitybuffer.h | |||
@@ -15,7 +15,7 @@ public: | |||
15 | const Entity &entity(); | 15 | const Entity &entity(); |
16 | 16 | ||
17 | static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler); | 17 | static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler); |
18 | static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void *metadataData, size_t metadataSize, void *resourceData, size_t resourceSize, void *localData, size_t localSize); | 18 | static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize); |
19 | 19 | ||
20 | private: | 20 | private: |
21 | const Entity *mEntity; | 21 | const Entity *mEntity; |
diff --git a/common/metadata.fbs b/common/metadata.fbs index 34a8df2..bb1163d 100644 --- a/common/metadata.fbs +++ b/common/metadata.fbs | |||
@@ -3,6 +3,7 @@ namespace Akonadi2; | |||
3 | table Metadata { | 3 | table Metadata { |
4 | revision: ulong; | 4 | revision: ulong; |
5 | processed: bool = true; | 5 | processed: bool = true; |
6 | processingProgress: [string]; | ||
6 | } | 7 | } |
7 | 8 | ||
8 | root_type Metadata; | 9 | root_type Metadata; |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 10bae54..9cc7450 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -23,10 +23,13 @@ | |||
23 | #include <QByteArray> | 23 | #include <QByteArray> |
24 | #include <QStandardPaths> | 24 | #include <QStandardPaths> |
25 | #include <QVector> | 25 | #include <QVector> |
26 | #include <QUuid> | ||
26 | #include <QDebug> | 27 | #include <QDebug> |
27 | #include "entity_generated.h" | 28 | #include "entity_generated.h" |
28 | #include "metadata_generated.h" | 29 | #include "metadata_generated.h" |
30 | #include "createentity_generated.h" | ||
29 | #include "entitybuffer.h" | 31 | #include "entitybuffer.h" |
32 | #include "async/src/async.h" | ||
30 | 33 | ||
31 | namespace Akonadi2 | 34 | namespace Akonadi2 |
32 | { | 35 | { |
@@ -90,39 +93,59 @@ void Pipeline::null() | |||
90 | // state.step(); | 93 | // state.step(); |
91 | } | 94 | } |
92 | 95 | ||
93 | void Pipeline::newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size) | 96 | Async::Job<void> Pipeline::newEntity(void const *command, size_t size) |
94 | { | 97 | { |
95 | const qint64 newRevision = storage().maxRevision() + 1; | 98 | qDebug() << "new entity"; |
96 | 99 | Async::start<void>([&](Async::Future<void> future) { | |
97 | //Add metadata buffer | 100 | |
98 | flatbuffers::FlatBufferBuilder metadataFbb; | 101 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. |
99 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | 102 | const auto key = QUuid::createUuid().toString().toUtf8(); |
100 | metadataBuilder.add_revision(newRevision); | 103 | |
101 | metadataBuilder.add_processed(false); | 104 | //TODO figure out if we already have created a revision for the message? |
102 | auto metadataBuffer = metadataBuilder.Finish(); | 105 | const qint64 newRevision = storage().maxRevision() + 1; |
103 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | 106 | |
104 | 107 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | |
105 | flatbuffers::FlatBufferBuilder fbb; | 108 | //TODO rename createEntitiy->domainType to bufferType |
106 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), resourceBufferData, size, 0, 0); | 109 | const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); |
107 | 110 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); | |
108 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | 111 | // |
109 | storage().setMaxRevision(newRevision); | 112 | // const QString entityType; |
113 | // auto entity = Akonadi2::GetEntity(0); | ||
114 | |||
115 | //Add metadata buffer | ||
116 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
117 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | ||
118 | metadataBuilder.add_revision(newRevision); | ||
119 | metadataBuilder.add_processed(false); | ||
120 | auto metadataBuffer = metadataBuilder.Finish(); | ||
121 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
122 | //TODO we should reserve some space in metadata for in-place updates | ||
123 | |||
124 | flatbuffers::FlatBufferBuilder fbb; | ||
125 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | ||
126 | |||
127 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | ||
128 | storage().setMaxRevision(newRevision); | ||
129 | |||
130 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { | ||
131 | future.setFinished(); | ||
132 | }); | ||
133 | d->activePipelines << state; | ||
134 | state.step(); | ||
110 | 135 | ||
111 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType]); | 136 | }); |
112 | d->activePipelines << state; | ||
113 | state.step(); | ||
114 | } | 137 | } |
115 | 138 | ||
116 | void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) | 139 | void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) |
117 | { | 140 | { |
118 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType]); | 141 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [](){}); |
119 | d->activePipelines << state; | 142 | d->activePipelines << state; |
120 | state.step(); | 143 | state.step(); |
121 | } | 144 | } |
122 | 145 | ||
123 | void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) | 146 | void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) |
124 | { | 147 | { |
125 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType]); | 148 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [](){}); |
126 | d->activePipelines << state; | 149 | d->activePipelines << state; |
127 | state.step(); | 150 | state.step(); |
128 | } | 151 | } |
@@ -160,6 +183,7 @@ void Pipeline::pipelineCompleted(const PipelineState &state) | |||
160 | } | 183 | } |
161 | 184 | ||
162 | if (state.type() != NullPipeline) { | 185 | if (state.type() != NullPipeline) { |
186 | //TODO what revision is finalized? | ||
163 | emit revisionUpdated(); | 187 | emit revisionUpdated(); |
164 | } | 188 | } |
165 | scheduleStep(); | 189 | scheduleStep(); |
@@ -172,12 +196,13 @@ void Pipeline::pipelineCompleted(const PipelineState &state) | |||
172 | class PipelineState::Private : public QSharedData | 196 | class PipelineState::Private : public QSharedData |
173 | { | 197 | { |
174 | public: | 198 | public: |
175 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters) | 199 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c) |
176 | : pipeline(p), | 200 | : pipeline(p), |
177 | type(t), | 201 | type(t), |
178 | key(k), | 202 | key(k), |
179 | filterIt(filters), | 203 | filterIt(filters), |
180 | idle(true) | 204 | idle(true), |
205 | callback(c) | ||
181 | {} | 206 | {} |
182 | 207 | ||
183 | Private() | 208 | Private() |
@@ -191,6 +216,7 @@ public: | |||
191 | QByteArray key; | 216 | QByteArray key; |
192 | QVectorIterator<Preprocessor *> filterIt; | 217 | QVectorIterator<Preprocessor *> filterIt; |
193 | bool idle; | 218 | bool idle; |
219 | std::function<void()> callback; | ||
194 | }; | 220 | }; |
195 | 221 | ||
196 | PipelineState::PipelineState() | 222 | PipelineState::PipelineState() |
@@ -199,8 +225,8 @@ PipelineState::PipelineState() | |||
199 | 225 | ||
200 | } | 226 | } |
201 | 227 | ||
202 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters) | 228 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, const std::function<void()> &callback) |
203 | : d(new Private(pipeline, type, key, filters)) | 229 | : d(new Private(pipeline, type, key, filters, callback)) |
204 | { | 230 | { |
205 | } | 231 | } |
206 | 232 | ||
@@ -247,6 +273,7 @@ void PipelineState::step() | |||
247 | 273 | ||
248 | d->idle = false; | 274 | d->idle = false; |
249 | if (d->filterIt.hasNext()) { | 275 | if (d->filterIt.hasNext()) { |
276 | //TODO skip step if already processed | ||
250 | d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { | 277 | d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { |
251 | auto entity = Akonadi2::GetEntity(dataValue); | 278 | auto entity = Akonadi2::GetEntity(dataValue); |
252 | d->filterIt.next()->process(*this, *entity); | 279 | d->filterIt.next()->process(*this, *entity); |
@@ -254,11 +281,13 @@ void PipelineState::step() | |||
254 | }); | 281 | }); |
255 | } else { | 282 | } else { |
256 | d->pipeline->pipelineCompleted(*this); | 283 | d->pipeline->pipelineCompleted(*this); |
284 | d->callback(); | ||
257 | } | 285 | } |
258 | } | 286 | } |
259 | 287 | ||
260 | void PipelineState::processingCompleted(Preprocessor *filter) | 288 | void PipelineState::processingCompleted(Preprocessor *filter) |
261 | { | 289 | { |
290 | //TODO record processing progress | ||
262 | if (d->pipeline && filter == d->filterIt.peekPrevious()) { | 291 | if (d->pipeline && filter == d->filterIt.peekPrevious()) { |
263 | d->idle = true; | 292 | d->idle = true; |
264 | d->pipeline->pipelineStepped(*this); | 293 | d->pipeline->pipelineStepped(*this); |
diff --git a/common/pipeline.h b/common/pipeline.h index 6b847f5..918d21e 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -27,7 +27,7 @@ | |||
27 | 27 | ||
28 | #include <akonadi2common_export.h> | 28 | #include <akonadi2common_export.h> |
29 | #include <storage.h> | 29 | #include <storage.h> |
30 | #include <clientapi.h> //For domain types | 30 | #include "async/src/async.h" |
31 | 31 | ||
32 | #include "entity_generated.h" | 32 | #include "entity_generated.h" |
33 | 33 | ||
@@ -49,34 +49,13 @@ public: | |||
49 | 49 | ||
50 | Storage &storage() const; | 50 | Storage &storage() const; |
51 | 51 | ||
52 | // template <typename T> | 52 | void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); |
53 | // Storage &storage() const; | ||
54 | |||
55 | template <typename T> | ||
56 | void setPreprocessors(Type type, const QVector<Preprocessor *> &preprocessors) | ||
57 | { | ||
58 | setPreprocessors(Akonadi2::Domain::getTypeName<T>(), type, preprocessors); | ||
59 | } | ||
60 | 53 | ||
61 | void null(); | 54 | void null(); |
62 | 55 | ||
63 | template <typename T> | 56 | Async::Job<void> newEntity(void const *command, size_t size); |
64 | void newEntity(const QByteArray &key, void *resourceBufferData, size_t size) | 57 | void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); |
65 | { | 58 | void deletedEntity(const QString &entityType, const QByteArray &key); |
66 | newEntity(Akonadi2::Domain::getTypeName<T>(), key, resourceBufferData, size); | ||
67 | } | ||
68 | |||
69 | template <typename T> | ||
70 | void modifiedEntity(const QByteArray &key, void *data, size_t size) | ||
71 | { | ||
72 | modifiedEntity(Akonadi2::Domain::getTypeName<T>(), key, data, size); | ||
73 | } | ||
74 | |||
75 | template <typename T> | ||
76 | void deletedEntity(const QByteArray &key) | ||
77 | { | ||
78 | deletedEntity(Akonadi2::Domain::getTypeName<T>(), key); | ||
79 | } | ||
80 | 59 | ||
81 | Q_SIGNALS: | 60 | Q_SIGNALS: |
82 | void revisionUpdated(); | 61 | void revisionUpdated(); |
@@ -86,10 +65,6 @@ private Q_SLOTS: | |||
86 | void stepPipelines(); | 65 | void stepPipelines(); |
87 | 66 | ||
88 | private: | 67 | private: |
89 | void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); | ||
90 | void newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size); | ||
91 | void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); | ||
92 | void deletedEntity(const QString &entityType, const QByteArray &key); | ||
93 | void pipelineStepped(const PipelineState &state); | 68 | void pipelineStepped(const PipelineState &state); |
94 | void pipelineCompleted(const PipelineState &state); | 69 | void pipelineCompleted(const PipelineState &state); |
95 | void scheduleStep(); | 70 | void scheduleStep(); |
@@ -104,7 +79,7 @@ class AKONADI2COMMON_EXPORT PipelineState | |||
104 | { | 79 | { |
105 | public: | 80 | public: |
106 | PipelineState(); | 81 | PipelineState(); |
107 | PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters); | 82 | PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, const std::function<void()> &callback); |
108 | PipelineState(const PipelineState &other); | 83 | PipelineState(const PipelineState &other); |
109 | ~PipelineState(); | 84 | ~PipelineState(); |
110 | 85 | ||
@@ -114,6 +89,7 @@ public: | |||
114 | bool isIdle() const; | 89 | bool isIdle() const; |
115 | QByteArray key() const; | 90 | QByteArray key() const; |
116 | Pipeline::Type type() const; | 91 | Pipeline::Type type() const; |
92 | //TODO expose command | ||
117 | 93 | ||
118 | void step(); | 94 | void step(); |
119 | void processingCompleted(Preprocessor *filter); | 95 | void processingCompleted(Preprocessor *filter); |
@@ -129,7 +105,10 @@ public: | |||
129 | Preprocessor(); | 105 | Preprocessor(); |
130 | virtual ~Preprocessor(); | 106 | virtual ~Preprocessor(); |
131 | 107 | ||
108 | //TODO pass actual command as well, for changerecording | ||
132 | virtual void process(PipelineState state, const Akonadi2::Entity &); | 109 | virtual void process(PipelineState state, const Akonadi2::Entity &); |
110 | //TODO to record progress | ||
111 | // virtual QString id(); | ||
133 | 112 | ||
134 | protected: | 113 | protected: |
135 | void processingCompleted(PipelineState state); | 114 | void processingCompleted(PipelineState state); |
diff --git a/common/queuedcommand.fbs b/common/queuedcommand.fbs new file mode 100644 index 0000000..0ca899f --- /dev/null +++ b/common/queuedcommand.fbs | |||
@@ -0,0 +1,11 @@ | |||
1 | namespace Akonadi2; | ||
2 | |||
3 | table QueuedCommand { | ||
4 | commandId: int; | ||
5 | command: [ubyte]; | ||
6 | // entityId: string; | ||
7 | // sourceRevision: ulong; | ||
8 | // targetRevision: [ubyte]; | ||
9 | } | ||
10 | |||
11 | root_type QueuedCommand; | ||
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index d45ebde..0a333f6 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -43,7 +43,7 @@ public: | |||
43 | 43 | ||
44 | //TODO use jobs | 44 | //TODO use jobs |
45 | void sendCommand(int commandId, const std::function<void()> &callback = std::function<void()>()); | 45 | void sendCommand(int commandId, const std::function<void()> &callback = std::function<void()>()); |
46 | void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback); | 46 | void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback = std::function<void()>()); |
47 | Async::Job<void> synchronizeResource(); | 47 | Async::Job<void> synchronizeResource(); |
48 | 48 | ||
49 | public Q_SLOTS: | 49 | public Q_SLOTS: |