summaryrefslogtreecommitdiffstats
path: root/common/storage/entitystore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/storage/entitystore.cpp')
-rw-r--r--common/storage/entitystore.cpp127
1 files changed, 64 insertions, 63 deletions
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index 909f1b2..b7309ab 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -31,8 +31,7 @@
31#include "bufferutils.h" 31#include "bufferutils.h"
32#include "entity_generated.h" 32#include "entity_generated.h"
33#include "applicationdomaintype_p.h" 33#include "applicationdomaintype_p.h"
34 34#include "typeimplementations.h"
35#include "domaintypes.h"
36 35
37using namespace Sink; 36using namespace Sink;
38using namespace Sink::Storage; 37using namespace Sink::Storage;
@@ -168,19 +167,15 @@ void EntityStore::copyBlobs(ApplicationDomain::ApplicationDomainType &entity, qi
168 } 167 }
169} 168}
170 169
171bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) 170bool EntityStore::add(const QByteArray &type, ApplicationDomain::ApplicationDomainType entity, bool replayToSource)
172{ 171{
173 if (entity_.identifier().isEmpty()) { 172 if (entity.identifier().isEmpty()) {
174 SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier"; 173 SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier";
175 return false; 174 return false;
176 } 175 }
177 176
178 auto entity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(entity_, entity_.availableProperties());
179 entity.setChangedProperties(entity.availableProperties().toSet());
180
181 SinkTraceCtx(d->logCtx) << "New entity " << entity; 177 SinkTraceCtx(d->logCtx) << "New entity " << entity;
182 178
183 preprocess(entity);
184 d->typeIndex(type).add(entity.identifier(), entity, d->transaction); 179 d->typeIndex(type).add(entity.identifier(), entity, d->transaction);
185 180
186 //The maxRevision may have changed meanwhile if the entity created sub-entities 181 //The maxRevision may have changed meanwhile if the entity created sub-entities
@@ -205,26 +200,20 @@ bool EntityStore::add(const QByteArray &type, const ApplicationDomain::Applicati
205 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); 200 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; });
206 DataStore::setMaxRevision(d->transaction, newRevision); 201 DataStore::setMaxRevision(d->transaction, newRevision);
207 DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); 202 DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type);
203 DataStore::recordUid(d->transaction, entity.identifier());
208 SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision; 204 SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision;
209 return true; 205 return true;
210} 206}
211 207
212bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &preprocess) 208ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const
213{ 209{
214 auto changeset = diff.changedProperties();
215 const auto current = readLatest(type, diff.identifier());
216 if (current.identifier().isEmpty()) {
217 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
218 return false;
219 }
220
221 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); 210 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties());
222 211
223 SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; 212 SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity;
224 213
225 // Apply diff 214 // Apply diff
226 //SinkTrace() << "Applying changed properties: " << changeset; 215 //SinkTrace() << "Applying changed properties: " << changeset;
227 for (const auto &property : changeset) { 216 for (const auto &property : diff.changedProperties()) {
228 const auto value = diff.getProperty(property); 217 const auto value = diff.getProperty(property);
229 if (value.isValid()) { 218 if (value.isValid()) {
230 //SinkTrace() << "Setting property: " << property; 219 //SinkTrace() << "Setting property: " << property;
@@ -237,8 +226,25 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
237 //SinkTrace() << "Removing property: " << property; 226 //SinkTrace() << "Removing property: " << property;
238 newEntity.setProperty(property, QVariant()); 227 newEntity.setProperty(property, QVariant());
239 } 228 }
229 return newEntity;
230}
231
232bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource)
233{
234 const auto current = readLatest(type, diff.identifier());
235 if (current.identifier().isEmpty()) {
236 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
237 return false;
238 }
239
240 auto newEntity = applyDiff(type, current, diff, deletions);
241 return modify(type, current, newEntity, replayToSource);
242}
243
244bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource)
245{
246 SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity;
240 247
241 preprocess(current, newEntity);
242 d->typeIndex(type).remove(current.identifier(), current, d->transaction); 248 d->typeIndex(type).remove(current.identifier(), current, d->transaction);
243 d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction); 249 d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction);
244 250
@@ -250,7 +256,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
250 flatbuffers::FlatBufferBuilder metadataFbb; 256 flatbuffers::FlatBufferBuilder metadataFbb;
251 { 257 {
252 //We add availableProperties to account for the properties that have been changed by the preprocessors 258 //We add availableProperties to account for the properties that have been changed by the preprocessors
253 auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newEntity.changedProperties()); 259 auto modifiedProperties = BufferUtils::toVector(metadataFbb, newEntity.changedProperties());
254 auto metadataBuilder = MetadataBuilder(metadataFbb); 260 auto metadataBuilder = MetadataBuilder(metadataFbb);
255 metadataBuilder.add_revision(newRevision); 261 metadataBuilder.add_revision(newRevision);
256 metadataBuilder.add_operation(Operation_Modification); 262 metadataBuilder.add_operation(Operation_Modification);
@@ -259,7 +265,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
259 auto metadataBuffer = metadataBuilder.Finish(); 265 auto metadataBuffer = metadataBuilder.Finish();
260 FinishMetadataBuffer(metadataFbb, metadataBuffer); 266 FinishMetadataBuffer(metadataFbb, metadataBuffer);
261 } 267 }
262 SinkTraceCtx(d->logCtx) << "Changed properties: " << changeset + newEntity.changedProperties(); 268 SinkTraceCtx(d->logCtx) << "Changed properties: " << newEntity.changedProperties();
263 269
264 newEntity.setChangedProperties(newEntity.availableProperties().toSet()); 270 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
265 271
@@ -275,36 +281,14 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
275 return true; 281 return true;
276} 282}
277 283
278bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &preprocess) 284bool EntityStore::remove(const QByteArray &type, const Sink::ApplicationDomain::ApplicationDomainType &current, bool replayToSource)
279{ 285{
280 bool found = false; 286 const auto uid = current.identifier();
281 bool alreadyRemoved = false; 287 if (!exists(type, uid)) {
282 DataStore::mainDatabase(d->transaction, type)
283 .findLatest(uid,
284 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
285 auto entity = GetEntity(data.data());
286 if (entity && entity->metadata()) {
287 auto metadata = GetMetadata(entity->metadata()->Data());
288 found = true;
289 if (metadata->operation() == Operation_Removal) {
290 alreadyRemoved = true;
291 }
292 }
293 return false;
294 },
295 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; });
296
297 if (!found) {
298 SinkWarningCtx(d->logCtx) << "Remove: Failed to find entity " << uid;
299 return false;
300 }
301 if (alreadyRemoved) {
302 SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid; 288 SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid;
303 return false; 289 return false;
304 } 290 }
305 291
306 const auto current = readLatest(type, uid);
307 preprocess(current);
308 d->typeIndex(type).remove(current.identifier(), current, d->transaction); 292 d->typeIndex(type).remove(current.identifier(), current, d->transaction);
309 293
310 SinkTraceCtx(d->logCtx) << "Removed entity " << current; 294 SinkTraceCtx(d->logCtx) << "Removed entity " << current;
@@ -328,6 +312,7 @@ bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool rep
328 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); 312 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; });
329 DataStore::setMaxRevision(d->transaction, newRevision); 313 DataStore::setMaxRevision(d->transaction, newRevision);
330 DataStore::recordRevision(d->transaction, newRevision, uid, type); 314 DataStore::recordRevision(d->transaction, newRevision, uid, type);
315 DataStore::removeUid(d->transaction, uid);
331 return true; 316 return true;
332} 317}
333 318
@@ -521,15 +506,9 @@ ApplicationDomain::ApplicationDomainType EntityStore::readEntity(const QByteArra
521 506
522void EntityStore::readAll(const QByteArray &type, const std::function<void(const ApplicationDomain::ApplicationDomainType &entity)> &callback) 507void EntityStore::readAll(const QByteArray &type, const std::function<void(const ApplicationDomain::ApplicationDomainType &entity)> &callback)
523{ 508{
524 auto db = DataStore::mainDatabase(d->getTransaction(), type); 509 readAllUids(type, [&] (const QByteArray &uid) {
525 db.scan("", 510 readLatest(type, uid, callback);
526 [=](const QByteArray &key, const QByteArray &value) -> bool { 511 });
527 auto uid = DataStore::uidFromKey(key);
528 auto buffer = Sink::EntityBuffer{value.data(), value.size()};
529 callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer));
530 return true;
531 },
532 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during query: " << error.message; });
533} 512}
534 513
535void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function<void(const QByteArray &key)> &callback) 514void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function<void(const QByteArray &key)> &callback)
@@ -588,15 +567,7 @@ ApplicationDomain::ApplicationDomainType EntityStore::readPrevious(const QByteAr
588 567
589void EntityStore::readAllUids(const QByteArray &type, const std::function<void(const QByteArray &uid)> callback) 568void EntityStore::readAllUids(const QByteArray &type, const std::function<void(const QByteArray &uid)> callback)
590{ 569{
591 //TODO use uid index instead 570 DataStore::getUids(d->getTransaction(), callback);
592 //FIXME we currently report each uid for every revision with the same uid
593 auto db = DataStore::mainDatabase(d->getTransaction(), type);
594 db.scan("",
595 [callback](const QByteArray &key, const QByteArray &) -> bool {
596 callback(Sink::Storage::DataStore::uidFromKey(key));
597 return true;
598 },
599 [&](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; });
600} 571}
601 572
602bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) 573bool EntityStore::contains(const QByteArray &type, const QByteArray &uid)
@@ -604,6 +575,36 @@ bool EntityStore::contains(const QByteArray &type, const QByteArray &uid)
604 return DataStore::mainDatabase(d->getTransaction(), type).contains(uid); 575 return DataStore::mainDatabase(d->getTransaction(), type).contains(uid);
605} 576}
606 577
578bool EntityStore::exists(const QByteArray &type, const QByteArray &uid)
579{
580 bool found = false;
581 bool alreadyRemoved = false;
582 DataStore::mainDatabase(d->transaction, type)
583 .findLatest(uid,
584 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
585 auto entity = GetEntity(data.data());
586 if (entity && entity->metadata()) {
587 auto metadata = GetMetadata(entity->metadata()->Data());
588 found = true;
589 if (metadata->operation() == Operation_Removal) {
590 alreadyRemoved = true;
591 }
592 }
593 return false;
594 },
595 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; });
596 if (!found) {
597 SinkTraceCtx(d->logCtx) << "Remove: Failed to find entity " << uid;
598 return false;
599 }
600 if (alreadyRemoved) {
601 SinkTraceCtx(d->logCtx) << "Remove: Entity is already removed " << uid;
602 return false;
603 }
604 return true;
605}
606
607
607qint64 EntityStore::maxRevision() 608qint64 EntityStore::maxRevision()
608{ 609{
609 if (!d->exists()) { 610 if (!d->exists()) {