From 6746247a49f09287ae4924c5c3df791f9bf61cbc Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 22 Aug 2015 11:25:26 +0200 Subject: Use named databases in storage. This will allow us to create indexes in the same store. --- common/storage.h | 61 ++++++++-- common/storage_lmdb.cpp | 297 +++++++++++++++++++++++++++------------------ tests/indextest.cpp | 21 ++-- tests/storagebenchmark.cpp | 8 +- tests/storagetest.cpp | 114 +++++++++++++++++ 5 files changed, 362 insertions(+), 139 deletions(-) diff --git a/common/storage.h b/common/storage.h index a7241a7..2f7a2df 100644 --- a/common/storage.h +++ b/common/storage.h @@ -51,16 +51,12 @@ public: int code; }; - class Transaction + class Transaction; + class NamedDatabase { public: - Transaction(); - ~Transaction(); - bool commit(const std::function &errorHandler = std::function()); - void abort(); - - void setAutocommit(int interval); - + NamedDatabase(); + ~NamedDatabase(); /** * Write a value */ @@ -73,22 +69,57 @@ public: const std::function &errorHandler = std::function()); /** * Read values with a given key. - * + * * * An empty @param key results in a full scan * * If duplicates are existing (revisions), all values are returned. * * The pointers of the returned values are valid during the execution of the @param resultHandler - * + * * @return The number of values retrieved. */ int scan(const QByteArray &k, const std::function &resultHandler, const std::function &errorHandler = std::function()) const; + NamedDatabase(NamedDatabase&& other) : d(other.d) + { + d = other.d; + other.d = nullptr; + } + + NamedDatabase& operator=(NamedDatabase&& other) { + d = other.d; + other.d = nullptr; + return *this; + } + + operator bool() const { + return (d != nullptr); + } + + private: + friend Transaction; + NamedDatabase(NamedDatabase& other); + NamedDatabase& operator=(NamedDatabase& other); + class Private; + NamedDatabase(Private*); + Private *d; + }; + + class Transaction + { + public: + Transaction(); + ~Transaction(); + bool commit(const std::function &errorHandler = std::function()); + void abort(); + + NamedDatabase openDatabase(const QByteArray &name = QByteArray("default"), const std::function &errorHandler = std::function()) const; + Transaction(Transaction&& other) : d(other.d) { d = other.d; other.d = nullptr; - } + } Transaction& operator=(Transaction&& other) { d = other.d; other.d = nullptr; @@ -98,6 +129,14 @@ public: operator bool() const { return (d != nullptr); } + + bool write(const QByteArray &key, const QByteArray &value, const std::function &errorHandler = std::function()); + + void remove(const QByteArray &key, + const std::function &errorHandler = std::function()); + int scan(const QByteArray &k, + const std::function &resultHandler, + const std::function &errorHandler = std::function()) const; private: Transaction(Transaction& other); Transaction& operator=(Transaction& other); diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 0618d61..ebb3be3 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -47,120 +47,69 @@ int getErrorCode(int e) return -1; } - - -class Storage::Transaction::Private +class Storage::NamedDatabase::Private { public: - Private(bool _requestRead, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name, MDB_env *_env) - : env(_env), - requestedRead(_requestRead), + Private(const QByteArray &_db, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name, MDB_txn *_txn) + : db(_db), + transaction(_txn), allowDuplicates(_allowDuplicates), defaultErrorHandler(_defaultErrorHandler), - name(_name), - implicitCommit(false), - error(false), - autoCommitInterval(0), - modificationCounter(0) + name(_name) { - } + ~Private() { } - MDB_env *env; + QByteArray db; MDB_txn *transaction; MDB_dbi dbi; - bool requestedRead; bool allowDuplicates; std::function defaultErrorHandler; QString name; - bool implicitCommit; - bool error; - int autoCommitInterval; - int modificationCounter; - void startTransaction() + bool openDatabase(std::function errorHandler) { - const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction); - if (rc) { - defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); + unsigned int flags = MDB_CREATE; + if (allowDuplicates) { + flags |= MDB_DUPSORT; } - } - - void openDatabase() - { - const int rc = mdb_dbi_open(transaction, NULL, allowDuplicates ? MDB_DUPSORT : 0, &dbi); - if (rc) { - defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc)))); + if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { + qWarning() << "Failed to open: " << rc << db; + dbi = 0; + transaction = 0; + Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc))); + errorHandler ? errorHandler(error) : defaultErrorHandler(error); + return false; } + return true; } }; -Storage::Transaction::Transaction() - : d(0) +Storage::NamedDatabase::NamedDatabase() + : d(nullptr) { } -Storage::Transaction::Transaction(Transaction::Private *prv) +Storage::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv) { - d->startTransaction(); - d->openDatabase(); } -Storage::Transaction::~Transaction() +Storage::NamedDatabase::~NamedDatabase() { - if (d && d->transaction) { - if (d->implicitCommit && !d->error) { - commit(); - } else { - mdb_txn_abort(d->transaction); - } - } delete d; } -bool Storage::Transaction::commit(const std::function &errorHandler) -{ - if (!d) { - return false; - } - - const int rc = mdb_txn_commit(d->transaction); - if (rc) { - mdb_txn_abort(d->transaction); - Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Error during transaction commit: " + QByteArray(mdb_strerror(rc))); - errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); - } - d->transaction = nullptr; - - return !rc; -} - -void Storage::Transaction::abort() -{ - if (!d || !d->transaction) { - return; - } - - mdb_txn_abort(d->transaction); - d->transaction = nullptr; -} - -void Storage::Transaction::setAutocommit(int interval) -{ - if (d) { - d->autoCommitInterval = interval; - } -} - -bool Storage::Transaction::write(const QByteArray &sKey, const QByteArray &sValue, const std::function &errorHandler) +bool Storage::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function &errorHandler) { if (!d || !d->transaction) { + Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Not open"); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return false; } const void *keyPtr = sKey.data(); @@ -183,33 +132,43 @@ bool Storage::Transaction::write(const QByteArray &sKey, const QByteArray &sValu rc = mdb_put(d->transaction, d->dbi, &key, &data, 0); if (rc) { - d->error = true; Error error(d->name.toLatin1(), ErrorCodes::GenericError, "mdb_put: " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); - } else { - d->implicitCommit = true; } - if (d->autoCommitInterval > 0) { - d->modificationCounter++; - if (d->modificationCounter >= d->autoCommitInterval) { - commit(); - d->startTransaction(); - d->openDatabase(); - d->modificationCounter = 0; - } + return !rc; +} + +void Storage::NamedDatabase::remove(const QByteArray &k, + const std::function &errorHandler) +{ + if (!d || !d->transaction) { + Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Not open"); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + return; } - return !rc; + int rc; + MDB_val key; + key.mv_size = k.size(); + key.mv_data = const_cast(static_cast(k.data())); + rc = mdb_del(d->transaction, d->dbi, &key, 0); + + if (rc) { + Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Error on mdb_del: %1 %2").arg(rc).arg(mdb_strerror(rc)).toLatin1()); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + } + + return; } -int Storage::Transaction::scan(const QByteArray &k, +int Storage::NamedDatabase::scan(const QByteArray &k, const std::function &resultHandler, const std::function &errorHandler) const { if (!d || !d->transaction) { - Error error(d->name.toLatin1(), ErrorCodes::NotOpen, "Not open"); - errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + // Error error(d->name.toLatin1(), ErrorCodes::NotOpen, "Not open"); + // errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return 0; } @@ -231,7 +190,7 @@ int Storage::Transaction::scan(const QByteArray &k, int numberOfRetrievedValues = 0; if (k.isEmpty() || d->allowDuplicates) { - if ((rc = mdb_cursor_get(cursor, &key, &data, d->allowDuplicates ? MDB_SET_RANGE : MDB_FIRST)) == 0) { + if ((rc = mdb_cursor_get(cursor, &key, &data, d->allowDuplicates ? MDB_SET : MDB_FIRST)) == 0) { numberOfRetrievedValues++; if (resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size))) { while ((rc = mdb_cursor_get(cursor, &key, &data, d->allowDuplicates ? MDB_NEXT_DUP : MDB_NEXT)) == 0) { @@ -264,34 +223,142 @@ int Storage::Transaction::scan(const QByteArray &k, return numberOfRetrievedValues; } -void Storage::Transaction::remove(const QByteArray &k, - const std::function &errorHandler) + + + +class Storage::Transaction::Private +{ +public: + Private(bool _requestRead, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name, MDB_env *_env) + : env(_env), + requestedRead(_requestRead), + allowDuplicates(_allowDuplicates), + defaultErrorHandler(_defaultErrorHandler), + name(_name), + implicitCommit(false), + error(false), + modificationCounter(0) + { + + } + ~Private() + { + + } + + MDB_env *env; + MDB_txn *transaction; + MDB_dbi dbi; + bool requestedRead; + bool allowDuplicates; + std::function defaultErrorHandler; + QString name; + bool implicitCommit; + bool error; + int modificationCounter; + + void startTransaction() + { + // qDebug() << "Opening transaction " << requestedRead; + const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction); + if (rc) { + defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); + } + } +}; + +Storage::Transaction::Transaction() + : d(nullptr) +{ + +} + +Storage::Transaction::Transaction(Transaction::Private *prv) + : d(prv) +{ + d->startTransaction(); +} + +Storage::Transaction::~Transaction() +{ + if (d && d->transaction) { + if (d->implicitCommit && !d->error) { + // qDebug() << "implicit commit"; + commit(); + } else { + // qDebug() << "Aorting transaction"; + mdb_txn_abort(d->transaction); + } + } + delete d; +} + +bool Storage::Transaction::commit(const std::function &errorHandler) { if (!d || !d->transaction) { - Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Not open"); + return false; + } + + const int rc = mdb_txn_commit(d->transaction); + if (rc) { + mdb_txn_abort(d->transaction); + Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Error during transaction commit: " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + } + d->transaction = nullptr; + + return !rc; +} + +void Storage::Transaction::abort() +{ + if (!d || !d->transaction) { return; } - int rc; - MDB_val key; - key.mv_size = k.size(); - key.mv_data = const_cast(static_cast(k.data())); - rc = mdb_del(d->transaction, d->dbi, &key, 0); + mdb_txn_abort(d->transaction); + d->transaction = nullptr; +} - if (rc) { +Storage::NamedDatabase Storage::Transaction::openDatabase(const QByteArray &db, const std::function &errorHandler) const +{ + if (!d) { + return Storage::NamedDatabase(); + } + //We don't now if anything changed + d->implicitCommit = true; + auto p = new Storage::NamedDatabase::Private(db, d->allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); + p->openDatabase(errorHandler); + return Storage::NamedDatabase(p); +} + +bool Storage::Transaction::write(const QByteArray &key, const QByteArray &value, const std::function &errorHandler) +{ + openDatabase().write(key, value, [this, errorHandler](const Storage::Error &error) { d->error = true; - Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Error on mdb_del: %1 %2").arg(rc).arg(mdb_strerror(rc)).toLatin1()); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); - } else { - d->implicitCommit = true; - } + }); + d->implicitCommit = true; - return; + return !d->error; } +void Storage::Transaction::remove(const QByteArray &k, + const std::function &errorHandler) +{ + openDatabase().remove(k, [this, errorHandler](const Storage::Error &error) { + d->error = true; + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + }); + d->implicitCommit = true; +} - +int Storage::Transaction::scan(const QByteArray &k, + const std::function &resultHandler, + const std::function &errorHandler) const +{ + return openDatabase().scan(k, resultHandler, errorHandler); +} @@ -310,9 +377,7 @@ public: QString storageRoot; QString name; - MDB_dbi dbi; MDB_env *env; - MDB_txn *transaction; AccessMode mode; bool readTransaction; bool firstOpen; @@ -328,7 +393,6 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m, bool : storageRoot(s), name(n), env(0), - transaction(0), mode(m), readTransaction(false), firstOpen(true), @@ -357,7 +421,12 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m, bool // TODO: handle error std::cerr << "mdb_env_create: " << rc << " " << mdb_strerror(rc) << std::endl; } else { - if ((rc = mdb_env_open(env, fullPath.toStdString().data(), mode == ReadOnly ? MDB_RDONLY : 0 | MDB_NOTLS, 0664))) { + mdb_env_set_maxdbs(env, 10); + unsigned int flags = MDB_NOTLS; + if (mode == ReadOnly) { + flags |= MDB_RDONLY; + } + if ((rc = mdb_env_open(env, fullPath.toStdString().data(), flags, 0664))) { std::cerr << "mdb_env_open: " << rc << " " << mdb_strerror(rc) << std::endl; mdb_env_close(env); env = 0; @@ -374,10 +443,6 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m, bool Storage::Private::~Private() { - if (transaction) { - mdb_txn_abort(transaction); - } - //Since we can have only one environment open per process, we currently leak the environments. // if (env) { // //mdb_dbi_close should not be necessary and is potentially dangerous (see docs) diff --git a/tests/indextest.cpp b/tests/indextest.cpp index 24f90c8..e3eabcc 100644 --- a/tests/indextest.cpp +++ b/tests/indextest.cpp @@ -13,38 +13,39 @@ class IndexTest : public QObject private Q_SLOTS: void initTestCase() { - Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy.testindex", Akonadi2::Storage::ReadWrite); + Akonadi2::Storage store("./testindex", "org.kde.dummy.testindex", Akonadi2::Storage::ReadWrite); store.removeFromDisk(); } void cleanup() { - Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy.testindex", Akonadi2::Storage::ReadWrite); + Akonadi2::Storage store("./testindex", "org.kde.dummy.testindex", Akonadi2::Storage::ReadWrite); store.removeFromDisk(); } void testIndex() { - Index index(Akonadi2::Store::storageLocation(), "org.kde.dummy.testindex", Akonadi2::Storage::ReadWrite); - index.add("key1", "value1"); - index.add("key1", "value2"); - index.add("key2", "value3"); + Index index("./testindex", "org.kde.dummy.testindex", Akonadi2::Storage::ReadWrite); + //The first key is specifically a substring of the second key + index.add("key", "value1"); + index.add("keyFoo", "value2"); + index.add("keyFoo", "value3"); { QList values; - index.lookup(QByteArray("key1"), [&values](const QByteArray &value) { + index.lookup(QByteArray("key"), [&values](const QByteArray &value) { values << value; }, [](const Index::Error &error){ qWarning() << "Error: "; }); - QCOMPARE(values.size(), 2); + QCOMPARE(values.size(), 1); } { QList values; - index.lookup(QByteArray("key2"), [&values](const QByteArray &value) { + index.lookup(QByteArray("keyFoo"), [&values](const QByteArray &value) { values << value; }, [](const Index::Error &error){ qWarning() << "Error: "; }); - QCOMPARE(values.size(), 1); + QCOMPARE(values.size(), 2); } { QList values; diff --git a/tests/storagebenchmark.cpp b/tests/storagebenchmark.cpp index ce1005d..f143c4d 100644 --- a/tests/storagebenchmark.cpp +++ b/tests/storagebenchmark.cpp @@ -97,9 +97,12 @@ private Q_SLOTS: auto event = createEvent(); if (store) { auto transaction = store->createTransaction(Akonadi2::Storage::ReadWrite); - transaction.setAutocommit(10000); for (int i = 0; i < count; i++) { transaction.write(keyPrefix + QByteArray::number(i), event); + if ((i % 10000) == 0) { + transaction.commit(); + transaction = store->createTransaction(Akonadi2::Storage::ReadWrite); + } } transaction.commit(); } else { @@ -116,8 +119,9 @@ private Q_SLOTS: { if (store) { auto transaction = store->createTransaction(Akonadi2::Storage::ReadOnly); + auto db = transaction.openDatabase(); for (int i = 0; i < count; i++) { - transaction.scan(keyPrefix + QByteArray::number(i), [](const QByteArray &key, const QByteArray &value) -> bool { return true; }); + db.scan(keyPrefix + QByteArray::number(i), [](const QByteArray &key, const QByteArray &value) -> bool { return true; }); } } } diff --git a/tests/storagetest.cpp b/tests/storagetest.cpp index 55ec888..fe80bb7 100644 --- a/tests/storagetest.cpp +++ b/tests/storagetest.cpp @@ -211,6 +211,120 @@ private Q_SLOTS: storage2.removeFromDisk(); } } + + void testNoDuplicates() + { + bool gotResult = false; + bool gotError = false; + Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite, false); + auto transaction = store.createTransaction(Akonadi2::Storage::ReadWrite); + auto db = transaction.openDatabase(); + db.write("key","value"); + db.write("key","value"); + + int numValues = db.scan("", [&](const QByteArray &key, const QByteArray &value) -> bool { + gotResult = true; + return true; + }, + [&](const Akonadi2::Storage::Error &error) { + qDebug() << error.message; + gotError = true; + }); + + QCOMPARE(numValues, 1); + QVERIFY(!gotError); + } + + void testDuplicates() + { + bool gotResult = false; + bool gotError = false; + Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite, true); + auto transaction = store.createTransaction(Akonadi2::Storage::ReadWrite); + auto db = transaction.openDatabase(); + db.write("key","value1"); + db.write("key","value2"); + int numValues = db.scan("key", [&](const QByteArray &key, const QByteArray &value) -> bool { + gotResult = true; + return true; + }, + [&](const Akonadi2::Storage::Error &error) { + qDebug() << error.message; + gotError = true; + }); + + QCOMPARE(numValues, 2); + QVERIFY(!gotError); + } + + void testNonexitingNamedDb() + { + bool gotResult = false; + bool gotError = false; + Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadOnly); + int numValues = store.createTransaction(Akonadi2::Storage::ReadOnly).openDatabase("test").scan("", [&](const QByteArray &key, const QByteArray &value) -> bool { + gotResult = true; + return false; + }, + [&](const Akonadi2::Storage::Error &error) { + qDebug() << error.message; + gotError = true; + }); + QCOMPARE(numValues, 0); + QVERIFY(!gotResult); + QVERIFY(!gotError); + } + + void testWriteToNamedDb() + { + bool gotError = false; + Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite); + store.createTransaction(Akonadi2::Storage::ReadWrite).openDatabase("test").write("key1", "value1", [&](const Akonadi2::Storage::Error &error) { + qDebug() << error.message; + gotError = true; + }); + QVERIFY(!gotError); + } + + void testWriteDuplicatesToNamedDb() + { + bool gotError = false; + Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite, true); + store.createTransaction(Akonadi2::Storage::ReadWrite).openDatabase("test").write("key1", "value1", [&](const Akonadi2::Storage::Error &error) { + qDebug() << error.message; + gotError = true; + }); + QVERIFY(!gotError); + } + + //By default we want only exact matches + void testSubstringKeys() + { + Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite, true); + auto transaction = store.createTransaction(Akonadi2::Storage::ReadWrite); + auto db = transaction.openDatabase(); + db.write("sub","value1"); + db.write("subsub","value2"); + int numValues = db.scan("sub", [&](const QByteArray &key, const QByteArray &value) -> bool { + return true; + }); + + QCOMPARE(numValues, 1); + } + + //Ensure we don't retrieve a key that is greater than the current key. We only want equal keys. + void testKeyRange() + { + Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite, true); + auto transaction = store.createTransaction(Akonadi2::Storage::ReadWrite); + auto db = transaction.openDatabase(); + db.write("sub1","value1"); + int numValues = db.scan("sub", [&](const QByteArray &key, const QByteArray &value) -> bool { + return true; + }); + + QCOMPARE(numValues, 0); + } }; QTEST_MAIN(StorageTest) -- cgit v1.2.3