diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-19 15:28:42 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-21 09:18:49 +0200 |
commit | ba7c8b890c45d735216888204ec88882ef58c918 (patch) | |
tree | cb00c9b51e5353ba3726216679c81c0e2fe9ac35 /common/pipeline.cpp | |
parent | da1c86b80f230c3a2023f97c0048020a12e38de4 (diff) | |
download | sink-ba7c8b890c45d735216888204ec88882ef58c918.tar.gz sink-ba7c8b890c45d735216888204ec88882ef58c918.zip |
Ported the pipeline to the entitystore
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 275 |
1 files changed, 55 insertions, 220 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index e257857..ea59ae9 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -37,6 +37,7 @@ | |||
37 | #include "adaptorfactoryregistry.h" | 37 | #include "adaptorfactoryregistry.h" |
38 | #include "definitions.h" | 38 | #include "definitions.h" |
39 | #include "bufferutils.h" | 39 | #include "bufferutils.h" |
40 | #include "storage/entitystore.h" | ||
40 | 41 | ||
41 | SINK_DEBUG_AREA("pipeline") | 42 | SINK_DEBUG_AREA("pipeline") |
42 | 43 | ||
@@ -46,31 +47,18 @@ using namespace Sink::Storage; | |||
46 | class Pipeline::Private | 47 | class Pipeline::Private |
47 | { | 48 | { |
48 | public: | 49 | public: |
49 | Private(const ResourceContext &context) : resourceContext(context), storage(Sink::storageLocation(), context.instanceId(), DataStore::ReadWrite), revisionChanged(false) | 50 | Private(const ResourceContext &context) : resourceContext(context), entityStore(context), revisionChanged(false) |
50 | { | 51 | { |
51 | } | 52 | } |
52 | 53 | ||
53 | ResourceContext resourceContext; | 54 | ResourceContext resourceContext; |
54 | DataStore storage; | 55 | Storage::EntityStore entityStore; |
55 | DataStore::Transaction transaction; | ||
56 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; | 56 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; |
57 | bool revisionChanged; | 57 | bool revisionChanged; |
58 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | ||
59 | QTime transactionTime; | 58 | QTime transactionTime; |
60 | int transactionItemCount; | 59 | int transactionItemCount; |
61 | }; | 60 | }; |
62 | 61 | ||
63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | ||
64 | { | ||
65 | SinkTrace() << "Committing new revision: " << uid << newRevision; | ||
66 | DataStore::mainDatabase(transaction, bufferType) | ||
67 | .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | ||
68 | [uid, newRevision](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); | ||
69 | revisionChanged = true; | ||
70 | DataStore::setMaxRevision(transaction, newRevision); | ||
71 | DataStore::recordRevision(transaction, newRevision, uid, bufferType); | ||
72 | } | ||
73 | |||
74 | 62 | ||
75 | Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) | 63 | Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) |
76 | { | 64 | { |
@@ -78,7 +66,6 @@ Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Pri | |||
78 | 66 | ||
79 | Pipeline::~Pipeline() | 67 | Pipeline::~Pipeline() |
80 | { | 68 | { |
81 | d->transaction = DataStore::Transaction(); | ||
82 | } | 69 | } |
83 | 70 | ||
84 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) | 71 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) |
@@ -98,27 +85,10 @@ void Pipeline::startTransaction() | |||
98 | // for (auto processor : d->processors[bufferType]) { | 85 | // for (auto processor : d->processors[bufferType]) { |
99 | // processor->startBatch(); | 86 | // processor->startBatch(); |
100 | // } | 87 | // } |
101 | if (d->transaction) { | ||
102 | return; | ||
103 | } | ||
104 | SinkTrace() << "Starting transaction."; | 88 | SinkTrace() << "Starting transaction."; |
105 | d->transactionTime.start(); | 89 | d->transactionTime.start(); |
106 | d->transactionItemCount = 0; | 90 | d->transactionItemCount = 0; |
107 | d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { | 91 | d->entityStore.startTransaction(DataStore::ReadWrite); |
108 | SinkWarning() << error.message; | ||
109 | }); | ||
110 | |||
111 | //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. | ||
112 | //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). | ||
113 | //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync... | ||
114 | if (d->storage.exists()) { | ||
115 | while (!d->transaction.validateNamedDatabases()) { | ||
116 | SinkWarning() << "Opened an invalid transaction!!!!!!"; | ||
117 | d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { | ||
118 | SinkWarning() << error.message; | ||
119 | }); | ||
120 | } | ||
121 | } | ||
122 | } | 92 | } |
123 | 93 | ||
124 | void Pipeline::commit() | 94 | void Pipeline::commit() |
@@ -129,34 +99,20 @@ void Pipeline::commit() | |||
129 | // processor->finalize(); | 99 | // processor->finalize(); |
130 | // } | 100 | // } |
131 | if (!d->revisionChanged) { | 101 | if (!d->revisionChanged) { |
132 | d->transaction.abort(); | 102 | d->entityStore.abortTransaction(); |
133 | d->transaction = DataStore::Transaction(); | ||
134 | return; | 103 | return; |
135 | } | 104 | } |
136 | const auto revision = DataStore::maxRevision(d->transaction); | 105 | const auto revision = d->entityStore.maxRevision(); |
137 | const auto elapsed = d->transactionTime.elapsed(); | 106 | const auto elapsed = d->transactionTime.elapsed(); |
138 | SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " | 107 | SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " |
139 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; | 108 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; |
140 | if (d->transaction) { | 109 | d->entityStore.commitTransaction(); |
141 | d->transaction.commit(); | ||
142 | } | ||
143 | d->transaction = DataStore::Transaction(); | ||
144 | if (d->revisionChanged) { | 110 | if (d->revisionChanged) { |
145 | d->revisionChanged = false; | 111 | d->revisionChanged = false; |
146 | emit revisionUpdated(revision); | 112 | emit revisionUpdated(revision); |
147 | } | 113 | } |
148 | } | 114 | } |
149 | 115 | ||
150 | DataStore::Transaction &Pipeline::transaction() | ||
151 | { | ||
152 | return d->transaction; | ||
153 | } | ||
154 | |||
155 | DataStore &Pipeline::storage() const | ||
156 | { | ||
157 | return d->storage; | ||
158 | } | ||
159 | |||
160 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | 116 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) |
161 | { | 117 | { |
162 | d->transactionItemCount++; | 118 | d->transactionItemCount++; |
@@ -175,7 +131,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
175 | QByteArray key; | 131 | QByteArray key; |
176 | if (createEntity->entityId()) { | 132 | if (createEntity->entityId()) { |
177 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); | 133 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); |
178 | if (DataStore::mainDatabase(d->transaction, bufferType).contains(key)) { | 134 | if (d->entityStore.contains(bufferType, key)) { |
179 | SinkError() << "An entity with this id already exists: " << key; | 135 | SinkError() << "An entity with this id already exists: " << key; |
180 | return KAsync::error<qint64>(0); | 136 | return KAsync::error<qint64>(0); |
181 | } | 137 | } |
@@ -208,29 +164,23 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
208 | 164 | ||
209 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 165 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
210 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); | 166 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); |
211 | foreach (const auto &processor, d->processors[bufferType]) { | ||
212 | processor->newEntity(key, DataStore::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); | ||
213 | } | ||
214 | //The maxRevision may have changed meanwhile if the entity created sub-entities | ||
215 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
216 | |||
217 | // Add metadata buffer | ||
218 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
219 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
220 | metadataBuilder.add_revision(newRevision); | ||
221 | metadataBuilder.add_operation(Operation_Creation); | ||
222 | metadataBuilder.add_replayToSource(replayToSource); | ||
223 | auto metadataBuffer = metadataBuilder.Finish(); | ||
224 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
225 | 167 | ||
226 | flatbuffers::FlatBufferBuilder fbb; | 168 | d->revisionChanged = true; |
227 | adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 169 | auto revision = d->entityStore.maxRevision(); |
170 | auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; | ||
171 | o.setChangedProperties(o.availableProperties().toSet()); | ||
228 | 172 | ||
229 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 173 | auto preprocess = [&, this](ApplicationDomain::ApplicationDomainType &newEntity) { |
174 | foreach (const auto &processor, d->processors[bufferType]) { | ||
175 | processor->newEntity(newEntity); | ||
176 | } | ||
177 | }; | ||
230 | 178 | ||
231 | //FIXME entityStore->create(bufferType, memoryAdaptor, replayToSource) | 179 | if (!d->entityStore.add(bufferType, o, replayToSource, preprocess)) { |
180 | return KAsync::error<qint64>(0); | ||
181 | } | ||
232 | 182 | ||
233 | return KAsync::value(newRevision); | 183 | return KAsync::value(d->entityStore.maxRevision()); |
234 | } | 184 | } |
235 | 185 | ||
236 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 186 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
@@ -254,6 +204,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
254 | } | 204 | } |
255 | const qint64 baseRevision = modifyEntity->revision(); | 205 | const qint64 baseRevision = modifyEntity->revision(); |
256 | const bool replayToSource = modifyEntity->replayToSource(); | 206 | const bool replayToSource = modifyEntity->replayToSource(); |
207 | |||
257 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); | 208 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); |
258 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); | 209 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); |
259 | SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 210 | SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
@@ -269,7 +220,6 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
269 | } | 220 | } |
270 | } | 221 | } |
271 | 222 | ||
272 | // TODO use only readPropertyMapper and writePropertyMapper | ||
273 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); | 223 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); |
274 | if (!adaptorFactory) { | 224 | if (!adaptorFactory) { |
275 | SinkWarning() << "no adaptor factory for type " << bufferType; | 225 | SinkWarning() << "no adaptor factory for type " << bufferType; |
@@ -278,72 +228,26 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
278 | 228 | ||
279 | auto diffEntity = GetEntity(modifyEntity->delta()->Data()); | 229 | auto diffEntity = GetEntity(modifyEntity->delta()->Data()); |
280 | Q_ASSERT(diffEntity); | 230 | Q_ASSERT(diffEntity); |
281 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 231 | Sink::ApplicationDomain::ApplicationDomainType diff{d->resourceContext.instanceId(), key, baseRevision, adaptorFactory->createAdaptor(*diffEntity)}; |
282 | 232 | diff.setChangedProperties(changeset.toSet()); | |
283 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; | ||
284 | DataStore::mainDatabase(d->transaction, bufferType) | ||
285 | .findLatest(key, | ||
286 | [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
287 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
288 | if (!buffer.isValid()) { | ||
289 | SinkWarning() << "Read invalid buffer from disk"; | ||
290 | } else { | ||
291 | current = adaptorFactory->createAdaptor(buffer.entity()); | ||
292 | } | ||
293 | return false; | ||
294 | }, | ||
295 | [baseRevision](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); | ||
296 | |||
297 | if (!current) { | ||
298 | SinkWarning() << "Failed to read local value " << key; | ||
299 | return KAsync::error<qint64>(0); | ||
300 | } | ||
301 | |||
302 | auto newAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(current), current->availableProperties()); | ||
303 | |||
304 | // Apply diff | ||
305 | // FIXME only apply the properties that are available in the buffer | ||
306 | SinkTrace() << "Applying changed properties: " << changeset; | ||
307 | for (const auto &property : changeset) { | ||
308 | const auto value = diff->getProperty(property); | ||
309 | if (value.isValid()) { | ||
310 | newAdaptor->setProperty(property, value); | ||
311 | } | ||
312 | } | ||
313 | 233 | ||
314 | // Remove deletions | 234 | QByteArrayList deletions; |
315 | if (modifyEntity->deletions()) { | 235 | if (modifyEntity->deletions()) { |
316 | for (const flatbuffers::String *property : *modifyEntity->deletions()) { | 236 | deletions = BufferUtils::fromVector(*modifyEntity->deletions()); |
317 | newAdaptor->setProperty(BufferUtils::extractBuffer(property), QVariant()); | ||
318 | } | ||
319 | } | 237 | } |
320 | 238 | ||
321 | newAdaptor->resetChangedProperties(); | 239 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { |
322 | foreach (const auto &processor, d->processors[bufferType]) { | 240 | foreach (const auto &processor, d->processors[bufferType]) { |
323 | processor->modifiedEntity(key, DataStore::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); | 241 | processor->modifiedEntity(oldEntity, newEntity); |
324 | } | 242 | } |
325 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 243 | }; |
326 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
327 | 244 | ||
328 | // Add metadata buffer | 245 | d->revisionChanged = true; |
329 | flatbuffers::FlatBufferBuilder metadataFbb; | 246 | if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { |
330 | { | 247 | return KAsync::error<qint64>(0); |
331 | //We add availableProperties to account for the properties that have been changed by the preprocessors | ||
332 | auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newAdaptor->changedProperties()); | ||
333 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
334 | metadataBuilder.add_revision(newRevision); | ||
335 | metadataBuilder.add_operation(Operation_Modification); | ||
336 | metadataBuilder.add_replayToSource(replayToSource); | ||
337 | metadataBuilder.add_modifiedProperties(modifiedProperties); | ||
338 | auto metadataBuffer = metadataBuilder.Finish(); | ||
339 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
340 | } | 248 | } |
341 | 249 | ||
342 | flatbuffers::FlatBufferBuilder fbb; | 250 | return KAsync::value(d->entityStore.maxRevision()); |
343 | adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | ||
344 | |||
345 | d->storeNewRevision(newRevision, fbb, bufferType, key); | ||
346 | return KAsync::value(newRevision); | ||
347 | } | 251 | } |
348 | 252 | ||
349 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | 253 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) |
@@ -364,106 +268,38 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
364 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 268 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
365 | SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 269 | SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
366 | 270 | ||
367 | bool found = false; | 271 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { |
368 | bool alreadyRemoved = false; | 272 | foreach (const auto &processor, d->processors[bufferType]) { |
369 | DataStore::mainDatabase(d->transaction, bufferType) | 273 | processor->deletedEntity(oldEntity); |
370 | .findLatest(key, | 274 | } |
371 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | 275 | }; |
372 | auto entity = GetEntity(data.data()); | ||
373 | if (entity && entity->metadata()) { | ||
374 | auto metadata = GetMetadata(entity->metadata()->Data()); | ||
375 | found = true; | ||
376 | if (metadata->operation() == Operation_Removal) { | ||
377 | alreadyRemoved = true; | ||
378 | } | ||
379 | } | ||
380 | return false; | ||
381 | }, | ||
382 | [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); | ||
383 | |||
384 | if (!found) { | ||
385 | SinkWarning() << "Failed to find entity " << key; | ||
386 | return KAsync::error<qint64>(0); | ||
387 | } | ||
388 | if (alreadyRemoved) { | ||
389 | SinkWarning() << "Entity is already removed " << key; | ||
390 | return KAsync::error<qint64>(0); | ||
391 | } | ||
392 | |||
393 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
394 | |||
395 | // Add metadata buffer | ||
396 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
397 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
398 | metadataBuilder.add_revision(newRevision); | ||
399 | metadataBuilder.add_operation(Operation_Removal); | ||
400 | metadataBuilder.add_replayToSource(replayToSource); | ||
401 | auto metadataBuffer = metadataBuilder.Finish(); | ||
402 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
403 | |||
404 | flatbuffers::FlatBufferBuilder fbb; | ||
405 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | ||
406 | 276 | ||
407 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); | 277 | d->revisionChanged = true; |
408 | if (!adaptorFactory) { | 278 | if (!d->entityStore.remove(bufferType, key, replayToSource, preprocess)) { |
409 | SinkWarning() << "no adaptor factory for type " << bufferType; | ||
410 | return KAsync::error<qint64>(0); | 279 | return KAsync::error<qint64>(0); |
411 | } | 280 | } |
412 | 281 | ||
413 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; | 282 | return KAsync::value(d->entityStore.maxRevision()); |
414 | DataStore::mainDatabase(d->transaction, bufferType) | ||
415 | .findLatest(key, | ||
416 | [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { | ||
417 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
418 | if (!buffer.isValid()) { | ||
419 | SinkWarning() << "Read invalid buffer from disk"; | ||
420 | } else { | ||
421 | current = adaptorFactory->createAdaptor(buffer.entity()); | ||
422 | } | ||
423 | return false; | ||
424 | }, | ||
425 | [this](const DataStore::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); | ||
426 | |||
427 | d->storeNewRevision(newRevision, fbb, bufferType, key); | ||
428 | |||
429 | foreach (const auto &processor, d->processors[bufferType]) { | ||
430 | processor->deletedEntity(key, newRevision, *current, d->transaction); | ||
431 | } | ||
432 | |||
433 | return KAsync::value(newRevision); | ||
434 | } | 283 | } |
435 | 284 | ||
436 | void Pipeline::cleanupRevision(qint64 revision) | 285 | void Pipeline::cleanupRevision(qint64 revision) |
437 | { | 286 | { |
287 | d->entityStore.cleanupRevision(revision); | ||
438 | d->revisionChanged = true; | 288 | d->revisionChanged = true; |
439 | const auto uid = DataStore::getUidFromRevision(d->transaction, revision); | ||
440 | const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); | ||
441 | SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; | ||
442 | DataStore::mainDatabase(d->transaction, bufferType) | ||
443 | .scan(uid, | ||
444 | [&](const QByteArray &key, const QByteArray &data) -> bool { | ||
445 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
446 | if (!buffer.isValid()) { | ||
447 | SinkWarning() << "Read invalid buffer from disk"; | ||
448 | } else { | ||
449 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); | ||
450 | const qint64 rev = metadata->revision(); | ||
451 | // Remove old revisions, and the current if the entity has already been removed | ||
452 | if (rev < revision || metadata->operation() == Operation_Removal) { | ||
453 | DataStore::removeRevision(d->transaction, rev); | ||
454 | DataStore::mainDatabase(d->transaction, bufferType).remove(key); | ||
455 | } | ||
456 | } | ||
457 | |||
458 | return true; | ||
459 | }, | ||
460 | [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); | ||
461 | DataStore::setCleanedUpRevision(d->transaction, revision); | ||
462 | } | 289 | } |
463 | 290 | ||
464 | qint64 Pipeline::cleanedUpRevision() | 291 | qint64 Pipeline::cleanedUpRevision() |
465 | { | 292 | { |
466 | return DataStore::cleanedUpRevision(d->transaction); | 293 | /* return d->entityStore.cleanedUpRevision(); */ |
294 | /* return DataStore::cleanedUpRevision(d->transaction); */ | ||
295 | //FIXME Just move the whole cleanup revision iteration into the entitystore | ||
296 | return 0; | ||
297 | } | ||
298 | |||
299 | qint64 Pipeline::revision() | ||
300 | { | ||
301 | //FIXME Just move the whole cleanup revision iteration into the entitystore | ||
302 | return 0; | ||
467 | } | 303 | } |
468 | 304 | ||
469 | class Preprocessor::Private { | 305 | class Preprocessor::Private { |
@@ -492,7 +328,7 @@ void Preprocessor::startBatch() | |||
492 | { | 328 | { |
493 | } | 329 | } |
494 | 330 | ||
495 | void Preprocessor::finalize() | 331 | void Preprocessor::finalizeBatch() |
496 | { | 332 | { |
497 | } | 333 | } |
498 | 334 | ||
@@ -510,7 +346,6 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain | |||
510 | 346 | ||
511 | flatbuffers::FlatBufferBuilder fbb; | 347 | flatbuffers::FlatBufferBuilder fbb; |
512 | auto entityId = fbb.CreateString(entity.identifier()); | 348 | auto entityId = fbb.CreateString(entity.identifier()); |
513 | // This is the resource buffer type and not the domain type | ||
514 | auto type = fbb.CreateString(typeName); | 349 | auto type = fbb.CreateString(typeName); |
515 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size()); | 350 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size()); |
516 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); | 351 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); |