summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/storage/entitystore.cpp1
-rw-r--r--common/storage_lmdb.cpp133
-rw-r--r--tests/storagetest.cpp36
3 files changed, 144 insertions, 26 deletions
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index 9156dd4..909f1b2 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -59,7 +59,6 @@ public:
59 59
60 Sink::Storage::DataStore store(Sink::storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly); 60 Sink::Storage::DataStore store(Sink::storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly);
61 transaction = store.createTransaction(DataStore::ReadOnly); 61 transaction = store.createTransaction(DataStore::ReadOnly);
62 Q_ASSERT(transaction.validateNamedDatabases());
63 return transaction; 62 return transaction;
64 } 63 }
65 64
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp
index 64f7db9..31aaebf 100644
--- a/common/storage_lmdb.cpp
+++ b/common/storage_lmdb.cpp
@@ -41,6 +41,7 @@ SINK_DEBUG_AREA("storage")
41namespace Sink { 41namespace Sink {
42namespace Storage { 42namespace Storage {
43 43
44 //TODO a QReadWriteLock would be a better choice because we mostly do reads.
44extern QMutex sMutex; 45extern QMutex sMutex;
45extern QHash<QString, MDB_env *> sEnvironments; 46extern QHash<QString, MDB_env *> sEnvironments;
46extern QHash<QString, MDB_dbi> sDbis; 47extern QHash<QString, MDB_dbi> sDbis;
@@ -79,35 +80,80 @@ public:
79 bool allowDuplicates; 80 bool allowDuplicates;
80 std::function<void(const DataStore::Error &error)> defaultErrorHandler; 81 std::function<void(const DataStore::Error &error)> defaultErrorHandler;
81 QString name; 82 QString name;
83 bool createdNewDbi = false;
84 QString createdDbName;
82 85
83 bool openDatabase(bool readOnly, std::function<void(const DataStore::Error &error)> errorHandler) 86 bool openDatabase(bool readOnly, std::function<void(const DataStore::Error &error)> errorHandler)
84 { 87 {
85 unsigned int flags = 0; 88 unsigned int flags = 0;
86 if (!readOnly) {
87 flags |= MDB_CREATE;
88 }
89 if (allowDuplicates) { 89 if (allowDuplicates) {
90 flags |= MDB_DUPSORT; 90 flags |= MDB_DUPSORT;
91 } 91 }
92 92
93 QMutexLocker locker(&sMutex);
94 const auto dbiName = name + db; 93 const auto dbiName = name + db;
95 if (sDbis.contains(dbiName)) { 94 if (sDbis.contains(dbiName)) {
96 dbi = sDbis.value(dbiName); 95 dbi = sDbis.value(dbiName);
97 } else { 96 } else {
97 MDB_dbi flagtableDbi;
98 if (const int rc = mdb_dbi_open(transaction, "__flagtable", readOnly ? 0 : MDB_CREATE, &flagtableDbi)) {
99 if (!readOnly) {
100 SinkWarning() << "Failed to to open flagdb: " << QByteArray(mdb_strerror(rc));
101 }
102 } else {
103 MDB_val key, value;
104 key.mv_data = const_cast<void*>(static_cast<const void*>(db.constData()));
105 key.mv_size = db.size();
106 if (const auto rc = mdb_get(transaction, flagtableDbi, &key, &value)) {
107 //We expect this to fail for new databases
108 if (rc != MDB_NOTFOUND) {
109 SinkWarning() << "Failed to read flags from flag db: " << QByteArray(mdb_strerror(rc));
110 }
111 } else {
112 //Found the flags
113 const auto ba = QByteArray::fromRawData((char *)value.mv_data, value.mv_size);
114 flags = ba.toInt();
115 }
116 }
117
98 Q_ASSERT(transaction); 118 Q_ASSERT(transaction);
99 if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { 119 if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) {
100 dbi = 0; 120 //Create the db if it is not existing already
101 transaction = 0; 121 if (rc == MDB_NOTFOUND && !readOnly) {
102 // The database is not existing, ignore in read-only mode 122 if (const int rc = mdb_dbi_open(transaction, db.constData(), flags | MDB_CREATE, &dbi)) {
103 if (!(readOnly && rc == MDB_NOTFOUND)) { 123 SinkWarning() << "Failed to create db " << QByteArray(mdb_strerror(rc));
104 SinkWarning() << "Failed to open db " << QByteArray(mdb_strerror(rc)); 124 Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while creating database: " + QByteArray(mdb_strerror(rc)));
105 Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc))); 125 errorHandler ? errorHandler(error) : defaultErrorHandler(error);
106 errorHandler ? errorHandler(error) : defaultErrorHandler(error); 126 return false;
127 }
128 //Record the db flags
129 MDB_val key, value;
130 key.mv_data = const_cast<void*>(static_cast<const void*>(db.constData()));
131 key.mv_size = db.size();
132 //Store the flags without the create option
133 const auto ba = QByteArray::number(flags);
134 value.mv_data = const_cast<void*>(static_cast<const void*>(db.constData()));
135 value.mv_size = db.size();
136 if (const int rc = mdb_put(transaction, flagtableDbi, &key, &value, MDB_NOOVERWRITE)) {
137 //We expect this to fail if we're only creating the dbi but not the db
138 if (rc != MDB_KEYEXIST) {
139 SinkWarning() << "Failed to write flags to flag db: " << QByteArray(mdb_strerror(rc));
140 }
141 }
142 } else {
143 dbi = 0;
144 transaction = 0;
145 //It's not an error if we only want to read
146 if (!readOnly) {
147 SinkWarning() << "Failed to open db " << QByteArray(mdb_strerror(rc));
148 Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc)));
149 errorHandler ? errorHandler(error) : defaultErrorHandler(error);
150 }
151 return false;
107 } 152 }
108 return false;
109 } 153 }
110 sDbis.insert(dbiName, dbi); 154
155 createdNewDbi = true;
156 createdDbName = dbiName;
111 } 157 }
112 return true; 158 return true;
113 } 159 }
@@ -316,7 +362,7 @@ void DataStore::NamedDatabase::findLatest(const QByteArray &k, const std::functi
316 362
317 rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); 363 rc = mdb_cursor_open(d->transaction, d->dbi, &cursor);
318 if (rc) { 364 if (rc) {
319 Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor open: ") + QByteArray(mdb_strerror(rc))); 365 Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc)));
320 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 366 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
321 return; 367 return;
322 } 368 }
@@ -391,8 +437,8 @@ qint64 DataStore::NamedDatabase::getSize()
391class DataStore::Transaction::Private 437class DataStore::Transaction::Private
392{ 438{
393public: 439public:
394 Private(bool _requestRead, const std::function<void(const DataStore::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_env *_env) 440 Private(bool _requestRead, const std::function<void(const DataStore::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_env *_env, bool _noLock = false)
395 : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0) 441 : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0), noLock(_noLock)
396 { 442 {
397 } 443 }
398 ~Private() 444 ~Private()
@@ -408,6 +454,9 @@ public:
408 bool implicitCommit; 454 bool implicitCommit;
409 bool error; 455 bool error;
410 int modificationCounter; 456 int modificationCounter;
457 bool noLock;
458
459 QMap<QString, MDB_dbi> createdDbs;
411 460
412 void startTransaction() 461 void startTransaction()
413 { 462 {
@@ -486,6 +535,21 @@ bool DataStore::Transaction::commit(const std::function<void(const DataStore::Er
486 } 535 }
487 d->transaction = nullptr; 536 d->transaction = nullptr;
488 537
538 //Add the created dbis to the shared environment
539 if (!d->createdDbs.isEmpty()) {
540 if (!d->noLock) {
541 sMutex.lock();
542 }
543 for (auto it = d->createdDbs.constBegin(); it != d->createdDbs.constEnd(); it++) {
544 Q_ASSERT(!sDbis.contains(it.key()));
545 sDbis.insert(it.key(), it.value());
546 }
547 d->createdDbs.clear();
548 if (!d->noLock) {
549 sMutex.unlock();
550 }
551 }
552
489 return !rc; 553 return !rc;
490} 554}
491 555
@@ -495,6 +559,7 @@ void DataStore::Transaction::abort()
495 return; 559 return;
496 } 560 }
497 561
562 d->createdDbs.clear();
498 // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; 563 // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction;
499 Q_ASSERT(sEnvironments.values().contains(d->env)); 564 Q_ASSERT(sEnvironments.values().contains(d->env));
500 mdb_txn_abort(d->transaction); 565 mdb_txn_abort(d->transaction);
@@ -527,14 +592,6 @@ static bool ensureCorrectDb(DataStore::NamedDatabase &database, const QByteArray
527 592
528bool DataStore::Transaction::validateNamedDatabases() 593bool DataStore::Transaction::validateNamedDatabases()
529{ 594{
530 auto databases = getDatabaseNames();
531 for (const auto &dbName : databases) {
532 auto db = openDatabase(dbName);
533 if (!db) {
534 SinkWarning() << "Failed to open the database: " << dbName;
535 return false;
536 }
537 }
538 return true; 595 return true;
539} 596}
540 597
@@ -548,13 +605,27 @@ DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &
548 // We don't now if anything changed 605 // We don't now if anything changed
549 d->implicitCommit = true; 606 d->implicitCommit = true;
550 auto p = new DataStore::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); 607 auto p = new DataStore::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction);
608 if (!d->noLock) {
609 //This wouldn't be necessary with a read-write lock, we could read-only lock here
610 sMutex.lock();
611 }
551 if (!p->openDatabase(d->requestedRead, errorHandler)) { 612 if (!p->openDatabase(d->requestedRead, errorHandler)) {
613 if (!d->noLock) {
614 sMutex.unlock();
615 }
552 delete p; 616 delete p;
553 return DataStore::NamedDatabase(); 617 return DataStore::NamedDatabase();
554 } 618 }
619 if (!d->noLock) {
620 sMutex.unlock();
621 }
622 if (p->createdNewDbi) {
623 d->createdDbs.insert(p->createdDbName, p->dbi);
624 }
555 auto database = DataStore::NamedDatabase(p); 625 auto database = DataStore::NamedDatabase(p);
556 if (!ensureCorrectDb(database, db, d->requestedRead)) { 626 if (!ensureCorrectDb(database, db, d->requestedRead)) {
557 SinkWarning() << "Failed to open the database" << db; 627 SinkWarning() << "Failed to open the database correctly" << db;
628 Q_ASSERT(false);
558 return DataStore::NamedDatabase(); 629 return DataStore::NamedDatabase();
559 } 630 }
560 return database; 631 return database;
@@ -569,6 +640,7 @@ QList<QByteArray> DataStore::Transaction::getDatabaseNames() const
569 640
570 int rc; 641 int rc;
571 QList<QByteArray> list; 642 QList<QByteArray> list;
643 Q_ASSERT(d->transaction);
572 if ((rc = mdb_dbi_open(d->transaction, nullptr, 0, &d->dbi) == 0)) { 644 if ((rc = mdb_dbi_open(d->transaction, nullptr, 0, &d->dbi) == 0)) {
573 MDB_val key; 645 MDB_val key;
574 MDB_val data; 646 MDB_val data;
@@ -657,6 +729,17 @@ DataStore::Private::Private(const QString &s, const QString &n, AccessMode m) :
657 mdb_env_set_mapsize(env, dbSize); 729 mdb_env_set_mapsize(env, dbSize);
658 } 730 }
659 sEnvironments.insert(fullPath, env); 731 sEnvironments.insert(fullPath, env);
732 //Open all available dbi's
733 bool noLock = true;
734 bool requestedRead = m == ReadOnly;
735 auto t = Transaction(new Transaction::Private(requestedRead, nullptr, name, env, noLock));
736 for (const auto &db : t.getDatabaseNames()) {
737 SinkLog() << "Opening initial db: " << db;
738 //Get dbi to store for future use.
739 t.openDatabase(db);
740 }
741 //To persist the dbis
742 t.commit();
660 } 743 }
661 } 744 }
662 } 745 }
diff --git a/tests/storagetest.cpp b/tests/storagetest.cpp
index 5a517c7..7202628 100644
--- a/tests/storagetest.cpp
+++ b/tests/storagetest.cpp
@@ -451,6 +451,42 @@ private slots:
451 db.findLatest(uid, [&](const QByteArray &key, const QByteArray &value) { result = value; }); 451 db.findLatest(uid, [&](const QByteArray &key, const QByteArray &value) { result = value; });
452 QCOMPARE(result, QByteArray("value2")); 452 QCOMPARE(result, QByteArray("value2"));
453 } 453 }
454
455 void testTransactionVisibility()
456 {
457 auto readValue = [](const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray) {
458 QByteArray result;
459 db.scan("key1", [&](const QByteArray &, const QByteArray &value) {
460 result = value;
461 return true;
462 });
463 return result;
464 };
465 {
466 Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite);
467 auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite);
468
469 auto db = transaction.openDatabase("testTransactionVisibility", nullptr, false);
470 db.write("key1", "foo");
471 QCOMPARE(readValue(db, "key1"), QByteArray("foo"));
472
473 {
474 auto transaction2 = store.createTransaction(Sink::Storage::DataStore::ReadOnly);
475 auto db2 = transaction2
476 .openDatabase("testTransactionVisibility", nullptr, false);
477 QCOMPARE(readValue(db2, "key1"), QByteArray());
478 }
479 transaction.commit();
480 {
481 auto transaction2 = store.createTransaction(Sink::Storage::DataStore::ReadOnly);
482 auto db2 = transaction2
483 .openDatabase("testTransactionVisibility", nullptr, false);
484 QCOMPARE(readValue(db2, "key1"), QByteArray("foo"));
485 }
486
487 }
488 }
489
454}; 490};
455 491
456QTEST_MAIN(StorageTest) 492QTEST_MAIN(StorageTest)