summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-09-24 16:45:06 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-09-24 16:45:06 +0200
commitb43c422a2b1b899ce5ac27a0bc381e8a49f05d86 (patch)
tree38e3b945f30e5fb7dd50d3c55d9819bb873cba29 /common/pipeline.cpp
parent179183cc388e3e8677ecdb82dac89f4d49570993 (diff)
downloadsink-b43c422a2b1b899ce5ac27a0bc381e8a49f05d86.tar.gz
sink-b43c422a2b1b899ce5ac27a0bc381e8a49f05d86.zip
Work with revisions in store + pipelinetest
Cleanup of revisions, and revision for removed entity is yet missing.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp19
1 files changed, 12 insertions, 7 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 14450aa..4fed41f 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -169,7 +169,11 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
169 flatbuffers::FlatBufferBuilder fbb; 169 flatbuffers::FlatBufferBuilder fbb;
170 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 170 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
171 171
172 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 172 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()),
173 [](const Akonadi2::Storage::Error &error) {
174 Warning() << "Failed to write entity";
175 }
176 );
173 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 177 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
174 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 178 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
175 179
@@ -198,6 +202,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
198 auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); 202 auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command);
199 Q_ASSERT(modifyEntity); 203 Q_ASSERT(modifyEntity);
200 204
205 const qint64 baseRevision = modifyEntity->revision();
201 //TODO rename modifyEntity->domainType to bufferType 206 //TODO rename modifyEntity->domainType to bufferType
202 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 207 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
203 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 208 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
@@ -224,8 +229,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
224 auto diff = adaptorFactory->createAdaptor(*diffEntity); 229 auto diff = adaptorFactory->createAdaptor(*diffEntity);
225 230
226 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; 231 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
227 //FIXME: read the revision that this modification is based on, not just the latest one 232 storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
228 storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
229 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 233 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
230 if (!buffer.isValid()) { 234 if (!buffer.isValid()) {
231 Warning() << "Read invalid buffer from disk"; 235 Warning() << "Read invalid buffer from disk";
@@ -234,10 +238,9 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
234 } 238 }
235 return false; 239 return false;
236 }, 240 },
237 [](const Storage::Error &error) { 241 [baseRevision](const Storage::Error &error) {
238 Warning() << "Failed to read value from storage: " << error.message; 242 Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision;
239 }); 243 });
240 //TODO error handler
241 244
242 if (!current) { 245 if (!current) {
243 Warning() << "Failed to read local value " << key; 246 Warning() << "Failed to read local value " << key;
@@ -275,6 +278,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
275 //TODO don't overwrite the old entry, but instead store a new revision 278 //TODO don't overwrite the old entry, but instead store a new revision
276 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 279 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
277 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 280 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
281 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
278 282
279 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { 283 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) {
280 PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { 284 PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() {
@@ -302,10 +306,11 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
302 306
303 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 307 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
304 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 308 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
309 const qint64 baseRevision = deleteEntity->revision();
305 310
306 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted 311 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted
307 //TODO remove all revisions? 312 //TODO remove all revisions?
308 d->transaction.openDatabase(bufferType + ".main").remove(key); 313 d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision));
309 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 314 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
310 Log() << "Pipeline: deleted entity: "<< newRevision; 315 Log() << "Pipeline: deleted entity: "<< newRevision;
311 316