summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/pipeline.cpp25
-rw-r--r--common/storage/entitystore.cpp66
-rw-r--r--common/storage/entitystore.h12
-rw-r--r--tests/entitystoretest.cpp10
-rw-r--r--tests/mailquerybenchmark.cpp2
5 files changed, 60 insertions, 55 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 15ed5fc..91437d4 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -175,13 +175,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
175 auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; 175 auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor};
176 o.setChangedProperties(o.availableProperties().toSet()); 176 o.setChangedProperties(o.availableProperties().toSet());
177 177
178 auto preprocess = [&, this](ApplicationDomain::ApplicationDomainType &newEntity) { 178 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(o, o.availableProperties());
179 foreach (const auto &processor, d->processors[bufferType]) { 179 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
180 processor->newEntity(newEntity); 180
181 } 181 foreach (const auto &processor, d->processors[bufferType]) {
182 }; 182 processor->newEntity(newEntity);
183 }
183 184
184 if (!d->entityStore.add(bufferType, o, replayToSource, preprocess)) { 185 if (!d->entityStore.add(bufferType, o, replayToSource)) {
185 return KAsync::error<qint64>(0); 186 return KAsync::error<qint64>(0);
186 } 187 }
187 188
@@ -323,14 +324,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
323 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 324 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
324 SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 325 SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
325 326
326 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { 327 const auto current = d->entityStore.readLatest(bufferType, key);
327 foreach (const auto &processor, d->processors[bufferType]) { 328
328 processor->deletedEntity(oldEntity); 329 foreach (const auto &processor, d->processors[bufferType]) {
329 } 330 processor->deletedEntity(current);
330 }; 331 }
331 332
332 d->revisionChanged = true; 333 d->revisionChanged = true;
333 if (!d->entityStore.remove(bufferType, key, replayToSource, preprocess)) { 334 if (!d->entityStore.remove(bufferType, current, replayToSource)) {
334 return KAsync::error<qint64>(0); 335 return KAsync::error<qint64>(0);
335 } 336 }
336 337
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index 3ef8784..b7309ab 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -167,19 +167,15 @@ void EntityStore::copyBlobs(ApplicationDomain::ApplicationDomainType &entity, qi
167 } 167 }
168} 168}
169 169
170bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) 170bool EntityStore::add(const QByteArray &type, ApplicationDomain::ApplicationDomainType entity, bool replayToSource)
171{ 171{
172 if (entity_.identifier().isEmpty()) { 172 if (entity.identifier().isEmpty()) {
173 SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier"; 173 SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier";
174 return false; 174 return false;
175 } 175 }
176 176
177 auto entity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(entity_, entity_.availableProperties());
178 entity.setChangedProperties(entity.availableProperties().toSet());
179
180 SinkTraceCtx(d->logCtx) << "New entity " << entity; 177 SinkTraceCtx(d->logCtx) << "New entity " << entity;
181 178
182 preprocess(entity);
183 d->typeIndex(type).add(entity.identifier(), entity, d->transaction); 179 d->typeIndex(type).add(entity.identifier(), entity, d->transaction);
184 180
185 //The maxRevision may have changed meanwhile if the entity created sub-entities 181 //The maxRevision may have changed meanwhile if the entity created sub-entities
@@ -285,36 +281,14 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
285 return true; 281 return true;
286} 282}
287 283
288bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &preprocess) 284bool EntityStore::remove(const QByteArray &type, const Sink::ApplicationDomain::ApplicationDomainType &current, bool replayToSource)
289{ 285{
290 bool found = false; 286 const auto uid = current.identifier();
291 bool alreadyRemoved = false; 287 if (!exists(type, uid)) {
292 DataStore::mainDatabase(d->transaction, type)
293 .findLatest(uid,
294 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
295 auto entity = GetEntity(data.data());
296 if (entity && entity->metadata()) {
297 auto metadata = GetMetadata(entity->metadata()->Data());
298 found = true;
299 if (metadata->operation() == Operation_Removal) {
300 alreadyRemoved = true;
301 }
302 }
303 return false;
304 },
305 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; });
306
307 if (!found) {
308 SinkWarningCtx(d->logCtx) << "Remove: Failed to find entity " << uid;
309 return false;
310 }
311 if (alreadyRemoved) {
312 SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid; 288 SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid;
313 return false; 289 return false;
314 } 290 }
315 291
316 const auto current = readLatest(type, uid);
317 preprocess(current);
318 d->typeIndex(type).remove(current.identifier(), current, d->transaction); 292 d->typeIndex(type).remove(current.identifier(), current, d->transaction);
319 293
320 SinkTraceCtx(d->logCtx) << "Removed entity " << current; 294 SinkTraceCtx(d->logCtx) << "Removed entity " << current;
@@ -601,6 +575,36 @@ bool EntityStore::contains(const QByteArray &type, const QByteArray &uid)
601 return DataStore::mainDatabase(d->getTransaction(), type).contains(uid); 575 return DataStore::mainDatabase(d->getTransaction(), type).contains(uid);
602} 576}
603 577
578bool EntityStore::exists(const QByteArray &type, const QByteArray &uid)
579{
580 bool found = false;
581 bool alreadyRemoved = false;
582 DataStore::mainDatabase(d->transaction, type)
583 .findLatest(uid,
584 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
585 auto entity = GetEntity(data.data());
586 if (entity && entity->metadata()) {
587 auto metadata = GetMetadata(entity->metadata()->Data());
588 found = true;
589 if (metadata->operation() == Operation_Removal) {
590 alreadyRemoved = true;
591 }
592 }
593 return false;
594 },
595 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; });
596 if (!found) {
597 SinkTraceCtx(d->logCtx) << "Remove: Failed to find entity " << uid;
598 return false;
599 }
600 if (alreadyRemoved) {
601 SinkTraceCtx(d->logCtx) << "Remove: Entity is already removed " << uid;
602 return false;
603 }
604 return true;
605}
606
607
604qint64 EntityStore::maxRevision() 608qint64 EntityStore::maxRevision()
605{ 609{
606 if (!d->exists()) { 610 if (!d->exists()) {
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h
index ddb4ef9..00241f2 100644
--- a/common/storage/entitystore.h
+++ b/common/storage/entitystore.h
@@ -38,15 +38,11 @@ public:
38 typedef QSharedPointer<EntityStore> Ptr; 38 typedef QSharedPointer<EntityStore> Ptr;
39 EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &); 39 EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &);
40 40
41 typedef std::function<void(const ApplicationDomain::ApplicationDomainType &, ApplicationDomain::ApplicationDomainType &)> PreprocessModification;
42 typedef std::function<void(ApplicationDomain::ApplicationDomainType &)> PreprocessCreation;
43 typedef std::function<void(const ApplicationDomain::ApplicationDomainType &)> PreprocessRemoval;
44
45 //Only the pipeline may call the following functions outside of tests 41 //Only the pipeline may call the following functions outside of tests
46 bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); 42 bool add(const QByteArray &type, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource);
47 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource); 43 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource);
48 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); 44 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource);
49 bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); 45 bool remove(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, bool replayToSource);
50 bool cleanupRevisions(qint64 revision); 46 bool cleanupRevisions(qint64 revision);
51 ApplicationDomain::ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const; 47 ApplicationDomain::ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const;
52 48
@@ -107,8 +103,12 @@ public:
107 103
108 void readRevisions(qint64 baseRevision, const QByteArray &type, const std::function<void(const QByteArray &key)> &callback); 104 void readRevisions(qint64 baseRevision, const QByteArray &type, const std::function<void(const QByteArray &key)> &callback);
109 105
106 ///Db contains entity (but may already be marked as removed
110 bool contains(const QByteArray &type, const QByteArray &uid); 107 bool contains(const QByteArray &type, const QByteArray &uid);
111 108
109 ///Db contains entity and entity is not yet removed
110 bool exists(const QByteArray &type, const QByteArray &uid);
111
112 qint64 maxRevision(); 112 qint64 maxRevision();
113 113
114 Sink::Log::Context logContext() const; 114 Sink::Log::Context logContext() const;
diff --git a/tests/entitystoretest.cpp b/tests/entitystoretest.cpp
index 925ca7e..90575a5 100644
--- a/tests/entitystoretest.cpp
+++ b/tests/entitystoretest.cpp
@@ -49,14 +49,14 @@ private slots:
49 mail3.setExtractedSubject("foo"); 49 mail3.setExtractedSubject("foo");
50 50
51 store.startTransaction(Storage::DataStore::ReadWrite); 51 store.startTransaction(Storage::DataStore::ReadWrite);
52 store.add("mail", mail, false, [] (const ApplicationDomain::ApplicationDomainType &) {}); 52 store.add("mail", mail, false);
53 store.add("mail", mail2, false, [] (const ApplicationDomain::ApplicationDomainType &) {}); 53 store.add("mail", mail2, false);
54 store.add("mail", mail3, false, [] (const ApplicationDomain::ApplicationDomainType &) {}); 54 store.add("mail", mail3, false);
55 55
56 mail.setExtractedSubject("foo"); 56 mail.setExtractedSubject("foo");
57 57
58 store.modify("mail", mail, {}, false, [] (const ApplicationDomain::ApplicationDomainType &, ApplicationDomain::ApplicationDomainType &) {}); 58 store.modify("mail", mail, QByteArrayList{}, false);
59 store.remove("mail", mail3.identifier(), false, [] (const ApplicationDomain::ApplicationDomainType &) {}); 59 store.remove("mail", mail3, false);
60 store.commitTransaction(); 60 store.commitTransaction();
61 61
62 store.startTransaction(Storage::DataStore::ReadOnly); 62 store.startTransaction(Storage::DataStore::ReadOnly);
diff --git a/tests/mailquerybenchmark.cpp b/tests/mailquerybenchmark.cpp
index 3ecfa19..b15c8d6 100644
--- a/tests/mailquerybenchmark.cpp
+++ b/tests/mailquerybenchmark.cpp
@@ -72,7 +72,7 @@ class MailQueryBenchmark : public QObject
72 domainObject.setFolder(QByteArray("folder") + QByteArray::number(i - (i % folderSpreadFactor))); 72 domainObject.setFolder(QByteArray("folder") + QByteArray::number(i - (i % folderSpreadFactor)));
73 } 73 }
74 74
75 entityStore.add("mail", domainObject, false, [] (const Mail &) {}); 75 entityStore.add("mail", domainObject, false);
76 } 76 }
77 77
78 entityStore.commitTransaction(); 78 entityStore.commitTransaction();