summaryrefslogtreecommitdiffstats
path: root/common/storage_lmdb.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-02-27 22:19:21 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-02-27 22:50:23 +0100
commit1b31818b3cd48be19e2e29eefe91ecec2cbb69e2 (patch)
tree1c245ae319dc8b99ed62e04d4f1b2f6cc399de24 /common/storage_lmdb.cpp
parentc51f447a476b1f5c3781bcea0ad19e969ba7564e (diff)
downloadsink-1b31818b3cd48be19e2e29eefe91ecec2cbb69e2.tar.gz
sink-1b31818b3cd48be19e2e29eefe91ecec2cbb69e2.zip
Make opening dbis non-racy
Dbis can only be opened by one thread and should then be shared accross all threads after committing the transaction to create the dbi. This requires us to initially open all db's, which in turn requires us to know the correct flags. This patch stores the flags to open each db in a separate db, and then opens up all databases on initial start. If a new database is created that dbi is as well shared as soon as the transaction is committed (before the dbi is private to the transaction).
Diffstat (limited to 'common/storage_lmdb.cpp')
-rw-r--r--common/storage_lmdb.cpp133
1 files changed, 108 insertions, 25 deletions
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp
index 64f7db9..31aaebf 100644
--- a/common/storage_lmdb.cpp
+++ b/common/storage_lmdb.cpp
@@ -41,6 +41,7 @@ SINK_DEBUG_AREA("storage")
41namespace Sink { 41namespace Sink {
42namespace Storage { 42namespace Storage {
43 43
44 //TODO a QReadWriteLock would be a better choice because we mostly do reads.
44extern QMutex sMutex; 45extern QMutex sMutex;
45extern QHash<QString, MDB_env *> sEnvironments; 46extern QHash<QString, MDB_env *> sEnvironments;
46extern QHash<QString, MDB_dbi> sDbis; 47extern QHash<QString, MDB_dbi> sDbis;
@@ -79,35 +80,80 @@ public:
79 bool allowDuplicates; 80 bool allowDuplicates;
80 std::function<void(const DataStore::Error &error)> defaultErrorHandler; 81 std::function<void(const DataStore::Error &error)> defaultErrorHandler;
81 QString name; 82 QString name;
83 bool createdNewDbi = false;
84 QString createdDbName;
82 85
83 bool openDatabase(bool readOnly, std::function<void(const DataStore::Error &error)> errorHandler) 86 bool openDatabase(bool readOnly, std::function<void(const DataStore::Error &error)> errorHandler)
84 { 87 {
85 unsigned int flags = 0; 88 unsigned int flags = 0;
86 if (!readOnly) {
87 flags |= MDB_CREATE;
88 }
89 if (allowDuplicates) { 89 if (allowDuplicates) {
90 flags |= MDB_DUPSORT; 90 flags |= MDB_DUPSORT;
91 } 91 }
92 92
93 QMutexLocker locker(&sMutex);
94 const auto dbiName = name + db; 93 const auto dbiName = name + db;
95 if (sDbis.contains(dbiName)) { 94 if (sDbis.contains(dbiName)) {
96 dbi = sDbis.value(dbiName); 95 dbi = sDbis.value(dbiName);
97 } else { 96 } else {
97 MDB_dbi flagtableDbi;
98 if (const int rc = mdb_dbi_open(transaction, "__flagtable", readOnly ? 0 : MDB_CREATE, &flagtableDbi)) {
99 if (!readOnly) {
100 SinkWarning() << "Failed to to open flagdb: " << QByteArray(mdb_strerror(rc));
101 }
102 } else {
103 MDB_val key, value;
104 key.mv_data = const_cast<void*>(static_cast<const void*>(db.constData()));
105 key.mv_size = db.size();
106 if (const auto rc = mdb_get(transaction, flagtableDbi, &key, &value)) {
107 //We expect this to fail for new databases
108 if (rc != MDB_NOTFOUND) {
109 SinkWarning() << "Failed to read flags from flag db: " << QByteArray(mdb_strerror(rc));
110 }
111 } else {
112 //Found the flags
113 const auto ba = QByteArray::fromRawData((char *)value.mv_data, value.mv_size);
114 flags = ba.toInt();
115 }
116 }
117
98 Q_ASSERT(transaction); 118 Q_ASSERT(transaction);
99 if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { 119 if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) {
100 dbi = 0; 120 //Create the db if it is not existing already
101 transaction = 0; 121 if (rc == MDB_NOTFOUND && !readOnly) {
102 // The database is not existing, ignore in read-only mode 122 if (const int rc = mdb_dbi_open(transaction, db.constData(), flags | MDB_CREATE, &dbi)) {
103 if (!(readOnly && rc == MDB_NOTFOUND)) { 123 SinkWarning() << "Failed to create db " << QByteArray(mdb_strerror(rc));
104 SinkWarning() << "Failed to open db " << QByteArray(mdb_strerror(rc)); 124 Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while creating database: " + QByteArray(mdb_strerror(rc)));
105 Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc))); 125 errorHandler ? errorHandler(error) : defaultErrorHandler(error);
106 errorHandler ? errorHandler(error) : defaultErrorHandler(error); 126 return false;
127 }
128 //Record the db flags
129 MDB_val key, value;
130 key.mv_data = const_cast<void*>(static_cast<const void*>(db.constData()));
131 key.mv_size = db.size();
132 //Store the flags without the create option
133 const auto ba = QByteArray::number(flags);
134 value.mv_data = const_cast<void*>(static_cast<const void*>(db.constData()));
135 value.mv_size = db.size();
136 if (const int rc = mdb_put(transaction, flagtableDbi, &key, &value, MDB_NOOVERWRITE)) {
137 //We expect this to fail if we're only creating the dbi but not the db
138 if (rc != MDB_KEYEXIST) {
139 SinkWarning() << "Failed to write flags to flag db: " << QByteArray(mdb_strerror(rc));
140 }
141 }
142 } else {
143 dbi = 0;
144 transaction = 0;
145 //It's not an error if we only want to read
146 if (!readOnly) {
147 SinkWarning() << "Failed to open db " << QByteArray(mdb_strerror(rc));
148 Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc)));
149 errorHandler ? errorHandler(error) : defaultErrorHandler(error);
150 }
151 return false;
107 } 152 }
108 return false;
109 } 153 }
110 sDbis.insert(dbiName, dbi); 154
155 createdNewDbi = true;
156 createdDbName = dbiName;
111 } 157 }
112 return true; 158 return true;
113 } 159 }
@@ -316,7 +362,7 @@ void DataStore::NamedDatabase::findLatest(const QByteArray &k, const std::functi
316 362
317 rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); 363 rc = mdb_cursor_open(d->transaction, d->dbi, &cursor);
318 if (rc) { 364 if (rc) {
319 Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor open: ") + QByteArray(mdb_strerror(rc))); 365 Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc)));
320 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 366 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
321 return; 367 return;
322 } 368 }
@@ -391,8 +437,8 @@ qint64 DataStore::NamedDatabase::getSize()
391class DataStore::Transaction::Private 437class DataStore::Transaction::Private
392{ 438{
393public: 439public:
394 Private(bool _requestRead, const std::function<void(const DataStore::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_env *_env) 440 Private(bool _requestRead, const std::function<void(const DataStore::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_env *_env, bool _noLock = false)
395 : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0) 441 : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0), noLock(_noLock)
396 { 442 {
397 } 443 }
398 ~Private() 444 ~Private()
@@ -408,6 +454,9 @@ public:
408 bool implicitCommit; 454 bool implicitCommit;
409 bool error; 455 bool error;
410 int modificationCounter; 456 int modificationCounter;
457 bool noLock;
458
459 QMap<QString, MDB_dbi> createdDbs;
411 460
412 void startTransaction() 461 void startTransaction()
413 { 462 {
@@ -486,6 +535,21 @@ bool DataStore::Transaction::commit(const std::function<void(const DataStore::Er
486 } 535 }
487 d->transaction = nullptr; 536 d->transaction = nullptr;
488 537
538 //Add the created dbis to the shared environment
539 if (!d->createdDbs.isEmpty()) {
540 if (!d->noLock) {
541 sMutex.lock();
542 }
543 for (auto it = d->createdDbs.constBegin(); it != d->createdDbs.constEnd(); it++) {
544 Q_ASSERT(!sDbis.contains(it.key()));
545 sDbis.insert(it.key(), it.value());
546 }
547 d->createdDbs.clear();
548 if (!d->noLock) {
549 sMutex.unlock();
550 }
551 }
552
489 return !rc; 553 return !rc;
490} 554}
491 555
@@ -495,6 +559,7 @@ void DataStore::Transaction::abort()
495 return; 559 return;
496 } 560 }
497 561
562 d->createdDbs.clear();
498 // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; 563 // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction;
499 Q_ASSERT(sEnvironments.values().contains(d->env)); 564 Q_ASSERT(sEnvironments.values().contains(d->env));
500 mdb_txn_abort(d->transaction); 565 mdb_txn_abort(d->transaction);
@@ -527,14 +592,6 @@ static bool ensureCorrectDb(DataStore::NamedDatabase &database, const QByteArray
527 592
528bool DataStore::Transaction::validateNamedDatabases() 593bool DataStore::Transaction::validateNamedDatabases()
529{ 594{
530 auto databases = getDatabaseNames();
531 for (const auto &dbName : databases) {
532 auto db = openDatabase(dbName);
533 if (!db) {
534 SinkWarning() << "Failed to open the database: " << dbName;
535 return false;
536 }
537 }
538 return true; 595 return true;
539} 596}
540 597
@@ -548,13 +605,27 @@ DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &
548 // We don't now if anything changed 605 // We don't now if anything changed
549 d->implicitCommit = true; 606 d->implicitCommit = true;
550 auto p = new DataStore::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); 607 auto p = new DataStore::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction);
608 if (!d->noLock) {
609 //This wouldn't be necessary with a read-write lock, we could read-only lock here
610 sMutex.lock();
611 }
551 if (!p->openDatabase(d->requestedRead, errorHandler)) { 612 if (!p->openDatabase(d->requestedRead, errorHandler)) {
613 if (!d->noLock) {
614 sMutex.unlock();
615 }
552 delete p; 616 delete p;
553 return DataStore::NamedDatabase(); 617 return DataStore::NamedDatabase();
554 } 618 }
619 if (!d->noLock) {
620 sMutex.unlock();
621 }
622 if (p->createdNewDbi) {
623 d->createdDbs.insert(p->createdDbName, p->dbi);
624 }
555 auto database = DataStore::NamedDatabase(p); 625 auto database = DataStore::NamedDatabase(p);
556 if (!ensureCorrectDb(database, db, d->requestedRead)) { 626 if (!ensureCorrectDb(database, db, d->requestedRead)) {
557 SinkWarning() << "Failed to open the database" << db; 627 SinkWarning() << "Failed to open the database correctly" << db;
628 Q_ASSERT(false);
558 return DataStore::NamedDatabase(); 629 return DataStore::NamedDatabase();
559 } 630 }
560 return database; 631 return database;
@@ -569,6 +640,7 @@ QList<QByteArray> DataStore::Transaction::getDatabaseNames() const
569 640
570 int rc; 641 int rc;
571 QList<QByteArray> list; 642 QList<QByteArray> list;
643 Q_ASSERT(d->transaction);
572 if ((rc = mdb_dbi_open(d->transaction, nullptr, 0, &d->dbi) == 0)) { 644 if ((rc = mdb_dbi_open(d->transaction, nullptr, 0, &d->dbi) == 0)) {
573 MDB_val key; 645 MDB_val key;
574 MDB_val data; 646 MDB_val data;
@@ -657,6 +729,17 @@ DataStore::Private::Private(const QString &s, const QString &n, AccessMode m) :
657 mdb_env_set_mapsize(env, dbSize); 729 mdb_env_set_mapsize(env, dbSize);
658 } 730 }
659 sEnvironments.insert(fullPath, env); 731 sEnvironments.insert(fullPath, env);
732 //Open all available dbi's
733 bool noLock = true;
734 bool requestedRead = m == ReadOnly;
735 auto t = Transaction(new Transaction::Private(requestedRead, nullptr, name, env, noLock));
736 for (const auto &db : t.getDatabaseNames()) {
737 SinkLog() << "Opening initial db: " << db;
738 //Get dbi to store for future use.
739 t.openDatabase(db);
740 }
741 //To persist the dbis
742 t.commit();
660 } 743 }
661 } 744 }
662 } 745 }