summaryrefslogtreecommitdiffstats
path: root/common/storage_lmdb.cpp
diff options
context:
space:
mode:
authorRémi Nicole <nicole@kolabsystems.com>2018-08-22 14:16:59 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2018-08-22 14:28:51 +0200
commit46313049ac01a3007ef60bdc937442945355a38d (patch)
tree56ce0cd679367a60ba3a706ac4d207bc9cc82230 /common/storage_lmdb.cpp
parentaf91a18748b91f4a4fc0d83247561371d376bec5 (diff)
downloadsink-46313049ac01a3007ef60bdc937442945355a38d.tar.gz
sink-46313049ac01a3007ef60bdc937442945355a38d.zip
Separate UIDs and Revisions in main databases
Summary: - Change revision type from `qint64` to `size_t` for LMDB in a couple of places (LMDB supports `unsigned int` or `size_t` which are `long unsigned int` on my machine) - Better support for database flags (duplicate, integer keys, integer values for now but is extensible) - Main databases' keys are now revisions - Some databases switched to integer keys databases: - Main databases - the revision to uid mapping database - the revision to entity type mapping database - Refactor the entity type's `typeDatabases` method (if in the future we need to change the main databases' flags again) - New uid to revision mapping database (`uidsToRevisions`): - Stores all revisions (not uid to latest revision) because we need it for cleaning old revisions - Flags are: duplicates + integer values (so findLatest finds the latest revision for the given uid) ~~Problems to fix before merging:~~ All Fixed! - ~~Sometimes Sink can't read what has just been written to the database (maybe because of transactions race conditions)~~ - ~~Most of the times, this results in Sink not able to find the uid for a given revision by reading the `revisions` database~~ - ~~`pipelinetest`'s `testModifyWithConflict` fails because the local changes are overridden~~ ~~The first problem prevents me from running benchmarks~~ Reviewers: cmollekopf Tags: #sink Differential Revision: https://phabricator.kde.org/D14974
Diffstat (limited to 'common/storage_lmdb.cpp')
-rw-r--r--common/storage_lmdb.cpp136
1 files changed, 86 insertions, 50 deletions
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp
index a007405..0458dae 100644
--- a/common/storage_lmdb.cpp
+++ b/common/storage_lmdb.cpp
@@ -48,6 +48,10 @@ static QMutex sCreateDbiLock;
48static QHash<QString, MDB_env *> sEnvironments; 48static QHash<QString, MDB_env *> sEnvironments;
49static QHash<QString, MDB_dbi> sDbis; 49static QHash<QString, MDB_dbi> sDbis;
50 50
51int AllowDuplicates = MDB_DUPSORT;
52int IntegerKeys = MDB_INTEGERKEY;
53int IntegerValues = MDB_INTEGERDUP;
54
51int getErrorCode(int e) 55int getErrorCode(int e)
52{ 56{
53 switch (e) { 57 switch (e) {
@@ -101,14 +105,8 @@ static QList<QByteArray> getDatabaseNames(MDB_txn *transaction)
101 * and we always need to commit the transaction ASAP 105 * and we always need to commit the transaction ASAP
102 * We can only ever enter from one point per process. 106 * We can only ever enter from one point per process.
103 */ 107 */
104static bool createDbi(MDB_txn *transaction, const QByteArray &db, bool readOnly, bool allowDuplicates, MDB_dbi &dbi) 108static bool createDbi(MDB_txn *transaction, const QByteArray &db, bool readOnly, int flags, MDB_dbi &dbi)
105{ 109{
106
107 unsigned int flags = 0;
108 if (allowDuplicates) {
109 flags |= MDB_DUPSORT;
110 }
111
112 MDB_dbi flagtableDbi; 110 MDB_dbi flagtableDbi;
113 if (const int rc = mdb_dbi_open(transaction, "__flagtable", readOnly ? 0 : MDB_CREATE, &flagtableDbi)) { 111 if (const int rc = mdb_dbi_open(transaction, "__flagtable", readOnly ? 0 : MDB_CREATE, &flagtableDbi)) {
114 if (!readOnly) { 112 if (!readOnly) {
@@ -130,6 +128,10 @@ static bool createDbi(MDB_txn *transaction, const QByteArray &db, bool readOnly,
130 } 128 }
131 } 129 }
132 130
131 if (flags & IntegerValues && !(flags & AllowDuplicates)) {
132 SinkWarning() << "Opening a database with integer values, but not duplicate keys";
133 }
134
133 if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { 135 if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) {
134 //Create the db if it is not existing already 136 //Create the db if it is not existing already
135 if (rc == MDB_NOTFOUND && !readOnly) { 137 if (rc == MDB_NOTFOUND && !readOnly) {
@@ -165,7 +167,7 @@ static bool createDbi(MDB_txn *transaction, const QByteArray &db, bool readOnly,
165 //Store the flags without the create option 167 //Store the flags without the create option
166 const auto ba = QByteArray::number(flags); 168 const auto ba = QByteArray::number(flags);
167 value.mv_data = const_cast<void*>(static_cast<const void*>(ba.constData())); 169 value.mv_data = const_cast<void*>(static_cast<const void*>(ba.constData()));
168 value.mv_size = db.size(); 170 value.mv_size = ba.size();
169 if (const int rc = mdb_put(transaction, flagtableDbi, &key, &value, MDB_NOOVERWRITE)) { 171 if (const int rc = mdb_put(transaction, flagtableDbi, &key, &value, MDB_NOOVERWRITE)) {
170 //We expect this to fail if we're only creating the dbi but not the db 172 //We expect this to fail if we're only creating the dbi but not the db
171 if (rc != MDB_KEYEXIST) { 173 if (rc != MDB_KEYEXIST) {
@@ -175,7 +177,7 @@ static bool createDbi(MDB_txn *transaction, const QByteArray &db, bool readOnly,
175 } else { 177 } else {
176 //It's not an error if we only want to read 178 //It's not an error if we only want to read
177 if (!readOnly) { 179 if (!readOnly) {
178 SinkWarning() << "Failed to open db " << QByteArray(mdb_strerror(rc)); 180 SinkWarning() << "Failed to open db " << db << "error:" << QByteArray(mdb_strerror(rc));
179 return true; 181 return true;
180 } 182 }
181 return false; 183 return false;
@@ -187,8 +189,14 @@ static bool createDbi(MDB_txn *transaction, const QByteArray &db, bool readOnly,
187class DataStore::NamedDatabase::Private 189class DataStore::NamedDatabase::Private
188{ 190{
189public: 191public:
190 Private(const QByteArray &_db, bool _allowDuplicates, const std::function<void(const DataStore::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_txn *_txn) 192 Private(const QByteArray &_db, int _flags,
191 : db(_db), transaction(_txn), allowDuplicates(_allowDuplicates), defaultErrorHandler(_defaultErrorHandler), name(_name) 193 const std::function<void(const DataStore::Error &error)> &_defaultErrorHandler,
194 const QString &_name, MDB_txn *_txn)
195 : db(_db),
196 transaction(_txn),
197 flags(_flags),
198 defaultErrorHandler(_defaultErrorHandler),
199 name(_name)
192 { 200 {
193 } 201 }
194 202
@@ -199,7 +207,7 @@ public:
199 QByteArray db; 207 QByteArray db;
200 MDB_txn *transaction; 208 MDB_txn *transaction;
201 MDB_dbi dbi; 209 MDB_dbi dbi;
202 bool allowDuplicates; 210 int flags;
203 std::function<void(const DataStore::Error &error)> defaultErrorHandler; 211 std::function<void(const DataStore::Error &error)> defaultErrorHandler;
204 QString name; 212 QString name;
205 bool createdNewDbi = false; 213 bool createdNewDbi = false;
@@ -313,7 +321,7 @@ public:
313 } else { 321 } else {
314 dbiTransaction = transaction; 322 dbiTransaction = transaction;
315 } 323 }
316 if (createDbi(dbiTransaction, db, readOnly, allowDuplicates, dbi)) { 324 if (createDbi(dbiTransaction, db, readOnly, flags, dbi)) {
317 if (readOnly) { 325 if (readOnly) {
318 mdb_txn_commit(dbiTransaction); 326 mdb_txn_commit(dbiTransaction);
319 Q_ASSERT(!sDbis.contains(dbiName)); 327 Q_ASSERT(!sDbis.contains(dbiName));
@@ -371,6 +379,12 @@ DataStore::NamedDatabase::~NamedDatabase()
371 delete d; 379 delete d;
372} 380}
373 381
382bool DataStore::NamedDatabase::write(const size_t key, const QByteArray &value,
383 const std::function<void(const DataStore::Error &error)> &errorHandler)
384{
385 return write(sizeTToByteArray(key), value, errorHandler);
386}
387
374bool DataStore::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function<void(const DataStore::Error &error)> &errorHandler) 388bool DataStore::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function<void(const DataStore::Error &error)> &errorHandler)
375{ 389{
376 if (!d || !d->transaction) { 390 if (!d || !d->transaction) {
@@ -407,11 +421,23 @@ bool DataStore::NamedDatabase::write(const QByteArray &sKey, const QByteArray &s
407 return !rc; 421 return !rc;
408} 422}
409 423
424void DataStore::NamedDatabase::remove(
425 const size_t key, const std::function<void(const DataStore::Error &error)> &errorHandler)
426{
427 return remove(sizeTToByteArray(key), errorHandler);
428}
429
410void DataStore::NamedDatabase::remove(const QByteArray &k, const std::function<void(const DataStore::Error &error)> &errorHandler) 430void DataStore::NamedDatabase::remove(const QByteArray &k, const std::function<void(const DataStore::Error &error)> &errorHandler)
411{ 431{
412 remove(k, QByteArray(), errorHandler); 432 remove(k, QByteArray(), errorHandler);
413} 433}
414 434
435void DataStore::NamedDatabase::remove(const size_t key, const QByteArray &value,
436 const std::function<void(const DataStore::Error &error)> &errorHandler)
437{
438 return remove(sizeTToByteArray(key), value, errorHandler);
439}
440
415void DataStore::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, const std::function<void(const DataStore::Error &error)> &errorHandler) 441void DataStore::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, const std::function<void(const DataStore::Error &error)> &errorHandler)
416{ 442{
417 if (!d || !d->transaction) { 443 if (!d || !d->transaction) {
@@ -445,6 +471,17 @@ void DataStore::NamedDatabase::remove(const QByteArray &k, const QByteArray &val
445 } 471 }
446} 472}
447 473
474int DataStore::NamedDatabase::scan(const size_t key,
475 const std::function<bool(size_t key, const QByteArray &value)> &resultHandler,
476 const std::function<void(const DataStore::Error &error)> &errorHandler, bool skipInternalKeys) const
477{
478 return scan(sizeTToByteArray(key),
479 [&resultHandler](const QByteArray &key, const QByteArray &value) {
480 return resultHandler(byteArrayToSizeT(key), value);
481 },
482 errorHandler, /* findSubstringKeys = */ false, skipInternalKeys);
483}
484
448int DataStore::NamedDatabase::scan(const QByteArray &k, const std::function<bool(const QByteArray &key, const QByteArray &value)> &resultHandler, 485int DataStore::NamedDatabase::scan(const QByteArray &k, const std::function<bool(const QByteArray &key, const QByteArray &value)> &resultHandler,
449 const std::function<void(const DataStore::Error &error)> &errorHandler, bool findSubstringKeys, bool skipInternalKeys) const 486 const std::function<void(const DataStore::Error &error)> &errorHandler, bool findSubstringKeys, bool skipInternalKeys) const
450{ 487{
@@ -471,8 +508,10 @@ int DataStore::NamedDatabase::scan(const QByteArray &k, const std::function<bool
471 508
472 int numberOfRetrievedValues = 0; 509 int numberOfRetrievedValues = 0;
473 510
474 if (k.isEmpty() || d->allowDuplicates || findSubstringKeys) { 511 bool allowDuplicates = d->flags & AllowDuplicates;
475 MDB_cursor_op op = d->allowDuplicates ? MDB_SET : MDB_FIRST; 512
513 if (k.isEmpty() || allowDuplicates || findSubstringKeys) {
514 MDB_cursor_op op = allowDuplicates ? MDB_SET : MDB_FIRST;
476 if (findSubstringKeys) { 515 if (findSubstringKeys) {
477 op = MDB_SET_RANGE; 516 op = MDB_SET_RANGE;
478 } 517 }
@@ -490,7 +529,7 @@ int DataStore::NamedDatabase::scan(const QByteArray &k, const std::function<bool
490 key.mv_data = (void *)k.constData(); 529 key.mv_data = (void *)k.constData();
491 key.mv_size = k.size(); 530 key.mv_size = k.size();
492 } 531 }
493 MDB_cursor_op nextOp = (d->allowDuplicates && !findSubstringKeys) ? MDB_NEXT_DUP : MDB_NEXT; 532 MDB_cursor_op nextOp = (allowDuplicates && !findSubstringKeys) ? MDB_NEXT_DUP : MDB_NEXT;
494 while ((rc = mdb_cursor_get(cursor, &key, &data, nextOp)) == 0) { 533 while ((rc = mdb_cursor_get(cursor, &key, &data, nextOp)) == 0) {
495 const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size); 534 const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size);
496 // Every consequitive lookup simply iterates through the list 535 // Every consequitive lookup simply iterates through the list
@@ -529,6 +568,18 @@ int DataStore::NamedDatabase::scan(const QByteArray &k, const std::function<bool
529 return numberOfRetrievedValues; 568 return numberOfRetrievedValues;
530} 569}
531 570
571
572void DataStore::NamedDatabase::findLatest(size_t key,
573 const std::function<void(size_t key, const QByteArray &value)> &resultHandler,
574 const std::function<void(const DataStore::Error &error)> &errorHandler) const
575{
576 return findLatest(sizeTToByteArray(key),
577 [&resultHandler](const QByteArray &key, const QByteArray &value) {
578 resultHandler(byteArrayToSizeT(value), value);
579 },
580 errorHandler);
581}
582
532void DataStore::NamedDatabase::findLatest(const QByteArray &k, const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler, 583void DataStore::NamedDatabase::findLatest(const QByteArray &k, const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler,
533 const std::function<void(const DataStore::Error &error)> &errorHandler) const 584 const std::function<void(const DataStore::Error &error)> &errorHandler) const
534{ 585{
@@ -602,6 +653,17 @@ void DataStore::NamedDatabase::findLatest(const QByteArray &k, const std::functi
602 return; 653 return;
603} 654}
604 655
656int DataStore::NamedDatabase::findAllInRange(const size_t lowerBound, const size_t upperBound,
657 const std::function<void(size_t key, const QByteArray &value)> &resultHandler,
658 const std::function<void(const DataStore::Error &error)> &errorHandler) const
659{
660 return findAllInRange(sizeTToByteArray(lowerBound), sizeTToByteArray(upperBound),
661 [&resultHandler](const QByteArray &key, const QByteArray &value) {
662 resultHandler(byteArrayToSizeT(value), value);
663 },
664 errorHandler);
665}
666
605int DataStore::NamedDatabase::findAllInRange(const QByteArray &lowerBound, const QByteArray &upperBound, 667int DataStore::NamedDatabase::findAllInRange(const QByteArray &lowerBound, const QByteArray &upperBound,
606 const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler, 668 const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler,
607 const std::function<void(const DataStore::Error &error)> &errorHandler) const 669 const std::function<void(const DataStore::Error &error)> &errorHandler) const
@@ -839,30 +901,8 @@ void DataStore::Transaction::abort()
839 d->transaction = nullptr; 901 d->transaction = nullptr;
840} 902}
841 903
842//Ensure that we opened the correct database by comparing the expected identifier with the one 904DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &db,
843//we write to the database on first open. 905 const std::function<void(const DataStore::Error &error)> &errorHandler, int flags) const
844static bool ensureCorrectDb(DataStore::NamedDatabase &database, const QByteArray &db, bool readOnly)
845{
846 bool openedTheWrongDatabase = false;
847 auto count = database.scan("__internal_dbname", [db, &openedTheWrongDatabase](const QByteArray &key, const QByteArray &value) ->bool {
848 if (value != db) {
849 SinkWarning() << "Opened the wrong database, got " << value << " instead of " << db;
850 openedTheWrongDatabase = true;
851 }
852 return false;
853 },
854 [&](const DataStore::Error &) {
855 }, false);
856 //This is the first time we open this database in a write transaction, write the db name
857 if (!count) {
858 if (!readOnly) {
859 database.write("__internal_dbname", db);
860 }
861 }
862 return !openedTheWrongDatabase;
863}
864
865DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &db, const std::function<void(const DataStore::Error &error)> &errorHandler, bool allowDuplicates) const
866{ 906{
867 if (!d) { 907 if (!d) {
868 SinkError() << "Tried to open database on invalid transaction: " << db; 908 SinkError() << "Tried to open database on invalid transaction: " << db;
@@ -871,7 +911,8 @@ DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &
871 Q_ASSERT(d->transaction); 911 Q_ASSERT(d->transaction);
872 // We don't now if anything changed 912 // We don't now if anything changed
873 d->implicitCommit = true; 913 d->implicitCommit = true;
874 auto p = new DataStore::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); 914 auto p = new DataStore::NamedDatabase::Private(
915 db, flags, d->defaultErrorHandler, d->name, d->transaction);
875 auto ret = p->openDatabase(d->requestedRead, errorHandler); 916 auto ret = p->openDatabase(d->requestedRead, errorHandler);
876 if (!ret) { 917 if (!ret) {
877 delete p; 918 delete p;
@@ -883,11 +924,6 @@ DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &
883 } 924 }
884 925
885 auto database = DataStore::NamedDatabase(p); 926 auto database = DataStore::NamedDatabase(p);
886 if (!ensureCorrectDb(database, db, d->requestedRead)) {
887 SinkWarning() << "Failed to open the database correctly" << db;
888 Q_ASSERT(false);
889 return DataStore::NamedDatabase();
890 }
891 return database; 927 return database;
892} 928}
893 929
@@ -1049,11 +1085,11 @@ public:
1049 1085
1050 //Create dbis from the given layout. 1086 //Create dbis from the given layout.
1051 for (auto it = layout.tables.constBegin(); it != layout.tables.constEnd(); it++) { 1087 for (auto it = layout.tables.constBegin(); it != layout.tables.constEnd(); it++) {
1052 const bool allowDuplicates = it.value(); 1088 const int flags = it.value();
1053 MDB_dbi dbi = 0; 1089 MDB_dbi dbi = 0;
1054 const auto db = it.key(); 1090 const auto db = it.key();
1055 const auto dbiName = name + db; 1091 const auto dbiName = name + db;
1056 if (createDbi(transaction, db, readOnly, allowDuplicates, dbi)) { 1092 if (createDbi(transaction, db, readOnly, flags, dbi)) {
1057 sDbis.insert(dbiName, dbi); 1093 sDbis.insert(dbiName, dbi);
1058 } 1094 }
1059 } 1095 }
@@ -1063,8 +1099,8 @@ public:
1063 MDB_dbi dbi = 0; 1099 MDB_dbi dbi = 0;
1064 const auto dbiName = name + db; 1100 const auto dbiName = name + db;
1065 //We're going to load the flags anyways. 1101 //We're going to load the flags anyways.
1066 bool allowDuplicates = false; 1102 const int flags = 0;
1067 if (createDbi(transaction, db, readOnly, allowDuplicates, dbi)) { 1103 if (createDbi(transaction, db, readOnly, flags, dbi)) {
1068 sDbis.insert(dbiName, dbi); 1104 sDbis.insert(dbiName, dbi);
1069 } 1105 }
1070 } 1106 }