summaryrefslogtreecommitdiffstats
path: root/common/storage/entitystore.cpp
diff options
context:
space:
mode:
authorMinijackson <minijackson@riseup.net>2018-08-21 12:03:40 +0200
committerMinijackson <minijackson@riseup.net>2018-08-21 12:03:40 +0200
commit6ef0a29d8e468de50c9dcf260db45957d028a083 (patch)
tree3c4f90c992595cb440e7a8a007d2f643c595777a /common/storage/entitystore.cpp
parent91a86f8664f4c8ddec6546bd1faeb793b8cd70e3 (diff)
downloadsink-6ef0a29d8e468de50c9dcf260db45957d028a083.tar.gz
sink-6ef0a29d8e468de50c9dcf260db45957d028a083.zip
Separate UIDs and revisions
Diffstat (limited to 'common/storage/entitystore.cpp')
-rw-r--r--common/storage/entitystore.cpp106
1 files changed, 62 insertions, 44 deletions
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index bb0967a..6231479 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -38,8 +38,9 @@ using namespace Sink::Storage;
38 38
39static QMap<QByteArray, int> baseDbs() 39static QMap<QByteArray, int> baseDbs()
40{ 40{
41 return {{"revisionType", 0}, 41 return {{"revisionType", Storage::IntegerKeys},
42 {"revisions", 0}, 42 {"revisions", Storage::IntegerKeys},
43 {"uidsToRevisions", Storage::AllowDuplicates | Storage::IntegerValues},
43 {"uids", 0}, 44 {"uids", 0},
44 {"default", 0}, 45 {"default", 0},
45 {"__flagtable", 0}}; 46 {"__flagtable", 0}};
@@ -242,12 +243,13 @@ bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool
242 const auto key = Key(identifier, newRevision); 243 const auto key = Key(identifier, newRevision);
243 244
244 DataStore::mainDatabase(d->transaction, type) 245 DataStore::mainDatabase(d->transaction, type)
245 .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), 246 .write(newRevision, BufferUtils::extractBuffer(fbb),
246 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); 247 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; });
248
247 DataStore::setMaxRevision(d->transaction, newRevision); 249 DataStore::setMaxRevision(d->transaction, newRevision);
248 DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); 250 DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type);
249 DataStore::recordUid(d->transaction, entity.identifier(), type); 251 DataStore::recordUid(d->transaction, entity.identifier(), type);
250 SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision; 252 SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision << "key:" << key.toInternalByteArray();
251 return true; 253 return true;
252} 254}
253 255
@@ -319,8 +321,9 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType &cu
319 const auto key = Key(identifier, newRevision); 321 const auto key = Key(identifier, newRevision);
320 322
321 DataStore::mainDatabase(d->transaction, type) 323 DataStore::mainDatabase(d->transaction, type)
322 .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), 324 .write(newRevision, BufferUtils::extractBuffer(fbb),
323 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; }); 325 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; });
326
324 DataStore::setMaxRevision(d->transaction, newRevision); 327 DataStore::setMaxRevision(d->transaction, newRevision);
325 DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); 328 DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type);
326 SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; 329 SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision;
@@ -356,8 +359,9 @@ bool EntityStore::remove(const QByteArray &type, const ApplicationDomainType &cu
356 const auto key = Key(identifier, newRevision); 359 const auto key = Key(identifier, newRevision);
357 360
358 DataStore::mainDatabase(d->transaction, type) 361 DataStore::mainDatabase(d->transaction, type)
359 .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), 362 .write(newRevision, BufferUtils::extractBuffer(fbb),
360 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); 363 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; });
364
361 DataStore::setMaxRevision(d->transaction, newRevision); 365 DataStore::setMaxRevision(d->transaction, newRevision);
362 DataStore::recordRevision(d->transaction, newRevision, uid, type); 366 DataStore::recordRevision(d->transaction, newRevision, uid, type);
363 DataStore::removeUid(d->transaction, uid, type); 367 DataStore::removeUid(d->transaction, uid, type);
@@ -375,30 +379,33 @@ void EntityStore::cleanupEntityRevisionsUntil(qint64 revision)
375 } 379 }
376 SinkTraceCtx(d->logCtx) << "Cleaning up revision " << revision << uid << bufferType; 380 SinkTraceCtx(d->logCtx) << "Cleaning up revision " << revision << uid << bufferType;
377 const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); 381 const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray();
378 DataStore::mainDatabase(d->transaction, bufferType)
379 .scan(internalUid,
380 [&](const QByteArray &key, const QByteArray &data) -> bool {
381 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
382 if (!buffer.isValid()) {
383 SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk";
384 } else {
385 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
386 const qint64 rev = metadata->revision();
387 const auto isRemoval = metadata->operation() == Operation_Removal;
388 // Remove old revisions, and the current if the entity has already been removed
389 if (rev < revision || isRemoval) {
390 DataStore::removeRevision(d->transaction, rev);
391 DataStore::mainDatabase(d->transaction, bufferType).remove(key);
392 }
393 //Don't cleanup more than specified
394 if (rev >= revision) {
395 return false;
396 }
397 }
398 382
399 return true; 383 // Remove old revisions
400 }, 384 const auto revisionsToRemove = DataStore::getRevisionsUntilFromUid(d->transaction, uid, revision);
401 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true); 385
386 for (const auto &revisionToRemove : revisionsToRemove) {
387 DataStore::removeRevision(d->transaction, revisionToRemove);
388 DataStore::mainDatabase(d->transaction, bufferType).remove(revisionToRemove);
389 }
390
391 // And remove the specified revision only if marked for removal
392 DataStore::mainDatabase(d->transaction, bufferType).scan(revision, [&](size_t, const QByteArray &data) {
393 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
394 if (!buffer.isValid()) {
395 SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk";
396 return false;
397 }
398
399 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
400 const qint64 rev = metadata->revision();
401 if (metadata->operation() == Operation_Removal) {
402 DataStore::removeRevision(d->transaction, revision);
403 DataStore::mainDatabase(d->transaction, bufferType).remove(revision);
404 }
405
406 return false;
407 });
408
402 DataStore::setCleanedUpRevision(d->transaction, revision); 409 DataStore::setCleanedUpRevision(d->transaction, revision);
403} 410}
404 411
@@ -437,13 +444,23 @@ QVector<Identifier> EntityStore::fullScan(const QByteArray &type)
437 QSet<Identifier> keys; 444 QSet<Identifier> keys;
438 DataStore::mainDatabase(d->getTransaction(), type) 445 DataStore::mainDatabase(d->getTransaction(), type)
439 .scan(QByteArray(), 446 .scan(QByteArray(),
440 [&](const QByteArray &key, const QByteArray &value) -> bool { 447 [&](const QByteArray &key, const QByteArray &data) -> bool {
441 const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier(); 448 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
442 if (keys.contains(uid)) { 449 if (!buffer.isValid()) {
450 SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk";
451 return true;
452 }
453
454 size_t revision = *reinterpret_cast<const size_t*>(key.constData());
455
456 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
457 const QByteArray uid = DataStore::getUidFromRevision(d->getTransaction(), revision);
458 const auto identifier = Sink::Storage::Identifier::fromDisplayByteArray(uid);
459 if (keys.contains(identifier)) {
443 //Not something that should persist if the replay works, so we keep a message for now. 460 //Not something that should persist if the replay works, so we keep a message for now.
444 SinkTraceCtx(d->logCtx) << "Multiple revisions for key: " << key; 461 SinkTraceCtx(d->logCtx) << "Multiple revisions for key: " << key;
445 } 462 }
446 keys << uid; 463 keys << identifier;
447 return true; 464 return true;
448 }, 465 },
449 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during fullScan query: " << error.message; }); 466 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during fullScan query: " << error.message; });
@@ -492,12 +509,12 @@ void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property
492void EntityStore::readLatest(const QByteArray &type, const Identifier &id, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) 509void EntityStore::readLatest(const QByteArray &type, const Identifier &id, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback)
493{ 510{
494 Q_ASSERT(d); 511 Q_ASSERT(d);
495 const auto internalKey = id.toInternalByteArray(); 512 const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), id.toDisplayByteArray());
496 auto db = DataStore::mainDatabase(d->getTransaction(), type); 513 auto db = DataStore::mainDatabase(d->getTransaction(), type);
497 db.findLatest(internalKey, 514 db.scan(revision,
498 [=](const QByteArray &key, const QByteArray &value) { 515 [=](size_t, const QByteArray &value) {
499 const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier().toDisplayByteArray(); 516 callback(id.toDisplayByteArray(), Sink::EntityBuffer(value.data(), value.size()));
500 callback(uid, Sink::EntityBuffer(value.data(), value.size())); 517 return false;
501 }, 518 },
502 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << id; }); 519 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << id; });
503} 520}
@@ -546,9 +563,9 @@ void EntityStore::readEntity(const QByteArray &type, const QByteArray &displayKe
546{ 563{
547 const auto key = Key::fromDisplayByteArray(displayKey); 564 const auto key = Key::fromDisplayByteArray(displayKey);
548 auto db = DataStore::mainDatabase(d->getTransaction(), type); 565 auto db = DataStore::mainDatabase(d->getTransaction(), type);
549 db.scan(key.toInternalByteArray(), 566 db.scan(key.revision().toSizeT(),
550 [=](const QByteArray &key, const QByteArray &value) -> bool { 567 [=](size_t rev, const QByteArray &value) -> bool {
551 const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier().toDisplayByteArray(); 568 const auto uid = DataStore::getUidFromRevision(d->transaction, rev);
552 callback(uid, Sink::EntityBuffer(value.data(), value.size())); 569 callback(uid, Sink::EntityBuffer(value.data(), value.size()));
553 return false; 570 return false;
554 }, 571 },
@@ -652,10 +669,10 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid)
652{ 669{
653 bool found = false; 670 bool found = false;
654 bool alreadyRemoved = false; 671 bool alreadyRemoved = false;
655 const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); 672 const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), uid);
656 DataStore::mainDatabase(d->transaction, type) 673 DataStore::mainDatabase(d->transaction, type)
657 .findLatest(internalUid, 674 .scan(revision,
658 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) { 675 [&found, &alreadyRemoved](size_t, const QByteArray &data) {
659 auto entity = GetEntity(data.data()); 676 auto entity = GetEntity(data.data());
660 if (entity && entity->metadata()) { 677 if (entity && entity->metadata()) {
661 auto metadata = GetMetadata(entity->metadata()->Data()); 678 auto metadata = GetMetadata(entity->metadata()->Data());
@@ -664,6 +681,7 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid)
664 alreadyRemoved = true; 681 alreadyRemoved = true;
665 } 682 }
666 } 683 }
684 return true;
667 }, 685 },
668 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); 686 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; });
669 if (!found) { 687 if (!found) {