diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 79 |
1 files changed, 54 insertions, 25 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 10bae54..9cc7450 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -23,10 +23,13 @@ | |||
23 | #include <QByteArray> | 23 | #include <QByteArray> |
24 | #include <QStandardPaths> | 24 | #include <QStandardPaths> |
25 | #include <QVector> | 25 | #include <QVector> |
26 | #include <QUuid> | ||
26 | #include <QDebug> | 27 | #include <QDebug> |
27 | #include "entity_generated.h" | 28 | #include "entity_generated.h" |
28 | #include "metadata_generated.h" | 29 | #include "metadata_generated.h" |
30 | #include "createentity_generated.h" | ||
29 | #include "entitybuffer.h" | 31 | #include "entitybuffer.h" |
32 | #include "async/src/async.h" | ||
30 | 33 | ||
31 | namespace Akonadi2 | 34 | namespace Akonadi2 |
32 | { | 35 | { |
@@ -90,39 +93,59 @@ void Pipeline::null() | |||
90 | // state.step(); | 93 | // state.step(); |
91 | } | 94 | } |
92 | 95 | ||
93 | void Pipeline::newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size) | 96 | Async::Job<void> Pipeline::newEntity(void const *command, size_t size) |
94 | { | 97 | { |
95 | const qint64 newRevision = storage().maxRevision() + 1; | 98 | qDebug() << "new entity"; |
96 | 99 | Async::start<void>([&](Async::Future<void> future) { | |
97 | //Add metadata buffer | 100 | |
98 | flatbuffers::FlatBufferBuilder metadataFbb; | 101 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. |
99 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | 102 | const auto key = QUuid::createUuid().toString().toUtf8(); |
100 | metadataBuilder.add_revision(newRevision); | 103 | |
101 | metadataBuilder.add_processed(false); | 104 | //TODO figure out if we already have created a revision for the message? |
102 | auto metadataBuffer = metadataBuilder.Finish(); | 105 | const qint64 newRevision = storage().maxRevision() + 1; |
103 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | 106 | |
104 | 107 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | |
105 | flatbuffers::FlatBufferBuilder fbb; | 108 | //TODO rename createEntitiy->domainType to bufferType |
106 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), resourceBufferData, size, 0, 0); | 109 | const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); |
107 | 110 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); | |
108 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | 111 | // |
109 | storage().setMaxRevision(newRevision); | 112 | // const QString entityType; |
113 | // auto entity = Akonadi2::GetEntity(0); | ||
114 | |||
115 | //Add metadata buffer | ||
116 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
117 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | ||
118 | metadataBuilder.add_revision(newRevision); | ||
119 | metadataBuilder.add_processed(false); | ||
120 | auto metadataBuffer = metadataBuilder.Finish(); | ||
121 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
122 | //TODO we should reserve some space in metadata for in-place updates | ||
123 | |||
124 | flatbuffers::FlatBufferBuilder fbb; | ||
125 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | ||
126 | |||
127 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | ||
128 | storage().setMaxRevision(newRevision); | ||
129 | |||
130 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { | ||
131 | future.setFinished(); | ||
132 | }); | ||
133 | d->activePipelines << state; | ||
134 | state.step(); | ||
110 | 135 | ||
111 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType]); | 136 | }); |
112 | d->activePipelines << state; | ||
113 | state.step(); | ||
114 | } | 137 | } |
115 | 138 | ||
116 | void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) | 139 | void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) |
117 | { | 140 | { |
118 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType]); | 141 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [](){}); |
119 | d->activePipelines << state; | 142 | d->activePipelines << state; |
120 | state.step(); | 143 | state.step(); |
121 | } | 144 | } |
122 | 145 | ||
123 | void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) | 146 | void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) |
124 | { | 147 | { |
125 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType]); | 148 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [](){}); |
126 | d->activePipelines << state; | 149 | d->activePipelines << state; |
127 | state.step(); | 150 | state.step(); |
128 | } | 151 | } |
@@ -160,6 +183,7 @@ void Pipeline::pipelineCompleted(const PipelineState &state) | |||
160 | } | 183 | } |
161 | 184 | ||
162 | if (state.type() != NullPipeline) { | 185 | if (state.type() != NullPipeline) { |
186 | //TODO what revision is finalized? | ||
163 | emit revisionUpdated(); | 187 | emit revisionUpdated(); |
164 | } | 188 | } |
165 | scheduleStep(); | 189 | scheduleStep(); |
@@ -172,12 +196,13 @@ void Pipeline::pipelineCompleted(const PipelineState &state) | |||
172 | class PipelineState::Private : public QSharedData | 196 | class PipelineState::Private : public QSharedData |
173 | { | 197 | { |
174 | public: | 198 | public: |
175 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters) | 199 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c) |
176 | : pipeline(p), | 200 | : pipeline(p), |
177 | type(t), | 201 | type(t), |
178 | key(k), | 202 | key(k), |
179 | filterIt(filters), | 203 | filterIt(filters), |
180 | idle(true) | 204 | idle(true), |
205 | callback(c) | ||
181 | {} | 206 | {} |
182 | 207 | ||
183 | Private() | 208 | Private() |
@@ -191,6 +216,7 @@ public: | |||
191 | QByteArray key; | 216 | QByteArray key; |
192 | QVectorIterator<Preprocessor *> filterIt; | 217 | QVectorIterator<Preprocessor *> filterIt; |
193 | bool idle; | 218 | bool idle; |
219 | std::function<void()> callback; | ||
194 | }; | 220 | }; |
195 | 221 | ||
196 | PipelineState::PipelineState() | 222 | PipelineState::PipelineState() |
@@ -199,8 +225,8 @@ PipelineState::PipelineState() | |||
199 | 225 | ||
200 | } | 226 | } |
201 | 227 | ||
202 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters) | 228 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, const std::function<void()> &callback) |
203 | : d(new Private(pipeline, type, key, filters)) | 229 | : d(new Private(pipeline, type, key, filters, callback)) |
204 | { | 230 | { |
205 | } | 231 | } |
206 | 232 | ||
@@ -247,6 +273,7 @@ void PipelineState::step() | |||
247 | 273 | ||
248 | d->idle = false; | 274 | d->idle = false; |
249 | if (d->filterIt.hasNext()) { | 275 | if (d->filterIt.hasNext()) { |
276 | //TODO skip step if already processed | ||
250 | d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { | 277 | d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { |
251 | auto entity = Akonadi2::GetEntity(dataValue); | 278 | auto entity = Akonadi2::GetEntity(dataValue); |
252 | d->filterIt.next()->process(*this, *entity); | 279 | d->filterIt.next()->process(*this, *entity); |
@@ -254,11 +281,13 @@ void PipelineState::step() | |||
254 | }); | 281 | }); |
255 | } else { | 282 | } else { |
256 | d->pipeline->pipelineCompleted(*this); | 283 | d->pipeline->pipelineCompleted(*this); |
284 | d->callback(); | ||
257 | } | 285 | } |
258 | } | 286 | } |
259 | 287 | ||
260 | void PipelineState::processingCompleted(Preprocessor *filter) | 288 | void PipelineState::processingCompleted(Preprocessor *filter) |
261 | { | 289 | { |
290 | //TODO record processing progress | ||
262 | if (d->pipeline && filter == d->filterIt.peekPrevious()) { | 291 | if (d->pipeline && filter == d->filterIt.peekPrevious()) { |
263 | d->idle = true; | 292 | d->idle = true; |
264 | d->pipeline->pipelineStepped(*this); | 293 | d->pipeline->pipelineStepped(*this); |