diff options
-rw-r--r-- | common/genericresource.cpp | 5 | ||||
-rw-r--r-- | common/messagequeue.cpp | 29 | ||||
-rw-r--r-- | common/messagequeue.h | 3 | ||||
-rw-r--r-- | common/storage_lmdb.cpp | 41 | ||||
-rw-r--r-- | tests/genericresourcetest.cpp | 3 | ||||
-rw-r--r-- | tests/messagequeuetest.cpp | 2 | ||||
-rw-r--r-- | tests/storagetest.cpp | 7 |
7 files changed, 55 insertions, 35 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 112ce64..3ffc56b 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -107,14 +107,15 @@ private slots: | |||
107 | [queue]() { return !queue->isEmpty(); }, | 107 | [queue]() { return !queue->isEmpty(); }, |
108 | [this, queue](KAsync::Future<void> &future) { | 108 | [this, queue](KAsync::Future<void> &future) { |
109 | queue->dequeueBatch(100, [this](const QByteArray &data) { | 109 | queue->dequeueBatch(100, [this](const QByteArray &data) { |
110 | Trace() << "Got value"; | ||
111 | return processQueuedCommand(data); | 110 | return processQueuedCommand(data); |
112 | } | 111 | } |
113 | ).then<void>([&future, queue](){ | 112 | ).then<void>([&future, queue](){ |
114 | future.setFinished(); | 113 | future.setFinished(); |
115 | }, | 114 | }, |
116 | [&future](int i, QString error) { | 115 | [&future](int i, QString error) { |
117 | Warning() << "Error while getting message from messagequeue: " << error; | 116 | if (i != MessageQueue::ErrorCodes::NoMessageFound) { |
117 | Warning() << "Error while getting message from messagequeue: " << error; | ||
118 | } | ||
118 | future.setFinished(); | 119 | future.setFinished(); |
119 | }).exec(); | 120 | }).exec(); |
120 | } | 121 | } |
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index ab4b1cf..f8bcd46 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -113,7 +113,6 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu | |||
113 | 113 | ||
114 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) | 114 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) |
115 | { | 115 | { |
116 | Trace() << "Dequeue batch"; | ||
117 | auto resultCount = QSharedPointer<int>::create(0); | 116 | auto resultCount = QSharedPointer<int>::create(0); |
118 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) { | 117 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) { |
119 | int count = 0; | 118 | int count = 0; |
@@ -123,14 +122,12 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
123 | return true; | 122 | return true; |
124 | } | 123 | } |
125 | *resultCount += 1; | 124 | *resultCount += 1; |
126 | Trace() << "Dequeue value"; | ||
127 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) | 125 | //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) |
128 | mPendingRemoval << QByteArray(key.constData(), key.size()); | 126 | mPendingRemoval << QByteArray(key.constData(), key.size()); |
129 | 127 | ||
130 | waitCondition << resultHandler(value).exec(); | 128 | waitCondition << resultHandler(value).exec(); |
131 | 129 | ||
132 | count++; | 130 | count++; |
133 | Trace() << count << maxBatchSize; | ||
134 | if (count < maxBatchSize) { | 131 | if (count < maxBatchSize) { |
135 | return true; | 132 | return true; |
136 | } | 133 | } |
@@ -145,7 +142,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
145 | ::waitForCompletion(waitCondition).then<void>([this, resultCount, &future]() { | 142 | ::waitForCompletion(waitCondition).then<void>([this, resultCount, &future]() { |
146 | processRemovals(); | 143 | processRemovals(); |
147 | if (*resultCount == 0) { | 144 | if (*resultCount == 0) { |
148 | future.setError(-1, "No message found"); | 145 | future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found"); |
149 | future.setFinished(); | 146 | future.setFinished(); |
150 | } else { | 147 | } else { |
151 | if (isEmpty()) { | 148 | if (isEmpty()) { |
@@ -160,16 +157,20 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
160 | bool MessageQueue::isEmpty() | 157 | bool MessageQueue::isEmpty() |
161 | { | 158 | { |
162 | int count = 0; | 159 | int count = 0; |
163 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&count, this](const QByteArray &key, const QByteArray &value) -> bool { | 160 | auto t = mStorage.createTransaction(Akonadi2::Storage::ReadOnly); |
164 | if (!Akonadi2::Storage::isInternalKey(key) && !mPendingRemoval.contains(key)) { | 161 | auto db = t.openDatabase(); |
165 | count++; | 162 | if (db) { |
166 | return false; | 163 | db.scan("", [&count, this](const QByteArray &key, const QByteArray &value) -> bool { |
167 | } | 164 | if (!Akonadi2::Storage::isInternalKey(key) && !mPendingRemoval.contains(key)) { |
168 | return true; | 165 | count++; |
169 | }, | 166 | return false; |
170 | [](const Akonadi2::Storage::Error &error) { | 167 | } |
171 | ErrorMsg() << "Error while checking if empty" << error.message; | 168 | return true; |
172 | }); | 169 | }, |
170 | [](const Akonadi2::Storage::Error &error) { | ||
171 | ErrorMsg() << "Error while checking if empty" << error.message; | ||
172 | }); | ||
173 | } | ||
173 | return count == 0; | 174 | return count == 0; |
174 | } | 175 | } |
175 | 176 | ||
diff --git a/common/messagequeue.h b/common/messagequeue.h index b6c2614..a04e22f 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h | |||
@@ -15,6 +15,9 @@ class MessageQueue : public QObject | |||
15 | { | 15 | { |
16 | Q_OBJECT | 16 | Q_OBJECT |
17 | public: | 17 | public: |
18 | enum ErrorCodes { | ||
19 | NoMessageFound | ||
20 | }; | ||
18 | class Error | 21 | class Error |
19 | { | 22 | { |
20 | public: | 23 | public: |
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index ebb3be3..3e7c962 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp | |||
@@ -71,18 +71,23 @@ public: | |||
71 | std::function<void(const Storage::Error &error)> defaultErrorHandler; | 71 | std::function<void(const Storage::Error &error)> defaultErrorHandler; |
72 | QString name; | 72 | QString name; |
73 | 73 | ||
74 | bool openDatabase(std::function<void(const Storage::Error &error)> errorHandler) | 74 | bool openDatabase(bool readOnly, std::function<void(const Storage::Error &error)> errorHandler) |
75 | { | 75 | { |
76 | unsigned int flags = MDB_CREATE; | 76 | unsigned int flags = 0; |
77 | if (!readOnly) { | ||
78 | flags |= MDB_CREATE; | ||
79 | } | ||
77 | if (allowDuplicates) { | 80 | if (allowDuplicates) { |
78 | flags |= MDB_DUPSORT; | 81 | flags |= MDB_DUPSORT; |
79 | } | 82 | } |
80 | if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { | 83 | if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { |
81 | qWarning() << "Failed to open: " << rc << db; | ||
82 | dbi = 0; | 84 | dbi = 0; |
83 | transaction = 0; | 85 | transaction = 0; |
84 | Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc))); | 86 | //The database is not existing, ignore in read-only mode |
85 | errorHandler ? errorHandler(error) : defaultErrorHandler(error); | 87 | if (!(readOnly && rc == MDB_NOTFOUND)) { |
88 | Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc))); | ||
89 | errorHandler ? errorHandler(error) : defaultErrorHandler(error); | ||
90 | } | ||
86 | return false; | 91 | return false; |
87 | } | 92 | } |
88 | return true; | 93 | return true; |
@@ -108,7 +113,7 @@ Storage::NamedDatabase::~NamedDatabase() | |||
108 | bool Storage::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function<void(const Storage::Error &error)> &errorHandler) | 113 | bool Storage::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function<void(const Storage::Error &error)> &errorHandler) |
109 | { | 114 | { |
110 | if (!d || !d->transaction) { | 115 | if (!d || !d->transaction) { |
111 | Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Not open"); | 116 | Error error("", ErrorCodes::GenericError, "Not open"); |
112 | errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); | 117 | errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); |
113 | return false; | 118 | return false; |
114 | } | 119 | } |
@@ -167,8 +172,7 @@ int Storage::NamedDatabase::scan(const QByteArray &k, | |||
167 | const std::function<void(const Storage::Error &error)> &errorHandler) const | 172 | const std::function<void(const Storage::Error &error)> &errorHandler) const |
168 | { | 173 | { |
169 | if (!d || !d->transaction) { | 174 | if (!d || !d->transaction) { |
170 | // Error error(d->name.toLatin1(), ErrorCodes::NotOpen, "Not open"); | 175 | //Not an error. We rely on this to read nothing from non-existing databases. |
171 | // errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); | ||
172 | return 0; | 176 | return 0; |
173 | } | 177 | } |
174 | 178 | ||
@@ -328,16 +332,20 @@ Storage::NamedDatabase Storage::Transaction::openDatabase(const QByteArray &db, | |||
328 | //We don't now if anything changed | 332 | //We don't now if anything changed |
329 | d->implicitCommit = true; | 333 | d->implicitCommit = true; |
330 | auto p = new Storage::NamedDatabase::Private(db, d->allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); | 334 | auto p = new Storage::NamedDatabase::Private(db, d->allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); |
331 | p->openDatabase(errorHandler); | 335 | if (!p->openDatabase(d->requestedRead, errorHandler)) { |
336 | return Storage::NamedDatabase(); | ||
337 | delete p; | ||
338 | } | ||
332 | return Storage::NamedDatabase(p); | 339 | return Storage::NamedDatabase(p); |
333 | } | 340 | } |
334 | 341 | ||
335 | bool Storage::Transaction::write(const QByteArray &key, const QByteArray &value, const std::function<void(const Storage::Error &error)> &errorHandler) | 342 | bool Storage::Transaction::write(const QByteArray &key, const QByteArray &value, const std::function<void(const Storage::Error &error)> &errorHandler) |
336 | { | 343 | { |
337 | openDatabase().write(key, value, [this, errorHandler](const Storage::Error &error) { | 344 | auto eHandler = [this, errorHandler](const Storage::Error &error) { |
338 | d->error = true; | 345 | d->error = true; |
339 | errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); | 346 | errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); |
340 | }); | 347 | }; |
348 | openDatabase("default", eHandler).write(key, value, eHandler); | ||
341 | d->implicitCommit = true; | 349 | d->implicitCommit = true; |
342 | 350 | ||
343 | return !d->error; | 351 | return !d->error; |
@@ -346,10 +354,11 @@ bool Storage::Transaction::write(const QByteArray &key, const QByteArray &value, | |||
346 | void Storage::Transaction::remove(const QByteArray &k, | 354 | void Storage::Transaction::remove(const QByteArray &k, |
347 | const std::function<void(const Storage::Error &error)> &errorHandler) | 355 | const std::function<void(const Storage::Error &error)> &errorHandler) |
348 | { | 356 | { |
349 | openDatabase().remove(k, [this, errorHandler](const Storage::Error &error) { | 357 | auto eHandler = [this, errorHandler](const Storage::Error &error) { |
350 | d->error = true; | 358 | d->error = true; |
351 | errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); | 359 | errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); |
352 | }); | 360 | }; |
361 | openDatabase("default", eHandler).remove(k, eHandler); | ||
353 | d->implicitCommit = true; | 362 | d->implicitCommit = true; |
354 | } | 363 | } |
355 | 364 | ||
@@ -357,7 +366,11 @@ int Storage::Transaction::scan(const QByteArray &k, | |||
357 | const std::function<bool(const QByteArray &key, const QByteArray &value)> &resultHandler, | 366 | const std::function<bool(const QByteArray &key, const QByteArray &value)> &resultHandler, |
358 | const std::function<void(const Storage::Error &error)> &errorHandler) const | 367 | const std::function<void(const Storage::Error &error)> &errorHandler) const |
359 | { | 368 | { |
360 | return openDatabase().scan(k, resultHandler, errorHandler); | 369 | auto db = openDatabase("default"); |
370 | if (db) { | ||
371 | return db.scan(k, resultHandler, errorHandler); | ||
372 | } | ||
373 | return 0; | ||
361 | } | 374 | } |
362 | 375 | ||
363 | 376 | ||
diff --git a/tests/genericresourcetest.cpp b/tests/genericresourcetest.cpp index 0c5db15..fa6da22 100644 --- a/tests/genericresourcetest.cpp +++ b/tests/genericresourcetest.cpp | |||
@@ -44,9 +44,6 @@ private Q_SLOTS: | |||
44 | removeFromDisk("org.kde.test.instance1.userqueue"); | 44 | removeFromDisk("org.kde.test.instance1.userqueue"); |
45 | removeFromDisk("org.kde.test.instance1.synchronizerqueue"); | 45 | removeFromDisk("org.kde.test.instance1.synchronizerqueue"); |
46 | Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); | 46 | Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); |
47 | qDebug(); | ||
48 | qDebug() << "-----------------------------------------"; | ||
49 | qDebug(); | ||
50 | } | 47 | } |
51 | 48 | ||
52 | void testProcessCommand() | 49 | void testProcessCommand() |
diff --git a/tests/messagequeuetest.cpp b/tests/messagequeuetest.cpp index 9c2aa16..22ce161 100644 --- a/tests/messagequeuetest.cpp +++ b/tests/messagequeuetest.cpp | |||
@@ -49,7 +49,7 @@ private Q_SLOTS: | |||
49 | gotError = true; | 49 | gotError = true; |
50 | }); | 50 | }); |
51 | QVERIFY(!gotValue); | 51 | QVERIFY(!gotValue); |
52 | QVERIFY(gotError); | 52 | QVERIFY(!gotError); |
53 | } | 53 | } |
54 | 54 | ||
55 | void testEnqueue() | 55 | void testEnqueue() |
diff --git a/tests/storagetest.cpp b/tests/storagetest.cpp index fe80bb7..e872c44 100644 --- a/tests/storagetest.cpp +++ b/tests/storagetest.cpp | |||
@@ -159,7 +159,12 @@ private Q_SLOTS: | |||
159 | bool gotResult = false; | 159 | bool gotResult = false; |
160 | bool gotError = false; | 160 | bool gotError = false; |
161 | Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite); | 161 | Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite); |
162 | int numValues = store.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [&](const QByteArray &key, const QByteArray &value) -> bool { | 162 | auto transaction = store.createTransaction(Akonadi2::Storage::ReadOnly); |
163 | auto db = transaction.openDatabase("default", [&](const Akonadi2::Storage::Error &error) { | ||
164 | qDebug() << error.message; | ||
165 | gotError = true; | ||
166 | }); | ||
167 | int numValues = db.scan("", [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
163 | gotResult = true; | 168 | gotResult = true; |
164 | return false; | 169 | return false; |
165 | }, | 170 | }, |