From 6746247a49f09287ae4924c5c3df791f9bf61cbc Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sat, 22 Aug 2015 11:25:26 +0200 Subject: Use named databases in storage. This will allow us to create indexes in the same store. --- common/storage.h | 61 ++++++++-- common/storage_lmdb.cpp | 297 +++++++++++++++++++++++++++++------------------- 2 files changed, 231 insertions(+), 127 deletions(-) (limited to 'common') diff --git a/common/storage.h b/common/storage.h index a7241a7..2f7a2df 100644 --- a/common/storage.h +++ b/common/storage.h @@ -51,16 +51,12 @@ public: int code; }; - class Transaction + class Transaction; + class NamedDatabase { public: - Transaction(); - ~Transaction(); - bool commit(const std::function &errorHandler = std::function()); - void abort(); - - void setAutocommit(int interval); - + NamedDatabase(); + ~NamedDatabase(); /** * Write a value */ @@ -73,22 +69,57 @@ public: const std::function &errorHandler = std::function()); /** * Read values with a given key. - * + * * * An empty @param key results in a full scan * * If duplicates are existing (revisions), all values are returned. * * The pointers of the returned values are valid during the execution of the @param resultHandler - * + * * @return The number of values retrieved. */ int scan(const QByteArray &k, const std::function &resultHandler, const std::function &errorHandler = std::function()) const; + NamedDatabase(NamedDatabase&& other) : d(other.d) + { + d = other.d; + other.d = nullptr; + } + + NamedDatabase& operator=(NamedDatabase&& other) { + d = other.d; + other.d = nullptr; + return *this; + } + + operator bool() const { + return (d != nullptr); + } + + private: + friend Transaction; + NamedDatabase(NamedDatabase& other); + NamedDatabase& operator=(NamedDatabase& other); + class Private; + NamedDatabase(Private*); + Private *d; + }; + + class Transaction + { + public: + Transaction(); + ~Transaction(); + bool commit(const std::function &errorHandler = std::function()); + void abort(); + + NamedDatabase openDatabase(const QByteArray &name = QByteArray("default"), const std::function &errorHandler = std::function()) const; + Transaction(Transaction&& other) : d(other.d) { d = other.d; other.d = nullptr; - } + } Transaction& operator=(Transaction&& other) { d = other.d; other.d = nullptr; @@ -98,6 +129,14 @@ public: operator bool() const { return (d != nullptr); } + + bool write(const QByteArray &key, const QByteArray &value, const std::function &errorHandler = std::function()); + + void remove(const QByteArray &key, + const std::function &errorHandler = std::function()); + int scan(const QByteArray &k, + const std::function &resultHandler, + const std::function &errorHandler = std::function()) const; private: Transaction(Transaction& other); Transaction& operator=(Transaction& other); diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 0618d61..ebb3be3 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -47,120 +47,69 @@ int getErrorCode(int e) return -1; } - - -class Storage::Transaction::Private +class Storage::NamedDatabase::Private { public: - Private(bool _requestRead, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name, MDB_env *_env) - : env(_env), - requestedRead(_requestRead), + Private(const QByteArray &_db, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name, MDB_txn *_txn) + : db(_db), + transaction(_txn), allowDuplicates(_allowDuplicates), defaultErrorHandler(_defaultErrorHandler), - name(_name), - implicitCommit(false), - error(false), - autoCommitInterval(0), - modificationCounter(0) + name(_name) { - } + ~Private() { } - MDB_env *env; + QByteArray db; MDB_txn *transaction; MDB_dbi dbi; - bool requestedRead; bool allowDuplicates; std::function defaultErrorHandler; QString name; - bool implicitCommit; - bool error; - int autoCommitInterval; - int modificationCounter; - void startTransaction() + bool openDatabase(std::function errorHandler) { - const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction); - if (rc) { - defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); + unsigned int flags = MDB_CREATE; + if (allowDuplicates) { + flags |= MDB_DUPSORT; } - } - - void openDatabase() - { - const int rc = mdb_dbi_open(transaction, NULL, allowDuplicates ? MDB_DUPSORT : 0, &dbi); - if (rc) { - defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc)))); + if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { + qWarning() << "Failed to open: " << rc << db; + dbi = 0; + transaction = 0; + Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc))); + errorHandler ? errorHandler(error) : defaultErrorHandler(error); + return false; } + return true; } }; -Storage::Transaction::Transaction() - : d(0) +Storage::NamedDatabase::NamedDatabase() + : d(nullptr) { } -Storage::Transaction::Transaction(Transaction::Private *prv) +Storage::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv) { - d->startTransaction(); - d->openDatabase(); } -Storage::Transaction::~Transaction() +Storage::NamedDatabase::~NamedDatabase() { - if (d && d->transaction) { - if (d->implicitCommit && !d->error) { - commit(); - } else { - mdb_txn_abort(d->transaction); - } - } delete d; } -bool Storage::Transaction::commit(const std::function &errorHandler) -{ - if (!d) { - return false; - } - - const int rc = mdb_txn_commit(d->transaction); - if (rc) { - mdb_txn_abort(d->transaction); - Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Error during transaction commit: " + QByteArray(mdb_strerror(rc))); - errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); - } - d->transaction = nullptr; - - return !rc; -} - -void Storage::Transaction::abort() -{ - if (!d || !d->transaction) { - return; - } - - mdb_txn_abort(d->transaction); - d->transaction = nullptr; -} - -void Storage::Transaction::setAutocommit(int interval) -{ - if (d) { - d->autoCommitInterval = interval; - } -} - -bool Storage::Transaction::write(const QByteArray &sKey, const QByteArray &sValue, const std::function &errorHandler) +bool Storage::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function &errorHandler) { if (!d || !d->transaction) { + Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Not open"); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return false; } const void *keyPtr = sKey.data(); @@ -183,33 +132,43 @@ bool Storage::Transaction::write(const QByteArray &sKey, const QByteArray &sValu rc = mdb_put(d->transaction, d->dbi, &key, &data, 0); if (rc) { - d->error = true; Error error(d->name.toLatin1(), ErrorCodes::GenericError, "mdb_put: " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); - } else { - d->implicitCommit = true; } - if (d->autoCommitInterval > 0) { - d->modificationCounter++; - if (d->modificationCounter >= d->autoCommitInterval) { - commit(); - d->startTransaction(); - d->openDatabase(); - d->modificationCounter = 0; - } + return !rc; +} + +void Storage::NamedDatabase::remove(const QByteArray &k, + const std::function &errorHandler) +{ + if (!d || !d->transaction) { + Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Not open"); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + return; } - return !rc; + int rc; + MDB_val key; + key.mv_size = k.size(); + key.mv_data = const_cast(static_cast(k.data())); + rc = mdb_del(d->transaction, d->dbi, &key, 0); + + if (rc) { + Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Error on mdb_del: %1 %2").arg(rc).arg(mdb_strerror(rc)).toLatin1()); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + } + + return; } -int Storage::Transaction::scan(const QByteArray &k, +int Storage::NamedDatabase::scan(const QByteArray &k, const std::function &resultHandler, const std::function &errorHandler) const { if (!d || !d->transaction) { - Error error(d->name.toLatin1(), ErrorCodes::NotOpen, "Not open"); - errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + // Error error(d->name.toLatin1(), ErrorCodes::NotOpen, "Not open"); + // errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return 0; } @@ -231,7 +190,7 @@ int Storage::Transaction::scan(const QByteArray &k, int numberOfRetrievedValues = 0; if (k.isEmpty() || d->allowDuplicates) { - if ((rc = mdb_cursor_get(cursor, &key, &data, d->allowDuplicates ? MDB_SET_RANGE : MDB_FIRST)) == 0) { + if ((rc = mdb_cursor_get(cursor, &key, &data, d->allowDuplicates ? MDB_SET : MDB_FIRST)) == 0) { numberOfRetrievedValues++; if (resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size))) { while ((rc = mdb_cursor_get(cursor, &key, &data, d->allowDuplicates ? MDB_NEXT_DUP : MDB_NEXT)) == 0) { @@ -264,34 +223,142 @@ int Storage::Transaction::scan(const QByteArray &k, return numberOfRetrievedValues; } -void Storage::Transaction::remove(const QByteArray &k, - const std::function &errorHandler) + + + +class Storage::Transaction::Private +{ +public: + Private(bool _requestRead, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name, MDB_env *_env) + : env(_env), + requestedRead(_requestRead), + allowDuplicates(_allowDuplicates), + defaultErrorHandler(_defaultErrorHandler), + name(_name), + implicitCommit(false), + error(false), + modificationCounter(0) + { + + } + ~Private() + { + + } + + MDB_env *env; + MDB_txn *transaction; + MDB_dbi dbi; + bool requestedRead; + bool allowDuplicates; + std::function defaultErrorHandler; + QString name; + bool implicitCommit; + bool error; + int modificationCounter; + + void startTransaction() + { + // qDebug() << "Opening transaction " << requestedRead; + const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction); + if (rc) { + defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); + } + } +}; + +Storage::Transaction::Transaction() + : d(nullptr) +{ + +} + +Storage::Transaction::Transaction(Transaction::Private *prv) + : d(prv) +{ + d->startTransaction(); +} + +Storage::Transaction::~Transaction() +{ + if (d && d->transaction) { + if (d->implicitCommit && !d->error) { + // qDebug() << "implicit commit"; + commit(); + } else { + // qDebug() << "Aorting transaction"; + mdb_txn_abort(d->transaction); + } + } + delete d; +} + +bool Storage::Transaction::commit(const std::function &errorHandler) { if (!d || !d->transaction) { - Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Not open"); + return false; + } + + const int rc = mdb_txn_commit(d->transaction); + if (rc) { + mdb_txn_abort(d->transaction); + Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Error during transaction commit: " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + } + d->transaction = nullptr; + + return !rc; +} + +void Storage::Transaction::abort() +{ + if (!d || !d->transaction) { return; } - int rc; - MDB_val key; - key.mv_size = k.size(); - key.mv_data = const_cast(static_cast(k.data())); - rc = mdb_del(d->transaction, d->dbi, &key, 0); + mdb_txn_abort(d->transaction); + d->transaction = nullptr; +} - if (rc) { +Storage::NamedDatabase Storage::Transaction::openDatabase(const QByteArray &db, const std::function &errorHandler) const +{ + if (!d) { + return Storage::NamedDatabase(); + } + //We don't now if anything changed + d->implicitCommit = true; + auto p = new Storage::NamedDatabase::Private(db, d->allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); + p->openDatabase(errorHandler); + return Storage::NamedDatabase(p); +} + +bool Storage::Transaction::write(const QByteArray &key, const QByteArray &value, const std::function &errorHandler) +{ + openDatabase().write(key, value, [this, errorHandler](const Storage::Error &error) { d->error = true; - Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Error on mdb_del: %1 %2").arg(rc).arg(mdb_strerror(rc)).toLatin1()); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); - } else { - d->implicitCommit = true; - } + }); + d->implicitCommit = true; - return; + return !d->error; } +void Storage::Transaction::remove(const QByteArray &k, + const std::function &errorHandler) +{ + openDatabase().remove(k, [this, errorHandler](const Storage::Error &error) { + d->error = true; + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + }); + d->implicitCommit = true; +} - +int Storage::Transaction::scan(const QByteArray &k, + const std::function &resultHandler, + const std::function &errorHandler) const +{ + return openDatabase().scan(k, resultHandler, errorHandler); +} @@ -310,9 +377,7 @@ public: QString storageRoot; QString name; - MDB_dbi dbi; MDB_env *env; - MDB_txn *transaction; AccessMode mode; bool readTransaction; bool firstOpen; @@ -328,7 +393,6 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m, bool : storageRoot(s), name(n), env(0), - transaction(0), mode(m), readTransaction(false), firstOpen(true), @@ -357,7 +421,12 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m, bool // TODO: handle error std::cerr << "mdb_env_create: " << rc << " " << mdb_strerror(rc) << std::endl; } else { - if ((rc = mdb_env_open(env, fullPath.toStdString().data(), mode == ReadOnly ? MDB_RDONLY : 0 | MDB_NOTLS, 0664))) { + mdb_env_set_maxdbs(env, 10); + unsigned int flags = MDB_NOTLS; + if (mode == ReadOnly) { + flags |= MDB_RDONLY; + } + if ((rc = mdb_env_open(env, fullPath.toStdString().data(), flags, 0664))) { std::cerr << "mdb_env_open: " << rc << " " << mdb_strerror(rc) << std::endl; mdb_env_close(env); env = 0; @@ -374,10 +443,6 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m, bool Storage::Private::~Private() { - if (transaction) { - mdb_txn_abort(transaction); - } - //Since we can have only one environment open per process, we currently leak the environments. // if (env) { // //mdb_dbi_close should not be necessary and is potentially dangerous (see docs) -- cgit v1.2.3