diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 114 |
1 files changed, 57 insertions, 57 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index a087def..401c26d 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -36,14 +36,14 @@ | |||
36 | #include "definitions.h" | 36 | #include "definitions.h" |
37 | #include "bufferutils.h" | 37 | #include "bufferutils.h" |
38 | 38 | ||
39 | namespace Akonadi2 | 39 | namespace Sink |
40 | { | 40 | { |
41 | 41 | ||
42 | class Pipeline::Private | 42 | class Pipeline::Private |
43 | { | 43 | { |
44 | public: | 44 | public: |
45 | Private(const QString &resourceName) | 45 | Private(const QString &resourceName) |
46 | : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite), | 46 | : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), |
47 | revisionChanged(false) | 47 | revisionChanged(false) |
48 | { | 48 | { |
49 | } | 49 | } |
@@ -86,7 +86,7 @@ void Pipeline::startTransaction() | |||
86 | if (d->transaction) { | 86 | if (d->transaction) { |
87 | return; | 87 | return; |
88 | } | 88 | } |
89 | d->transaction = std::move(storage().createTransaction(Akonadi2::Storage::ReadWrite)); | 89 | d->transaction = std::move(storage().createTransaction(Sink::Storage::ReadWrite)); |
90 | } | 90 | } |
91 | 91 | ||
92 | void Pipeline::commit() | 92 | void Pipeline::commit() |
@@ -96,7 +96,7 @@ void Pipeline::commit() | |||
96 | // for (auto processor : d->processors[bufferType]) { | 96 | // for (auto processor : d->processors[bufferType]) { |
97 | // processor->finalize(); | 97 | // processor->finalize(); |
98 | // } | 98 | // } |
99 | const auto revision = Akonadi2::Storage::maxRevision(d->transaction); | 99 | const auto revision = Sink::Storage::maxRevision(d->transaction); |
100 | Trace() << "Committing " << revision; | 100 | Trace() << "Committing " << revision; |
101 | if (d->transaction) { | 101 | if (d->transaction) { |
102 | d->transaction.commit(); | 102 | d->transaction.commit(); |
@@ -120,14 +120,14 @@ Storage &Pipeline::storage() const | |||
120 | 120 | ||
121 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 121 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
122 | { | 122 | { |
123 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | 123 | d->transaction.openDatabase(bufferType + ".main").write(Sink::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), |
124 | [](const Akonadi2::Storage::Error &error) { | 124 | [](const Sink::Storage::Error &error) { |
125 | Warning() << "Failed to write entity"; | 125 | Warning() << "Failed to write entity"; |
126 | } | 126 | } |
127 | ); | 127 | ); |
128 | d->revisionChanged = true; | 128 | d->revisionChanged = true; |
129 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 129 | Sink::Storage::setMaxRevision(d->transaction, newRevision); |
130 | Akonadi2::Storage::recordRevision(d->transaction, newRevision, uid, bufferType); | 130 | Sink::Storage::recordRevision(d->transaction, newRevision, uid, bufferType); |
131 | } | 131 | } |
132 | 132 | ||
133 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | 133 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) |
@@ -136,23 +136,23 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
136 | 136 | ||
137 | { | 137 | { |
138 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 138 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
139 | if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { | 139 | if (!Sink::Commands::VerifyCreateEntityBuffer(verifyer)) { |
140 | Warning() << "invalid buffer, not a create entity buffer"; | 140 | Warning() << "invalid buffer, not a create entity buffer"; |
141 | return KAsync::error<qint64>(0); | 141 | return KAsync::error<qint64>(0); |
142 | } | 142 | } |
143 | } | 143 | } |
144 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | 144 | auto createEntity = Sink::Commands::GetCreateEntity(command); |
145 | 145 | ||
146 | const bool replayToSource = createEntity->replayToSource(); | 146 | const bool replayToSource = createEntity->replayToSource(); |
147 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); | 147 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); |
148 | { | 148 | { |
149 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 149 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
150 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { | 150 | if (!Sink::VerifyEntityBuffer(verifyer)) { |
151 | Warning() << "invalid buffer, not an entity buffer"; | 151 | Warning() << "invalid buffer, not an entity buffer"; |
152 | return KAsync::error<qint64>(0); | 152 | return KAsync::error<qint64>(0); |
153 | } | 153 | } |
154 | } | 154 | } |
155 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); | 155 | auto entity = Sink::GetEntity(createEntity->delta()->Data()); |
156 | if (!entity->resource()->size() && !entity->local()->size()) { | 156 | if (!entity->resource()->size() && !entity->local()->size()) { |
157 | Warning() << "No local and no resource buffer while trying to create entity."; | 157 | Warning() << "No local and no resource buffer while trying to create entity."; |
158 | return KAsync::error<qint64>(0); | 158 | return KAsync::error<qint64>(0); |
@@ -171,16 +171,16 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
171 | key = QUuid::createUuid().toString().toUtf8(); | 171 | key = QUuid::createUuid().toString().toUtf8(); |
172 | } | 172 | } |
173 | Q_ASSERT(!key.isEmpty()); | 173 | Q_ASSERT(!key.isEmpty()); |
174 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; | 174 | const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; |
175 | 175 | ||
176 | //Add metadata buffer | 176 | //Add metadata buffer |
177 | flatbuffers::FlatBufferBuilder metadataFbb; | 177 | flatbuffers::FlatBufferBuilder metadataFbb; |
178 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | 178 | auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); |
179 | metadataBuilder.add_revision(newRevision); | 179 | metadataBuilder.add_revision(newRevision); |
180 | metadataBuilder.add_operation(Akonadi2::Operation_Creation); | 180 | metadataBuilder.add_operation(Sink::Operation_Creation); |
181 | metadataBuilder.add_replayToSource(replayToSource); | 181 | metadataBuilder.add_replayToSource(replayToSource); |
182 | auto metadataBuffer = metadataBuilder.Finish(); | 182 | auto metadataBuffer = metadataBuilder.Finish(); |
183 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | 183 | Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); |
184 | 184 | ||
185 | flatbuffers::FlatBufferBuilder fbb; | 185 | flatbuffers::FlatBufferBuilder fbb; |
186 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 186 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); |
@@ -194,14 +194,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
194 | } | 194 | } |
195 | 195 | ||
196 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 196 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
197 | d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { | 197 | d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { |
198 | auto entity = Akonadi2::GetEntity(value); | 198 | auto entity = Sink::GetEntity(value); |
199 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 199 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
200 | for (auto processor : d->processors[bufferType]) { | 200 | for (auto processor : d->processors[bufferType]) { |
201 | processor->newEntity(key, newRevision, *adaptor, d->transaction); | 201 | processor->newEntity(key, newRevision, *adaptor, d->transaction); |
202 | } | 202 | } |
203 | return false; | 203 | return false; |
204 | }, [this](const Akonadi2::Storage::Error &error) { | 204 | }, [this](const Sink::Storage::Error &error) { |
205 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 205 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; |
206 | }); | 206 | }); |
207 | return KAsync::start<qint64>([newRevision](){ | 207 | return KAsync::start<qint64>([newRevision](){ |
@@ -213,16 +213,16 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
213 | { | 213 | { |
214 | Trace() << "Pipeline: Modified Entity"; | 214 | Trace() << "Pipeline: Modified Entity"; |
215 | 215 | ||
216 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; | 216 | const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; |
217 | 217 | ||
218 | { | 218 | { |
219 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 219 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
220 | if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) { | 220 | if (!Sink::Commands::VerifyModifyEntityBuffer(verifyer)) { |
221 | Warning() << "invalid buffer, not a modify entity buffer"; | 221 | Warning() << "invalid buffer, not a modify entity buffer"; |
222 | return KAsync::error<qint64>(0); | 222 | return KAsync::error<qint64>(0); |
223 | } | 223 | } |
224 | } | 224 | } |
225 | auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); | 225 | auto modifyEntity = Sink::Commands::GetModifyEntity(command); |
226 | Q_ASSERT(modifyEntity); | 226 | Q_ASSERT(modifyEntity); |
227 | 227 | ||
228 | const qint64 baseRevision = modifyEntity->revision(); | 228 | const qint64 baseRevision = modifyEntity->revision(); |
@@ -236,7 +236,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
236 | } | 236 | } |
237 | { | 237 | { |
238 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); | 238 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); |
239 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { | 239 | if (!Sink::VerifyEntityBuffer(verifyer)) { |
240 | Warning() << "invalid buffer, not an entity buffer"; | 240 | Warning() << "invalid buffer, not an entity buffer"; |
241 | return KAsync::error<qint64>(0); | 241 | return KAsync::error<qint64>(0); |
242 | } | 242 | } |
@@ -249,13 +249,13 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
249 | return KAsync::error<qint64>(0); | 249 | return KAsync::error<qint64>(0); |
250 | } | 250 | } |
251 | 251 | ||
252 | auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data()); | 252 | auto diffEntity = Sink::GetEntity(modifyEntity->delta()->Data()); |
253 | Q_ASSERT(diffEntity); | 253 | Q_ASSERT(diffEntity); |
254 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 254 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
255 | 255 | ||
256 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | 256 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; |
257 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 257 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
258 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 258 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
259 | if (!buffer.isValid()) { | 259 | if (!buffer.isValid()) { |
260 | Warning() << "Read invalid buffer from disk"; | 260 | Warning() << "Read invalid buffer from disk"; |
261 | } else { | 261 | } else { |
@@ -273,8 +273,8 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
273 | } | 273 | } |
274 | 274 | ||
275 | //resource and uid don't matter at this point | 275 | //resource and uid don't matter at this point |
276 | const Akonadi2::ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); | 276 | const Sink::ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); |
277 | auto newObject = Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<Akonadi2::ApplicationDomain::ApplicationDomainType>(existingObject); | 277 | auto newObject = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<Sink::ApplicationDomain::ApplicationDomainType>(existingObject); |
278 | 278 | ||
279 | //Apply diff | 279 | //Apply diff |
280 | //FIXME only apply the properties that are available in the buffer | 280 | //FIXME only apply the properties that are available in the buffer |
@@ -292,26 +292,26 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
292 | 292 | ||
293 | //Add metadata buffer | 293 | //Add metadata buffer |
294 | flatbuffers::FlatBufferBuilder metadataFbb; | 294 | flatbuffers::FlatBufferBuilder metadataFbb; |
295 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | 295 | auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); |
296 | metadataBuilder.add_revision(newRevision); | 296 | metadataBuilder.add_revision(newRevision); |
297 | metadataBuilder.add_operation(Akonadi2::Operation_Modification); | 297 | metadataBuilder.add_operation(Sink::Operation_Modification); |
298 | metadataBuilder.add_replayToSource(replayToSource); | 298 | metadataBuilder.add_replayToSource(replayToSource); |
299 | auto metadataBuffer = metadataBuilder.Finish(); | 299 | auto metadataBuffer = metadataBuilder.Finish(); |
300 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | 300 | Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); |
301 | 301 | ||
302 | flatbuffers::FlatBufferBuilder fbb; | 302 | flatbuffers::FlatBufferBuilder fbb; |
303 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 303 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
304 | 304 | ||
305 | storeNewRevision(newRevision, fbb, bufferType, key); | 305 | storeNewRevision(newRevision, fbb, bufferType, key); |
306 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 306 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
307 | d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool { | 307 | d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool { |
308 | auto entity = Akonadi2::GetEntity(value); | 308 | auto entity = Sink::GetEntity(value); |
309 | auto newEntity = adaptorFactory->createAdaptor(*entity); | 309 | auto newEntity = adaptorFactory->createAdaptor(*entity); |
310 | for (auto processor : d->processors[bufferType]) { | 310 | for (auto processor : d->processors[bufferType]) { |
311 | processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); | 311 | processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); |
312 | } | 312 | } |
313 | return false; | 313 | return false; |
314 | }, [this](const Akonadi2::Storage::Error &error) { | 314 | }, [this](const Sink::Storage::Error &error) { |
315 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 315 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; |
316 | }); | 316 | }); |
317 | return KAsync::start<qint64>([newRevision](){ | 317 | return KAsync::start<qint64>([newRevision](){ |
@@ -325,12 +325,12 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
325 | 325 | ||
326 | { | 326 | { |
327 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 327 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
328 | if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) { | 328 | if (!Sink::Commands::VerifyDeleteEntityBuffer(verifyer)) { |
329 | Warning() << "invalid buffer, not a delete entity buffer"; | 329 | Warning() << "invalid buffer, not a delete entity buffer"; |
330 | return KAsync::error<qint64>(0); | 330 | return KAsync::error<qint64>(0); |
331 | } | 331 | } |
332 | } | 332 | } |
333 | auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); | 333 | auto deleteEntity = Sink::Commands::GetDeleteEntity(command); |
334 | 334 | ||
335 | const bool replayToSource = deleteEntity->replayToSource(); | 335 | const bool replayToSource = deleteEntity->replayToSource(); |
336 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); | 336 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); |
@@ -339,12 +339,12 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
339 | bool found = false; | 339 | bool found = false; |
340 | bool alreadyRemoved = false; | 340 | bool alreadyRemoved = false; |
341 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | 341 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { |
342 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 342 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
343 | auto entity = Akonadi2::GetEntity(data.data()); | 343 | auto entity = Sink::GetEntity(data.data()); |
344 | if (entity && entity->metadata()) { | 344 | if (entity && entity->metadata()) { |
345 | auto metadata = Akonadi2::GetMetadata(entity->metadata()->Data()); | 345 | auto metadata = Sink::GetMetadata(entity->metadata()->Data()); |
346 | found = true; | 346 | found = true; |
347 | if (metadata->operation() == Akonadi2::Operation_Removal) { | 347 | if (metadata->operation() == Sink::Operation_Removal) { |
348 | alreadyRemoved = true; | 348 | alreadyRemoved = true; |
349 | } | 349 | } |
350 | 350 | ||
@@ -364,16 +364,16 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
364 | return KAsync::error<qint64>(0); | 364 | return KAsync::error<qint64>(0); |
365 | } | 365 | } |
366 | 366 | ||
367 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; | 367 | const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; |
368 | 368 | ||
369 | //Add metadata buffer | 369 | //Add metadata buffer |
370 | flatbuffers::FlatBufferBuilder metadataFbb; | 370 | flatbuffers::FlatBufferBuilder metadataFbb; |
371 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | 371 | auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); |
372 | metadataBuilder.add_revision(newRevision); | 372 | metadataBuilder.add_revision(newRevision); |
373 | metadataBuilder.add_operation(Akonadi2::Operation_Removal); | 373 | metadataBuilder.add_operation(Sink::Operation_Removal); |
374 | metadataBuilder.add_replayToSource(replayToSource); | 374 | metadataBuilder.add_replayToSource(replayToSource); |
375 | auto metadataBuffer = metadataBuilder.Finish(); | 375 | auto metadataBuffer = metadataBuilder.Finish(); |
376 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | 376 | Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); |
377 | 377 | ||
378 | flatbuffers::FlatBufferBuilder fbb; | 378 | flatbuffers::FlatBufferBuilder fbb; |
379 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | 379 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); |
@@ -384,16 +384,16 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
384 | return KAsync::error<qint64>(0); | 384 | return KAsync::error<qint64>(0); |
385 | } | 385 | } |
386 | 386 | ||
387 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | 387 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; |
388 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { | 388 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { |
389 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 389 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
390 | if (!buffer.isValid()) { | 390 | if (!buffer.isValid()) { |
391 | Warning() << "Read invalid buffer from disk"; | 391 | Warning() << "Read invalid buffer from disk"; |
392 | } else { | 392 | } else { |
393 | current = adaptorFactory->createAdaptor(buffer.entity()); | 393 | current = adaptorFactory->createAdaptor(buffer.entity()); |
394 | } | 394 | } |
395 | return false; | 395 | return false; |
396 | }, [this](const Akonadi2::Storage::Error &error) { | 396 | }, [this](const Sink::Storage::Error &error) { |
397 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 397 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; |
398 | }); | 398 | }); |
399 | 399 | ||
@@ -411,33 +411,33 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
411 | 411 | ||
412 | void Pipeline::cleanupRevision(qint64 revision) | 412 | void Pipeline::cleanupRevision(qint64 revision) |
413 | { | 413 | { |
414 | const auto uid = Akonadi2::Storage::getUidFromRevision(d->transaction, revision); | 414 | const auto uid = Sink::Storage::getUidFromRevision(d->transaction, revision); |
415 | const auto bufferType = Akonadi2::Storage::getTypeFromRevision(d->transaction, revision); | 415 | const auto bufferType = Sink::Storage::getTypeFromRevision(d->transaction, revision); |
416 | Trace() << "Cleaning up revision " << revision << uid << bufferType; | 416 | Trace() << "Cleaning up revision " << revision << uid << bufferType; |
417 | d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { | 417 | d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { |
418 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 418 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
419 | if (!buffer.isValid()) { | 419 | if (!buffer.isValid()) { |
420 | Warning() << "Read invalid buffer from disk"; | 420 | Warning() << "Read invalid buffer from disk"; |
421 | } else { | 421 | } else { |
422 | const auto metadata = flatbuffers::GetRoot<Akonadi2::Metadata>(buffer.metadataBuffer()); | 422 | const auto metadata = flatbuffers::GetRoot<Sink::Metadata>(buffer.metadataBuffer()); |
423 | const qint64 rev = metadata->revision(); | 423 | const qint64 rev = metadata->revision(); |
424 | //Remove old revisions, and the current if the entity has already been removed | 424 | //Remove old revisions, and the current if the entity has already been removed |
425 | if (rev < revision || metadata->operation() == Akonadi2::Operation_Removal) { | 425 | if (rev < revision || metadata->operation() == Sink::Operation_Removal) { |
426 | Akonadi2::Storage::removeRevision(d->transaction, rev); | 426 | Sink::Storage::removeRevision(d->transaction, rev); |
427 | d->transaction.openDatabase(bufferType + ".main").remove(key); | 427 | d->transaction.openDatabase(bufferType + ".main").remove(key); |
428 | } | 428 | } |
429 | } | 429 | } |
430 | 430 | ||
431 | return true; | 431 | return true; |
432 | }, [](const Akonadi2::Storage::Error &error) { | 432 | }, [](const Sink::Storage::Error &error) { |
433 | Warning() << "Error while reading: " << error.message; | 433 | Warning() << "Error while reading: " << error.message; |
434 | }, true); | 434 | }, true); |
435 | Akonadi2::Storage::setCleanedUpRevision(d->transaction, revision); | 435 | Sink::Storage::setCleanedUpRevision(d->transaction, revision); |
436 | } | 436 | } |
437 | 437 | ||
438 | qint64 Pipeline::cleanedUpRevision() | 438 | qint64 Pipeline::cleanedUpRevision() |
439 | { | 439 | { |
440 | return Akonadi2::Storage::cleanedUpRevision(d->transaction); | 440 | return Sink::Storage::cleanedUpRevision(d->transaction); |
441 | } | 441 | } |
442 | 442 | ||
443 | Preprocessor::Preprocessor() | 443 | Preprocessor::Preprocessor() |
@@ -457,5 +457,5 @@ void Preprocessor::finalize() | |||
457 | { | 457 | { |
458 | } | 458 | } |
459 | 459 | ||
460 | } // namespace Akonadi2 | 460 | } // namespace Sink |
461 | 461 | ||