summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-09-11 09:37:38 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-09-11 09:37:38 +0200
commit179183cc388e3e8677ecdb82dac89f4d49570993 (patch)
treee0fa8f888fe5479c203b0ef4c16b1ad59bd9bace
parent50f737b8549fb1b380c753d36be3fafe0ec4a768 (diff)
downloadsink-179183cc388e3e8677ecdb82dac89f4d49570993.tar.gz
sink-179183cc388e3e8677ecdb82dac89f4d49570993.zip
Store entities with revisions
-rw-r--r--common/entitystorage.cpp29
-rw-r--r--common/pipeline.cpp12
-rw-r--r--examples/dummyresource/resourcefactory.cpp4
3 files changed, 27 insertions, 18 deletions
diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp
index bcc3562..0eb2763 100644
--- a/common/entitystorage.cpp
+++ b/common/entitystorage.cpp
@@ -21,12 +21,8 @@
21 21
22static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, std::function<bool(const QByteArray &key, const Akonadi2::Entity &entity)> callback, const QByteArray &bufferType) 22static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, std::function<bool(const QByteArray &key, const Akonadi2::Entity &entity)> callback, const QByteArray &bufferType)
23{ 23{
24 transaction.openDatabase(bufferType + ".main").scan(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
25 //Skip internals
26 if (Akonadi2::Storage::isInternalKey(key)) {
27 return true;
28 }
29 24
25 transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
30 //Extract buffers 26 //Extract buffers
31 Akonadi2::EntityBuffer buffer(value.data(), value.size()); 27 Akonadi2::EntityBuffer buffer(value.data(), value.size());
32 28
@@ -39,7 +35,9 @@ static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteA
39 // qWarning() << "invalid buffer " << QByteArray::fromRawData(static_cast<char*>(keyValue), keySize); 35 // qWarning() << "invalid buffer " << QByteArray::fromRawData(static_cast<char*>(keyValue), keySize);
40 // return true; 36 // return true;
41 // } 37 // }
42 return callback(key, buffer.entity()); 38 //
39 //We're cutting the revision off the key
40 return callback(Akonadi2::Storage::uidFromKey(key), buffer.entity());
43 }, 41 },
44 [](const Akonadi2::Storage::Error &error) { 42 [](const Akonadi2::Storage::Error &error) {
45 qWarning() << "Error during query: " << error.message; 43 qWarning() << "Error during query: " << error.message;
@@ -54,10 +52,11 @@ void EntityStorageBase::readValue(const Akonadi2::Storage::Transaction &transact
54 //This only works for a 1:1 mapping of resource to domain types. 52 //This only works for a 1:1 mapping of resource to domain types.
55 //Not i.e. for tags that are stored as flags in each entity of an imap store. 53 //Not i.e. for tags that are stored as flags in each entity of an imap store.
56 //additional properties that don't have a 1:1 mapping (such as separately stored tags), 54 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
57 //could be added to the adaptor 55 //could be added to the adaptor.
56
58 auto domainObject = create(key, revision, mDomainTypeAdaptorFactory->createAdaptor(entity)); 57 auto domainObject = create(key, revision, mDomainTypeAdaptorFactory->createAdaptor(entity));
59 resultCallback(domainObject); 58 resultCallback(domainObject);
60 return true; 59 return false;
61 }, mBufferType); 60 }, mBufferType);
62} 61}
63 62
@@ -65,10 +64,18 @@ static ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, con
65{ 64{
66 //TODO use a result set with an iterator, to read values on demand 65 //TODO use a result set with an iterator, to read values on demand
67 QVector<QByteArray> keys; 66 QVector<QByteArray> keys;
68 scan(transaction, QByteArray(), [=, &keys](const QByteArray &key, const Akonadi2::Entity &) { 67 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
69 keys << key; 68 //Skip internals
69 if (Akonadi2::Storage::isInternalKey(key)) {
70 return true;
71 }
72 keys << Akonadi2::Storage::uidFromKey(key);
70 return true; 73 return true;
71 }, bufferType); 74 },
75 [](const Akonadi2::Storage::Error &error) {
76 qWarning() << "Error during query: " << error.message;
77 });
78
72 Trace() << "Full scan found " << keys.size() << " results"; 79 Trace() << "Full scan found " << keys.size() << " results";
73 return ResultSet(keys); 80 return ResultSet(keys);
74} 81}
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 33e5d5c..14450aa 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -169,12 +169,12 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
169 flatbuffers::FlatBufferBuilder fbb; 169 flatbuffers::FlatBufferBuilder fbb;
170 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 170 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
171 171
172 d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 172 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
173 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 173 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
174 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 174 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
175 175
176 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { 176 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) {
177 PipelineState state(this, NewPipeline, key, d->newPipeline[bufferType], newRevision, [&future]() { 177 PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future]() {
178 future.setFinished(); 178 future.setFinished();
179 }, bufferType); 179 }, bufferType);
180 d->activePipelines << state; 180 d->activePipelines << state;
@@ -224,7 +224,8 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
224 auto diff = adaptorFactory->createAdaptor(*diffEntity); 224 auto diff = adaptorFactory->createAdaptor(*diffEntity);
225 225
226 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; 226 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
227 storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 227 //FIXME: read the revision that this modification is based on, not just the latest one
228 storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
228 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 229 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
229 if (!buffer.isValid()) { 230 if (!buffer.isValid()) {
230 Warning() << "Read invalid buffer from disk"; 231 Warning() << "Read invalid buffer from disk";
@@ -272,11 +273,11 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
272 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 273 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
273 274
274 //TODO don't overwrite the old entry, but instead store a new revision 275 //TODO don't overwrite the old entry, but instead store a new revision
275 d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 276 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
276 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 277 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
277 278
278 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { 279 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) {
279 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[bufferType], newRevision, [&future]() { 280 PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() {
280 future.setFinished(); 281 future.setFinished();
281 }, bufferType); 282 }, bufferType);
282 d->activePipelines << state; 283 d->activePipelines << state;
@@ -303,6 +304,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
303 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 304 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
304 305
305 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted 306 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted
307 //TODO remove all revisions?
306 d->transaction.openDatabase(bufferType + ".main").remove(key); 308 d->transaction.openDatabase(bufferType + ".main").remove(key);
307 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 309 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
308 Log() << "Pipeline: deleted entity: "<< newRevision; 310 Log() << "Pipeline: deleted entity: "<< newRevision;
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp
index cfbcdae..8d84cf8 100644
--- a/examples/dummyresource/resourcefactory.cpp
+++ b/examples/dummyresource/resourcefactory.cpp
@@ -55,7 +55,7 @@ DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QShared
55 const auto resourceIdentifier = mResourceInstanceIdentifier; 55 const auto resourceIdentifier = mResourceInstanceIdentifier;
56 56
57 auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { 57 auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) {
58 Akonadi2::ApplicationDomain::Event event(resourceIdentifier, state.key(), -1, eventFactory->createAdaptor(entity)); 58 Akonadi2::ApplicationDomain::Event event(resourceIdentifier, Akonadi2::Storage::uidFromKey(state.key()), -1, eventFactory->createAdaptor(entity));
59 Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Event>::index(event, transaction); 59 Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Event>::index(event, transaction);
60 index("event.index.rid", event.getProperty("remoteId"), event.identifier(), transaction); 60 index("event.index.rid", event.getProperty("remoteId"), event.identifier(), transaction);
61 }); 61 });
@@ -67,7 +67,7 @@ DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QShared
67 { 67 {
68 auto mailFactory = QSharedPointer<DummyMailAdaptorFactory>::create(); 68 auto mailFactory = QSharedPointer<DummyMailAdaptorFactory>::create();
69 auto mailIndexer = new Akonadi2::SimpleProcessor("mailIndexer", [mailFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { 69 auto mailIndexer = new Akonadi2::SimpleProcessor("mailIndexer", [mailFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) {
70 Akonadi2::ApplicationDomain::Mail mail(resourceIdentifier, state.key(), -1, mailFactory->createAdaptor(entity)); 70 Akonadi2::ApplicationDomain::Mail mail(resourceIdentifier, Akonadi2::Storage::uidFromKey(state.key()), -1, mailFactory->createAdaptor(entity));
71 Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Mail>::index(mail, transaction); 71 Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Mail>::index(mail, transaction);
72 index("mail.index.rid", mail.getProperty("remoteId"), mail.identifier(), transaction); 72 index("mail.index.rid", mail.getProperty("remoteId"), mail.identifier(), transaction);
73 }); 73 });