summaryrefslogtreecommitdiffstats
path: root/common/storage_lmdb.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-09 23:17:42 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-09 23:17:42 +0200
commita8263a28f5d3a74581e289289d0807e6b656104b (patch)
tree66d3033df141b0a73d1b89b8345dbcad7ba6eb7f /common/storage_lmdb.cpp
parentb6e7be78f3e13275a8f217a4e01b304d97538641 (diff)
downloadsink-a8263a28f5d3a74581e289289d0807e6b656104b.tar.gz
sink-a8263a28f5d3a74581e289289d0807e6b656104b.zip
Transaction class for storage
The beginning of a cleaner and less bare-bones API for the storage. The lifetime of transactions is now handled in (movable) transaction objects.
Diffstat (limited to 'common/storage_lmdb.cpp')
-rw-r--r--common/storage_lmdb.cpp306
1 files changed, 278 insertions, 28 deletions
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp
index 7bbf8b5..230409f 100644
--- a/common/storage_lmdb.cpp
+++ b/common/storage_lmdb.cpp
@@ -47,6 +47,220 @@ int getErrorCode(int e)
47 return -1; 47 return -1;
48} 48}
49 49
50
51
52class Storage::Transaction::Private
53{
54public:
55 Private(MDB_txn *_txn, MDB_dbi _dbi, bool _allowDuplicates, const std::function<void(const Storage::Error &error)> &_defaultErrorHandler, const QString &_name)
56 : transaction(_txn),
57 dbi(_dbi),
58 allowDuplicates(_allowDuplicates),
59 defaultErrorHandler(_defaultErrorHandler),
60 name(_name),
61 implicitCommit(false),
62 error(false)
63 {
64
65 }
66 ~Private()
67 {
68
69 }
70
71 MDB_txn *transaction;
72 MDB_dbi dbi;
73 bool allowDuplicates;
74 std::function<void(const Storage::Error &error)> defaultErrorHandler;
75 QString name;
76 bool implicitCommit;
77 bool error;
78};
79
80Storage::Transaction::Transaction()
81 : d(0)
82{
83
84}
85
86Storage::Transaction::Transaction(Transaction::Private *prv)
87 : d(prv)
88{
89
90}
91
92Storage::Transaction::~Transaction()
93{
94 if (d && d->transaction) {
95 if (d->implicitCommit && !d->error) {
96 commit();
97 } else {
98 mdb_txn_abort(d->transaction);
99 }
100 }
101 delete d;
102}
103
104bool Storage::Transaction::commit(const std::function<void(const Storage::Error &error)> &errorHandler)
105{
106 if (!d) {
107 return false;
108 }
109
110 const int rc = mdb_txn_commit(d->transaction);
111 if (rc) {
112 mdb_txn_abort(d->transaction);
113 Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Error during transaction commit: " + QByteArray(mdb_strerror(rc)));
114 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
115 }
116 d->transaction = nullptr;
117
118 return !rc;
119}
120
121void Storage::Transaction::abort()
122{
123 if (!d || !d->transaction) {
124 return;
125 }
126
127 mdb_txn_abort(d->transaction);
128 d->transaction = nullptr;
129}
130
131bool Storage::Transaction::write(const QByteArray &sKey, const QByteArray &sValue, const std::function<void(const Storage::Error &error)> &errorHandler)
132{
133 if (!d || !d->transaction) {
134 return false;
135 }
136 const void *keyPtr = sKey.data();
137 const size_t keySize = sKey.size();
138 const void *valuePtr = sValue.data();
139 const size_t valueSize = sValue.size();
140
141 if (!keyPtr || keySize == 0) {
142 Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Tried to write empty key.");
143 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
144 return false;
145 }
146
147 int rc;
148 MDB_val key, data;
149 key.mv_size = keySize;
150 key.mv_data = const_cast<void*>(keyPtr);
151 data.mv_size = valueSize;
152 data.mv_data = const_cast<void*>(valuePtr);
153 rc = mdb_put(d->transaction, d->dbi, &key, &data, 0);
154
155 if (rc) {
156 d->error = true;
157 Error error(d->name.toLatin1(), ErrorCodes::GenericError, "mdb_put: " + QByteArray(mdb_strerror(rc)));
158 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
159 } else {
160 d->implicitCommit = true;
161 }
162
163 return !rc;
164}
165
166int Storage::Transaction::scan(const QByteArray &k,
167 const std::function<bool(const QByteArray &key, const QByteArray &value)> &resultHandler,
168 const std::function<void(const Storage::Error &error)> &errorHandler)
169{
170 if (!d || !d->transaction) {
171 Error error(d->name.toLatin1(), ErrorCodes::NotOpen, "Not open");
172 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
173 return 0;
174 }
175
176 int rc;
177 MDB_val key;
178 MDB_val data;
179 MDB_cursor *cursor;
180
181 key.mv_data = (void*)k.constData();
182 key.mv_size = k.size();
183
184 rc = mdb_cursor_open(d->transaction, d->dbi, &cursor);
185 if (rc) {
186 Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Error during mdb_cursor open: ") + QByteArray(mdb_strerror(rc)));
187 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
188 return 0;
189 }
190
191 int numberOfRetrievedValues = 0;
192
193 if (k.isEmpty() || d->allowDuplicates) {
194 if ((rc = mdb_cursor_get(cursor, &key, &data, d->allowDuplicates ? MDB_SET_RANGE : MDB_FIRST)) == 0) {
195 numberOfRetrievedValues++;
196 if (resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size))) {
197 while ((rc = mdb_cursor_get(cursor, &key, &data, d->allowDuplicates ? MDB_NEXT_DUP : MDB_NEXT)) == 0) {
198 numberOfRetrievedValues++;
199 if (!resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size))) {
200 break;
201 }
202 }
203 }
204 }
205
206 //We never find the last value
207 if (rc == MDB_NOTFOUND) {
208 rc = 0;
209 }
210 } else {
211 if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_SET)) == 0) {
212 numberOfRetrievedValues++;
213 resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size));
214 }
215 }
216
217 mdb_cursor_close(cursor);
218
219 if (rc) {
220 Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Key: ") + k + " : " + QByteArray(mdb_strerror(rc)));
221 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
222 }
223
224 return numberOfRetrievedValues;
225}
226
227void Storage::Transaction::remove(const QByteArray &k,
228 const std::function<void(const Storage::Error &error)> &errorHandler)
229{
230 if (!d) {
231 Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Not open");
232 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
233 return;
234 }
235
236 int rc;
237 MDB_val key;
238 key.mv_size = k.size();
239 key.mv_data = const_cast<void*>(static_cast<const void*>(k.data()));
240 rc = mdb_del(d->transaction, d->dbi, &key, 0);
241
242 if (rc) {
243 d->error = true;
244 Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Error on mdb_del: %1 %2").arg(rc).arg(mdb_strerror(rc)).toLatin1());
245 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
246 } else {
247 d->implicitCommit = true;
248 }
249
250 return;
251}
252
253
254
255
256
257
258
259
260
261
262
263
50class Storage::Private 264class Storage::Private
51{ 265{
52public: 266public:
@@ -81,33 +295,38 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m, bool
81 allowDuplicates(duplicates) 295 allowDuplicates(duplicates)
82{ 296{
83 const QString fullPath(storageRoot + '/' + name); 297 const QString fullPath(storageRoot + '/' + name);
84 QDir dir; 298 QFileInfo dirInfo(fullPath);
85 dir.mkpath(storageRoot); 299 if (!dirInfo.exists() && mode == ReadWrite) {
86 dir.mkdir(fullPath); 300 QDir().mkpath(fullPath);
87 301 dirInfo.refresh();
88 //Ensure the environment is only created once 302 }
89 QMutexLocker locker(&sMutex); 303 if (mode == ReadWrite && !dirInfo.permission(QFile::WriteOwner)) {
90 304 qCritical() << fullPath << "does not have write permissions. Aborting";
91 int rc = 0; 305 } else if (dirInfo.exists()) {
92 /* 306 //Ensure the environment is only created once
93 * It seems we can only ever have one environment open in the process. 307 QMutexLocker locker(&sMutex);
94 * Otherwise multi-threading breaks. 308
95 */ 309 /*
96 env = sEnvironments.value(fullPath); 310 * It seems we can only ever have one environment open in the process.
97 if (!env) { 311 * Otherwise multi-threading breaks.
98 if ((rc = mdb_env_create(&env))) { 312 */
99 // TODO: handle error 313 env = sEnvironments.value(fullPath);
100 std::cerr << "mdb_env_create: " << rc << " " << mdb_strerror(rc) << std::endl; 314 if (!env) {
101 } else { 315 int rc = 0;
102 if ((rc = mdb_env_open(env, fullPath.toStdString().data(), mode == ReadOnly ? MDB_RDONLY : 0 , 0664))) { 316 if ((rc = mdb_env_create(&env))) {
103 std::cerr << "mdb_env_open: " << rc << " " << mdb_strerror(rc) << std::endl; 317 // TODO: handle error
104 mdb_env_close(env); 318 std::cerr << "mdb_env_create: " << rc << " " << mdb_strerror(rc) << std::endl;
105 env = 0;
106 } else { 319 } else {
107 //FIXME: dynamic resize 320 if ((rc = mdb_env_open(env, fullPath.toStdString().data(), mode == ReadOnly ? MDB_RDONLY : 0 , 0664))) {
108 const size_t dbSize = (size_t)10485760 * (size_t)100 * (size_t)80; //10MB * 800 321 std::cerr << "mdb_env_open: " << rc << " " << mdb_strerror(rc) << std::endl;
109 mdb_env_set_mapsize(env, dbSize); 322 mdb_env_close(env);
110 sEnvironments.insert(fullPath, env); 323 env = 0;
324 } else {
325 //FIXME: dynamic resize
326 const size_t dbSize = (size_t)10485760 * (size_t)8000; //1MB * 8000
327 mdb_env_set_mapsize(env, dbSize);
328 sEnvironments.insert(fullPath, env);
329 }
111 } 330 }
112 } 331 }
113 } 332 }
@@ -142,9 +361,40 @@ bool Storage::exists() const
142 return (d->env != 0); 361 return (d->env != 0);
143} 362}
144 363
145bool Storage::isInTransaction() const 364Storage::Transaction Storage::createTransaction(AccessMode type, const std::function<void(const Storage::Error &error)> &errorHandlerArg)
146{ 365{
147 return d->transaction; 366 auto errorHandler = errorHandlerArg ? errorHandlerArg : defaultErrorHandler();
367 if (!d->env) {
368 errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Missing database environment"));
369 return Transaction();
370 }
371
372 bool requestedRead = type == ReadOnly;
373
374 if (d->mode == ReadOnly && !requestedRead) {
375 errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Requested read/write transaction in read-only mode."));
376 return Transaction();
377 }
378
379 int rc;
380 MDB_txn *txn;
381 rc = mdb_txn_begin(d->env, NULL, requestedRead ? MDB_RDONLY : 0, &txn);
382 if (!rc) {
383 //TODO: Move opening of dbi into Transaction for different named databases
384 MDB_dbi dbi;
385 rc = mdb_dbi_open(txn, NULL, d->allowDuplicates ? MDB_DUPSORT : 0, &dbi);
386 if (rc) {
387 errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc))));
388 return Transaction();
389 }
390 return Transaction(new Transaction::Private(txn, dbi, d->allowDuplicates, defaultErrorHandler(), d->name));
391 } else {
392 if (rc) {
393 errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Error while beginning transaction: " + QByteArray(mdb_strerror(rc))));
394 return Transaction();
395 }
396 }
397 return Transaction();
148} 398}
149 399
150bool Storage::startTransaction(AccessMode type, 400bool Storage::startTransaction(AccessMode type,