diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 67 |
1 files changed, 49 insertions, 18 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 4fed41f..c108540 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -124,7 +124,7 @@ void Pipeline::null() | |||
124 | // state.step(); | 124 | // state.step(); |
125 | } | 125 | } |
126 | 126 | ||
127 | KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | 127 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) |
128 | { | 128 | { |
129 | Log() << "Pipeline: New Entity"; | 129 | Log() << "Pipeline: New Entity"; |
130 | 130 | ||
@@ -137,7 +137,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
137 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 137 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
138 | if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { | 138 | if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { |
139 | Warning() << "invalid buffer, not a create entity buffer"; | 139 | Warning() << "invalid buffer, not a create entity buffer"; |
140 | return KAsync::error<void>(); | 140 | return KAsync::error<qint64>(0); |
141 | } | 141 | } |
142 | } | 142 | } |
143 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | 143 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); |
@@ -148,13 +148,13 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
148 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 148 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
149 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { | 149 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { |
150 | Warning() << "invalid buffer, not an entity buffer"; | 150 | Warning() << "invalid buffer, not an entity buffer"; |
151 | return KAsync::error<void>(); | 151 | return KAsync::error<qint64>(0); |
152 | } | 152 | } |
153 | } | 153 | } |
154 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); | 154 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); |
155 | if (!entity->resource()->size() && !entity->local()->size()) { | 155 | if (!entity->resource()->size() && !entity->local()->size()) { |
156 | Warning() << "No local and no resource buffer while trying to create entity."; | 156 | Warning() << "No local and no resource buffer while trying to create entity."; |
157 | return KAsync::error<void>(); | 157 | return KAsync::error<qint64>(0); |
158 | } | 158 | } |
159 | 159 | ||
160 | //Add metadata buffer | 160 | //Add metadata buffer |
@@ -175,10 +175,12 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
175 | } | 175 | } |
176 | ); | 176 | ); |
177 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 177 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
178 | Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType); | ||
178 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 179 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
179 | 180 | ||
180 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { | 181 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { |
181 | PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future]() { | 182 | PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future, newRevision]() { |
183 | future.setValue(newRevision); | ||
182 | future.setFinished(); | 184 | future.setFinished(); |
183 | }, bufferType); | 185 | }, bufferType); |
184 | d->activePipelines << state; | 186 | d->activePipelines << state; |
@@ -186,7 +188,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
186 | }); | 188 | }); |
187 | } | 189 | } |
188 | 190 | ||
189 | KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | 191 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
190 | { | 192 | { |
191 | Log() << "Pipeline: Modified Entity"; | 193 | Log() << "Pipeline: Modified Entity"; |
192 | 194 | ||
@@ -196,7 +198,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
196 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 198 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
197 | if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) { | 199 | if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) { |
198 | Warning() << "invalid buffer, not a modify entity buffer"; | 200 | Warning() << "invalid buffer, not a modify entity buffer"; |
199 | return KAsync::error<void>(); | 201 | return KAsync::error<qint64>(0); |
200 | } | 202 | } |
201 | } | 203 | } |
202 | auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); | 204 | auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); |
@@ -208,20 +210,20 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
208 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); | 210 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); |
209 | if (bufferType.isEmpty() || key.isEmpty()) { | 211 | if (bufferType.isEmpty() || key.isEmpty()) { |
210 | Warning() << "entity type or key " << bufferType << key; | 212 | Warning() << "entity type or key " << bufferType << key; |
211 | return KAsync::error<void>(); | 213 | return KAsync::error<qint64>(0); |
212 | } | 214 | } |
213 | { | 215 | { |
214 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); | 216 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); |
215 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { | 217 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { |
216 | Warning() << "invalid buffer, not an entity buffer"; | 218 | Warning() << "invalid buffer, not an entity buffer"; |
217 | return KAsync::error<void>(); | 219 | return KAsync::error<qint64>(0); |
218 | } | 220 | } |
219 | } | 221 | } |
220 | 222 | ||
221 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 223 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
222 | if (!adaptorFactory) { | 224 | if (!adaptorFactory) { |
223 | Warning() << "no adaptor factory for type " << bufferType; | 225 | Warning() << "no adaptor factory for type " << bufferType; |
224 | return KAsync::error<void>(); | 226 | return KAsync::error<qint64>(0); |
225 | } | 227 | } |
226 | 228 | ||
227 | auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data()); | 229 | auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data()); |
@@ -244,7 +246,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
244 | 246 | ||
245 | if (!current) { | 247 | if (!current) { |
246 | Warning() << "Failed to read local value " << key; | 248 | Warning() << "Failed to read local value " << key; |
247 | return KAsync::error<void>(); | 249 | return KAsync::error<qint64>(0); |
248 | } | 250 | } |
249 | 251 | ||
250 | //resource and uid don't matter at this point | 252 | //resource and uid don't matter at this point |
@@ -278,10 +280,12 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
278 | //TODO don't overwrite the old entry, but instead store a new revision | 280 | //TODO don't overwrite the old entry, but instead store a new revision |
279 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 281 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
280 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 282 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
283 | Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType); | ||
281 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 284 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
282 | 285 | ||
283 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { | 286 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { |
284 | PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { | 287 | PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future, newRevision]() { |
288 | future.setValue(newRevision); | ||
285 | future.setFinished(); | 289 | future.setFinished(); |
286 | }, bufferType); | 290 | }, bufferType); |
287 | d->activePipelines << state; | 291 | d->activePipelines << state; |
@@ -289,7 +293,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
289 | }); | 293 | }); |
290 | } | 294 | } |
291 | 295 | ||
292 | KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | 296 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) |
293 | { | 297 | { |
294 | Log() << "Pipeline: Deleted Entity"; | 298 | Log() << "Pipeline: Deleted Entity"; |
295 | 299 | ||
@@ -299,7 +303,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
299 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 303 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
300 | if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) { | 304 | if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) { |
301 | Warning() << "invalid buffer, not a delete entity buffer"; | 305 | Warning() << "invalid buffer, not a delete entity buffer"; |
302 | return KAsync::error<void>(); | 306 | return KAsync::error<qint64>(0); |
303 | } | 307 | } |
304 | } | 308 | } |
305 | auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); | 309 | auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); |
@@ -312,10 +316,12 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
312 | //TODO remove all revisions? | 316 | //TODO remove all revisions? |
313 | d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision)); | 317 | d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision)); |
314 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 318 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
319 | Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType); | ||
315 | Log() << "Pipeline: deleted entity: "<< newRevision; | 320 | Log() << "Pipeline: deleted entity: "<< newRevision; |
316 | 321 | ||
317 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { | 322 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { |
318 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future](){ | 323 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future, newRevision](){ |
324 | future.setValue(newRevision); | ||
319 | future.setFinished(); | 325 | future.setFinished(); |
320 | }, bufferType); | 326 | }, bufferType); |
321 | d->activePipelines << state; | 327 | d->activePipelines << state; |
@@ -323,6 +329,31 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
323 | }); | 329 | }); |
324 | } | 330 | } |
325 | 331 | ||
332 | void Pipeline::cleanupRevision(qint64 revision) | ||
333 | { | ||
334 | const auto uid = Akonadi2::Storage::getUidFromRevision(d->transaction, revision); | ||
335 | const auto bufferType = Akonadi2::Storage::getTypeFromRevision(d->transaction, revision); | ||
336 | Trace() << "Cleaning up revision " << revision << uid << bufferType; | ||
337 | d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { | ||
338 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
339 | if (!buffer.isValid()) { | ||
340 | Warning() << "Read invalid buffer from disk"; | ||
341 | } else { | ||
342 | const auto metadata = flatbuffers::GetRoot<Akonadi2::Metadata>(buffer.metadataBuffer()); | ||
343 | const qint64 rev = metadata->revision(); | ||
344 | //Remove old revisions, and the current if the entity has already been removed | ||
345 | if (rev < revision || metadata->operation() == Akonadi2::Operation_Removal) { | ||
346 | Akonadi2::Storage::removeRevision(d->transaction, rev); | ||
347 | d->transaction.openDatabase(bufferType + ".main").remove(key); | ||
348 | } | ||
349 | } | ||
350 | |||
351 | return true; | ||
352 | }, [](const Akonadi2::Storage::Error &error) { | ||
353 | Warning() << "Error while reading: " << error.message; | ||
354 | }, true); | ||
355 | } | ||
356 | |||
326 | void Pipeline::pipelineStepped(const PipelineState &state) | 357 | void Pipeline::pipelineStepped(const PipelineState &state) |
327 | { | 358 | { |
328 | scheduleStep(); | 359 | scheduleStep(); |