diff options
-rw-r--r-- | common/genericresource.cpp | 31 | ||||
-rw-r--r-- | common/metadata.fbs | 3 | ||||
-rw-r--r-- | common/pipeline.cpp | 67 | ||||
-rw-r--r-- | common/pipeline.h | 12 | ||||
-rw-r--r-- | common/storage.h | 5 | ||||
-rw-r--r-- | common/storage_common.cpp | 42 | ||||
-rw-r--r-- | tests/pipelinetest.cpp | 21 |
7 files changed, 149 insertions, 32 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 3ffc56b..4abcecd 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -60,7 +60,7 @@ private slots: | |||
60 | }).exec(); | 60 | }).exec(); |
61 | } | 61 | } |
62 | 62 | ||
63 | KAsync::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) | 63 | KAsync::Job<qint64> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) |
64 | { | 64 | { |
65 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | 65 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); |
66 | //Throw command into appropriate pipeline | 66 | //Throw command into appropriate pipeline |
@@ -72,25 +72,27 @@ private slots: | |||
72 | case Akonadi2::Commands::CreateEntityCommand: | 72 | case Akonadi2::Commands::CreateEntityCommand: |
73 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 73 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
74 | default: | 74 | default: |
75 | return KAsync::error<void>(-1, "Unhandled command"); | 75 | return KAsync::error<qint64>(-1, "Unhandled command"); |
76 | } | 76 | } |
77 | return KAsync::null<void>(); | 77 | return KAsync::null<qint64>(); |
78 | } | 78 | } |
79 | 79 | ||
80 | KAsync::Job<void> processQueuedCommand(const QByteArray &data) | 80 | KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) |
81 | { | 81 | { |
82 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); | 82 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); |
83 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | 83 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { |
84 | Warning() << "invalid buffer"; | 84 | Warning() << "invalid buffer"; |
85 | return KAsync::error<void>(1, "Invalid Buffer"); | 85 | // return KAsync::error<void, qint64>(1, "Invalid Buffer"); |
86 | } | 86 | } |
87 | auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData()); | 87 | auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData()); |
88 | const auto commandId = queuedCommand->commandId(); | 88 | const auto commandId = queuedCommand->commandId(); |
89 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); | 89 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); |
90 | return processQueuedCommand(queuedCommand).then<void>( | 90 | return processQueuedCommand(queuedCommand).then<qint64, qint64>( |
91 | [commandId]() { | 91 | [commandId](qint64 createdRevision) -> qint64 { |
92 | Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); | 92 | Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); |
93 | }, | 93 | return createdRevision; |
94 | } | ||
95 | , | ||
94 | [](int errorCode, QString errorMessage) { | 96 | [](int errorCode, QString errorMessage) { |
95 | //FIXME propagate error, we didn't handle it | 97 | //FIXME propagate error, we didn't handle it |
96 | Warning() << "Error while processing queue command: " << errorMessage; | 98 | Warning() << "Error while processing queue command: " << errorMessage; |
@@ -106,8 +108,17 @@ private slots: | |||
106 | }).then(KAsync::dowhile( | 108 | }).then(KAsync::dowhile( |
107 | [queue]() { return !queue->isEmpty(); }, | 109 | [queue]() { return !queue->isEmpty(); }, |
108 | [this, queue](KAsync::Future<void> &future) { | 110 | [this, queue](KAsync::Future<void> &future) { |
109 | queue->dequeueBatch(100, [this](const QByteArray &data) { | 111 | const int batchSize = 100; |
110 | return processQueuedCommand(data); | 112 | queue->dequeueBatch(batchSize, [this](const QByteArray &data) { |
113 | return KAsync::start<void>([this, data](KAsync::Future<void> &future) { | ||
114 | processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) { | ||
115 | Trace() << "Created revision " << createdRevision; | ||
116 | //We don't have a writeback yet, so we cleanup revisions immediately | ||
117 | //TODO: only cleanup once writeback is done | ||
118 | mPipeline->cleanupRevision(createdRevision); | ||
119 | future.setFinished(); | ||
120 | }).exec(); | ||
121 | }); | ||
111 | } | 122 | } |
112 | ).then<void>([&future, queue](){ | 123 | ).then<void>([&future, queue](){ |
113 | future.setFinished(); | 124 | future.setFinished(); |
diff --git a/common/metadata.fbs b/common/metadata.fbs index bb1163d..1455238 100644 --- a/common/metadata.fbs +++ b/common/metadata.fbs | |||
@@ -1,9 +1,12 @@ | |||
1 | namespace Akonadi2; | 1 | namespace Akonadi2; |
2 | 2 | ||
3 | enum Operation : byte { Creation = 1, Modification, Removal } | ||
4 | |||
3 | table Metadata { | 5 | table Metadata { |
4 | revision: ulong; | 6 | revision: ulong; |
5 | processed: bool = true; | 7 | processed: bool = true; |
6 | processingProgress: [string]; | 8 | processingProgress: [string]; |
9 | operation: Operation = Modification; | ||
7 | } | 10 | } |
8 | 11 | ||
9 | root_type Metadata; | 12 | root_type Metadata; |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 4fed41f..c108540 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -124,7 +124,7 @@ void Pipeline::null() | |||
124 | // state.step(); | 124 | // state.step(); |
125 | } | 125 | } |
126 | 126 | ||
127 | KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | 127 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) |
128 | { | 128 | { |
129 | Log() << "Pipeline: New Entity"; | 129 | Log() << "Pipeline: New Entity"; |
130 | 130 | ||
@@ -137,7 +137,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
137 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 137 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
138 | if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { | 138 | if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { |
139 | Warning() << "invalid buffer, not a create entity buffer"; | 139 | Warning() << "invalid buffer, not a create entity buffer"; |
140 | return KAsync::error<void>(); | 140 | return KAsync::error<qint64>(0); |
141 | } | 141 | } |
142 | } | 142 | } |
143 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | 143 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); |
@@ -148,13 +148,13 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
148 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 148 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
149 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { | 149 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { |
150 | Warning() << "invalid buffer, not an entity buffer"; | 150 | Warning() << "invalid buffer, not an entity buffer"; |
151 | return KAsync::error<void>(); | 151 | return KAsync::error<qint64>(0); |
152 | } | 152 | } |
153 | } | 153 | } |
154 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); | 154 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); |
155 | if (!entity->resource()->size() && !entity->local()->size()) { | 155 | if (!entity->resource()->size() && !entity->local()->size()) { |
156 | Warning() << "No local and no resource buffer while trying to create entity."; | 156 | Warning() << "No local and no resource buffer while trying to create entity."; |
157 | return KAsync::error<void>(); | 157 | return KAsync::error<qint64>(0); |
158 | } | 158 | } |
159 | 159 | ||
160 | //Add metadata buffer | 160 | //Add metadata buffer |
@@ -175,10 +175,12 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
175 | } | 175 | } |
176 | ); | 176 | ); |
177 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 177 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
178 | Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType); | ||
178 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 179 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
179 | 180 | ||
180 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { | 181 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { |
181 | PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future]() { | 182 | PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future, newRevision]() { |
183 | future.setValue(newRevision); | ||
182 | future.setFinished(); | 184 | future.setFinished(); |
183 | }, bufferType); | 185 | }, bufferType); |
184 | d->activePipelines << state; | 186 | d->activePipelines << state; |
@@ -186,7 +188,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
186 | }); | 188 | }); |
187 | } | 189 | } |
188 | 190 | ||
189 | KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | 191 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
190 | { | 192 | { |
191 | Log() << "Pipeline: Modified Entity"; | 193 | Log() << "Pipeline: Modified Entity"; |
192 | 194 | ||
@@ -196,7 +198,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
196 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 198 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
197 | if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) { | 199 | if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) { |
198 | Warning() << "invalid buffer, not a modify entity buffer"; | 200 | Warning() << "invalid buffer, not a modify entity buffer"; |
199 | return KAsync::error<void>(); | 201 | return KAsync::error<qint64>(0); |
200 | } | 202 | } |
201 | } | 203 | } |
202 | auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); | 204 | auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); |
@@ -208,20 +210,20 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
208 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); | 210 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); |
209 | if (bufferType.isEmpty() || key.isEmpty()) { | 211 | if (bufferType.isEmpty() || key.isEmpty()) { |
210 | Warning() << "entity type or key " << bufferType << key; | 212 | Warning() << "entity type or key " << bufferType << key; |
211 | return KAsync::error<void>(); | 213 | return KAsync::error<qint64>(0); |
212 | } | 214 | } |
213 | { | 215 | { |
214 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); | 216 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); |
215 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { | 217 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { |
216 | Warning() << "invalid buffer, not an entity buffer"; | 218 | Warning() << "invalid buffer, not an entity buffer"; |
217 | return KAsync::error<void>(); | 219 | return KAsync::error<qint64>(0); |
218 | } | 220 | } |
219 | } | 221 | } |
220 | 222 | ||
221 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 223 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
222 | if (!adaptorFactory) { | 224 | if (!adaptorFactory) { |
223 | Warning() << "no adaptor factory for type " << bufferType; | 225 | Warning() << "no adaptor factory for type " << bufferType; |
224 | return KAsync::error<void>(); | 226 | return KAsync::error<qint64>(0); |
225 | } | 227 | } |
226 | 228 | ||
227 | auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data()); | 229 | auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data()); |
@@ -244,7 +246,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
244 | 246 | ||
245 | if (!current) { | 247 | if (!current) { |
246 | Warning() << "Failed to read local value " << key; | 248 | Warning() << "Failed to read local value " << key; |
247 | return KAsync::error<void>(); | 249 | return KAsync::error<qint64>(0); |
248 | } | 250 | } |
249 | 251 | ||
250 | //resource and uid don't matter at this point | 252 | //resource and uid don't matter at this point |
@@ -278,10 +280,12 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
278 | //TODO don't overwrite the old entry, but instead store a new revision | 280 | //TODO don't overwrite the old entry, but instead store a new revision |
279 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 281 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
280 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 282 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
283 | Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType); | ||
281 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 284 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
282 | 285 | ||
283 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { | 286 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { |
284 | PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { | 287 | PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future, newRevision]() { |
288 | future.setValue(newRevision); | ||
285 | future.setFinished(); | 289 | future.setFinished(); |
286 | }, bufferType); | 290 | }, bufferType); |
287 | d->activePipelines << state; | 291 | d->activePipelines << state; |
@@ -289,7 +293,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
289 | }); | 293 | }); |
290 | } | 294 | } |
291 | 295 | ||
292 | KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | 296 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) |
293 | { | 297 | { |
294 | Log() << "Pipeline: Deleted Entity"; | 298 | Log() << "Pipeline: Deleted Entity"; |
295 | 299 | ||
@@ -299,7 +303,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
299 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 303 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
300 | if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) { | 304 | if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) { |
301 | Warning() << "invalid buffer, not a delete entity buffer"; | 305 | Warning() << "invalid buffer, not a delete entity buffer"; |
302 | return KAsync::error<void>(); | 306 | return KAsync::error<qint64>(0); |
303 | } | 307 | } |
304 | } | 308 | } |
305 | auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); | 309 | auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); |
@@ -312,10 +316,12 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
312 | //TODO remove all revisions? | 316 | //TODO remove all revisions? |
313 | d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision)); | 317 | d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision)); |
314 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | 318 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
319 | Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType); | ||
315 | Log() << "Pipeline: deleted entity: "<< newRevision; | 320 | Log() << "Pipeline: deleted entity: "<< newRevision; |
316 | 321 | ||
317 | return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { | 322 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { |
318 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future](){ | 323 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future, newRevision](){ |
324 | future.setValue(newRevision); | ||
319 | future.setFinished(); | 325 | future.setFinished(); |
320 | }, bufferType); | 326 | }, bufferType); |
321 | d->activePipelines << state; | 327 | d->activePipelines << state; |
@@ -323,6 +329,31 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
323 | }); | 329 | }); |
324 | } | 330 | } |
325 | 331 | ||
332 | void Pipeline::cleanupRevision(qint64 revision) | ||
333 | { | ||
334 | const auto uid = Akonadi2::Storage::getUidFromRevision(d->transaction, revision); | ||
335 | const auto bufferType = Akonadi2::Storage::getTypeFromRevision(d->transaction, revision); | ||
336 | Trace() << "Cleaning up revision " << revision << uid << bufferType; | ||
337 | d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { | ||
338 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
339 | if (!buffer.isValid()) { | ||
340 | Warning() << "Read invalid buffer from disk"; | ||
341 | } else { | ||
342 | const auto metadata = flatbuffers::GetRoot<Akonadi2::Metadata>(buffer.metadataBuffer()); | ||
343 | const qint64 rev = metadata->revision(); | ||
344 | //Remove old revisions, and the current if the entity has already been removed | ||
345 | if (rev < revision || metadata->operation() == Akonadi2::Operation_Removal) { | ||
346 | Akonadi2::Storage::removeRevision(d->transaction, rev); | ||
347 | d->transaction.openDatabase(bufferType + ".main").remove(key); | ||
348 | } | ||
349 | } | ||
350 | |||
351 | return true; | ||
352 | }, [](const Akonadi2::Storage::Error &error) { | ||
353 | Warning() << "Error while reading: " << error.message; | ||
354 | }, true); | ||
355 | } | ||
356 | |||
326 | void Pipeline::pipelineStepped(const PipelineState &state) | 357 | void Pipeline::pipelineStepped(const PipelineState &state) |
327 | { | 358 | { |
328 | scheduleStep(); | 359 | scheduleStep(); |
diff --git a/common/pipeline.h b/common/pipeline.h index 573af73..89232d0 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -58,9 +58,15 @@ public: | |||
58 | void null(); | 58 | void null(); |
59 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); | 59 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); |
60 | 60 | ||
61 | KAsync::Job<void> newEntity(void const *command, size_t size); | 61 | KAsync::Job<qint64> newEntity(void const *command, size_t size); |
62 | KAsync::Job<void> modifiedEntity(void const *command, size_t size); | 62 | KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); |
63 | KAsync::Job<void> deletedEntity(void const *command, size_t size); | 63 | KAsync::Job<qint64> deletedEntity(void const *command, size_t size); |
64 | /* | ||
65 | * Cleans up a single revision. | ||
66 | * | ||
67 | * This has to be called for every revision in consecutive order. | ||
68 | */ | ||
69 | void cleanupRevision(qint64 revision); | ||
64 | 70 | ||
65 | Q_SIGNALS: | 71 | Q_SIGNALS: |
66 | void revisionUpdated(qint64); | 72 | void revisionUpdated(qint64); |
diff --git a/common/storage.h b/common/storage.h index 98b12ed..9459f04 100644 --- a/common/storage.h +++ b/common/storage.h | |||
@@ -168,6 +168,11 @@ public: | |||
168 | static qint64 maxRevision(const Akonadi2::Storage::Transaction &); | 168 | static qint64 maxRevision(const Akonadi2::Storage::Transaction &); |
169 | static void setMaxRevision(Akonadi2::Storage::Transaction &, qint64 revision); | 169 | static void setMaxRevision(Akonadi2::Storage::Transaction &, qint64 revision); |
170 | 170 | ||
171 | static QByteArray getUidFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision); | ||
172 | static QByteArray getTypeFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision); | ||
173 | static void recordRevision(Akonadi2::Storage::Transaction &, qint64 revision, const QByteArray &uid, const QByteArray &type); | ||
174 | static void removeRevision(Akonadi2::Storage::Transaction &, qint64 revision); | ||
175 | |||
171 | bool exists() const; | 176 | bool exists() const; |
172 | 177 | ||
173 | static bool isInternalKey(const char *key); | 178 | static bool isInternalKey(const char *key); |
diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 28fb4c2..dc02aec 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp | |||
@@ -74,6 +74,48 @@ qint64 Storage::maxRevision(const Akonadi2::Storage::Transaction &transaction) | |||
74 | return r; | 74 | return r; |
75 | } | 75 | } |
76 | 76 | ||
77 | QByteArray Storage::getUidFromRevision(const Akonadi2::Storage::Transaction &transaction, qint64 revision) | ||
78 | { | ||
79 | QByteArray uid; | ||
80 | transaction.openDatabase("revisions").scan(QByteArray::number(revision), [&](const QByteArray &, const QByteArray &value) -> bool { | ||
81 | uid = value; | ||
82 | return false; | ||
83 | }, [](const Error &error){ | ||
84 | if (error.code != ErrorCodes::NotFound) { | ||
85 | //FIXME | ||
86 | // defaultErrorHandler()(error); | ||
87 | } | ||
88 | }); | ||
89 | return uid; | ||
90 | } | ||
91 | |||
92 | QByteArray Storage::getTypeFromRevision(const Akonadi2::Storage::Transaction &transaction, qint64 revision) | ||
93 | { | ||
94 | QByteArray type; | ||
95 | transaction.openDatabase("revisionType").scan(QByteArray::number(revision), [&](const QByteArray &, const QByteArray &value) -> bool { | ||
96 | type = value; | ||
97 | return false; | ||
98 | }, [](const Error &error){ | ||
99 | if (error.code != ErrorCodes::NotFound) { | ||
100 | //FIXME | ||
101 | // defaultErrorHandler()(error); | ||
102 | } | ||
103 | }); | ||
104 | return type; | ||
105 | } | ||
106 | |||
107 | void Storage::recordRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision, const QByteArray &uid, const QByteArray &type) | ||
108 | { | ||
109 | //TODO use integerkeys | ||
110 | transaction.openDatabase("revisions").write(QByteArray::number(revision), uid); | ||
111 | transaction.openDatabase("revisionType").write(QByteArray::number(revision), type); | ||
112 | } | ||
113 | |||
114 | void Storage::removeRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision) | ||
115 | { | ||
116 | transaction.openDatabase("revisions").remove(QByteArray::number(revision)); | ||
117 | } | ||
118 | |||
77 | bool Storage::isInternalKey(const char *key) | 119 | bool Storage::isInternalKey(const char *key) |
78 | { | 120 | { |
79 | return key && strncmp(key, s_internalPrefix, s_internalPrefixSize) == 0; | 121 | return key && strncmp(key, s_internalPrefix, s_internalPrefixSize) == 0; |
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 96448e2..5dede64 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp | |||
@@ -210,6 +210,17 @@ private Q_SLOTS: | |||
210 | Akonadi2::EntityBuffer entityBuffer(buffer.data(), buffer.size()); | 210 | Akonadi2::EntityBuffer entityBuffer(buffer.data(), buffer.size()); |
211 | auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); | 211 | auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); |
212 | QCOMPARE(adaptor->getProperty("summary").toString(), QString("summary2")); | 212 | QCOMPARE(adaptor->getProperty("summary").toString(), QString("summary2")); |
213 | |||
214 | //Both revisions are in the store at this point | ||
215 | QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 2); | ||
216 | |||
217 | //Cleanup old revisions | ||
218 | pipeline.startTransaction(); | ||
219 | pipeline.cleanupRevision(2); | ||
220 | pipeline.commit(); | ||
221 | |||
222 | //And now only the latest revision is left | ||
223 | QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 1); | ||
213 | } | 224 | } |
214 | 225 | ||
215 | void testDelete() | 226 | void testDelete() |
@@ -230,10 +241,18 @@ private Q_SLOTS: | |||
230 | const auto uid = Akonadi2::Storage::uidFromKey(result.first()); | 241 | const auto uid = Akonadi2::Storage::uidFromKey(result.first()); |
231 | 242 | ||
232 | //Delete entity | 243 | //Delete entity |
233 | auto deleteCommand = deleteEntityCommand(uid,1); | 244 | auto deleteCommand = deleteEntityCommand(uid, 1); |
234 | pipeline.startTransaction(); | 245 | pipeline.startTransaction(); |
235 | pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()); | 246 | pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()); |
236 | pipeline.commit(); | 247 | pipeline.commit(); |
248 | |||
249 | //Cleanup old revisions | ||
250 | pipeline.startTransaction(); | ||
251 | pipeline.cleanupRevision(2); | ||
252 | pipeline.commit(); | ||
253 | |||
254 | //And all revisions are gone | ||
255 | QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 0); | ||
237 | } | 256 | } |
238 | }; | 257 | }; |
239 | 258 | ||