From a8263a28f5d3a74581e289289d0807e6b656104b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 9 Aug 2015 23:17:42 +0200 Subject: Transaction class for storage The beginning of a cleaner and less bare-bones API for the storage. The lifetime of transactions is now handled in (movable) transaction objects. --- common/storage.h | 56 +++++++++ common/storage_lmdb.cpp | 306 +++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 334 insertions(+), 28 deletions(-) (limited to 'common') diff --git a/common/storage.h b/common/storage.h index 09365b0..77f11f9 100644 --- a/common/storage.h +++ b/common/storage.h @@ -51,8 +51,64 @@ public: int code; }; + class Transaction + { + public: + ~Transaction(); + bool commit(const std::function &errorHandler = std::function()); + void abort(); + + /** + * Write a value + */ + bool write(const QByteArray &key, const QByteArray &value, const std::function &errorHandler = std::function()); + + /** + * Remove a value + */ + void remove(const QByteArray &key, + const std::function &errorHandler); + /** + * Read values with a given key. + * + * * An empty @param key results in a full scan + * * If duplicates are existing (revisions), all values are returned. + * * The pointers of the returned values are valid during the execution of the @param resultHandler + * + * @return The number of values retrieved. + */ + int scan(const QByteArray &k, + const std::function &resultHandler, + const std::function &errorHandler = std::function()); + + Transaction(Transaction&& other) : d(other.d) + { + d = other.d; + other.d = nullptr; + } + Transaction& operator=(Transaction&& other) { + d = other.d; + other.d = nullptr; + return *this; + } + private: + Transaction(Transaction& other); + Transaction& operator=(Transaction& other); + friend Storage; + class Private; + Transaction(); + Transaction(Private*); + Private *d; + }; + Storage(const QString &storageRoot, const QString &name, AccessMode mode = ReadOnly, bool allowDuplicates = false); ~Storage(); + + Transaction createTransaction(AccessMode mode = ReadWrite, + const std::function &errorHandler = std::function()); + + + //Old API bool isInTransaction() const; bool startTransaction(AccessMode mode = ReadWrite, const std::function &errorHandler = std::function()); bool commitTransaction(const std::function &errorHandler = std::function()); diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 7bbf8b5..230409f 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -47,6 +47,220 @@ int getErrorCode(int e) return -1; } + + +class Storage::Transaction::Private +{ +public: + Private(MDB_txn *_txn, MDB_dbi _dbi, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name) + : transaction(_txn), + dbi(_dbi), + allowDuplicates(_allowDuplicates), + defaultErrorHandler(_defaultErrorHandler), + name(_name), + implicitCommit(false), + error(false) + { + + } + ~Private() + { + + } + + MDB_txn *transaction; + MDB_dbi dbi; + bool allowDuplicates; + std::function defaultErrorHandler; + QString name; + bool implicitCommit; + bool error; +}; + +Storage::Transaction::Transaction() + : d(0) +{ + +} + +Storage::Transaction::Transaction(Transaction::Private *prv) + : d(prv) +{ + +} + +Storage::Transaction::~Transaction() +{ + if (d && d->transaction) { + if (d->implicitCommit && !d->error) { + commit(); + } else { + mdb_txn_abort(d->transaction); + } + } + delete d; +} + +bool Storage::Transaction::commit(const std::function &errorHandler) +{ + if (!d) { + return false; + } + + const int rc = mdb_txn_commit(d->transaction); + if (rc) { + mdb_txn_abort(d->transaction); + Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Error during transaction commit: " + QByteArray(mdb_strerror(rc))); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + } + d->transaction = nullptr; + + return !rc; +} + +void Storage::Transaction::abort() +{ + if (!d || !d->transaction) { + return; + } + + mdb_txn_abort(d->transaction); + d->transaction = nullptr; +} + +bool Storage::Transaction::write(const QByteArray &sKey, const QByteArray &sValue, const std::function &errorHandler) +{ + if (!d || !d->transaction) { + return false; + } + const void *keyPtr = sKey.data(); + const size_t keySize = sKey.size(); + const void *valuePtr = sValue.data(); + const size_t valueSize = sValue.size(); + + if (!keyPtr || keySize == 0) { + Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Tried to write empty key."); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + return false; + } + + int rc; + MDB_val key, data; + key.mv_size = keySize; + key.mv_data = const_cast(keyPtr); + data.mv_size = valueSize; + data.mv_data = const_cast(valuePtr); + rc = mdb_put(d->transaction, d->dbi, &key, &data, 0); + + if (rc) { + d->error = true; + Error error(d->name.toLatin1(), ErrorCodes::GenericError, "mdb_put: " + QByteArray(mdb_strerror(rc))); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + } else { + d->implicitCommit = true; + } + + return !rc; +} + +int Storage::Transaction::scan(const QByteArray &k, + const std::function &resultHandler, + const std::function &errorHandler) +{ + if (!d || !d->transaction) { + Error error(d->name.toLatin1(), ErrorCodes::NotOpen, "Not open"); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + return 0; + } + + int rc; + MDB_val key; + MDB_val data; + MDB_cursor *cursor; + + key.mv_data = (void*)k.constData(); + key.mv_size = k.size(); + + rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); + if (rc) { + Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Error during mdb_cursor open: ") + QByteArray(mdb_strerror(rc))); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + return 0; + } + + int numberOfRetrievedValues = 0; + + if (k.isEmpty() || d->allowDuplicates) { + if ((rc = mdb_cursor_get(cursor, &key, &data, d->allowDuplicates ? MDB_SET_RANGE : MDB_FIRST)) == 0) { + numberOfRetrievedValues++; + if (resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size))) { + while ((rc = mdb_cursor_get(cursor, &key, &data, d->allowDuplicates ? MDB_NEXT_DUP : MDB_NEXT)) == 0) { + numberOfRetrievedValues++; + if (!resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size))) { + break; + } + } + } + } + + //We never find the last value + if (rc == MDB_NOTFOUND) { + rc = 0; + } + } else { + if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_SET)) == 0) { + numberOfRetrievedValues++; + resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size)); + } + } + + mdb_cursor_close(cursor); + + if (rc) { + Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + } + + return numberOfRetrievedValues; +} + +void Storage::Transaction::remove(const QByteArray &k, + const std::function &errorHandler) +{ + if (!d) { + Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Not open"); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + return; + } + + int rc; + MDB_val key; + key.mv_size = k.size(); + key.mv_data = const_cast(static_cast(k.data())); + rc = mdb_del(d->transaction, d->dbi, &key, 0); + + if (rc) { + d->error = true; + Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Error on mdb_del: %1 %2").arg(rc).arg(mdb_strerror(rc)).toLatin1()); + errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); + } else { + d->implicitCommit = true; + } + + return; +} + + + + + + + + + + + + class Storage::Private { public: @@ -81,33 +295,38 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m, bool allowDuplicates(duplicates) { const QString fullPath(storageRoot + '/' + name); - QDir dir; - dir.mkpath(storageRoot); - dir.mkdir(fullPath); - - //Ensure the environment is only created once - QMutexLocker locker(&sMutex); - - int rc = 0; - /* - * It seems we can only ever have one environment open in the process. - * Otherwise multi-threading breaks. - */ - env = sEnvironments.value(fullPath); - if (!env) { - if ((rc = mdb_env_create(&env))) { - // TODO: handle error - std::cerr << "mdb_env_create: " << rc << " " << mdb_strerror(rc) << std::endl; - } else { - if ((rc = mdb_env_open(env, fullPath.toStdString().data(), mode == ReadOnly ? MDB_RDONLY : 0 , 0664))) { - std::cerr << "mdb_env_open: " << rc << " " << mdb_strerror(rc) << std::endl; - mdb_env_close(env); - env = 0; + QFileInfo dirInfo(fullPath); + if (!dirInfo.exists() && mode == ReadWrite) { + QDir().mkpath(fullPath); + dirInfo.refresh(); + } + if (mode == ReadWrite && !dirInfo.permission(QFile::WriteOwner)) { + qCritical() << fullPath << "does not have write permissions. Aborting"; + } else if (dirInfo.exists()) { + //Ensure the environment is only created once + QMutexLocker locker(&sMutex); + + /* + * It seems we can only ever have one environment open in the process. + * Otherwise multi-threading breaks. + */ + env = sEnvironments.value(fullPath); + if (!env) { + int rc = 0; + if ((rc = mdb_env_create(&env))) { + // TODO: handle error + std::cerr << "mdb_env_create: " << rc << " " << mdb_strerror(rc) << std::endl; } else { - //FIXME: dynamic resize - const size_t dbSize = (size_t)10485760 * (size_t)100 * (size_t)80; //10MB * 800 - mdb_env_set_mapsize(env, dbSize); - sEnvironments.insert(fullPath, env); + if ((rc = mdb_env_open(env, fullPath.toStdString().data(), mode == ReadOnly ? MDB_RDONLY : 0 , 0664))) { + std::cerr << "mdb_env_open: " << rc << " " << mdb_strerror(rc) << std::endl; + mdb_env_close(env); + env = 0; + } else { + //FIXME: dynamic resize + const size_t dbSize = (size_t)10485760 * (size_t)8000; //1MB * 8000 + mdb_env_set_mapsize(env, dbSize); + sEnvironments.insert(fullPath, env); + } } } } @@ -142,9 +361,40 @@ bool Storage::exists() const return (d->env != 0); } -bool Storage::isInTransaction() const +Storage::Transaction Storage::createTransaction(AccessMode type, const std::function &errorHandlerArg) { - return d->transaction; + auto errorHandler = errorHandlerArg ? errorHandlerArg : defaultErrorHandler(); + if (!d->env) { + errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Missing database environment")); + return Transaction(); + } + + bool requestedRead = type == ReadOnly; + + if (d->mode == ReadOnly && !requestedRead) { + errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Requested read/write transaction in read-only mode.")); + return Transaction(); + } + + int rc; + MDB_txn *txn; + rc = mdb_txn_begin(d->env, NULL, requestedRead ? MDB_RDONLY : 0, &txn); + if (!rc) { + //TODO: Move opening of dbi into Transaction for different named databases + MDB_dbi dbi; + rc = mdb_dbi_open(txn, NULL, d->allowDuplicates ? MDB_DUPSORT : 0, &dbi); + if (rc) { + errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); + return Transaction(); + } + return Transaction(new Transaction::Private(txn, dbi, d->allowDuplicates, defaultErrorHandler(), d->name)); + } else { + if (rc) { + errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Error while beginning transaction: " + QByteArray(mdb_strerror(rc)))); + return Transaction(); + } + } + return Transaction(); } bool Storage::startTransaction(AccessMode type, -- cgit v1.2.3