diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 14450aa..4fed41f 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -169,7 +169,11 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
169 | flatbuffers::FlatBufferBuilder fbb; | 169 | flatbuffers::FlatBufferBuilder fbb; |
170 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 170 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); |
171 | 171 | ||
172 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 172 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), |
173 | [](const Akonadi2::Storage::Error &error) { | ||
174 | Warning() << "Failed to write entity"; | ||
175 | } | ||
176 | ); | ||
173 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 177 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
174 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 178 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
175 | 179 | ||
@@ -198,6 +202,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
198 | auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); | 202 | auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); |
199 | Q_ASSERT(modifyEntity); | 203 | Q_ASSERT(modifyEntity); |
200 | 204 | ||
205 | const qint64 baseRevision = modifyEntity->revision(); | ||
201 | //TODO rename modifyEntity->domainType to bufferType | 206 | //TODO rename modifyEntity->domainType to bufferType |
202 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); | 207 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); |
203 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); | 208 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); |
@@ -224,8 +229,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
224 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 229 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
225 | 230 | ||
226 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | 231 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; |
227 | //FIXME: read the revision that this modification is based on, not just the latest one | 232 | storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
228 | storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
229 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 233 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
230 | if (!buffer.isValid()) { | 234 | if (!buffer.isValid()) { |
231 | Warning() << "Read invalid buffer from disk"; | 235 | Warning() << "Read invalid buffer from disk"; |
@@ -234,10 +238,9 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
234 | } | 238 | } |
235 | return false; | 239 | return false; |
236 | }, | 240 | }, |
237 | [](const Storage::Error &error) { | 241 | [baseRevision](const Storage::Error &error) { |
238 | Warning() << "Failed to read value from storage: " << error.message; | 242 | Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; |
239 | }); | 243 | }); |
240 | //TODO error handler | ||
241 | 244 | ||
242 | if (!current) { | 245 | if (!current) { |
243 | Warning() << "Failed to read local value " << key; | 246 | Warning() << "Failed to read local value " << key; |
@@ -275,6 +278,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
275 | //TODO don't overwrite the old entry, but instead store a new revision | 278 | //TODO don't overwrite the old entry, but instead store a new revision |
276 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 279 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
277 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 280 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
281 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | ||
278 | 282 | ||
279 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { | 283 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { |
280 | PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { | 284 | PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { |
@@ -302,10 +306,11 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
302 | 306 | ||
303 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); | 307 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); |
304 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 308 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
309 | const qint64 baseRevision = deleteEntity->revision(); | ||
305 | 310 | ||
306 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted | 311 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted |
307 | //TODO remove all revisions? | 312 | //TODO remove all revisions? |
308 | d->transaction.openDatabase(bufferType + ".main").remove(key); | 313 | d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision)); |
309 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 314 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
310 | Log() << "Pipeline: deleted entity: "<< newRevision; | 315 | Log() << "Pipeline: deleted entity: "<< newRevision; |
311 | 316 | ||