diff options
author | Minijackson <minijackson@riseup.net> | 2018-06-26 11:44:11 +0200 |
---|---|---|
committer | Minijackson <minijackson@riseup.net> | 2018-07-04 15:37:14 +0200 |
commit | 922e0979e2c27ff8dbc765ae151d17c7815b98a0 (patch) | |
tree | cb031c5d3ccc31ea576f66b4f718c17f5bb0775c /common/storage | |
parent | 06f30d0f0d0051df97d4c34cd1a80b14857c9e9c (diff) | |
download | sink-922e0979e2c27ff8dbc765ae151d17c7815b98a0.tar.gz sink-922e0979e2c27ff8dbc765ae151d17c7815b98a0.zip |
[Storage] Implement Key API
Diffstat (limited to 'common/storage')
-rw-r--r-- | common/storage/entitystore.cpp | 66 |
1 files changed, 47 insertions, 19 deletions
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index dd6bbf0..f74d3df 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp | |||
@@ -237,8 +237,10 @@ bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool | |||
237 | flatbuffers::FlatBufferBuilder fbb; | 237 | flatbuffers::FlatBufferBuilder fbb; |
238 | d->resourceContext.adaptorFactory(type).createBuffer(entity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 238 | d->resourceContext.adaptorFactory(type).createBuffer(entity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
239 | 239 | ||
240 | const auto key = Key(Identifier::fromDisplayByteArray(entity.identifier()), newRevision); | ||
241 | |||
240 | DataStore::mainDatabase(d->transaction, type) | 242 | DataStore::mainDatabase(d->transaction, type) |
241 | .write(DataStore::assembleKey(entity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), | 243 | .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), |
242 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); | 244 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); |
243 | DataStore::setMaxRevision(d->transaction, newRevision); | 245 | DataStore::setMaxRevision(d->transaction, newRevision); |
244 | DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); | 246 | DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); |
@@ -311,8 +313,10 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType &cu | |||
311 | flatbuffers::FlatBufferBuilder fbb; | 313 | flatbuffers::FlatBufferBuilder fbb; |
312 | d->resourceContext.adaptorFactory(type).createBuffer(newEntity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 314 | d->resourceContext.adaptorFactory(type).createBuffer(newEntity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
313 | 315 | ||
316 | const auto key = Key(Identifier::fromDisplayByteArray(newEntity.identifier()), newRevision); | ||
317 | |||
314 | DataStore::mainDatabase(d->transaction, type) | 318 | DataStore::mainDatabase(d->transaction, type) |
315 | .write(DataStore::assembleKey(newEntity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), | 319 | .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), |
316 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; }); | 320 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; }); |
317 | DataStore::setMaxRevision(d->transaction, newRevision); | 321 | DataStore::setMaxRevision(d->transaction, newRevision); |
318 | DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); | 322 | DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); |
@@ -346,8 +350,10 @@ bool EntityStore::remove(const QByteArray &type, const ApplicationDomainType &cu | |||
346 | flatbuffers::FlatBufferBuilder fbb; | 350 | flatbuffers::FlatBufferBuilder fbb; |
347 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | 351 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); |
348 | 352 | ||
353 | const auto key = Key(Identifier::fromDisplayByteArray(uid), newRevision); | ||
354 | |||
349 | DataStore::mainDatabase(d->transaction, type) | 355 | DataStore::mainDatabase(d->transaction, type) |
350 | .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | 356 | .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), |
351 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); | 357 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); |
352 | DataStore::setMaxRevision(d->transaction, newRevision); | 358 | DataStore::setMaxRevision(d->transaction, newRevision); |
353 | DataStore::recordRevision(d->transaction, newRevision, uid, type); | 359 | DataStore::recordRevision(d->transaction, newRevision, uid, type); |
@@ -428,7 +434,7 @@ QVector<QByteArray> EntityStore::fullScan(const QByteArray &type) | |||
428 | DataStore::mainDatabase(d->getTransaction(), type) | 434 | DataStore::mainDatabase(d->getTransaction(), type) |
429 | .scan(QByteArray(), | 435 | .scan(QByteArray(), |
430 | [&](const QByteArray &key, const QByteArray &value) -> bool { | 436 | [&](const QByteArray &key, const QByteArray &value) -> bool { |
431 | const auto uid = DataStore::uidFromKey(key); | 437 | const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier().toDisplayByteArray(); |
432 | if (keys.contains(uid)) { | 438 | if (keys.contains(uid)) { |
433 | //Not something that should persist if the replay works, so we keep a message for now. | 439 | //Not something that should persist if the replay works, so we keep a message for now. |
434 | SinkTraceCtx(d->logCtx) << "Multiple revisions for key: " << key; | 440 | SinkTraceCtx(d->logCtx) << "Multiple revisions for key: " << key; |
@@ -479,16 +485,24 @@ void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property | |||
479 | /* }); */ | 485 | /* }); */ |
480 | } | 486 | } |
481 | 487 | ||
482 | void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) | 488 | void EntityStore::readLatest(const QByteArray &type, const QByteArray &key, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) |
483 | { | 489 | { |
484 | Q_ASSERT(d); | 490 | Q_ASSERT(d); |
485 | Q_ASSERT(!uid.isEmpty()); | 491 | Q_ASSERT(!key.isEmpty()); |
492 | const auto internalKey = [&key]() { | ||
493 | if(key.size() == Identifier::DISPLAY_REPR_SIZE) { | ||
494 | return Identifier::fromDisplayByteArray(key).toInternalByteArray(); | ||
495 | } else { | ||
496 | return Key::fromDisplayByteArray(key).toInternalByteArray(); | ||
497 | } | ||
498 | }(); | ||
486 | auto db = DataStore::mainDatabase(d->getTransaction(), type); | 499 | auto db = DataStore::mainDatabase(d->getTransaction(), type); |
487 | db.findLatest(uid, | 500 | db.findLatest(internalKey, |
488 | [=](const QByteArray &key, const QByteArray &value) { | 501 | [=](const QByteArray &key, const QByteArray &value) { |
489 | callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); | 502 | const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier().toDisplayByteArray(); |
503 | callback(uid, Sink::EntityBuffer(value.data(), value.size())); | ||
490 | }, | 504 | }, |
491 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << uid; }); | 505 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << key; }); |
492 | } | 506 | } |
493 | 507 | ||
494 | void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomainType &)> callback) | 508 | void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomainType &)> callback) |
@@ -516,12 +530,15 @@ ApplicationDomain::ApplicationDomainType EntityStore::readLatest(const QByteArra | |||
516 | return dt; | 530 | return dt; |
517 | } | 531 | } |
518 | 532 | ||
519 | void EntityStore::readEntity(const QByteArray &type, const QByteArray &key, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) | 533 | // TODO: check every usage |
534 | void EntityStore::readEntity(const QByteArray &type, const QByteArray &displayKey, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) | ||
520 | { | 535 | { |
536 | const auto key = Key::fromDisplayByteArray(displayKey); | ||
521 | auto db = DataStore::mainDatabase(d->getTransaction(), type); | 537 | auto db = DataStore::mainDatabase(d->getTransaction(), type); |
522 | db.scan(key, | 538 | db.scan(key.toInternalByteArray(), |
523 | [=](const QByteArray &key, const QByteArray &value) -> bool { | 539 | [=](const QByteArray &key, const QByteArray &value) -> bool { |
524 | callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); | 540 | const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier().toDisplayByteArray(); |
541 | callback(uid, Sink::EntityBuffer(value.data(), value.size())); | ||
525 | return false; | 542 | return false; |
526 | }, | 543 | }, |
527 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readEntity query: " << error.message << key; }); | 544 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readEntity query: " << error.message << key; }); |
@@ -567,9 +584,10 @@ void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedT | |||
567 | revisionCounter++; | 584 | revisionCounter++; |
568 | continue; | 585 | continue; |
569 | } | 586 | } |
570 | const auto key = DataStore::assembleKey(uid, revisionCounter); | 587 | const auto key = Key(Identifier::fromDisplayByteArray(uid), revisionCounter); |
588 | |||
571 | revisionCounter++; | 589 | revisionCounter++; |
572 | callback(key); | 590 | callback(key.toDisplayByteArray()); |
573 | } | 591 | } |
574 | } | 592 | } |
575 | 593 | ||
@@ -577,16 +595,19 @@ void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qi | |||
577 | { | 595 | { |
578 | auto db = DataStore::mainDatabase(d->getTransaction(), type); | 596 | auto db = DataStore::mainDatabase(d->getTransaction(), type); |
579 | qint64 latestRevision = 0; | 597 | qint64 latestRevision = 0; |
580 | db.scan(uid, | 598 | const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); |
599 | db.scan(internalUid, | ||
581 | [&latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { | 600 | [&latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { |
582 | const auto foundRevision = DataStore::revisionFromKey(key); | 601 | //const auto foundRevision = DataStore::revisionFromKey(key); |
602 | const auto foundRevision = Key::fromInternalByteArray(key).revision().toQint64(); | ||
583 | if (foundRevision < revision && foundRevision > latestRevision) { | 603 | if (foundRevision < revision && foundRevision > latestRevision) { |
584 | latestRevision = foundRevision; | 604 | latestRevision = foundRevision; |
585 | } | 605 | } |
586 | return true; | 606 | return true; |
587 | }, | 607 | }, |
588 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }, true); | 608 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }, true); |
589 | readEntity(type, DataStore::assembleKey(uid, latestRevision), callback); | 609 | const auto key = Key(Identifier::fromDisplayByteArray(uid), latestRevision); |
610 | readEntity(type, key.toDisplayByteArray(), callback); | ||
590 | } | 611 | } |
591 | 612 | ||
592 | void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function<void(const ApplicationDomainType &)> callback) | 613 | void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function<void(const ApplicationDomainType &)> callback) |
@@ -612,15 +633,22 @@ void EntityStore::readAllUids(const QByteArray &type, const std::function<void(c | |||
612 | 633 | ||
613 | bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) | 634 | bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) |
614 | { | 635 | { |
615 | return DataStore::mainDatabase(d->getTransaction(), type).contains(uid); | 636 | // Because of pipeline using this function with new entities |
637 | // TODO: maybe modify pipeline instead? | ||
638 | if(uid.isEmpty()) { | ||
639 | return false; | ||
640 | } | ||
641 | const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); | ||
642 | return DataStore::mainDatabase(d->getTransaction(), type).contains(internalUid); | ||
616 | } | 643 | } |
617 | 644 | ||
618 | bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) | 645 | bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) |
619 | { | 646 | { |
620 | bool found = false; | 647 | bool found = false; |
621 | bool alreadyRemoved = false; | 648 | bool alreadyRemoved = false; |
649 | const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); | ||
622 | DataStore::mainDatabase(d->transaction, type) | 650 | DataStore::mainDatabase(d->transaction, type) |
623 | .findLatest(uid, | 651 | .findLatest(internalUid, |
624 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) { | 652 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) { |
625 | auto entity = GetEntity(data.data()); | 653 | auto entity = GetEntity(data.data()); |
626 | if (entity && entity->metadata()) { | 654 | if (entity && entity->metadata()) { |