diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-02-01 15:18:13 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-02-01 15:18:13 +0100 |
commit | 9809045a2a71cf509857a762fac62e81c25de856 (patch) | |
tree | b74270913a41f9d9f3d243f7c8cc127dbe0e3c00 /common/pipeline.cpp | |
parent | 4aa007e5cf59288e7d548ea3f613a10ac0a6057e (diff) | |
download | sink-9809045a2a71cf509857a762fac62e81c25de856.tar.gz sink-9809045a2a71cf509857a762fac62e81c25de856.zip |
Centralized main database name
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index b7ca638..f861ab6 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -120,7 +120,7 @@ Storage &Pipeline::storage() const | |||
120 | 120 | ||
121 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 121 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
122 | { | 122 | { |
123 | d->transaction.openDatabase(bufferType + ".main").write(Sink::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | 123 | Storage::mainDatabase(d->transaction, bufferType).write(Sink::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), |
124 | [](const Sink::Storage::Error &error) { | 124 | [](const Sink::Storage::Error &error) { |
125 | Warning() << "Failed to write entity"; | 125 | Warning() << "Failed to write entity"; |
126 | } | 126 | } |
@@ -161,7 +161,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
161 | QByteArray key; | 161 | QByteArray key; |
162 | if (createEntity->entityId()) { | 162 | if (createEntity->entityId()) { |
163 | key = QByteArray(reinterpret_cast<char const*>(createEntity->entityId()->Data()), createEntity->entityId()->size()); | 163 | key = QByteArray(reinterpret_cast<char const*>(createEntity->entityId()->Data()), createEntity->entityId()->size()); |
164 | if (d->transaction.openDatabase(bufferType + ".main").contains(key)) { | 164 | if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { |
165 | ErrorMsg() << "An entity with this id already exists: " << key; | 165 | ErrorMsg() << "An entity with this id already exists: " << key; |
166 | return KAsync::error<qint64>(0); | 166 | return KAsync::error<qint64>(0); |
167 | } | 167 | } |
@@ -194,7 +194,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
194 | } | 194 | } |
195 | 195 | ||
196 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 196 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
197 | d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { | 197 | Storage::mainDatabase(d->transaction, bufferType).scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { |
198 | auto entity = Sink::GetEntity(value); | 198 | auto entity = Sink::GetEntity(value); |
199 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 199 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
200 | for (auto processor : d->processors[bufferType]) { | 200 | for (auto processor : d->processors[bufferType]) { |
@@ -254,7 +254,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
254 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 254 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
255 | 255 | ||
256 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 256 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; |
257 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 257 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
258 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 258 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
259 | if (!buffer.isValid()) { | 259 | if (!buffer.isValid()) { |
260 | Warning() << "Read invalid buffer from disk"; | 260 | Warning() << "Read invalid buffer from disk"; |
@@ -311,7 +311,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
311 | 311 | ||
312 | storeNewRevision(newRevision, fbb, bufferType, key); | 312 | storeNewRevision(newRevision, fbb, bufferType, key); |
313 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 313 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
314 | d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { | 314 | Storage::mainDatabase(d->transaction, bufferType).scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { |
315 | if (value.isEmpty()) { | 315 | if (value.isEmpty()) { |
316 | ErrorMsg() << "Read buffer is empty."; | 316 | ErrorMsg() << "Read buffer is empty."; |
317 | } | 317 | } |
@@ -348,7 +348,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
348 | 348 | ||
349 | bool found = false; | 349 | bool found = false; |
350 | bool alreadyRemoved = false; | 350 | bool alreadyRemoved = false; |
351 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | 351 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { |
352 | auto entity = Sink::GetEntity(data.data()); | 352 | auto entity = Sink::GetEntity(data.data()); |
353 | if (entity && entity->metadata()) { | 353 | if (entity && entity->metadata()) { |
354 | auto metadata = Sink::GetMetadata(entity->metadata()->Data()); | 354 | auto metadata = Sink::GetMetadata(entity->metadata()->Data()); |
@@ -394,7 +394,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
394 | } | 394 | } |
395 | 395 | ||
396 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 396 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; |
397 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { | 397 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { |
398 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 398 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
399 | if (!buffer.isValid()) { | 399 | if (!buffer.isValid()) { |
400 | Warning() << "Read invalid buffer from disk"; | 400 | Warning() << "Read invalid buffer from disk"; |
@@ -423,7 +423,7 @@ void Pipeline::cleanupRevision(qint64 revision) | |||
423 | const auto uid = Sink::Storage::getUidFromRevision(d->transaction, revision); | 423 | const auto uid = Sink::Storage::getUidFromRevision(d->transaction, revision); |
424 | const auto bufferType = Sink::Storage::getTypeFromRevision(d->transaction, revision); | 424 | const auto bufferType = Sink::Storage::getTypeFromRevision(d->transaction, revision); |
425 | Trace() << "Cleaning up revision " << revision << uid << bufferType; | 425 | Trace() << "Cleaning up revision " << revision << uid << bufferType; |
426 | d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { | 426 | Storage::mainDatabase(d->transaction, bufferType).scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { |
427 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 427 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
428 | if (!buffer.isValid()) { | 428 | if (!buffer.isValid()) { |
429 | Warning() << "Read invalid buffer from disk"; | 429 | Warning() << "Read invalid buffer from disk"; |
@@ -433,7 +433,7 @@ void Pipeline::cleanupRevision(qint64 revision) | |||
433 | //Remove old revisions, and the current if the entity has already been removed | 433 | //Remove old revisions, and the current if the entity has already been removed |
434 | if (rev < revision || metadata->operation() == Sink::Operation_Removal) { | 434 | if (rev < revision || metadata->operation() == Sink::Operation_Removal) { |
435 | Sink::Storage::removeRevision(d->transaction, rev); | 435 | Sink::Storage::removeRevision(d->transaction, rev); |
436 | d->transaction.openDatabase(bufferType + ".main").remove(key); | 436 | Storage::mainDatabase(d->transaction, bufferType).remove(key); |
437 | } | 437 | } |
438 | } | 438 | } |
439 | 439 | ||