diff options
-rw-r--r-- | common/entitystorage.cpp | 29 | ||||
-rw-r--r-- | common/pipeline.cpp | 12 | ||||
-rw-r--r-- | examples/dummyresource/resourcefactory.cpp | 4 |
3 files changed, 27 insertions, 18 deletions
diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp index bcc3562..0eb2763 100644 --- a/common/entitystorage.cpp +++ b/common/entitystorage.cpp | |||
@@ -21,12 +21,8 @@ | |||
21 | 21 | ||
22 | static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, std::function<bool(const QByteArray &key, const Akonadi2::Entity &entity)> callback, const QByteArray &bufferType) | 22 | static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, std::function<bool(const QByteArray &key, const Akonadi2::Entity &entity)> callback, const QByteArray &bufferType) |
23 | { | 23 | { |
24 | transaction.openDatabase(bufferType + ".main").scan(key, [=](const QByteArray &key, const QByteArray &value) -> bool { | ||
25 | //Skip internals | ||
26 | if (Akonadi2::Storage::isInternalKey(key)) { | ||
27 | return true; | ||
28 | } | ||
29 | 24 | ||
25 | transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { | ||
30 | //Extract buffers | 26 | //Extract buffers |
31 | Akonadi2::EntityBuffer buffer(value.data(), value.size()); | 27 | Akonadi2::EntityBuffer buffer(value.data(), value.size()); |
32 | 28 | ||
@@ -39,7 +35,9 @@ static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteA | |||
39 | // qWarning() << "invalid buffer " << QByteArray::fromRawData(static_cast<char*>(keyValue), keySize); | 35 | // qWarning() << "invalid buffer " << QByteArray::fromRawData(static_cast<char*>(keyValue), keySize); |
40 | // return true; | 36 | // return true; |
41 | // } | 37 | // } |
42 | return callback(key, buffer.entity()); | 38 | // |
39 | //We're cutting the revision off the key | ||
40 | return callback(Akonadi2::Storage::uidFromKey(key), buffer.entity()); | ||
43 | }, | 41 | }, |
44 | [](const Akonadi2::Storage::Error &error) { | 42 | [](const Akonadi2::Storage::Error &error) { |
45 | qWarning() << "Error during query: " << error.message; | 43 | qWarning() << "Error during query: " << error.message; |
@@ -54,10 +52,11 @@ void EntityStorageBase::readValue(const Akonadi2::Storage::Transaction &transact | |||
54 | //This only works for a 1:1 mapping of resource to domain types. | 52 | //This only works for a 1:1 mapping of resource to domain types. |
55 | //Not i.e. for tags that are stored as flags in each entity of an imap store. | 53 | //Not i.e. for tags that are stored as flags in each entity of an imap store. |
56 | //additional properties that don't have a 1:1 mapping (such as separately stored tags), | 54 | //additional properties that don't have a 1:1 mapping (such as separately stored tags), |
57 | //could be added to the adaptor | 55 | //could be added to the adaptor. |
56 | |||
58 | auto domainObject = create(key, revision, mDomainTypeAdaptorFactory->createAdaptor(entity)); | 57 | auto domainObject = create(key, revision, mDomainTypeAdaptorFactory->createAdaptor(entity)); |
59 | resultCallback(domainObject); | 58 | resultCallback(domainObject); |
60 | return true; | 59 | return false; |
61 | }, mBufferType); | 60 | }, mBufferType); |
62 | } | 61 | } |
63 | 62 | ||
@@ -65,10 +64,18 @@ static ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, con | |||
65 | { | 64 | { |
66 | //TODO use a result set with an iterator, to read values on demand | 65 | //TODO use a result set with an iterator, to read values on demand |
67 | QVector<QByteArray> keys; | 66 | QVector<QByteArray> keys; |
68 | scan(transaction, QByteArray(), [=, &keys](const QByteArray &key, const Akonadi2::Entity &) { | 67 | transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { |
69 | keys << key; | 68 | //Skip internals |
69 | if (Akonadi2::Storage::isInternalKey(key)) { | ||
70 | return true; | ||
71 | } | ||
72 | keys << Akonadi2::Storage::uidFromKey(key); | ||
70 | return true; | 73 | return true; |
71 | }, bufferType); | 74 | }, |
75 | [](const Akonadi2::Storage::Error &error) { | ||
76 | qWarning() << "Error during query: " << error.message; | ||
77 | }); | ||
78 | |||
72 | Trace() << "Full scan found " << keys.size() << " results"; | 79 | Trace() << "Full scan found " << keys.size() << " results"; |
73 | return ResultSet(keys); | 80 | return ResultSet(keys); |
74 | } | 81 | } |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 33e5d5c..14450aa 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -169,12 +169,12 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
169 | flatbuffers::FlatBufferBuilder fbb; | 169 | flatbuffers::FlatBufferBuilder fbb; |
170 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 170 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); |
171 | 171 | ||
172 | d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 172 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
173 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 173 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
174 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 174 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
175 | 175 | ||
176 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { | 176 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { |
177 | PipelineState state(this, NewPipeline, key, d->newPipeline[bufferType], newRevision, [&future]() { | 177 | PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future]() { |
178 | future.setFinished(); | 178 | future.setFinished(); |
179 | }, bufferType); | 179 | }, bufferType); |
180 | d->activePipelines << state; | 180 | d->activePipelines << state; |
@@ -224,7 +224,8 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
224 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 224 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
225 | 225 | ||
226 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | 226 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; |
227 | storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 227 | //FIXME: read the revision that this modification is based on, not just the latest one |
228 | storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
228 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 229 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
229 | if (!buffer.isValid()) { | 230 | if (!buffer.isValid()) { |
230 | Warning() << "Read invalid buffer from disk"; | 231 | Warning() << "Read invalid buffer from disk"; |
@@ -272,11 +273,11 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
272 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 273 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
273 | 274 | ||
274 | //TODO don't overwrite the old entry, but instead store a new revision | 275 | //TODO don't overwrite the old entry, but instead store a new revision |
275 | d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 276 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
276 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 277 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
277 | 278 | ||
278 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { | 279 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { |
279 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[bufferType], newRevision, [&future]() { | 280 | PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { |
280 | future.setFinished(); | 281 | future.setFinished(); |
281 | }, bufferType); | 282 | }, bufferType); |
282 | d->activePipelines << state; | 283 | d->activePipelines << state; |
@@ -303,6 +304,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
303 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 304 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
304 | 305 | ||
305 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted | 306 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted |
307 | //TODO remove all revisions? | ||
306 | d->transaction.openDatabase(bufferType + ".main").remove(key); | 308 | d->transaction.openDatabase(bufferType + ".main").remove(key); |
307 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 309 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
308 | Log() << "Pipeline: deleted entity: "<< newRevision; | 310 | Log() << "Pipeline: deleted entity: "<< newRevision; |
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index cfbcdae..8d84cf8 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp | |||
@@ -55,7 +55,7 @@ DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QShared | |||
55 | const auto resourceIdentifier = mResourceInstanceIdentifier; | 55 | const auto resourceIdentifier = mResourceInstanceIdentifier; |
56 | 56 | ||
57 | auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { | 57 | auto eventIndexer = new Akonadi2::SimpleProcessor("eventIndexer", [eventFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { |
58 | Akonadi2::ApplicationDomain::Event event(resourceIdentifier, state.key(), -1, eventFactory->createAdaptor(entity)); | 58 | Akonadi2::ApplicationDomain::Event event(resourceIdentifier, Akonadi2::Storage::uidFromKey(state.key()), -1, eventFactory->createAdaptor(entity)); |
59 | Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Event>::index(event, transaction); | 59 | Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Event>::index(event, transaction); |
60 | index("event.index.rid", event.getProperty("remoteId"), event.identifier(), transaction); | 60 | index("event.index.rid", event.getProperty("remoteId"), event.identifier(), transaction); |
61 | }); | 61 | }); |
@@ -67,7 +67,7 @@ DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QShared | |||
67 | { | 67 | { |
68 | auto mailFactory = QSharedPointer<DummyMailAdaptorFactory>::create(); | 68 | auto mailFactory = QSharedPointer<DummyMailAdaptorFactory>::create(); |
69 | auto mailIndexer = new Akonadi2::SimpleProcessor("mailIndexer", [mailFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { | 69 | auto mailIndexer = new Akonadi2::SimpleProcessor("mailIndexer", [mailFactory, resourceIdentifier](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity, Akonadi2::Storage::Transaction &transaction) { |
70 | Akonadi2::ApplicationDomain::Mail mail(resourceIdentifier, state.key(), -1, mailFactory->createAdaptor(entity)); | 70 | Akonadi2::ApplicationDomain::Mail mail(resourceIdentifier, Akonadi2::Storage::uidFromKey(state.key()), -1, mailFactory->createAdaptor(entity)); |
71 | Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Mail>::index(mail, transaction); | 71 | Akonadi2::ApplicationDomain::TypeImplementation<Akonadi2::ApplicationDomain::Mail>::index(mail, transaction); |
72 | index("mail.index.rid", mail.getProperty("remoteId"), mail.identifier(), transaction); | 72 | index("mail.index.rid", mail.getProperty("remoteId"), mail.identifier(), transaction); |
73 | }); | 73 | }); |