summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/genericresource.cpp31
-rw-r--r--common/metadata.fbs3
-rw-r--r--common/pipeline.cpp67
-rw-r--r--common/pipeline.h12
-rw-r--r--common/storage.h5
-rw-r--r--common/storage_common.cpp42
-rw-r--r--tests/pipelinetest.cpp21
7 files changed, 149 insertions, 32 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 3ffc56b..4abcecd 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -60,7 +60,7 @@ private slots:
60 }).exec(); 60 }).exec();
61 } 61 }
62 62
63 KAsync::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) 63 KAsync::Job<qint64> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand)
64 { 64 {
65 Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); 65 Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId());
66 //Throw command into appropriate pipeline 66 //Throw command into appropriate pipeline
@@ -72,25 +72,27 @@ private slots:
72 case Akonadi2::Commands::CreateEntityCommand: 72 case Akonadi2::Commands::CreateEntityCommand:
73 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 73 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
74 default: 74 default:
75 return KAsync::error<void>(-1, "Unhandled command"); 75 return KAsync::error<qint64>(-1, "Unhandled command");
76 } 76 }
77 return KAsync::null<void>(); 77 return KAsync::null<qint64>();
78 } 78 }
79 79
80 KAsync::Job<void> processQueuedCommand(const QByteArray &data) 80 KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data)
81 { 81 {
82 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); 82 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
83 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { 83 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
84 Warning() << "invalid buffer"; 84 Warning() << "invalid buffer";
85 return KAsync::error<void>(1, "Invalid Buffer"); 85 // return KAsync::error<void, qint64>(1, "Invalid Buffer");
86 } 86 }
87 auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData()); 87 auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData());
88 const auto commandId = queuedCommand->commandId(); 88 const auto commandId = queuedCommand->commandId();
89 Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); 89 Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId);
90 return processQueuedCommand(queuedCommand).then<void>( 90 return processQueuedCommand(queuedCommand).then<qint64, qint64>(
91 [commandId]() { 91 [commandId](qint64 createdRevision) -> qint64 {
92 Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); 92 Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId);
93 }, 93 return createdRevision;
94 }
95 ,
94 [](int errorCode, QString errorMessage) { 96 [](int errorCode, QString errorMessage) {
95 //FIXME propagate error, we didn't handle it 97 //FIXME propagate error, we didn't handle it
96 Warning() << "Error while processing queue command: " << errorMessage; 98 Warning() << "Error while processing queue command: " << errorMessage;
@@ -106,8 +108,17 @@ private slots:
106 }).then(KAsync::dowhile( 108 }).then(KAsync::dowhile(
107 [queue]() { return !queue->isEmpty(); }, 109 [queue]() { return !queue->isEmpty(); },
108 [this, queue](KAsync::Future<void> &future) { 110 [this, queue](KAsync::Future<void> &future) {
109 queue->dequeueBatch(100, [this](const QByteArray &data) { 111 const int batchSize = 100;
110 return processQueuedCommand(data); 112 queue->dequeueBatch(batchSize, [this](const QByteArray &data) {
113 return KAsync::start<void>([this, data](KAsync::Future<void> &future) {
114 processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) {
115 Trace() << "Created revision " << createdRevision;
116 //We don't have a writeback yet, so we cleanup revisions immediately
117 //TODO: only cleanup once writeback is done
118 mPipeline->cleanupRevision(createdRevision);
119 future.setFinished();
120 }).exec();
121 });
111 } 122 }
112 ).then<void>([&future, queue](){ 123 ).then<void>([&future, queue](){
113 future.setFinished(); 124 future.setFinished();
diff --git a/common/metadata.fbs b/common/metadata.fbs
index bb1163d..1455238 100644
--- a/common/metadata.fbs
+++ b/common/metadata.fbs
@@ -1,9 +1,12 @@
1namespace Akonadi2; 1namespace Akonadi2;
2 2
3enum Operation : byte { Creation = 1, Modification, Removal }
4
3table Metadata { 5table Metadata {
4 revision: ulong; 6 revision: ulong;
5 processed: bool = true; 7 processed: bool = true;
6 processingProgress: [string]; 8 processingProgress: [string];
9 operation: Operation = Modification;
7} 10}
8 11
9root_type Metadata; 12root_type Metadata;
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 4fed41f..c108540 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -124,7 +124,7 @@ void Pipeline::null()
124 // state.step(); 124 // state.step();
125} 125}
126 126
127KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) 127KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
128{ 128{
129 Log() << "Pipeline: New Entity"; 129 Log() << "Pipeline: New Entity";
130 130
@@ -137,7 +137,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
137 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 137 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
138 if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { 138 if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) {
139 Warning() << "invalid buffer, not a create entity buffer"; 139 Warning() << "invalid buffer, not a create entity buffer";
140 return KAsync::error<void>(); 140 return KAsync::error<qint64>(0);
141 } 141 }
142 } 142 }
143 auto createEntity = Akonadi2::Commands::GetCreateEntity(command); 143 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
@@ -148,13 +148,13 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
149 if (!Akonadi2::VerifyEntityBuffer(verifyer)) { 149 if (!Akonadi2::VerifyEntityBuffer(verifyer)) {
150 Warning() << "invalid buffer, not an entity buffer"; 150 Warning() << "invalid buffer, not an entity buffer";
151 return KAsync::error<void>(); 151 return KAsync::error<qint64>(0);
152 } 152 }
153 } 153 }
154 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); 154 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data());
155 if (!entity->resource()->size() && !entity->local()->size()) { 155 if (!entity->resource()->size() && !entity->local()->size()) {
156 Warning() << "No local and no resource buffer while trying to create entity."; 156 Warning() << "No local and no resource buffer while trying to create entity.";
157 return KAsync::error<void>(); 157 return KAsync::error<qint64>(0);
158 } 158 }
159 159
160 //Add metadata buffer 160 //Add metadata buffer
@@ -175,10 +175,12 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
175 } 175 }
176 ); 176 );
177 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 177 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
178 Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType);
178 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 179 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
179 180
180 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { 181 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) {
181 PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future]() { 182 PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future, newRevision]() {
183 future.setValue(newRevision);
182 future.setFinished(); 184 future.setFinished();
183 }, bufferType); 185 }, bufferType);
184 d->activePipelines << state; 186 d->activePipelines << state;
@@ -186,7 +188,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
186 }); 188 });
187} 189}
188 190
189KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) 191KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
190{ 192{
191 Log() << "Pipeline: Modified Entity"; 193 Log() << "Pipeline: Modified Entity";
192 194
@@ -196,7 +198,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
196 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 198 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
197 if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) { 199 if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) {
198 Warning() << "invalid buffer, not a modify entity buffer"; 200 Warning() << "invalid buffer, not a modify entity buffer";
199 return KAsync::error<void>(); 201 return KAsync::error<qint64>(0);
200 } 202 }
201 } 203 }
202 auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); 204 auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command);
@@ -208,20 +210,20 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
208 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 210 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
209 if (bufferType.isEmpty() || key.isEmpty()) { 211 if (bufferType.isEmpty() || key.isEmpty()) {
210 Warning() << "entity type or key " << bufferType << key; 212 Warning() << "entity type or key " << bufferType << key;
211 return KAsync::error<void>(); 213 return KAsync::error<qint64>(0);
212 } 214 }
213 { 215 {
214 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 216 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
215 if (!Akonadi2::VerifyEntityBuffer(verifyer)) { 217 if (!Akonadi2::VerifyEntityBuffer(verifyer)) {
216 Warning() << "invalid buffer, not an entity buffer"; 218 Warning() << "invalid buffer, not an entity buffer";
217 return KAsync::error<void>(); 219 return KAsync::error<qint64>(0);
218 } 220 }
219 } 221 }
220 222
221 auto adaptorFactory = d->adaptorFactory.value(bufferType); 223 auto adaptorFactory = d->adaptorFactory.value(bufferType);
222 if (!adaptorFactory) { 224 if (!adaptorFactory) {
223 Warning() << "no adaptor factory for type " << bufferType; 225 Warning() << "no adaptor factory for type " << bufferType;
224 return KAsync::error<void>(); 226 return KAsync::error<qint64>(0);
225 } 227 }
226 228
227 auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data()); 229 auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data());
@@ -244,7 +246,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
244 246
245 if (!current) { 247 if (!current) {
246 Warning() << "Failed to read local value " << key; 248 Warning() << "Failed to read local value " << key;
247 return KAsync::error<void>(); 249 return KAsync::error<qint64>(0);
248 } 250 }
249 251
250 //resource and uid don't matter at this point 252 //resource and uid don't matter at this point
@@ -278,10 +280,12 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
278 //TODO don't overwrite the old entry, but instead store a new revision 280 //TODO don't overwrite the old entry, but instead store a new revision
279 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 281 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
280 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 282 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
283 Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType);
281 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 284 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
282 285
283 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { 286 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) {
284 PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { 287 PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future, newRevision]() {
288 future.setValue(newRevision);
285 future.setFinished(); 289 future.setFinished();
286 }, bufferType); 290 }, bufferType);
287 d->activePipelines << state; 291 d->activePipelines << state;
@@ -289,7 +293,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
289 }); 293 });
290} 294}
291 295
292KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) 296KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
293{ 297{
294 Log() << "Pipeline: Deleted Entity"; 298 Log() << "Pipeline: Deleted Entity";
295 299
@@ -299,7 +303,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
299 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 303 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
300 if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) { 304 if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) {
301 Warning() << "invalid buffer, not a delete entity buffer"; 305 Warning() << "invalid buffer, not a delete entity buffer";
302 return KAsync::error<void>(); 306 return KAsync::error<qint64>(0);
303 } 307 }
304 } 308 }
305 auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); 309 auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command);
@@ -312,10 +316,12 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
312 //TODO remove all revisions? 316 //TODO remove all revisions?
313 d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision)); 317 d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision));
314 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 318 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
319 Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType);
315 Log() << "Pipeline: deleted entity: "<< newRevision; 320 Log() << "Pipeline: deleted entity: "<< newRevision;
316 321
317 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { 322 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) {
318 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future](){ 323 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future, newRevision](){
324 future.setValue(newRevision);
319 future.setFinished(); 325 future.setFinished();
320 }, bufferType); 326 }, bufferType);
321 d->activePipelines << state; 327 d->activePipelines << state;
@@ -323,6 +329,31 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
323 }); 329 });
324} 330}
325 331
332void Pipeline::cleanupRevision(qint64 revision)
333{
334 const auto uid = Akonadi2::Storage::getUidFromRevision(d->transaction, revision);
335 const auto bufferType = Akonadi2::Storage::getTypeFromRevision(d->transaction, revision);
336 Trace() << "Cleaning up revision " << revision << uid << bufferType;
337 d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool {
338 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
339 if (!buffer.isValid()) {
340 Warning() << "Read invalid buffer from disk";
341 } else {
342 const auto metadata = flatbuffers::GetRoot<Akonadi2::Metadata>(buffer.metadataBuffer());
343 const qint64 rev = metadata->revision();
344 //Remove old revisions, and the current if the entity has already been removed
345 if (rev < revision || metadata->operation() == Akonadi2::Operation_Removal) {
346 Akonadi2::Storage::removeRevision(d->transaction, rev);
347 d->transaction.openDatabase(bufferType + ".main").remove(key);
348 }
349 }
350
351 return true;
352 }, [](const Akonadi2::Storage::Error &error) {
353 Warning() << "Error while reading: " << error.message;
354 }, true);
355}
356
326void Pipeline::pipelineStepped(const PipelineState &state) 357void Pipeline::pipelineStepped(const PipelineState &state)
327{ 358{
328 scheduleStep(); 359 scheduleStep();
diff --git a/common/pipeline.h b/common/pipeline.h
index 573af73..89232d0 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -58,9 +58,15 @@ public:
58 void null(); 58 void null();
59 void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); 59 void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory);
60 60
61 KAsync::Job<void> newEntity(void const *command, size_t size); 61 KAsync::Job<qint64> newEntity(void const *command, size_t size);
62 KAsync::Job<void> modifiedEntity(void const *command, size_t size); 62 KAsync::Job<qint64> modifiedEntity(void const *command, size_t size);
63 KAsync::Job<void> deletedEntity(void const *command, size_t size); 63 KAsync::Job<qint64> deletedEntity(void const *command, size_t size);
64 /*
65 * Cleans up a single revision.
66 *
67 * This has to be called for every revision in consecutive order.
68 */
69 void cleanupRevision(qint64 revision);
64 70
65Q_SIGNALS: 71Q_SIGNALS:
66 void revisionUpdated(qint64); 72 void revisionUpdated(qint64);
diff --git a/common/storage.h b/common/storage.h
index 98b12ed..9459f04 100644
--- a/common/storage.h
+++ b/common/storage.h
@@ -168,6 +168,11 @@ public:
168 static qint64 maxRevision(const Akonadi2::Storage::Transaction &); 168 static qint64 maxRevision(const Akonadi2::Storage::Transaction &);
169 static void setMaxRevision(Akonadi2::Storage::Transaction &, qint64 revision); 169 static void setMaxRevision(Akonadi2::Storage::Transaction &, qint64 revision);
170 170
171 static QByteArray getUidFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision);
172 static QByteArray getTypeFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision);
173 static void recordRevision(Akonadi2::Storage::Transaction &, qint64 revision, const QByteArray &uid, const QByteArray &type);
174 static void removeRevision(Akonadi2::Storage::Transaction &, qint64 revision);
175
171 bool exists() const; 176 bool exists() const;
172 177
173 static bool isInternalKey(const char *key); 178 static bool isInternalKey(const char *key);
diff --git a/common/storage_common.cpp b/common/storage_common.cpp
index 28fb4c2..dc02aec 100644
--- a/common/storage_common.cpp
+++ b/common/storage_common.cpp
@@ -74,6 +74,48 @@ qint64 Storage::maxRevision(const Akonadi2::Storage::Transaction &transaction)
74 return r; 74 return r;
75} 75}
76 76
77QByteArray Storage::getUidFromRevision(const Akonadi2::Storage::Transaction &transaction, qint64 revision)
78{
79 QByteArray uid;
80 transaction.openDatabase("revisions").scan(QByteArray::number(revision), [&](const QByteArray &, const QByteArray &value) -> bool {
81 uid = value;
82 return false;
83 }, [](const Error &error){
84 if (error.code != ErrorCodes::NotFound) {
85 //FIXME
86 // defaultErrorHandler()(error);
87 }
88 });
89 return uid;
90}
91
92QByteArray Storage::getTypeFromRevision(const Akonadi2::Storage::Transaction &transaction, qint64 revision)
93{
94 QByteArray type;
95 transaction.openDatabase("revisionType").scan(QByteArray::number(revision), [&](const QByteArray &, const QByteArray &value) -> bool {
96 type = value;
97 return false;
98 }, [](const Error &error){
99 if (error.code != ErrorCodes::NotFound) {
100 //FIXME
101 // defaultErrorHandler()(error);
102 }
103 });
104 return type;
105}
106
107void Storage::recordRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision, const QByteArray &uid, const QByteArray &type)
108{
109 //TODO use integerkeys
110 transaction.openDatabase("revisions").write(QByteArray::number(revision), uid);
111 transaction.openDatabase("revisionType").write(QByteArray::number(revision), type);
112}
113
114void Storage::removeRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision)
115{
116 transaction.openDatabase("revisions").remove(QByteArray::number(revision));
117}
118
77bool Storage::isInternalKey(const char *key) 119bool Storage::isInternalKey(const char *key)
78{ 120{
79 return key && strncmp(key, s_internalPrefix, s_internalPrefixSize) == 0; 121 return key && strncmp(key, s_internalPrefix, s_internalPrefixSize) == 0;
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp
index 96448e2..5dede64 100644
--- a/tests/pipelinetest.cpp
+++ b/tests/pipelinetest.cpp
@@ -210,6 +210,17 @@ private Q_SLOTS:
210 Akonadi2::EntityBuffer entityBuffer(buffer.data(), buffer.size()); 210 Akonadi2::EntityBuffer entityBuffer(buffer.data(), buffer.size());
211 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); 211 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity());
212 QCOMPARE(adaptor->getProperty("summary").toString(), QString("summary2")); 212 QCOMPARE(adaptor->getProperty("summary").toString(), QString("summary2"));
213
214 //Both revisions are in the store at this point
215 QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 2);
216
217 //Cleanup old revisions
218 pipeline.startTransaction();
219 pipeline.cleanupRevision(2);
220 pipeline.commit();
221
222 //And now only the latest revision is left
223 QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 1);
213 } 224 }
214 225
215 void testDelete() 226 void testDelete()
@@ -230,10 +241,18 @@ private Q_SLOTS:
230 const auto uid = Akonadi2::Storage::uidFromKey(result.first()); 241 const auto uid = Akonadi2::Storage::uidFromKey(result.first());
231 242
232 //Delete entity 243 //Delete entity
233 auto deleteCommand = deleteEntityCommand(uid,1); 244 auto deleteCommand = deleteEntityCommand(uid, 1);
234 pipeline.startTransaction(); 245 pipeline.startTransaction();
235 pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size()); 246 pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size());
236 pipeline.commit(); 247 pipeline.commit();
248
249 //Cleanup old revisions
250 pipeline.startTransaction();
251 pipeline.cleanupRevision(2);
252 pipeline.commit();
253
254 //And all revisions are gone
255 QCOMPARE(getKeys("org.kde.pipelinetest.instance1", "event.main").size(), 0);
237 } 256 }
238}; 257};
239 258