diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-04-11 15:41:20 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-04-11 15:41:20 +0200 |
commit | 5bf7ded65ef517fac6b088342d195392bc09be4c (patch) | |
tree | 793eac32ddc018a3d2bb42cb088f4c919f22e0ed | |
parent | f6c3c144e60611d2da7ba7aa5b115affe92a57a4 (diff) | |
download | sink-5bf7ded65ef517fac6b088342d195392bc09be4c.tar.gz sink-5bf7ded65ef517fac6b088342d195392bc09be4c.zip |
Moved all preprocessing back into the pipeline
-rw-r--r-- | common/pipeline.cpp | 25 | ||||
-rw-r--r-- | common/storage/entitystore.cpp | 66 | ||||
-rw-r--r-- | common/storage/entitystore.h | 12 | ||||
-rw-r--r-- | tests/entitystoretest.cpp | 10 | ||||
-rw-r--r-- | tests/mailquerybenchmark.cpp | 2 |
5 files changed, 60 insertions, 55 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 15ed5fc..91437d4 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -175,13 +175,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
175 | auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; | 175 | auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; |
176 | o.setChangedProperties(o.availableProperties().toSet()); | 176 | o.setChangedProperties(o.availableProperties().toSet()); |
177 | 177 | ||
178 | auto preprocess = [&, this](ApplicationDomain::ApplicationDomainType &newEntity) { | 178 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(o, o.availableProperties()); |
179 | foreach (const auto &processor, d->processors[bufferType]) { | 179 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); |
180 | processor->newEntity(newEntity); | 180 | |
181 | } | 181 | foreach (const auto &processor, d->processors[bufferType]) { |
182 | }; | 182 | processor->newEntity(newEntity); |
183 | } | ||
183 | 184 | ||
184 | if (!d->entityStore.add(bufferType, o, replayToSource, preprocess)) { | 185 | if (!d->entityStore.add(bufferType, o, replayToSource)) { |
185 | return KAsync::error<qint64>(0); | 186 | return KAsync::error<qint64>(0); |
186 | } | 187 | } |
187 | 188 | ||
@@ -323,14 +324,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
323 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 324 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
324 | SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 325 | SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
325 | 326 | ||
326 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { | 327 | const auto current = d->entityStore.readLatest(bufferType, key); |
327 | foreach (const auto &processor, d->processors[bufferType]) { | 328 | |
328 | processor->deletedEntity(oldEntity); | 329 | foreach (const auto &processor, d->processors[bufferType]) { |
329 | } | 330 | processor->deletedEntity(current); |
330 | }; | 331 | } |
331 | 332 | ||
332 | d->revisionChanged = true; | 333 | d->revisionChanged = true; |
333 | if (!d->entityStore.remove(bufferType, key, replayToSource, preprocess)) { | 334 | if (!d->entityStore.remove(bufferType, current, replayToSource)) { |
334 | return KAsync::error<qint64>(0); | 335 | return KAsync::error<qint64>(0); |
335 | } | 336 | } |
336 | 337 | ||
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 3ef8784..b7309ab 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp | |||
@@ -167,19 +167,15 @@ void EntityStore::copyBlobs(ApplicationDomain::ApplicationDomainType &entity, qi | |||
167 | } | 167 | } |
168 | } | 168 | } |
169 | 169 | ||
170 | bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) | 170 | bool EntityStore::add(const QByteArray &type, ApplicationDomain::ApplicationDomainType entity, bool replayToSource) |
171 | { | 171 | { |
172 | if (entity_.identifier().isEmpty()) { | 172 | if (entity.identifier().isEmpty()) { |
173 | SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier"; | 173 | SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier"; |
174 | return false; | 174 | return false; |
175 | } | 175 | } |
176 | 176 | ||
177 | auto entity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(entity_, entity_.availableProperties()); | ||
178 | entity.setChangedProperties(entity.availableProperties().toSet()); | ||
179 | |||
180 | SinkTraceCtx(d->logCtx) << "New entity " << entity; | 177 | SinkTraceCtx(d->logCtx) << "New entity " << entity; |
181 | 178 | ||
182 | preprocess(entity); | ||
183 | d->typeIndex(type).add(entity.identifier(), entity, d->transaction); | 179 | d->typeIndex(type).add(entity.identifier(), entity, d->transaction); |
184 | 180 | ||
185 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 181 | //The maxRevision may have changed meanwhile if the entity created sub-entities |
@@ -285,36 +281,14 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic | |||
285 | return true; | 281 | return true; |
286 | } | 282 | } |
287 | 283 | ||
288 | bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &preprocess) | 284 | bool EntityStore::remove(const QByteArray &type, const Sink::ApplicationDomain::ApplicationDomainType ¤t, bool replayToSource) |
289 | { | 285 | { |
290 | bool found = false; | 286 | const auto uid = current.identifier(); |
291 | bool alreadyRemoved = false; | 287 | if (!exists(type, uid)) { |
292 | DataStore::mainDatabase(d->transaction, type) | ||
293 | .findLatest(uid, | ||
294 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | ||
295 | auto entity = GetEntity(data.data()); | ||
296 | if (entity && entity->metadata()) { | ||
297 | auto metadata = GetMetadata(entity->metadata()->Data()); | ||
298 | found = true; | ||
299 | if (metadata->operation() == Operation_Removal) { | ||
300 | alreadyRemoved = true; | ||
301 | } | ||
302 | } | ||
303 | return false; | ||
304 | }, | ||
305 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); | ||
306 | |||
307 | if (!found) { | ||
308 | SinkWarningCtx(d->logCtx) << "Remove: Failed to find entity " << uid; | ||
309 | return false; | ||
310 | } | ||
311 | if (alreadyRemoved) { | ||
312 | SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid; | 288 | SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid; |
313 | return false; | 289 | return false; |
314 | } | 290 | } |
315 | 291 | ||
316 | const auto current = readLatest(type, uid); | ||
317 | preprocess(current); | ||
318 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); | 292 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); |
319 | 293 | ||
320 | SinkTraceCtx(d->logCtx) << "Removed entity " << current; | 294 | SinkTraceCtx(d->logCtx) << "Removed entity " << current; |
@@ -601,6 +575,36 @@ bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) | |||
601 | return DataStore::mainDatabase(d->getTransaction(), type).contains(uid); | 575 | return DataStore::mainDatabase(d->getTransaction(), type).contains(uid); |
602 | } | 576 | } |
603 | 577 | ||
578 | bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) | ||
579 | { | ||
580 | bool found = false; | ||
581 | bool alreadyRemoved = false; | ||
582 | DataStore::mainDatabase(d->transaction, type) | ||
583 | .findLatest(uid, | ||
584 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | ||
585 | auto entity = GetEntity(data.data()); | ||
586 | if (entity && entity->metadata()) { | ||
587 | auto metadata = GetMetadata(entity->metadata()->Data()); | ||
588 | found = true; | ||
589 | if (metadata->operation() == Operation_Removal) { | ||
590 | alreadyRemoved = true; | ||
591 | } | ||
592 | } | ||
593 | return false; | ||
594 | }, | ||
595 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); | ||
596 | if (!found) { | ||
597 | SinkTraceCtx(d->logCtx) << "Remove: Failed to find entity " << uid; | ||
598 | return false; | ||
599 | } | ||
600 | if (alreadyRemoved) { | ||
601 | SinkTraceCtx(d->logCtx) << "Remove: Entity is already removed " << uid; | ||
602 | return false; | ||
603 | } | ||
604 | return true; | ||
605 | } | ||
606 | |||
607 | |||
604 | qint64 EntityStore::maxRevision() | 608 | qint64 EntityStore::maxRevision() |
605 | { | 609 | { |
606 | if (!d->exists()) { | 610 | if (!d->exists()) { |
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index ddb4ef9..00241f2 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h | |||
@@ -38,15 +38,11 @@ public: | |||
38 | typedef QSharedPointer<EntityStore> Ptr; | 38 | typedef QSharedPointer<EntityStore> Ptr; |
39 | EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &); | 39 | EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &); |
40 | 40 | ||
41 | typedef std::function<void(const ApplicationDomain::ApplicationDomainType &, ApplicationDomain::ApplicationDomainType &)> PreprocessModification; | ||
42 | typedef std::function<void(ApplicationDomain::ApplicationDomainType &)> PreprocessCreation; | ||
43 | typedef std::function<void(const ApplicationDomain::ApplicationDomainType &)> PreprocessRemoval; | ||
44 | |||
45 | //Only the pipeline may call the following functions outside of tests | 41 | //Only the pipeline may call the following functions outside of tests |
46 | bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); | 42 | bool add(const QByteArray &type, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); |
47 | bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource); | 43 | bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource); |
48 | bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); | 44 | bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); |
49 | bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); | 45 | bool remove(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, bool replayToSource); |
50 | bool cleanupRevisions(qint64 revision); | 46 | bool cleanupRevisions(qint64 revision); |
51 | ApplicationDomain::ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const; | 47 | ApplicationDomain::ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const; |
52 | 48 | ||
@@ -107,8 +103,12 @@ public: | |||
107 | 103 | ||
108 | void readRevisions(qint64 baseRevision, const QByteArray &type, const std::function<void(const QByteArray &key)> &callback); | 104 | void readRevisions(qint64 baseRevision, const QByteArray &type, const std::function<void(const QByteArray &key)> &callback); |
109 | 105 | ||
106 | ///Db contains entity (but may already be marked as removed | ||
110 | bool contains(const QByteArray &type, const QByteArray &uid); | 107 | bool contains(const QByteArray &type, const QByteArray &uid); |
111 | 108 | ||
109 | ///Db contains entity and entity is not yet removed | ||
110 | bool exists(const QByteArray &type, const QByteArray &uid); | ||
111 | |||
112 | qint64 maxRevision(); | 112 | qint64 maxRevision(); |
113 | 113 | ||
114 | Sink::Log::Context logContext() const; | 114 | Sink::Log::Context logContext() const; |
diff --git a/tests/entitystoretest.cpp b/tests/entitystoretest.cpp index 925ca7e..90575a5 100644 --- a/tests/entitystoretest.cpp +++ b/tests/entitystoretest.cpp | |||
@@ -49,14 +49,14 @@ private slots: | |||
49 | mail3.setExtractedSubject("foo"); | 49 | mail3.setExtractedSubject("foo"); |
50 | 50 | ||
51 | store.startTransaction(Storage::DataStore::ReadWrite); | 51 | store.startTransaction(Storage::DataStore::ReadWrite); |
52 | store.add("mail", mail, false, [] (const ApplicationDomain::ApplicationDomainType &) {}); | 52 | store.add("mail", mail, false); |
53 | store.add("mail", mail2, false, [] (const ApplicationDomain::ApplicationDomainType &) {}); | 53 | store.add("mail", mail2, false); |
54 | store.add("mail", mail3, false, [] (const ApplicationDomain::ApplicationDomainType &) {}); | 54 | store.add("mail", mail3, false); |
55 | 55 | ||
56 | mail.setExtractedSubject("foo"); | 56 | mail.setExtractedSubject("foo"); |
57 | 57 | ||
58 | store.modify("mail", mail, {}, false, [] (const ApplicationDomain::ApplicationDomainType &, ApplicationDomain::ApplicationDomainType &) {}); | 58 | store.modify("mail", mail, QByteArrayList{}, false); |
59 | store.remove("mail", mail3.identifier(), false, [] (const ApplicationDomain::ApplicationDomainType &) {}); | 59 | store.remove("mail", mail3, false); |
60 | store.commitTransaction(); | 60 | store.commitTransaction(); |
61 | 61 | ||
62 | store.startTransaction(Storage::DataStore::ReadOnly); | 62 | store.startTransaction(Storage::DataStore::ReadOnly); |
diff --git a/tests/mailquerybenchmark.cpp b/tests/mailquerybenchmark.cpp index 3ecfa19..b15c8d6 100644 --- a/tests/mailquerybenchmark.cpp +++ b/tests/mailquerybenchmark.cpp | |||
@@ -72,7 +72,7 @@ class MailQueryBenchmark : public QObject | |||
72 | domainObject.setFolder(QByteArray("folder") + QByteArray::number(i - (i % folderSpreadFactor))); | 72 | domainObject.setFolder(QByteArray("folder") + QByteArray::number(i - (i % folderSpreadFactor))); |
73 | } | 73 | } |
74 | 74 | ||
75 | entityStore.add("mail", domainObject, false, [] (const Mail &) {}); | 75 | entityStore.add("mail", domainObject, false); |
76 | } | 76 | } |
77 | 77 | ||
78 | entityStore.commitTransaction(); | 78 | entityStore.commitTransaction(); |