diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-09-08 21:08:54 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-09-08 21:08:54 +0200 |
commit | bbbda3fe9444eba6795a5490da0425cdf8f26361 (patch) | |
tree | d558ce110b71278df91135db5ac29c484e588ac5 /common/pipeline.cpp | |
parent | 43ae43bc74800473aadf9c5c807603cdf8516d36 (diff) | |
download | sink-bbbda3fe9444eba6795a5490da0425cdf8f26361.tar.gz sink-bbbda3fe9444eba6795a5490da0425cdf8f26361.zip |
Added support for mails to akonadi and the dummyresource.
Adding new types definitely needs to become easier.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 64 |
1 files changed, 39 insertions, 25 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 8ef6187..33e5d5c 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -143,7 +143,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
143 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | 143 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); |
144 | 144 | ||
145 | //TODO rename createEntitiy->domainType to bufferType | 145 | //TODO rename createEntitiy->domainType to bufferType |
146 | const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); | 146 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); |
147 | { | 147 | { |
148 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 148 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
149 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { | 149 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { |
@@ -152,6 +152,10 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
152 | } | 152 | } |
153 | } | 153 | } |
154 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); | 154 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); |
155 | if (!entity->resource()->size() && !entity->local()->size()) { | ||
156 | Warning() << "No local and no resource buffer while trying to create entity."; | ||
157 | return KAsync::error<void>(); | ||
158 | } | ||
155 | 159 | ||
156 | //Add metadata buffer | 160 | //Add metadata buffer |
157 | flatbuffers::FlatBufferBuilder metadataFbb; | 161 | flatbuffers::FlatBufferBuilder metadataFbb; |
@@ -165,14 +169,14 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
165 | flatbuffers::FlatBufferBuilder fbb; | 169 | flatbuffers::FlatBufferBuilder fbb; |
166 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 170 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); |
167 | 171 | ||
168 | d->transaction.openDatabase().write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 172 | d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
169 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 173 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
170 | Log() << "Pipeline: wrote entity: " << key << newRevision; | 174 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
171 | 175 | ||
172 | return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) { | 176 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { |
173 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], newRevision, [&future]() { | 177 | PipelineState state(this, NewPipeline, key, d->newPipeline[bufferType], newRevision, [&future]() { |
174 | future.setFinished(); | 178 | future.setFinished(); |
175 | }); | 179 | }, bufferType); |
176 | d->activePipelines << state; | 180 | d->activePipelines << state; |
177 | state.step(); | 181 | state.step(); |
178 | }); | 182 | }); |
@@ -195,10 +199,10 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
195 | Q_ASSERT(modifyEntity); | 199 | Q_ASSERT(modifyEntity); |
196 | 200 | ||
197 | //TODO rename modifyEntity->domainType to bufferType | 201 | //TODO rename modifyEntity->domainType to bufferType |
198 | const QByteArray entityType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); | 202 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); |
199 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); | 203 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); |
200 | if (entityType.isEmpty() || key.isEmpty()) { | 204 | if (bufferType.isEmpty() || key.isEmpty()) { |
201 | Warning() << "entity type or key " << entityType << key; | 205 | Warning() << "entity type or key " << bufferType << key; |
202 | return KAsync::error<void>(); | 206 | return KAsync::error<void>(); |
203 | } | 207 | } |
204 | { | 208 | { |
@@ -209,9 +213,9 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
209 | } | 213 | } |
210 | } | 214 | } |
211 | 215 | ||
212 | auto adaptorFactory = d->adaptorFactory.value(entityType); | 216 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
213 | if (!adaptorFactory) { | 217 | if (!adaptorFactory) { |
214 | Warning() << "no adaptor factory for type " << entityType; | 218 | Warning() << "no adaptor factory for type " << bufferType; |
215 | return KAsync::error<void>(); | 219 | return KAsync::error<void>(); |
216 | } | 220 | } |
217 | 221 | ||
@@ -220,7 +224,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
220 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 224 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
221 | 225 | ||
222 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | 226 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; |
223 | storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase().scan(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 227 | storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
224 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 228 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
225 | if (!buffer.isValid()) { | 229 | if (!buffer.isValid()) { |
226 | Warning() << "Read invalid buffer from disk"; | 230 | Warning() << "Read invalid buffer from disk"; |
@@ -228,6 +232,9 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
228 | current = adaptorFactory->createAdaptor(buffer.entity()); | 232 | current = adaptorFactory->createAdaptor(buffer.entity()); |
229 | } | 233 | } |
230 | return false; | 234 | return false; |
235 | }, | ||
236 | [](const Storage::Error &error) { | ||
237 | Warning() << "Failed to read value from storage: " << error.message; | ||
231 | }); | 238 | }); |
232 | //TODO error handler | 239 | //TODO error handler |
233 | 240 | ||
@@ -265,13 +272,13 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
265 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 272 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
266 | 273 | ||
267 | //TODO don't overwrite the old entry, but instead store a new revision | 274 | //TODO don't overwrite the old entry, but instead store a new revision |
268 | d->transaction.openDatabase().write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 275 | d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
269 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 276 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
270 | 277 | ||
271 | return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) { | 278 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { |
272 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], newRevision, [&future]() { | 279 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[bufferType], newRevision, [&future]() { |
273 | future.setFinished(); | 280 | future.setFinished(); |
274 | }); | 281 | }, bufferType); |
275 | d->activePipelines << state; | 282 | d->activePipelines << state; |
276 | state.step(); | 283 | state.step(); |
277 | }); | 284 | }); |
@@ -292,18 +299,18 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
292 | } | 299 | } |
293 | auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); | 300 | auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); |
294 | 301 | ||
295 | const QByteArray entityType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); | 302 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); |
296 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 303 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
297 | 304 | ||
298 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted | 305 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted |
299 | d->transaction.openDatabase().remove(key); | 306 | d->transaction.openDatabase(bufferType + ".main").remove(key); |
300 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 307 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
301 | Log() << "Pipeline: deleted entity: "<< newRevision; | 308 | Log() << "Pipeline: deleted entity: "<< newRevision; |
302 | 309 | ||
303 | return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) { | 310 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { |
304 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], newRevision, [&future](){ | 311 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future](){ |
305 | future.setFinished(); | 312 | future.setFinished(); |
306 | }); | 313 | }, bufferType); |
307 | d->activePipelines << state; | 314 | d->activePipelines << state; |
308 | state.step(); | 315 | state.step(); |
309 | }); | 316 | }); |
@@ -354,14 +361,15 @@ void Pipeline::pipelineCompleted(PipelineState state) | |||
354 | class PipelineState::Private : public QSharedData | 361 | class PipelineState::Private : public QSharedData |
355 | { | 362 | { |
356 | public: | 363 | public: |
357 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c, qint64 r) | 364 | Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c, qint64 r, const QByteArray &b) |
358 | : pipeline(p), | 365 | : pipeline(p), |
359 | type(t), | 366 | type(t), |
360 | key(k), | 367 | key(k), |
361 | filterIt(filters), | 368 | filterIt(filters), |
362 | idle(true), | 369 | idle(true), |
363 | callback(c), | 370 | callback(c), |
364 | revision(r) | 371 | revision(r), |
372 | bufferType(b) | ||
365 | {} | 373 | {} |
366 | 374 | ||
367 | Private() | 375 | Private() |
@@ -378,6 +386,7 @@ public: | |||
378 | bool idle; | 386 | bool idle; |
379 | std::function<void()> callback; | 387 | std::function<void()> callback; |
380 | qint64 revision; | 388 | qint64 revision; |
389 | QByteArray bufferType; | ||
381 | }; | 390 | }; |
382 | 391 | ||
383 | PipelineState::PipelineState() | 392 | PipelineState::PipelineState() |
@@ -386,8 +395,8 @@ PipelineState::PipelineState() | |||
386 | 395 | ||
387 | } | 396 | } |
388 | 397 | ||
389 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback) | 398 | PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType) |
390 | : d(new Private(pipeline, type, key, filters, callback, revision)) | 399 | : d(new Private(pipeline, type, key, filters, callback, revision, bufferType)) |
391 | { | 400 | { |
392 | } | 401 | } |
393 | 402 | ||
@@ -431,6 +440,11 @@ qint64 PipelineState::revision() const | |||
431 | return d->revision; | 440 | return d->revision; |
432 | } | 441 | } |
433 | 442 | ||
443 | QByteArray PipelineState::bufferType() const | ||
444 | { | ||
445 | return d->bufferType; | ||
446 | } | ||
447 | |||
434 | void PipelineState::step() | 448 | void PipelineState::step() |
435 | { | 449 | { |
436 | if (!d->pipeline) { | 450 | if (!d->pipeline) { |