From 179183cc388e3e8677ecdb82dac89f4d49570993 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 11 Sep 2015 09:37:38 +0200 Subject: Store entities with revisions --- common/entitystorage.cpp | 29 ++++++++++++++++++----------- common/pipeline.cpp | 12 +++++++----- 2 files changed, 25 insertions(+), 16 deletions(-) (limited to 'common') diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp index bcc3562..0eb2763 100644 --- a/common/entitystorage.cpp +++ b/common/entitystorage.cpp @@ -21,12 +21,8 @@ static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, std::function callback, const QByteArray &bufferType) { - transaction.openDatabase(bufferType + ".main").scan(key, [=](const QByteArray &key, const QByteArray &value) -> bool { - //Skip internals - if (Akonadi2::Storage::isInternalKey(key)) { - return true; - } + transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { //Extract buffers Akonadi2::EntityBuffer buffer(value.data(), value.size()); @@ -39,7 +35,9 @@ static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteA // qWarning() << "invalid buffer " << QByteArray::fromRawData(static_cast(keyValue), keySize); // return true; // } - return callback(key, buffer.entity()); + // + //We're cutting the revision off the key + return callback(Akonadi2::Storage::uidFromKey(key), buffer.entity()); }, [](const Akonadi2::Storage::Error &error) { qWarning() << "Error during query: " << error.message; @@ -54,10 +52,11 @@ void EntityStorageBase::readValue(const Akonadi2::Storage::Transaction &transact //This only works for a 1:1 mapping of resource to domain types. //Not i.e. for tags that are stored as flags in each entity of an imap store. //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor + //could be added to the adaptor. + auto domainObject = create(key, revision, mDomainTypeAdaptorFactory->createAdaptor(entity)); resultCallback(domainObject); - return true; + return false; }, mBufferType); } @@ -65,10 +64,18 @@ static ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, con { //TODO use a result set with an iterator, to read values on demand QVector keys; - scan(transaction, QByteArray(), [=, &keys](const QByteArray &key, const Akonadi2::Entity &) { - keys << key; + transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { + //Skip internals + if (Akonadi2::Storage::isInternalKey(key)) { + return true; + } + keys << Akonadi2::Storage::uidFromKey(key); return true; - }, bufferType); + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); + Trace() << "Full scan found " << keys.size() << " results"; return ResultSet(keys); } diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 33e5d5c..14450aa 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -169,12 +169,12 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder fbb; EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); - d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); + d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { - PipelineState state(this, NewPipeline, key, d->newPipeline[bufferType], newRevision, [&future]() { + PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future]() { future.setFinished(); }, bufferType); d->activePipelines << state; @@ -224,7 +224,8 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto diff = adaptorFactory->createAdaptor(*diffEntity); QSharedPointer current; - storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + //FIXME: read the revision that this modification is based on, not just the latest one + storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { Warning() << "Read invalid buffer from disk"; @@ -272,11 +273,11 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); //TODO don't overwrite the old entry, but instead store a new revision - d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); + d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { - PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[bufferType], newRevision, [&future]() { + PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { future.setFinished(); }, bufferType); d->activePipelines << state; @@ -303,6 +304,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted + //TODO remove all revisions? d->transaction.openDatabase(bufferType + ".main").remove(key); Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); Log() << "Pipeline: deleted entity: "<< newRevision; -- cgit v1.2.3