summaryrefslogtreecommitdiffstats
path: root/common/storage
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-04-11 15:41:20 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-04-11 15:41:20 +0200
commit5bf7ded65ef517fac6b088342d195392bc09be4c (patch)
tree793eac32ddc018a3d2bb42cb088f4c919f22e0ed /common/storage
parentf6c3c144e60611d2da7ba7aa5b115affe92a57a4 (diff)
downloadsink-5bf7ded65ef517fac6b088342d195392bc09be4c.tar.gz
sink-5bf7ded65ef517fac6b088342d195392bc09be4c.zip
Moved all preprocessing back into the pipeline
Diffstat (limited to 'common/storage')
-rw-r--r--common/storage/entitystore.cpp66
-rw-r--r--common/storage/entitystore.h12
2 files changed, 41 insertions, 37 deletions
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index 3ef8784..b7309ab 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -167,19 +167,15 @@ void EntityStore::copyBlobs(ApplicationDomain::ApplicationDomainType &entity, qi
167 } 167 }
168} 168}
169 169
170bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) 170bool EntityStore::add(const QByteArray &type, ApplicationDomain::ApplicationDomainType entity, bool replayToSource)
171{ 171{
172 if (entity_.identifier().isEmpty()) { 172 if (entity.identifier().isEmpty()) {
173 SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier"; 173 SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier";
174 return false; 174 return false;
175 } 175 }
176 176
177 auto entity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(entity_, entity_.availableProperties());
178 entity.setChangedProperties(entity.availableProperties().toSet());
179
180 SinkTraceCtx(d->logCtx) << "New entity " << entity; 177 SinkTraceCtx(d->logCtx) << "New entity " << entity;
181 178
182 preprocess(entity);
183 d->typeIndex(type).add(entity.identifier(), entity, d->transaction); 179 d->typeIndex(type).add(entity.identifier(), entity, d->transaction);
184 180
185 //The maxRevision may have changed meanwhile if the entity created sub-entities 181 //The maxRevision may have changed meanwhile if the entity created sub-entities
@@ -285,36 +281,14 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
285 return true; 281 return true;
286} 282}
287 283
288bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &preprocess) 284bool EntityStore::remove(const QByteArray &type, const Sink::ApplicationDomain::ApplicationDomainType &current, bool replayToSource)
289{ 285{
290 bool found = false; 286 const auto uid = current.identifier();
291 bool alreadyRemoved = false; 287 if (!exists(type, uid)) {
292 DataStore::mainDatabase(d->transaction, type)
293 .findLatest(uid,
294 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
295 auto entity = GetEntity(data.data());
296 if (entity && entity->metadata()) {
297 auto metadata = GetMetadata(entity->metadata()->Data());
298 found = true;
299 if (metadata->operation() == Operation_Removal) {
300 alreadyRemoved = true;
301 }
302 }
303 return false;
304 },
305 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; });
306
307 if (!found) {
308 SinkWarningCtx(d->logCtx) << "Remove: Failed to find entity " << uid;
309 return false;
310 }
311 if (alreadyRemoved) {
312 SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid; 288 SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid;
313 return false; 289 return false;
314 } 290 }
315 291
316 const auto current = readLatest(type, uid);
317 preprocess(current);
318 d->typeIndex(type).remove(current.identifier(), current, d->transaction); 292 d->typeIndex(type).remove(current.identifier(), current, d->transaction);
319 293
320 SinkTraceCtx(d->logCtx) << "Removed entity " << current; 294 SinkTraceCtx(d->logCtx) << "Removed entity " << current;
@@ -601,6 +575,36 @@ bool EntityStore::contains(const QByteArray &type, const QByteArray &uid)
601 return DataStore::mainDatabase(d->getTransaction(), type).contains(uid); 575 return DataStore::mainDatabase(d->getTransaction(), type).contains(uid);
602} 576}
603 577
578bool EntityStore::exists(const QByteArray &type, const QByteArray &uid)
579{
580 bool found = false;
581 bool alreadyRemoved = false;
582 DataStore::mainDatabase(d->transaction, type)
583 .findLatest(uid,
584 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
585 auto entity = GetEntity(data.data());
586 if (entity && entity->metadata()) {
587 auto metadata = GetMetadata(entity->metadata()->Data());
588 found = true;
589 if (metadata->operation() == Operation_Removal) {
590 alreadyRemoved = true;
591 }
592 }
593 return false;
594 },
595 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; });
596 if (!found) {
597 SinkTraceCtx(d->logCtx) << "Remove: Failed to find entity " << uid;
598 return false;
599 }
600 if (alreadyRemoved) {
601 SinkTraceCtx(d->logCtx) << "Remove: Entity is already removed " << uid;
602 return false;
603 }
604 return true;
605}
606
607
604qint64 EntityStore::maxRevision() 608qint64 EntityStore::maxRevision()
605{ 609{
606 if (!d->exists()) { 610 if (!d->exists()) {
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h
index ddb4ef9..00241f2 100644
--- a/common/storage/entitystore.h
+++ b/common/storage/entitystore.h
@@ -38,15 +38,11 @@ public:
38 typedef QSharedPointer<EntityStore> Ptr; 38 typedef QSharedPointer<EntityStore> Ptr;
39 EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &); 39 EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &);
40 40
41 typedef std::function<void(const ApplicationDomain::ApplicationDomainType &, ApplicationDomain::ApplicationDomainType &)> PreprocessModification;
42 typedef std::function<void(ApplicationDomain::ApplicationDomainType &)> PreprocessCreation;
43 typedef std::function<void(const ApplicationDomain::ApplicationDomainType &)> PreprocessRemoval;
44
45 //Only the pipeline may call the following functions outside of tests 41 //Only the pipeline may call the following functions outside of tests
46 bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); 42 bool add(const QByteArray &type, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource);
47 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource); 43 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource);
48 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); 44 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource);
49 bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); 45 bool remove(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, bool replayToSource);
50 bool cleanupRevisions(qint64 revision); 46 bool cleanupRevisions(qint64 revision);
51 ApplicationDomain::ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const; 47 ApplicationDomain::ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const;
52 48
@@ -107,8 +103,12 @@ public:
107 103
108 void readRevisions(qint64 baseRevision, const QByteArray &type, const std::function<void(const QByteArray &key)> &callback); 104 void readRevisions(qint64 baseRevision, const QByteArray &type, const std::function<void(const QByteArray &key)> &callback);
109 105
106 ///Db contains entity (but may already be marked as removed
110 bool contains(const QByteArray &type, const QByteArray &uid); 107 bool contains(const QByteArray &type, const QByteArray &uid);
111 108
109 ///Db contains entity and entity is not yet removed
110 bool exists(const QByteArray &type, const QByteArray &uid);
111
112 qint64 maxRevision(); 112 qint64 maxRevision();
113 113
114 Sink::Log::Context logContext() const; 114 Sink::Log::Context logContext() const;