diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-23 23:57:09 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-23 23:57:09 +0200 |
commit | 9c3d000e11d3b1fc6c6c205fe9e7ea26c11092c6 (patch) | |
tree | 7cf47fd2334487a0e4f2bfef73c8e09e165dc4cc /common | |
parent | ae0255bf8eb913004082278af27d69047503ea71 (diff) | |
download | sink-9c3d000e11d3b1fc6c6c205fe9e7ea26c11092c6.tar.gz sink-9c3d000e11d3b1fc6c6c205fe9e7ea26c11092c6.zip |
Removed convenience API
Diffstat (limited to 'common')
-rw-r--r-- | common/entitystorage.cpp | 2 | ||||
-rw-r--r-- | common/messagequeue.cpp | 6 | ||||
-rw-r--r-- | common/pipeline.cpp | 8 | ||||
-rw-r--r-- | common/pipeline.h | 2 | ||||
-rw-r--r-- | common/storage.h | 7 | ||||
-rw-r--r-- | common/storage_common.cpp | 4 | ||||
-rw-r--r-- | common/storage_lmdb.cpp | 34 |
7 files changed, 11 insertions, 52 deletions
diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp index 22fd9e6..8a3391e 100644 --- a/common/entitystorage.cpp +++ b/common/entitystorage.cpp | |||
@@ -21,7 +21,7 @@ | |||
21 | 21 | ||
22 | static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, std::function<bool(const QByteArray &key, const Akonadi2::Entity &entity)> callback) | 22 | static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, std::function<bool(const QByteArray &key, const Akonadi2::Entity &entity)> callback) |
23 | { | 23 | { |
24 | transaction.scan(key, [=](const QByteArray &key, const QByteArray &value) -> bool { | 24 | transaction.openDatabase().scan(key, [=](const QByteArray &key, const QByteArray &value) -> bool { |
25 | //Skip internals | 25 | //Skip internals |
26 | if (Akonadi2::Storage::isInternalKey(key)) { | 26 | if (Akonadi2::Storage::isInternalKey(key)) { |
27 | return true; | 27 | return true; |
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index f8bcd46..8e3d7d7 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -76,7 +76,7 @@ void MessageQueue::enqueue(const QByteArray &value) | |||
76 | } | 76 | } |
77 | const qint64 revision = Akonadi2::Storage::maxRevision(mWriteTransaction) + 1; | 77 | const qint64 revision = Akonadi2::Storage::maxRevision(mWriteTransaction) + 1; |
78 | const QByteArray key = QString("%1").arg(revision).toUtf8(); | 78 | const QByteArray key = QString("%1").arg(revision).toUtf8(); |
79 | mWriteTransaction.write(key, value); | 79 | mWriteTransaction.openDatabase().write(key, value); |
80 | Akonadi2::Storage::setMaxRevision(mWriteTransaction, revision); | 80 | Akonadi2::Storage::setMaxRevision(mWriteTransaction, revision); |
81 | if (implicitTransaction) { | 81 | if (implicitTransaction) { |
82 | commit(); | 82 | commit(); |
@@ -90,7 +90,7 @@ void MessageQueue::processRemovals() | |||
90 | } | 90 | } |
91 | auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); | 91 | auto transaction = std::move(mStorage.createTransaction(Akonadi2::Storage::ReadWrite)); |
92 | for (const auto &key : mPendingRemoval) { | 92 | for (const auto &key : mPendingRemoval) { |
93 | transaction.remove(key); | 93 | transaction.openDatabase().remove(key); |
94 | } | 94 | } |
95 | transaction.commit(); | 95 | transaction.commit(); |
96 | mPendingRemoval.clear(); | 96 | mPendingRemoval.clear(); |
@@ -117,7 +117,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
117 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) { | 117 | return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) { |
118 | int count = 0; | 118 | int count = 0; |
119 | QList<KAsync::Future<void> > waitCondition; | 119 | QList<KAsync::Future<void> > waitCondition; |
120 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).scan("", [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { | 120 | mStorage.createTransaction(Akonadi2::Storage::ReadOnly).openDatabase().scan("", [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { |
121 | if (Akonadi2::Storage::isInternalKey(key) || mPendingRemoval.contains(key)) { | 121 | if (Akonadi2::Storage::isInternalKey(key) || mPendingRemoval.contains(key)) { |
122 | return true; | 122 | return true; |
123 | } | 123 | } |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index c5e36ee..8ef6187 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -165,7 +165,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
165 | flatbuffers::FlatBufferBuilder fbb; | 165 | flatbuffers::FlatBufferBuilder fbb; |
166 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 166 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); |
167 | 167 | ||
168 | d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 168 | d->transaction.openDatabase().write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
169 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 169 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
170 | Log() << "Pipeline: wrote entity: " << key << newRevision; | 170 | Log() << "Pipeline: wrote entity: " << key << newRevision; |
171 | 171 | ||
@@ -220,7 +220,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
220 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 220 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
221 | 221 | ||
222 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | 222 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; |
223 | storage().createTransaction(Akonadi2::Storage::ReadOnly).scan(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 223 | storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase().scan(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
224 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 224 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
225 | if (!buffer.isValid()) { | 225 | if (!buffer.isValid()) { |
226 | Warning() << "Read invalid buffer from disk"; | 226 | Warning() << "Read invalid buffer from disk"; |
@@ -265,7 +265,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
265 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 265 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
266 | 266 | ||
267 | //TODO don't overwrite the old entry, but instead store a new revision | 267 | //TODO don't overwrite the old entry, but instead store a new revision |
268 | d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 268 | d->transaction.openDatabase().write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
269 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 269 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
270 | 270 | ||
271 | return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) { | 271 | return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) { |
@@ -296,7 +296,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
296 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 296 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
297 | 297 | ||
298 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted | 298 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted |
299 | d->transaction.remove(key); | 299 | d->transaction.openDatabase().remove(key); |
300 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 300 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
301 | Log() << "Pipeline: deleted entity: "<< newRevision; | 301 | Log() << "Pipeline: deleted entity: "<< newRevision; |
302 | 302 | ||
diff --git a/common/pipeline.h b/common/pipeline.h index fee6a5e..a3b3735 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -142,7 +142,7 @@ public: | |||
142 | 142 | ||
143 | void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE | 143 | void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE |
144 | { | 144 | { |
145 | transaction.scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool { | 145 | transaction.openDatabase().scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool { |
146 | auto entity = Akonadi2::GetEntity(value); | 146 | auto entity = Akonadi2::GetEntity(value); |
147 | mFunction(state, *entity, transaction); | 147 | mFunction(state, *entity, transaction); |
148 | processingCompleted(state); | 148 | processingCompleted(state); |
diff --git a/common/storage.h b/common/storage.h index 8a2e51f..191f535 100644 --- a/common/storage.h +++ b/common/storage.h | |||
@@ -130,13 +130,6 @@ public: | |||
130 | return (d != nullptr); | 130 | return (d != nullptr); |
131 | } | 131 | } |
132 | 132 | ||
133 | bool write(const QByteArray &key, const QByteArray &value, const std::function<void(const Storage::Error &error)> &errorHandler = std::function<void(const Storage::Error &error)>()); | ||
134 | |||
135 | void remove(const QByteArray &key, | ||
136 | const std::function<void(const Storage::Error &error)> &errorHandler = std::function<void(const Storage::Error &error)>()); | ||
137 | int scan(const QByteArray &k, | ||
138 | const std::function<bool(const QByteArray &key, const QByteArray &value)> &resultHandler, | ||
139 | const std::function<void(const Storage::Error &error)> &errorHandler = std::function<void(const Storage::Error &error)>()) const; | ||
140 | private: | 133 | private: |
141 | Transaction(Transaction& other); | 134 | Transaction(Transaction& other); |
142 | Transaction& operator=(Transaction& other); | 135 | Transaction& operator=(Transaction& other); |
diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 4de585d..a506cf8 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp | |||
@@ -62,7 +62,7 @@ void Storage::setMaxRevision(qint64 revision) | |||
62 | 62 | ||
63 | void Storage::setMaxRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision) | 63 | void Storage::setMaxRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision) |
64 | { | 64 | { |
65 | transaction.write("__internal_maxRevision", QByteArray::number(revision)); | 65 | transaction.openDatabase().write("__internal_maxRevision", QByteArray::number(revision)); |
66 | } | 66 | } |
67 | 67 | ||
68 | qint64 Storage::maxRevision() | 68 | qint64 Storage::maxRevision() |
@@ -74,7 +74,7 @@ qint64 Storage::maxRevision() | |||
74 | qint64 Storage::maxRevision(const Akonadi2::Storage::Transaction &transaction) | 74 | qint64 Storage::maxRevision(const Akonadi2::Storage::Transaction &transaction) |
75 | { | 75 | { |
76 | qint64 r = 0; | 76 | qint64 r = 0; |
77 | transaction.scan("__internal_maxRevision", [&](const QByteArray &, const QByteArray &revision) -> bool { | 77 | transaction.openDatabase().scan("__internal_maxRevision", [&](const QByteArray &, const QByteArray &revision) -> bool { |
78 | r = revision.toLongLong(); | 78 | r = revision.toLongLong(); |
79 | return false; | 79 | return false; |
80 | }, [](const Error &error){ | 80 | }, [](const Error &error){ |
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 7fed830..a048a71 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp | |||
@@ -337,40 +337,6 @@ Storage::NamedDatabase Storage::Transaction::openDatabase(const QByteArray &db, | |||
337 | return Storage::NamedDatabase(p); | 337 | return Storage::NamedDatabase(p); |
338 | } | 338 | } |
339 | 339 | ||
340 | bool Storage::Transaction::write(const QByteArray &key, const QByteArray &value, const std::function<void(const Storage::Error &error)> &errorHandler) | ||
341 | { | ||
342 | auto eHandler = [this, errorHandler](const Storage::Error &error) { | ||
343 | d->error = true; | ||
344 | errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); | ||
345 | }; | ||
346 | openDatabase("default", eHandler).write(key, value, eHandler); | ||
347 | d->implicitCommit = true; | ||
348 | |||
349 | return !d->error; | ||
350 | } | ||
351 | |||
352 | void Storage::Transaction::remove(const QByteArray &k, | ||
353 | const std::function<void(const Storage::Error &error)> &errorHandler) | ||
354 | { | ||
355 | auto eHandler = [this, errorHandler](const Storage::Error &error) { | ||
356 | d->error = true; | ||
357 | errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); | ||
358 | }; | ||
359 | openDatabase("default", eHandler).remove(k, eHandler); | ||
360 | d->implicitCommit = true; | ||
361 | } | ||
362 | |||
363 | int Storage::Transaction::scan(const QByteArray &k, | ||
364 | const std::function<bool(const QByteArray &key, const QByteArray &value)> &resultHandler, | ||
365 | const std::function<void(const Storage::Error &error)> &errorHandler) const | ||
366 | { | ||
367 | auto db = openDatabase("default", std::function<void(const Storage::Error &error)>()); | ||
368 | if (db) { | ||
369 | return db.scan(k, resultHandler, errorHandler); | ||
370 | } | ||
371 | return 0; | ||
372 | } | ||
373 | |||
374 | 340 | ||
375 | 341 | ||
376 | 342 | ||