summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp67
1 files changed, 49 insertions, 18 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 4fed41f..c108540 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -124,7 +124,7 @@ void Pipeline::null()
124 // state.step(); 124 // state.step();
125} 125}
126 126
127KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) 127KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
128{ 128{
129 Log() << "Pipeline: New Entity"; 129 Log() << "Pipeline: New Entity";
130 130
@@ -137,7 +137,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
137 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 137 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
138 if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { 138 if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) {
139 Warning() << "invalid buffer, not a create entity buffer"; 139 Warning() << "invalid buffer, not a create entity buffer";
140 return KAsync::error<void>(); 140 return KAsync::error<qint64>(0);
141 } 141 }
142 } 142 }
143 auto createEntity = Akonadi2::Commands::GetCreateEntity(command); 143 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
@@ -148,13 +148,13 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
149 if (!Akonadi2::VerifyEntityBuffer(verifyer)) { 149 if (!Akonadi2::VerifyEntityBuffer(verifyer)) {
150 Warning() << "invalid buffer, not an entity buffer"; 150 Warning() << "invalid buffer, not an entity buffer";
151 return KAsync::error<void>(); 151 return KAsync::error<qint64>(0);
152 } 152 }
153 } 153 }
154 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); 154 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data());
155 if (!entity->resource()->size() && !entity->local()->size()) { 155 if (!entity->resource()->size() && !entity->local()->size()) {
156 Warning() << "No local and no resource buffer while trying to create entity."; 156 Warning() << "No local and no resource buffer while trying to create entity.";
157 return KAsync::error<void>(); 157 return KAsync::error<qint64>(0);
158 } 158 }
159 159
160 //Add metadata buffer 160 //Add metadata buffer
@@ -175,10 +175,12 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
175 } 175 }
176 ); 176 );
177 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 177 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
178 Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType);
178 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 179 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
179 180
180 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { 181 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) {
181 PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future]() { 182 PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future, newRevision]() {
183 future.setValue(newRevision);
182 future.setFinished(); 184 future.setFinished();
183 }, bufferType); 185 }, bufferType);
184 d->activePipelines << state; 186 d->activePipelines << state;
@@ -186,7 +188,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
186 }); 188 });
187} 189}
188 190
189KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) 191KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
190{ 192{
191 Log() << "Pipeline: Modified Entity"; 193 Log() << "Pipeline: Modified Entity";
192 194
@@ -196,7 +198,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
196 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 198 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
197 if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) { 199 if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) {
198 Warning() << "invalid buffer, not a modify entity buffer"; 200 Warning() << "invalid buffer, not a modify entity buffer";
199 return KAsync::error<void>(); 201 return KAsync::error<qint64>(0);
200 } 202 }
201 } 203 }
202 auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); 204 auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command);
@@ -208,20 +210,20 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
208 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 210 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
209 if (bufferType.isEmpty() || key.isEmpty()) { 211 if (bufferType.isEmpty() || key.isEmpty()) {
210 Warning() << "entity type or key " << bufferType << key; 212 Warning() << "entity type or key " << bufferType << key;
211 return KAsync::error<void>(); 213 return KAsync::error<qint64>(0);
212 } 214 }
213 { 215 {
214 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 216 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
215 if (!Akonadi2::VerifyEntityBuffer(verifyer)) { 217 if (!Akonadi2::VerifyEntityBuffer(verifyer)) {
216 Warning() << "invalid buffer, not an entity buffer"; 218 Warning() << "invalid buffer, not an entity buffer";
217 return KAsync::error<void>(); 219 return KAsync::error<qint64>(0);
218 } 220 }
219 } 221 }
220 222
221 auto adaptorFactory = d->adaptorFactory.value(bufferType); 223 auto adaptorFactory = d->adaptorFactory.value(bufferType);
222 if (!adaptorFactory) { 224 if (!adaptorFactory) {
223 Warning() << "no adaptor factory for type " << bufferType; 225 Warning() << "no adaptor factory for type " << bufferType;
224 return KAsync::error<void>(); 226 return KAsync::error<qint64>(0);
225 } 227 }
226 228
227 auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data()); 229 auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data());
@@ -244,7 +246,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
244 246
245 if (!current) { 247 if (!current) {
246 Warning() << "Failed to read local value " << key; 248 Warning() << "Failed to read local value " << key;
247 return KAsync::error<void>(); 249 return KAsync::error<qint64>(0);
248 } 250 }
249 251
250 //resource and uid don't matter at this point 252 //resource and uid don't matter at this point
@@ -278,10 +280,12 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
278 //TODO don't overwrite the old entry, but instead store a new revision 280 //TODO don't overwrite the old entry, but instead store a new revision
279 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 281 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
280 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 282 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
283 Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType);
281 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 284 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
282 285
283 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { 286 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) {
284 PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { 287 PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future, newRevision]() {
288 future.setValue(newRevision);
285 future.setFinished(); 289 future.setFinished();
286 }, bufferType); 290 }, bufferType);
287 d->activePipelines << state; 291 d->activePipelines << state;
@@ -289,7 +293,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
289 }); 293 });
290} 294}
291 295
292KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) 296KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
293{ 297{
294 Log() << "Pipeline: Deleted Entity"; 298 Log() << "Pipeline: Deleted Entity";
295 299
@@ -299,7 +303,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
299 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 303 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
300 if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) { 304 if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) {
301 Warning() << "invalid buffer, not a delete entity buffer"; 305 Warning() << "invalid buffer, not a delete entity buffer";
302 return KAsync::error<void>(); 306 return KAsync::error<qint64>(0);
303 } 307 }
304 } 308 }
305 auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); 309 auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command);
@@ -312,10 +316,12 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
312 //TODO remove all revisions? 316 //TODO remove all revisions?
313 d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision)); 317 d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision));
314 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 318 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
319 Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType);
315 Log() << "Pipeline: deleted entity: "<< newRevision; 320 Log() << "Pipeline: deleted entity: "<< newRevision;
316 321
317 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { 322 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) {
318 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future](){ 323 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future, newRevision](){
324 future.setValue(newRevision);
319 future.setFinished(); 325 future.setFinished();
320 }, bufferType); 326 }, bufferType);
321 d->activePipelines << state; 327 d->activePipelines << state;
@@ -323,6 +329,31 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
323 }); 329 });
324} 330}
325 331
332void Pipeline::cleanupRevision(qint64 revision)
333{
334 const auto uid = Akonadi2::Storage::getUidFromRevision(d->transaction, revision);
335 const auto bufferType = Akonadi2::Storage::getTypeFromRevision(d->transaction, revision);
336 Trace() << "Cleaning up revision " << revision << uid << bufferType;
337 d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool {
338 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
339 if (!buffer.isValid()) {
340 Warning() << "Read invalid buffer from disk";
341 } else {
342 const auto metadata = flatbuffers::GetRoot<Akonadi2::Metadata>(buffer.metadataBuffer());
343 const qint64 rev = metadata->revision();
344 //Remove old revisions, and the current if the entity has already been removed
345 if (rev < revision || metadata->operation() == Akonadi2::Operation_Removal) {
346 Akonadi2::Storage::removeRevision(d->transaction, rev);
347 d->transaction.openDatabase(bufferType + ".main").remove(key);
348 }
349 }
350
351 return true;
352 }, [](const Akonadi2::Storage::Error &error) {
353 Warning() << "Error while reading: " << error.message;
354 }, true);
355}
356
326void Pipeline::pipelineStepped(const PipelineState &state) 357void Pipeline::pipelineStepped(const PipelineState &state)
327{ 358{
328 scheduleStep(); 359 scheduleStep();