diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-09-11 09:37:38 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-09-11 09:37:38 +0200 |
commit | 179183cc388e3e8677ecdb82dac89f4d49570993 (patch) | |
tree | e0fa8f888fe5479c203b0ef4c16b1ad59bd9bace /common/pipeline.cpp | |
parent | 50f737b8549fb1b380c753d36be3fafe0ec4a768 (diff) | |
download | sink-179183cc388e3e8677ecdb82dac89f4d49570993.tar.gz sink-179183cc388e3e8677ecdb82dac89f4d49570993.zip |
Store entities with revisions
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 33e5d5c..14450aa 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -169,12 +169,12 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
169 | flatbuffers::FlatBufferBuilder fbb; | 169 | flatbuffers::FlatBufferBuilder fbb; |
170 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 170 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); |
171 | 171 | ||
172 | d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 172 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
173 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 173 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
174 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 174 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
175 | 175 | ||
176 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { | 176 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { |
177 | PipelineState state(this, NewPipeline, key, d->newPipeline[bufferType], newRevision, [&future]() { | 177 | PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future]() { |
178 | future.setFinished(); | 178 | future.setFinished(); |
179 | }, bufferType); | 179 | }, bufferType); |
180 | d->activePipelines << state; | 180 | d->activePipelines << state; |
@@ -224,7 +224,8 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
224 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 224 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
225 | 225 | ||
226 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | 226 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; |
227 | storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 227 | //FIXME: read the revision that this modification is based on, not just the latest one |
228 | storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
228 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 229 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
229 | if (!buffer.isValid()) { | 230 | if (!buffer.isValid()) { |
230 | Warning() << "Read invalid buffer from disk"; | 231 | Warning() << "Read invalid buffer from disk"; |
@@ -272,11 +273,11 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
272 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 273 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
273 | 274 | ||
274 | //TODO don't overwrite the old entry, but instead store a new revision | 275 | //TODO don't overwrite the old entry, but instead store a new revision |
275 | d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 276 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
276 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 277 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
277 | 278 | ||
278 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { | 279 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { |
279 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[bufferType], newRevision, [&future]() { | 280 | PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { |
280 | future.setFinished(); | 281 | future.setFinished(); |
281 | }, bufferType); | 282 | }, bufferType); |
282 | d->activePipelines << state; | 283 | d->activePipelines << state; |
@@ -303,6 +304,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
303 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 304 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
304 | 305 | ||
305 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted | 306 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted |
307 | //TODO remove all revisions? | ||
306 | d->transaction.openDatabase(bufferType + ".main").remove(key); | 308 | d->transaction.openDatabase(bufferType + ".main").remove(key); |
307 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 309 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
308 | Log() << "Pipeline: deleted entity: "<< newRevision; | 310 | Log() << "Pipeline: deleted entity: "<< newRevision; |