diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 174 |
1 files changed, 101 insertions, 73 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 401c26d..93d8236 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -25,6 +25,7 @@ | |||
25 | #include <QVector> | 25 | #include <QVector> |
26 | #include <QUuid> | 26 | #include <QUuid> |
27 | #include <QDebug> | 27 | #include <QDebug> |
28 | #include <QTime> | ||
28 | #include "entity_generated.h" | 29 | #include "entity_generated.h" |
29 | #include "metadata_generated.h" | 30 | #include "metadata_generated.h" |
30 | #include "createentity_generated.h" | 31 | #include "createentity_generated.h" |
@@ -36,6 +37,9 @@ | |||
36 | #include "definitions.h" | 37 | #include "definitions.h" |
37 | #include "bufferutils.h" | 38 | #include "bufferutils.h" |
38 | 39 | ||
40 | #undef DEBUG_AREA | ||
41 | #define DEBUG_AREA "resource.pipeline" | ||
42 | |||
39 | namespace Sink | 43 | namespace Sink |
40 | { | 44 | { |
41 | 45 | ||
@@ -53,8 +57,24 @@ public: | |||
53 | QHash<QString, QVector<Preprocessor *> > processors; | 57 | QHash<QString, QVector<Preprocessor *> > processors; |
54 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; | 58 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; |
55 | bool revisionChanged; | 59 | bool revisionChanged; |
60 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | ||
61 | QTime transactionTime; | ||
62 | int transactionItemCount; | ||
56 | }; | 63 | }; |
57 | 64 | ||
65 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | ||
66 | { | ||
67 | Storage::mainDatabase(transaction, bufferType).write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | ||
68 | [uid, newRevision](const Storage::Error &error) { | ||
69 | Warning() << "Failed to write entity" << uid << newRevision; | ||
70 | } | ||
71 | ); | ||
72 | revisionChanged = true; | ||
73 | Storage::setMaxRevision(transaction, newRevision); | ||
74 | Storage::recordRevision(transaction, newRevision, uid, bufferType); | ||
75 | } | ||
76 | |||
77 | |||
58 | Pipeline::Pipeline(const QString &resourceName, QObject *parent) | 78 | Pipeline::Pipeline(const QString &resourceName, QObject *parent) |
59 | : QObject(parent), | 79 | : QObject(parent), |
60 | d(new Private(resourceName)) | 80 | d(new Private(resourceName)) |
@@ -86,7 +106,10 @@ void Pipeline::startTransaction() | |||
86 | if (d->transaction) { | 106 | if (d->transaction) { |
87 | return; | 107 | return; |
88 | } | 108 | } |
89 | d->transaction = std::move(storage().createTransaction(Sink::Storage::ReadWrite)); | 109 | Trace() << "Starting transaction."; |
110 | d->transactionTime.start(); | ||
111 | d->transactionItemCount = 0; | ||
112 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite)); | ||
90 | } | 113 | } |
91 | 114 | ||
92 | void Pipeline::commit() | 115 | void Pipeline::commit() |
@@ -96,8 +119,9 @@ void Pipeline::commit() | |||
96 | // for (auto processor : d->processors[bufferType]) { | 119 | // for (auto processor : d->processors[bufferType]) { |
97 | // processor->finalize(); | 120 | // processor->finalize(); |
98 | // } | 121 | // } |
99 | const auto revision = Sink::Storage::maxRevision(d->transaction); | 122 | const auto revision = Storage::maxRevision(d->transaction); |
100 | Trace() << "Committing " << revision; | 123 | const auto elapsed = d->transactionTime.elapsed(); |
124 | Trace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " << (double)elapsed/(double)qMax(d->transactionItemCount, 1) << "[ms/item]"; | ||
101 | if (d->transaction) { | 125 | if (d->transaction) { |
102 | d->transaction.commit(); | 126 | d->transaction.commit(); |
103 | } | 127 | } |
@@ -118,41 +142,30 @@ Storage &Pipeline::storage() const | |||
118 | return d->storage; | 142 | return d->storage; |
119 | } | 143 | } |
120 | 144 | ||
121 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | ||
122 | { | ||
123 | d->transaction.openDatabase(bufferType + ".main").write(Sink::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | ||
124 | [](const Sink::Storage::Error &error) { | ||
125 | Warning() << "Failed to write entity"; | ||
126 | } | ||
127 | ); | ||
128 | d->revisionChanged = true; | ||
129 | Sink::Storage::setMaxRevision(d->transaction, newRevision); | ||
130 | Sink::Storage::recordRevision(d->transaction, newRevision, uid, bufferType); | ||
131 | } | ||
132 | |||
133 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | 145 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) |
134 | { | 146 | { |
135 | Trace() << "Pipeline: New Entity"; | 147 | Trace() << "Pipeline: New Entity"; |
148 | d->transactionItemCount++; | ||
136 | 149 | ||
137 | { | 150 | { |
138 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 151 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
139 | if (!Sink::Commands::VerifyCreateEntityBuffer(verifyer)) { | 152 | if (!Commands::VerifyCreateEntityBuffer(verifyer)) { |
140 | Warning() << "invalid buffer, not a create entity buffer"; | 153 | Warning() << "invalid buffer, not a create entity buffer"; |
141 | return KAsync::error<qint64>(0); | 154 | return KAsync::error<qint64>(0); |
142 | } | 155 | } |
143 | } | 156 | } |
144 | auto createEntity = Sink::Commands::GetCreateEntity(command); | 157 | auto createEntity = Commands::GetCreateEntity(command); |
145 | 158 | ||
146 | const bool replayToSource = createEntity->replayToSource(); | 159 | const bool replayToSource = createEntity->replayToSource(); |
147 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); | 160 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); |
148 | { | 161 | { |
149 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 162 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
150 | if (!Sink::VerifyEntityBuffer(verifyer)) { | 163 | if (!VerifyEntityBuffer(verifyer)) { |
151 | Warning() << "invalid buffer, not an entity buffer"; | 164 | Warning() << "invalid buffer, not an entity buffer"; |
152 | return KAsync::error<qint64>(0); | 165 | return KAsync::error<qint64>(0); |
153 | } | 166 | } |
154 | } | 167 | } |
155 | auto entity = Sink::GetEntity(createEntity->delta()->Data()); | 168 | auto entity = GetEntity(createEntity->delta()->Data()); |
156 | if (!entity->resource()->size() && !entity->local()->size()) { | 169 | if (!entity->resource()->size() && !entity->local()->size()) { |
157 | Warning() << "No local and no resource buffer while trying to create entity."; | 170 | Warning() << "No local and no resource buffer while trying to create entity."; |
158 | return KAsync::error<qint64>(0); | 171 | return KAsync::error<qint64>(0); |
@@ -161,7 +174,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
161 | QByteArray key; | 174 | QByteArray key; |
162 | if (createEntity->entityId()) { | 175 | if (createEntity->entityId()) { |
163 | key = QByteArray(reinterpret_cast<char const*>(createEntity->entityId()->Data()), createEntity->entityId()->size()); | 176 | key = QByteArray(reinterpret_cast<char const*>(createEntity->entityId()->Data()), createEntity->entityId()->size()); |
164 | if (d->transaction.openDatabase(bufferType + ".main").contains(key)) { | 177 | if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { |
165 | ErrorMsg() << "An entity with this id already exists: " << key; | 178 | ErrorMsg() << "An entity with this id already exists: " << key; |
166 | return KAsync::error<qint64>(0); | 179 | return KAsync::error<qint64>(0); |
167 | } | 180 | } |
@@ -171,21 +184,21 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
171 | key = QUuid::createUuid().toString().toUtf8(); | 184 | key = QUuid::createUuid().toString().toUtf8(); |
172 | } | 185 | } |
173 | Q_ASSERT(!key.isEmpty()); | 186 | Q_ASSERT(!key.isEmpty()); |
174 | const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; | 187 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; |
175 | 188 | ||
176 | //Add metadata buffer | 189 | //Add metadata buffer |
177 | flatbuffers::FlatBufferBuilder metadataFbb; | 190 | flatbuffers::FlatBufferBuilder metadataFbb; |
178 | auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); | 191 | auto metadataBuilder = MetadataBuilder(metadataFbb); |
179 | metadataBuilder.add_revision(newRevision); | 192 | metadataBuilder.add_revision(newRevision); |
180 | metadataBuilder.add_operation(Sink::Operation_Creation); | 193 | metadataBuilder.add_operation(Operation_Creation); |
181 | metadataBuilder.add_replayToSource(replayToSource); | 194 | metadataBuilder.add_replayToSource(replayToSource); |
182 | auto metadataBuffer = metadataBuilder.Finish(); | 195 | auto metadataBuffer = metadataBuilder.Finish(); |
183 | Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); | 196 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
184 | 197 | ||
185 | flatbuffers::FlatBufferBuilder fbb; | 198 | flatbuffers::FlatBufferBuilder fbb; |
186 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 199 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); |
187 | 200 | ||
188 | storeNewRevision(newRevision, fbb, bufferType, key); | 201 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
189 | 202 | ||
190 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 203 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
191 | if (!adaptorFactory) { | 204 | if (!adaptorFactory) { |
@@ -194,14 +207,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
194 | } | 207 | } |
195 | 208 | ||
196 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 209 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
197 | d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { | 210 | Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { |
198 | auto entity = Sink::GetEntity(value); | 211 | auto entity = GetEntity(value); |
199 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 212 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
200 | for (auto processor : d->processors[bufferType]) { | 213 | for (auto processor : d->processors[bufferType]) { |
201 | processor->newEntity(key, newRevision, *adaptor, d->transaction); | 214 | processor->newEntity(key, newRevision, *adaptor, d->transaction); |
202 | } | 215 | } |
203 | return false; | 216 | return false; |
204 | }, [this](const Sink::Storage::Error &error) { | 217 | }, [this](const Storage::Error &error) { |
205 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 218 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; |
206 | }); | 219 | }); |
207 | return KAsync::start<qint64>([newRevision](){ | 220 | return KAsync::start<qint64>([newRevision](){ |
@@ -212,17 +225,18 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
212 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 225 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
213 | { | 226 | { |
214 | Trace() << "Pipeline: Modified Entity"; | 227 | Trace() << "Pipeline: Modified Entity"; |
228 | d->transactionItemCount++; | ||
215 | 229 | ||
216 | const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; | 230 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; |
217 | 231 | ||
218 | { | 232 | { |
219 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 233 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
220 | if (!Sink::Commands::VerifyModifyEntityBuffer(verifyer)) { | 234 | if (!Commands::VerifyModifyEntityBuffer(verifyer)) { |
221 | Warning() << "invalid buffer, not a modify entity buffer"; | 235 | Warning() << "invalid buffer, not a modify entity buffer"; |
222 | return KAsync::error<qint64>(0); | 236 | return KAsync::error<qint64>(0); |
223 | } | 237 | } |
224 | } | 238 | } |
225 | auto modifyEntity = Sink::Commands::GetModifyEntity(command); | 239 | auto modifyEntity = Commands::GetModifyEntity(command); |
226 | Q_ASSERT(modifyEntity); | 240 | Q_ASSERT(modifyEntity); |
227 | 241 | ||
228 | const qint64 baseRevision = modifyEntity->revision(); | 242 | const qint64 baseRevision = modifyEntity->revision(); |
@@ -236,7 +250,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
236 | } | 250 | } |
237 | { | 251 | { |
238 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); | 252 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); |
239 | if (!Sink::VerifyEntityBuffer(verifyer)) { | 253 | if (!VerifyEntityBuffer(verifyer)) { |
240 | Warning() << "invalid buffer, not an entity buffer"; | 254 | Warning() << "invalid buffer, not an entity buffer"; |
241 | return KAsync::error<qint64>(0); | 255 | return KAsync::error<qint64>(0); |
242 | } | 256 | } |
@@ -249,13 +263,13 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
249 | return KAsync::error<qint64>(0); | 263 | return KAsync::error<qint64>(0); |
250 | } | 264 | } |
251 | 265 | ||
252 | auto diffEntity = Sink::GetEntity(modifyEntity->delta()->Data()); | 266 | auto diffEntity = GetEntity(modifyEntity->delta()->Data()); |
253 | Q_ASSERT(diffEntity); | 267 | Q_ASSERT(diffEntity); |
254 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 268 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
255 | 269 | ||
256 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 270 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; |
257 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 271 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
258 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 272 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
259 | if (!buffer.isValid()) { | 273 | if (!buffer.isValid()) { |
260 | Warning() << "Read invalid buffer from disk"; | 274 | Warning() << "Read invalid buffer from disk"; |
261 | } else { | 275 | } else { |
@@ -273,15 +287,22 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
273 | } | 287 | } |
274 | 288 | ||
275 | //resource and uid don't matter at this point | 289 | //resource and uid don't matter at this point |
276 | const Sink::ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); | 290 | const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); |
277 | auto newObject = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<Sink::ApplicationDomain::ApplicationDomainType>(existingObject); | 291 | auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(existingObject); |
278 | 292 | ||
279 | //Apply diff | 293 | //Apply diff |
280 | //FIXME only apply the properties that are available in the buffer | 294 | //FIXME only apply the properties that are available in the buffer |
281 | Trace() << "Applying changed properties: " << diff->availableProperties(); | 295 | Trace() << "Applying changed properties: " << diff->availableProperties(); |
296 | QSet<QByteArray> changeset; | ||
282 | for (const auto &property : diff->availableProperties()) { | 297 | for (const auto &property : diff->availableProperties()) { |
283 | newObject->setProperty(property, diff->getProperty(property)); | 298 | changeset << property; |
299 | const auto value = diff->getProperty(property); | ||
300 | if (value.isValid()) { | ||
301 | newObject->setProperty(property, value); | ||
302 | } | ||
284 | } | 303 | } |
304 | //Altough we only set some properties, we want all to be serialized | ||
305 | newObject->setChangedProperties(changeset); | ||
285 | 306 | ||
286 | //Remove deletions | 307 | //Remove deletions |
287 | if (modifyEntity->deletions()) { | 308 | if (modifyEntity->deletions()) { |
@@ -292,26 +313,29 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
292 | 313 | ||
293 | //Add metadata buffer | 314 | //Add metadata buffer |
294 | flatbuffers::FlatBufferBuilder metadataFbb; | 315 | flatbuffers::FlatBufferBuilder metadataFbb; |
295 | auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); | 316 | auto metadataBuilder = MetadataBuilder(metadataFbb); |
296 | metadataBuilder.add_revision(newRevision); | 317 | metadataBuilder.add_revision(newRevision); |
297 | metadataBuilder.add_operation(Sink::Operation_Modification); | 318 | metadataBuilder.add_operation(Operation_Modification); |
298 | metadataBuilder.add_replayToSource(replayToSource); | 319 | metadataBuilder.add_replayToSource(replayToSource); |
299 | auto metadataBuffer = metadataBuilder.Finish(); | 320 | auto metadataBuffer = metadataBuilder.Finish(); |
300 | Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); | 321 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
301 | 322 | ||
302 | flatbuffers::FlatBufferBuilder fbb; | 323 | flatbuffers::FlatBufferBuilder fbb; |
303 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 324 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
304 | 325 | ||
305 | storeNewRevision(newRevision, fbb, bufferType, key); | 326 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
306 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 327 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
307 | d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool { | 328 | Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { |
308 | auto entity = Sink::GetEntity(value); | 329 | if (value.isEmpty()) { |
330 | ErrorMsg() << "Read buffer is empty."; | ||
331 | } | ||
332 | auto entity = GetEntity(value.data()); | ||
309 | auto newEntity = adaptorFactory->createAdaptor(*entity); | 333 | auto newEntity = adaptorFactory->createAdaptor(*entity); |
310 | for (auto processor : d->processors[bufferType]) { | 334 | for (auto processor : d->processors[bufferType]) { |
311 | processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); | 335 | processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); |
312 | } | 336 | } |
313 | return false; | 337 | return false; |
314 | }, [this](const Sink::Storage::Error &error) { | 338 | }, [this](const Storage::Error &error) { |
315 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 339 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; |
316 | }); | 340 | }); |
317 | return KAsync::start<qint64>([newRevision](){ | 341 | return KAsync::start<qint64>([newRevision](){ |
@@ -322,15 +346,16 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
322 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | 346 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) |
323 | { | 347 | { |
324 | Trace() << "Pipeline: Deleted Entity"; | 348 | Trace() << "Pipeline: Deleted Entity"; |
349 | d->transactionItemCount++; | ||
325 | 350 | ||
326 | { | 351 | { |
327 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 352 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
328 | if (!Sink::Commands::VerifyDeleteEntityBuffer(verifyer)) { | 353 | if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { |
329 | Warning() << "invalid buffer, not a delete entity buffer"; | 354 | Warning() << "invalid buffer, not a delete entity buffer"; |
330 | return KAsync::error<qint64>(0); | 355 | return KAsync::error<qint64>(0); |
331 | } | 356 | } |
332 | } | 357 | } |
333 | auto deleteEntity = Sink::Commands::GetDeleteEntity(command); | 358 | auto deleteEntity = Commands::GetDeleteEntity(command); |
334 | 359 | ||
335 | const bool replayToSource = deleteEntity->replayToSource(); | 360 | const bool replayToSource = deleteEntity->replayToSource(); |
336 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); | 361 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); |
@@ -338,13 +363,12 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
338 | 363 | ||
339 | bool found = false; | 364 | bool found = false; |
340 | bool alreadyRemoved = false; | 365 | bool alreadyRemoved = false; |
341 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | 366 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { |
342 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 367 | auto entity = GetEntity(data.data()); |
343 | auto entity = Sink::GetEntity(data.data()); | ||
344 | if (entity && entity->metadata()) { | 368 | if (entity && entity->metadata()) { |
345 | auto metadata = Sink::GetMetadata(entity->metadata()->Data()); | 369 | auto metadata = GetMetadata(entity->metadata()->Data()); |
346 | found = true; | 370 | found = true; |
347 | if (metadata->operation() == Sink::Operation_Removal) { | 371 | if (metadata->operation() == Operation_Removal) { |
348 | alreadyRemoved = true; | 372 | alreadyRemoved = true; |
349 | } | 373 | } |
350 | 374 | ||
@@ -364,16 +388,16 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
364 | return KAsync::error<qint64>(0); | 388 | return KAsync::error<qint64>(0); |
365 | } | 389 | } |
366 | 390 | ||
367 | const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; | 391 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; |
368 | 392 | ||
369 | //Add metadata buffer | 393 | //Add metadata buffer |
370 | flatbuffers::FlatBufferBuilder metadataFbb; | 394 | flatbuffers::FlatBufferBuilder metadataFbb; |
371 | auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); | 395 | auto metadataBuilder = MetadataBuilder(metadataFbb); |
372 | metadataBuilder.add_revision(newRevision); | 396 | metadataBuilder.add_revision(newRevision); |
373 | metadataBuilder.add_operation(Sink::Operation_Removal); | 397 | metadataBuilder.add_operation(Operation_Removal); |
374 | metadataBuilder.add_replayToSource(replayToSource); | 398 | metadataBuilder.add_replayToSource(replayToSource); |
375 | auto metadataBuffer = metadataBuilder.Finish(); | 399 | auto metadataBuffer = metadataBuilder.Finish(); |
376 | Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); | 400 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
377 | 401 | ||
378 | flatbuffers::FlatBufferBuilder fbb; | 402 | flatbuffers::FlatBufferBuilder fbb; |
379 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | 403 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); |
@@ -384,20 +408,20 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
384 | return KAsync::error<qint64>(0); | 408 | return KAsync::error<qint64>(0); |
385 | } | 409 | } |
386 | 410 | ||
387 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 411 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; |
388 | d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { | 412 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { |
389 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 413 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
390 | if (!buffer.isValid()) { | 414 | if (!buffer.isValid()) { |
391 | Warning() << "Read invalid buffer from disk"; | 415 | Warning() << "Read invalid buffer from disk"; |
392 | } else { | 416 | } else { |
393 | current = adaptorFactory->createAdaptor(buffer.entity()); | 417 | current = adaptorFactory->createAdaptor(buffer.entity()); |
394 | } | 418 | } |
395 | return false; | 419 | return false; |
396 | }, [this](const Sink::Storage::Error &error) { | 420 | }, [this](const Storage::Error &error) { |
397 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 421 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; |
398 | }); | 422 | }); |
399 | 423 | ||
400 | storeNewRevision(newRevision, fbb, bufferType, key); | 424 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
401 | Log() << "Pipeline: deleted entity: "<< newRevision; | 425 | Log() << "Pipeline: deleted entity: "<< newRevision; |
402 | 426 | ||
403 | for (auto processor : d->processors[bufferType]) { | 427 | for (auto processor : d->processors[bufferType]) { |
@@ -411,33 +435,33 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
411 | 435 | ||
412 | void Pipeline::cleanupRevision(qint64 revision) | 436 | void Pipeline::cleanupRevision(qint64 revision) |
413 | { | 437 | { |
414 | const auto uid = Sink::Storage::getUidFromRevision(d->transaction, revision); | 438 | const auto uid = Storage::getUidFromRevision(d->transaction, revision); |
415 | const auto bufferType = Sink::Storage::getTypeFromRevision(d->transaction, revision); | 439 | const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); |
416 | Trace() << "Cleaning up revision " << revision << uid << bufferType; | 440 | Trace() << "Cleaning up revision " << revision << uid << bufferType; |
417 | d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { | 441 | Storage::mainDatabase(d->transaction, bufferType).scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { |
418 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 442 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
419 | if (!buffer.isValid()) { | 443 | if (!buffer.isValid()) { |
420 | Warning() << "Read invalid buffer from disk"; | 444 | Warning() << "Read invalid buffer from disk"; |
421 | } else { | 445 | } else { |
422 | const auto metadata = flatbuffers::GetRoot<Sink::Metadata>(buffer.metadataBuffer()); | 446 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); |
423 | const qint64 rev = metadata->revision(); | 447 | const qint64 rev = metadata->revision(); |
424 | //Remove old revisions, and the current if the entity has already been removed | 448 | //Remove old revisions, and the current if the entity has already been removed |
425 | if (rev < revision || metadata->operation() == Sink::Operation_Removal) { | 449 | if (rev < revision || metadata->operation() == Operation_Removal) { |
426 | Sink::Storage::removeRevision(d->transaction, rev); | 450 | Storage::removeRevision(d->transaction, rev); |
427 | d->transaction.openDatabase(bufferType + ".main").remove(key); | 451 | Storage::mainDatabase(d->transaction, bufferType).remove(key); |
428 | } | 452 | } |
429 | } | 453 | } |
430 | 454 | ||
431 | return true; | 455 | return true; |
432 | }, [](const Sink::Storage::Error &error) { | 456 | }, [](const Storage::Error &error) { |
433 | Warning() << "Error while reading: " << error.message; | 457 | Warning() << "Error while reading: " << error.message; |
434 | }, true); | 458 | }, true); |
435 | Sink::Storage::setCleanedUpRevision(d->transaction, revision); | 459 | Storage::setCleanedUpRevision(d->transaction, revision); |
436 | } | 460 | } |
437 | 461 | ||
438 | qint64 Pipeline::cleanedUpRevision() | 462 | qint64 Pipeline::cleanedUpRevision() |
439 | { | 463 | { |
440 | return Sink::Storage::cleanedUpRevision(d->transaction); | 464 | return Storage::cleanedUpRevision(d->transaction); |
441 | } | 465 | } |
442 | 466 | ||
443 | Preprocessor::Preprocessor() | 467 | Preprocessor::Preprocessor() |
@@ -459,3 +483,7 @@ void Preprocessor::finalize() | |||
459 | 483 | ||
460 | } // namespace Sink | 484 | } // namespace Sink |
461 | 485 | ||
486 | #pragma clang diagnostic push | ||
487 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" | ||
488 | #include "moc_pipeline.cpp" | ||
489 | #pragma clang diagnostic pop | ||