summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-09-11 09:37:38 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-09-11 09:37:38 +0200
commit179183cc388e3e8677ecdb82dac89f4d49570993 (patch)
treee0fa8f888fe5479c203b0ef4c16b1ad59bd9bace /common/pipeline.cpp
parent50f737b8549fb1b380c753d36be3fafe0ec4a768 (diff)
downloadsink-179183cc388e3e8677ecdb82dac89f4d49570993.tar.gz
sink-179183cc388e3e8677ecdb82dac89f4d49570993.zip
Store entities with revisions
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp12
1 files changed, 7 insertions, 5 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 33e5d5c..14450aa 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -169,12 +169,12 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
169 flatbuffers::FlatBufferBuilder fbb; 169 flatbuffers::FlatBufferBuilder fbb;
170 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 170 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
171 171
172 d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 172 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
173 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 173 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
174 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 174 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
175 175
176 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { 176 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) {
177 PipelineState state(this, NewPipeline, key, d->newPipeline[bufferType], newRevision, [&future]() { 177 PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future]() {
178 future.setFinished(); 178 future.setFinished();
179 }, bufferType); 179 }, bufferType);
180 d->activePipelines << state; 180 d->activePipelines << state;
@@ -224,7 +224,8 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
224 auto diff = adaptorFactory->createAdaptor(*diffEntity); 224 auto diff = adaptorFactory->createAdaptor(*diffEntity);
225 225
226 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; 226 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
227 storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 227 //FIXME: read the revision that this modification is based on, not just the latest one
228 storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
228 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 229 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
229 if (!buffer.isValid()) { 230 if (!buffer.isValid()) {
230 Warning() << "Read invalid buffer from disk"; 231 Warning() << "Read invalid buffer from disk";
@@ -272,11 +273,11 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
272 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 273 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
273 274
274 //TODO don't overwrite the old entry, but instead store a new revision 275 //TODO don't overwrite the old entry, but instead store a new revision
275 d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 276 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
276 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 277 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
277 278
278 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { 279 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) {
279 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[bufferType], newRevision, [&future]() { 280 PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() {
280 future.setFinished(); 281 future.setFinished();
281 }, bufferType); 282 }, bufferType);
282 d->activePipelines << state; 283 d->activePipelines << state;
@@ -303,6 +304,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
303 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 304 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
304 305
305 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted 306 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted
307 //TODO remove all revisions?
306 d->transaction.openDatabase(bufferType + ".main").remove(key); 308 d->transaction.openDatabase(bufferType + ".main").remove(key);
307 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 309 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
308 Log() << "Pipeline: deleted entity: "<< newRevision; 310 Log() << "Pipeline: deleted entity: "<< newRevision;