diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-02-01 15:36:42 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-02-01 15:36:42 +0100 |
commit | b1571c2be7342a0f7474e6a94d9c55230241fa1c (patch) | |
tree | 2fd90359cfb5faba9d07490a480a9e210dba2097 /common/pipeline.cpp | |
parent | 19998fce1c67fc99ccf9f30530275925927b88f3 (diff) | |
download | sink-b1571c2be7342a0f7474e6a94d9c55230241fa1c.tar.gz sink-b1571c2be7342a0f7474e6a94d9c55230241fa1c.zip |
Avoid unnecessary namespace usage
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 106 |
1 files changed, 53 insertions, 53 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index f861ab6..0a71c8d 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -86,7 +86,7 @@ void Pipeline::startTransaction() | |||
86 | if (d->transaction) { | 86 | if (d->transaction) { |
87 | return; | 87 | return; |
88 | } | 88 | } |
89 | d->transaction = std::move(storage().createTransaction(Sink::Storage::ReadWrite)); | 89 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite)); |
90 | } | 90 | } |
91 | 91 | ||
92 | void Pipeline::commit() | 92 | void Pipeline::commit() |
@@ -96,7 +96,7 @@ void Pipeline::commit() | |||
96 | // for (auto processor : d->processors[bufferType]) { | 96 | // for (auto processor : d->processors[bufferType]) { |
97 | // processor->finalize(); | 97 | // processor->finalize(); |
98 | // } | 98 | // } |
99 | const auto revision = Sink::Storage::maxRevision(d->transaction); | 99 | const auto revision = Storage::maxRevision(d->transaction); |
100 | Trace() << "Committing " << revision; | 100 | Trace() << "Committing " << revision; |
101 | if (d->transaction) { | 101 | if (d->transaction) { |
102 | d->transaction.commit(); | 102 | d->transaction.commit(); |
@@ -120,14 +120,14 @@ Storage &Pipeline::storage() const | |||
120 | 120 | ||
121 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 121 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
122 | { | 122 | { |
123 | Storage::mainDatabase(d->transaction, bufferType).write(Sink::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | 123 | Storage::mainDatabase(d->transaction, bufferType).write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), |
124 | [](const Sink::Storage::Error &error) { | 124 | [](const Storage::Error &error) { |
125 | Warning() << "Failed to write entity"; | 125 | Warning() << "Failed to write entity"; |
126 | } | 126 | } |
127 | ); | 127 | ); |
128 | d->revisionChanged = true; | 128 | d->revisionChanged = true; |
129 | Sink::Storage::setMaxRevision(d->transaction, newRevision); | 129 | Storage::setMaxRevision(d->transaction, newRevision); |
130 | Sink::Storage::recordRevision(d->transaction, newRevision, uid, bufferType); | 130 | Storage::recordRevision(d->transaction, newRevision, uid, bufferType); |
131 | } | 131 | } |
132 | 132 | ||
133 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | 133 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) |
@@ -136,23 +136,23 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
136 | 136 | ||
137 | { | 137 | { |
138 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 138 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
139 | if (!Sink::Commands::VerifyCreateEntityBuffer(verifyer)) { | 139 | if (!Commands::VerifyCreateEntityBuffer(verifyer)) { |
140 | Warning() << "invalid buffer, not a create entity buffer"; | 140 | Warning() << "invalid buffer, not a create entity buffer"; |
141 | return KAsync::error<qint64>(0); | 141 | return KAsync::error<qint64>(0); |
142 | } | 142 | } |
143 | } | 143 | } |
144 | auto createEntity = Sink::Commands::GetCreateEntity(command); | 144 | auto createEntity = Commands::GetCreateEntity(command); |
145 | 145 | ||
146 | const bool replayToSource = createEntity->replayToSource(); | 146 | const bool replayToSource = createEntity->replayToSource(); |
147 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); | 147 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); |
148 | { | 148 | { |
149 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 149 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
150 | if (!Sink::VerifyEntityBuffer(verifyer)) { | 150 | if (!VerifyEntityBuffer(verifyer)) { |
151 | Warning() << "invalid buffer, not an entity buffer"; | 151 | Warning() << "invalid buffer, not an entity buffer"; |
152 | return KAsync::error<qint64>(0); | 152 | return KAsync::error<qint64>(0); |
153 | } | 153 | } |
154 | } | 154 | } |
155 | auto entity = Sink::GetEntity(createEntity->delta()->Data()); | 155 | auto entity = GetEntity(createEntity->delta()->Data()); |
156 | if (!entity->resource()->size() && !entity->local()->size()) { | 156 | if (!entity->resource()->size() && !entity->local()->size()) { |
157 | Warning() << "No local and no resource buffer while trying to create entity."; | 157 | Warning() << "No local and no resource buffer while trying to create entity."; |
158 | return KAsync::error<qint64>(0); | 158 | return KAsync::error<qint64>(0); |
@@ -171,16 +171,16 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
171 | key = QUuid::createUuid().toString().toUtf8(); | 171 | key = QUuid::createUuid().toString().toUtf8(); |
172 | } | 172 | } |
173 | Q_ASSERT(!key.isEmpty()); | 173 | Q_ASSERT(!key.isEmpty()); |
174 | const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; | 174 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; |
175 | 175 | ||
176 | //Add metadata buffer | 176 | //Add metadata buffer |
177 | flatbuffers::FlatBufferBuilder metadataFbb; | 177 | flatbuffers::FlatBufferBuilder metadataFbb; |
178 | auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); | 178 | auto metadataBuilder = MetadataBuilder(metadataFbb); |
179 | metadataBuilder.add_revision(newRevision); | 179 | metadataBuilder.add_revision(newRevision); |
180 | metadataBuilder.add_operation(Sink::Operation_Creation); | 180 | metadataBuilder.add_operation(Operation_Creation); |
181 | metadataBuilder.add_replayToSource(replayToSource); | 181 | metadataBuilder.add_replayToSource(replayToSource); |
182 | auto metadataBuffer = metadataBuilder.Finish(); | 182 | auto metadataBuffer = metadataBuilder.Finish(); |
183 | Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); | 183 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
184 | 184 | ||
185 | flatbuffers::FlatBufferBuilder fbb; | 185 | flatbuffers::FlatBufferBuilder fbb; |
186 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 186 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); |
@@ -194,14 +194,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
194 | } | 194 | } |
195 | 195 | ||
196 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 196 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
197 | Storage::mainDatabase(d->transaction, bufferType).scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { | 197 | Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { |
198 | auto entity = Sink::GetEntity(value); | 198 | auto entity = GetEntity(value); |
199 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 199 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
200 | for (auto processor : d->processors[bufferType]) { | 200 | for (auto processor : d->processors[bufferType]) { |
201 | processor->newEntity(key, newRevision, *adaptor, d->transaction); | 201 | processor->newEntity(key, newRevision, *adaptor, d->transaction); |
202 | } | 202 | } |
203 | return false; | 203 | return false; |
204 | }, [this](const Sink::Storage::Error &error) { | 204 | }, [this](const Storage::Error &error) { |
205 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 205 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; |
206 | }); | 206 | }); |
207 | return KAsync::start<qint64>([newRevision](){ | 207 | return KAsync::start<qint64>([newRevision](){ |
@@ -213,16 +213,16 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
213 | { | 213 | { |
214 | Trace() << "Pipeline: Modified Entity"; | 214 | Trace() << "Pipeline: Modified Entity"; |
215 | 215 | ||
216 | const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; | 216 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; |
217 | 217 | ||
218 | { | 218 | { |
219 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 219 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
220 | if (!Sink::Commands::VerifyModifyEntityBuffer(verifyer)) { | 220 | if (!Commands::VerifyModifyEntityBuffer(verifyer)) { |
221 | Warning() << "invalid buffer, not a modify entity buffer"; | 221 | Warning() << "invalid buffer, not a modify entity buffer"; |
222 | return KAsync::error<qint64>(0); | 222 | return KAsync::error<qint64>(0); |
223 | } | 223 | } |
224 | } | 224 | } |
225 | auto modifyEntity = Sink::Commands::GetModifyEntity(command); | 225 | auto modifyEntity = Commands::GetModifyEntity(command); |
226 | Q_ASSERT(modifyEntity); | 226 | Q_ASSERT(modifyEntity); |
227 | 227 | ||
228 | const qint64 baseRevision = modifyEntity->revision(); | 228 | const qint64 baseRevision = modifyEntity->revision(); |
@@ -236,7 +236,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
236 | } | 236 | } |
237 | { | 237 | { |
238 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); | 238 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); |
239 | if (!Sink::VerifyEntityBuffer(verifyer)) { | 239 | if (!VerifyEntityBuffer(verifyer)) { |
240 | Warning() << "invalid buffer, not an entity buffer"; | 240 | Warning() << "invalid buffer, not an entity buffer"; |
241 | return KAsync::error<qint64>(0); | 241 | return KAsync::error<qint64>(0); |
242 | } | 242 | } |
@@ -249,13 +249,13 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
249 | return KAsync::error<qint64>(0); | 249 | return KAsync::error<qint64>(0); |
250 | } | 250 | } |
251 | 251 | ||
252 | auto diffEntity = Sink::GetEntity(modifyEntity->delta()->Data()); | 252 | auto diffEntity = GetEntity(modifyEntity->delta()->Data()); |
253 | Q_ASSERT(diffEntity); | 253 | Q_ASSERT(diffEntity); |
254 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 254 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
255 | 255 | ||
256 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 256 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; |
257 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 257 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
258 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 258 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
259 | if (!buffer.isValid()) { | 259 | if (!buffer.isValid()) { |
260 | Warning() << "Read invalid buffer from disk"; | 260 | Warning() << "Read invalid buffer from disk"; |
261 | } else { | 261 | } else { |
@@ -273,8 +273,8 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
273 | } | 273 | } |
274 | 274 | ||
275 | //resource and uid don't matter at this point | 275 | //resource and uid don't matter at this point |
276 | const Sink::ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); | 276 | const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); |
277 | auto newObject = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<Sink::ApplicationDomain::ApplicationDomainType>(existingObject); | 277 | auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(existingObject); |
278 | 278 | ||
279 | //Apply diff | 279 | //Apply diff |
280 | //FIXME only apply the properties that are available in the buffer | 280 | //FIXME only apply the properties that are available in the buffer |
@@ -299,29 +299,29 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
299 | 299 | ||
300 | //Add metadata buffer | 300 | //Add metadata buffer |
301 | flatbuffers::FlatBufferBuilder metadataFbb; | 301 | flatbuffers::FlatBufferBuilder metadataFbb; |
302 | auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); | 302 | auto metadataBuilder = MetadataBuilder(metadataFbb); |
303 | metadataBuilder.add_revision(newRevision); | 303 | metadataBuilder.add_revision(newRevision); |
304 | metadataBuilder.add_operation(Sink::Operation_Modification); | 304 | metadataBuilder.add_operation(Operation_Modification); |
305 | metadataBuilder.add_replayToSource(replayToSource); | 305 | metadataBuilder.add_replayToSource(replayToSource); |
306 | auto metadataBuffer = metadataBuilder.Finish(); | 306 | auto metadataBuffer = metadataBuilder.Finish(); |
307 | Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); | 307 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
308 | 308 | ||
309 | flatbuffers::FlatBufferBuilder fbb; | 309 | flatbuffers::FlatBufferBuilder fbb; |
310 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 310 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
311 | 311 | ||
312 | storeNewRevision(newRevision, fbb, bufferType, key); | 312 | storeNewRevision(newRevision, fbb, bufferType, key); |
313 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 313 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
314 | Storage::mainDatabase(d->transaction, bufferType).scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { | 314 | Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { |
315 | if (value.isEmpty()) { | 315 | if (value.isEmpty()) { |
316 | ErrorMsg() << "Read buffer is empty."; | 316 | ErrorMsg() << "Read buffer is empty."; |
317 | } | 317 | } |
318 | auto entity = Sink::GetEntity(value.data()); | 318 | auto entity = GetEntity(value.data()); |
319 | auto newEntity = adaptorFactory->createAdaptor(*entity); | 319 | auto newEntity = adaptorFactory->createAdaptor(*entity); |
320 | for (auto processor : d->processors[bufferType]) { | 320 | for (auto processor : d->processors[bufferType]) { |
321 | processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); | 321 | processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); |
322 | } | 322 | } |
323 | return false; | 323 | return false; |
324 | }, [this](const Sink::Storage::Error &error) { | 324 | }, [this](const Storage::Error &error) { |
325 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 325 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; |
326 | }); | 326 | }); |
327 | return KAsync::start<qint64>([newRevision](){ | 327 | return KAsync::start<qint64>([newRevision](){ |
@@ -335,12 +335,12 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
335 | 335 | ||
336 | { | 336 | { |
337 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 337 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
338 | if (!Sink::Commands::VerifyDeleteEntityBuffer(verifyer)) { | 338 | if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { |
339 | Warning() << "invalid buffer, not a delete entity buffer"; | 339 | Warning() << "invalid buffer, not a delete entity buffer"; |
340 | return KAsync::error<qint64>(0); | 340 | return KAsync::error<qint64>(0); |
341 | } | 341 | } |
342 | } | 342 | } |
343 | auto deleteEntity = Sink::Commands::GetDeleteEntity(command); | 343 | auto deleteEntity = Commands::GetDeleteEntity(command); |
344 | 344 | ||
345 | const bool replayToSource = deleteEntity->replayToSource(); | 345 | const bool replayToSource = deleteEntity->replayToSource(); |
346 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); | 346 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); |
@@ -349,11 +349,11 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
349 | bool found = false; | 349 | bool found = false; |
350 | bool alreadyRemoved = false; | 350 | bool alreadyRemoved = false; |
351 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | 351 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { |
352 | auto entity = Sink::GetEntity(data.data()); | 352 | auto entity = GetEntity(data.data()); |
353 | if (entity && entity->metadata()) { | 353 | if (entity && entity->metadata()) { |
354 | auto metadata = Sink::GetMetadata(entity->metadata()->Data()); | 354 | auto metadata = GetMetadata(entity->metadata()->Data()); |
355 | found = true; | 355 | found = true; |
356 | if (metadata->operation() == Sink::Operation_Removal) { | 356 | if (metadata->operation() == Operation_Removal) { |
357 | alreadyRemoved = true; | 357 | alreadyRemoved = true; |
358 | } | 358 | } |
359 | 359 | ||
@@ -373,16 +373,16 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
373 | return KAsync::error<qint64>(0); | 373 | return KAsync::error<qint64>(0); |
374 | } | 374 | } |
375 | 375 | ||
376 | const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; | 376 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; |
377 | 377 | ||
378 | //Add metadata buffer | 378 | //Add metadata buffer |
379 | flatbuffers::FlatBufferBuilder metadataFbb; | 379 | flatbuffers::FlatBufferBuilder metadataFbb; |
380 | auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); | 380 | auto metadataBuilder = MetadataBuilder(metadataFbb); |
381 | metadataBuilder.add_revision(newRevision); | 381 | metadataBuilder.add_revision(newRevision); |
382 | metadataBuilder.add_operation(Sink::Operation_Removal); | 382 | metadataBuilder.add_operation(Operation_Removal); |
383 | metadataBuilder.add_replayToSource(replayToSource); | 383 | metadataBuilder.add_replayToSource(replayToSource); |
384 | auto metadataBuffer = metadataBuilder.Finish(); | 384 | auto metadataBuffer = metadataBuilder.Finish(); |
385 | Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); | 385 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
386 | 386 | ||
387 | flatbuffers::FlatBufferBuilder fbb; | 387 | flatbuffers::FlatBufferBuilder fbb; |
388 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | 388 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); |
@@ -393,16 +393,16 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
393 | return KAsync::error<qint64>(0); | 393 | return KAsync::error<qint64>(0); |
394 | } | 394 | } |
395 | 395 | ||
396 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 396 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; |
397 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { | 397 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { |
398 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 398 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
399 | if (!buffer.isValid()) { | 399 | if (!buffer.isValid()) { |
400 | Warning() << "Read invalid buffer from disk"; | 400 | Warning() << "Read invalid buffer from disk"; |
401 | } else { | 401 | } else { |
402 | current = adaptorFactory->createAdaptor(buffer.entity()); | 402 | current = adaptorFactory->createAdaptor(buffer.entity()); |
403 | } | 403 | } |
404 | return false; | 404 | return false; |
405 | }, [this](const Sink::Storage::Error &error) { | 405 | }, [this](const Storage::Error &error) { |
406 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 406 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; |
407 | }); | 407 | }); |
408 | 408 | ||
@@ -420,33 +420,33 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
420 | 420 | ||
421 | void Pipeline::cleanupRevision(qint64 revision) | 421 | void Pipeline::cleanupRevision(qint64 revision) |
422 | { | 422 | { |
423 | const auto uid = Sink::Storage::getUidFromRevision(d->transaction, revision); | 423 | const auto uid = Storage::getUidFromRevision(d->transaction, revision); |
424 | const auto bufferType = Sink::Storage::getTypeFromRevision(d->transaction, revision); | 424 | const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); |
425 | Trace() << "Cleaning up revision " << revision << uid << bufferType; | 425 | Trace() << "Cleaning up revision " << revision << uid << bufferType; |
426 | Storage::mainDatabase(d->transaction, bufferType).scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { | 426 | Storage::mainDatabase(d->transaction, bufferType).scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { |
427 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 427 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
428 | if (!buffer.isValid()) { | 428 | if (!buffer.isValid()) { |
429 | Warning() << "Read invalid buffer from disk"; | 429 | Warning() << "Read invalid buffer from disk"; |
430 | } else { | 430 | } else { |
431 | const auto metadata = flatbuffers::GetRoot<Sink::Metadata>(buffer.metadataBuffer()); | 431 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); |
432 | const qint64 rev = metadata->revision(); | 432 | const qint64 rev = metadata->revision(); |
433 | //Remove old revisions, and the current if the entity has already been removed | 433 | //Remove old revisions, and the current if the entity has already been removed |
434 | if (rev < revision || metadata->operation() == Sink::Operation_Removal) { | 434 | if (rev < revision || metadata->operation() == Operation_Removal) { |
435 | Sink::Storage::removeRevision(d->transaction, rev); | 435 | Storage::removeRevision(d->transaction, rev); |
436 | Storage::mainDatabase(d->transaction, bufferType).remove(key); | 436 | Storage::mainDatabase(d->transaction, bufferType).remove(key); |
437 | } | 437 | } |
438 | } | 438 | } |
439 | 439 | ||
440 | return true; | 440 | return true; |
441 | }, [](const Sink::Storage::Error &error) { | 441 | }, [](const Storage::Error &error) { |
442 | Warning() << "Error while reading: " << error.message; | 442 | Warning() << "Error while reading: " << error.message; |
443 | }, true); | 443 | }, true); |
444 | Sink::Storage::setCleanedUpRevision(d->transaction, revision); | 444 | Storage::setCleanedUpRevision(d->transaction, revision); |
445 | } | 445 | } |
446 | 446 | ||
447 | qint64 Pipeline::cleanedUpRevision() | 447 | qint64 Pipeline::cleanedUpRevision() |
448 | { | 448 | { |
449 | return Sink::Storage::cleanedUpRevision(d->transaction); | 449 | return Storage::cleanedUpRevision(d->transaction); |
450 | } | 450 | } |
451 | 451 | ||
452 | Preprocessor::Preprocessor() | 452 | Preprocessor::Preprocessor() |