From b43c422a2b1b899ce5ac27a0bc381e8a49f05d86 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 24 Sep 2015 16:45:06 +0200 Subject: Work with revisions in store + pipelinetest Cleanup of revisions, and revision for removed entity is yet missing. --- common/pipeline.cpp | 19 ++++++++++++------- common/storage_lmdb.cpp | 8 ++++++-- 2 files changed, 18 insertions(+), 9 deletions(-) (limited to 'common') diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 14450aa..4fed41f 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -169,7 +169,11 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder fbb; EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); - d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); + d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()), + [](const Akonadi2::Storage::Error &error) { + Warning() << "Failed to write entity"; + } + ); Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; @@ -198,6 +202,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); Q_ASSERT(modifyEntity); + const qint64 baseRevision = modifyEntity->revision(); //TODO rename modifyEntity->domainType to bufferType const QByteArray bufferType = QByteArray(reinterpret_cast(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); const QByteArray key = QByteArray(reinterpret_cast(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); @@ -224,8 +229,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto diff = adaptorFactory->createAdaptor(*diffEntity); QSharedPointer current; - //FIXME: read the revision that this modification is based on, not just the latest one - storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { Warning() << "Read invalid buffer from disk"; @@ -234,10 +238,9 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) } return false; }, - [](const Storage::Error &error) { - Warning() << "Failed to read value from storage: " << error.message; + [baseRevision](const Storage::Error &error) { + Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); - //TODO error handler if (!current) { Warning() << "Failed to read local value " << key; @@ -275,6 +278,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) //TODO don't overwrite the old entry, but instead store a new revision d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); + Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { @@ -302,10 +306,11 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) const QByteArray bufferType = QByteArray(reinterpret_cast(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); + const qint64 baseRevision = deleteEntity->revision(); //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted //TODO remove all revisions? - d->transaction.openDatabase(bufferType + ".main").remove(key); + d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision)); Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); Log() << "Pipeline: deleted entity: "<< newRevision; diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 3073d37..be5a9da 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -114,7 +114,9 @@ bool Storage::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sVa { if (!d || !d->transaction) { Error error("", ErrorCodes::GenericError, "Not open"); - errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + if (d) { + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + } return false; } const void *keyPtr = sKey.data(); @@ -149,7 +151,9 @@ void Storage::NamedDatabase::remove(const QByteArray &k, { if (!d || !d->transaction) { Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Not open"); - errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + if (d) { + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + } return; } -- cgit v1.2.3