summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/pipeline.cpp19
-rw-r--r--common/storage_lmdb.cpp8
-rw-r--r--tests/CMakeLists.txt1
-rw-r--r--tests/pipelinetest.cpp241
4 files changed, 260 insertions, 9 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 14450aa..4fed41f 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -169,7 +169,11 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
169 flatbuffers::FlatBufferBuilder fbb; 169 flatbuffers::FlatBufferBuilder fbb;
170 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 170 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
171 171
172 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 172 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()),
173 [](const Akonadi2::Storage::Error &error) {
174 Warning() << "Failed to write entity";
175 }
176 );
173 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 177 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
174 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 178 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
175 179
@@ -198,6 +202,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
198 auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); 202 auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command);
199 Q_ASSERT(modifyEntity); 203 Q_ASSERT(modifyEntity);
200 204
205 const qint64 baseRevision = modifyEntity->revision();
201 //TODO rename modifyEntity->domainType to bufferType 206 //TODO rename modifyEntity->domainType to bufferType
202 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 207 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
203 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 208 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
@@ -224,8 +229,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
224 auto diff = adaptorFactory->createAdaptor(*diffEntity); 229 auto diff = adaptorFactory->createAdaptor(*diffEntity);
225 230
226 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; 231 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
227 //FIXME: read the revision that this modification is based on, not just the latest one 232 storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
228 storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
229 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 233 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
230 if (!buffer.isValid()) { 234 if (!buffer.isValid()) {
231 Warning() << "Read invalid buffer from disk"; 235 Warning() << "Read invalid buffer from disk";
@@ -234,10 +238,9 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
234 } 238 }
235 return false; 239 return false;
236 }, 240 },
237 [](const Storage::Error &error) { 241 [baseRevision](const Storage::Error &error) {
238 Warning() << "Failed to read value from storage: " << error.message; 242 Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision;
239 }); 243 });
240 //TODO error handler
241 244
242 if (!current) { 245 if (!current) {
243 Warning() << "Failed to read local value " << key; 246 Warning() << "Failed to read local value " << key;
@@ -275,6 +278,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
275 //TODO don't overwrite the old entry, but instead store a new revision 278 //TODO don't overwrite the old entry, but instead store a new revision
276 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 279 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
277 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 280 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
281 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
278 282
279 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) { 283 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) {
280 PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() { 284 PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future]() {
@@ -302,10 +306,11 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
302 306
303 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 307 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
304 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 308 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
309 const qint64 baseRevision = deleteEntity->revision();
305 310
306 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted 311 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted
307 //TODO remove all revisions? 312 //TODO remove all revisions?
308 d->transaction.openDatabase(bufferType + ".main").remove(key); 313 d->transaction.openDatabase(bufferType + ".main").remove(Akonadi2::Storage::assembleKey(key, baseRevision));
309 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 314 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
310 Log() << "Pipeline: deleted entity: "<< newRevision; 315 Log() << "Pipeline: deleted entity: "<< newRevision;
311 316
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp
index 3073d37..be5a9da 100644
--- a/common/storage_lmdb.cpp
+++ b/common/storage_lmdb.cpp
@@ -114,7 +114,9 @@ bool Storage::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sVa
114{ 114{
115 if (!d || !d->transaction) { 115 if (!d || !d->transaction) {
116 Error error("", ErrorCodes::GenericError, "Not open"); 116 Error error("", ErrorCodes::GenericError, "Not open");
117 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 117 if (d) {
118 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
119 }
118 return false; 120 return false;
119 } 121 }
120 const void *keyPtr = sKey.data(); 122 const void *keyPtr = sKey.data();
@@ -149,7 +151,9 @@ void Storage::NamedDatabase::remove(const QByteArray &k,
149{ 151{
150 if (!d || !d->transaction) { 152 if (!d || !d->transaction) {
151 Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Not open"); 153 Error error(d->name.toLatin1(), ErrorCodes::GenericError, "Not open");
152 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 154 if (d) {
155 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
156 }
153 return; 157 return;
154 } 158 }
155 159
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 251e780..b2201ff 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -44,6 +44,7 @@ auto_tests (
44 genericresourcetest 44 genericresourcetest
45 genericfacadetest 45 genericfacadetest
46 resourcecommunicationtest 46 resourcecommunicationtest
47 pipelinetest
47) 48)
48 49
49target_link_libraries(dummyresourcetest akonadi2_resource_dummy) 50target_link_libraries(dummyresourcetest akonadi2_resource_dummy)
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp
new file mode 100644
index 0000000..96448e2
--- /dev/null
+++ b/tests/pipelinetest.cpp
@@ -0,0 +1,241 @@
1#include <QtTest>
2
3#include <QString>
4
5#include "event_generated.h"
6#include "entity_generated.h"
7#include "metadata_generated.h"
8#include "createentity_generated.h"
9#include "modifyentity_generated.h"
10#include "deleteentity_generated.h"
11#include "dummyresource/resourcefactory.h"
12#include "clientapi.h"
13#include "synclistresult.h"
14#include "commands.h"
15#include "entitybuffer.h"
16#include "resourceconfig.h"
17#include "pipeline.h"
18#include "log.h"
19#include "domainadaptor.h"
20
21class TestEventAdaptorFactory : public DomainTypeAdaptorFactory<Akonadi2::ApplicationDomain::Event, Akonadi2::ApplicationDomain::Buffer::Event, Akonadi2::ApplicationDomain::Buffer::EventBuilder>
22{
23public:
24 TestEventAdaptorFactory()
25 : DomainTypeAdaptorFactory()
26 {
27 }
28
29 virtual ~TestEventAdaptorFactory() {};
30};
31
32static void removeFromDisk(const QString &name)
33{
34 Akonadi2::Storage store(Akonadi2::Store::storageLocation(), name, Akonadi2::Storage::ReadWrite);
35 store.removeFromDisk();
36}
37
38static QList<QByteArray> getKeys(const QByteArray &dbEnv, const QByteArray &name)
39{
40 Akonadi2::Storage store(Akonadi2::storageLocation(), dbEnv, Akonadi2::Storage::ReadOnly);
41 auto transaction = store.createTransaction(Akonadi2::Storage::ReadOnly);
42 auto db = transaction.openDatabase(name, nullptr, false);
43 QList<QByteArray> result;
44 db.scan("", [&](const QByteArray &key, const QByteArray &value) {
45 result << key;
46 return true;
47 });
48 return result;
49}
50
51static QByteArray getEntity(const QByteArray &dbEnv, const QByteArray &name, const QByteArray &uid)
52{
53 Akonadi2::Storage store(Akonadi2::storageLocation(), dbEnv, Akonadi2::Storage::ReadOnly);
54 auto transaction = store.createTransaction(Akonadi2::Storage::ReadOnly);
55 auto db = transaction.openDatabase(name, nullptr, false);
56 QByteArray result;
57 db.scan(uid, [&](const QByteArray &key, const QByteArray &value) {
58 result = value;
59 return true;
60 });
61 return result;
62}
63
64flatbuffers::FlatBufferBuilder &createEvent(flatbuffers::FlatBufferBuilder &entityFbb, const QString &s = QString("summary"))
65{
66 flatbuffers::FlatBufferBuilder eventFbb;
67 eventFbb.Clear();
68 {
69 Akonadi2::ApplicationDomain::Buffer::EventBuilder eventBuilder(eventFbb);
70 auto eventLocation = eventBuilder.Finish();
71 Akonadi2::ApplicationDomain::Buffer::FinishEventBuffer(eventFbb, eventLocation);
72 }
73
74 flatbuffers::FlatBufferBuilder localFbb;
75 {
76 auto uid = localFbb.CreateString("testuid");
77 auto summary = localFbb.CreateString(s.toStdString());
78 auto localBuilder = Akonadi2::ApplicationDomain::Buffer::EventBuilder(localFbb);
79 localBuilder.add_uid(uid);
80 localBuilder.add_summary(summary);
81 auto location = localBuilder.Finish();
82 Akonadi2::ApplicationDomain::Buffer::FinishEventBuffer(localFbb, location);
83 }
84
85 Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize());
86 return entityFbb;
87}
88
89QByteArray createEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb)
90{
91 flatbuffers::FlatBufferBuilder fbb;
92 auto type = fbb.CreateString(Akonadi2::ApplicationDomain::getTypeName<Akonadi2::ApplicationDomain::Event>().toStdString().data());
93 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize());
94 Akonadi2::Commands::CreateEntityBuilder builder(fbb);
95 builder.add_domainType(type);
96 builder.add_delta(delta);
97 auto location = builder.Finish();
98 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location);
99
100 const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
101 {
102 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command.data()), command.size());
103 Q_ASSERT(Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer));
104 }
105 return command;
106}
107
108QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision)
109{
110 flatbuffers::FlatBufferBuilder fbb;
111 auto type = fbb.CreateString(Akonadi2::ApplicationDomain::getTypeName<Akonadi2::ApplicationDomain::Event>().toStdString().data());
112 auto id = fbb.CreateString(std::string(uid.constData(), uid.size()));
113 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize());
114 // auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size());
115 Akonadi2::Commands::ModifyEntityBuilder builder(fbb);
116 builder.add_domainType(type);
117 builder.add_delta(delta);
118 builder.add_revision(revision);
119 builder.add_entityId(id);
120 auto location = builder.Finish();
121 Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location);
122
123 const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
124 {
125 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command.data()), command.size());
126 Q_ASSERT(Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer));
127 }
128 return command;
129}
130
131QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision)
132{
133 flatbuffers::FlatBufferBuilder fbb;
134 auto type = fbb.CreateString(Akonadi2::ApplicationDomain::getTypeName<Akonadi2::ApplicationDomain::Event>().toStdString().data());
135 auto id = fbb.CreateString(std::string(uid.constData(), uid.size()));
136 Akonadi2::Commands::DeleteEntityBuilder builder(fbb);
137 builder.add_domainType(type);
138 builder.add_revision(revision);
139 builder.add_entityId(id);
140 auto location = builder.Finish();
141 Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location);
142
143 const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
144 {
145 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command.data()), command.size());
146 Q_ASSERT(Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer));
147 }
148 return command;
149}
150
151class PipelineTest : public QObject
152{
153 Q_OBJECT
154private Q_SLOTS:
155 void initTestCase()
156 {
157 Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace);
158 }
159
160 void init()
161 {
162 removeFromDisk("org.kde.pipelinetest.instance1");
163 }
164
165 void testCreate()
166 {
167 flatbuffers::FlatBufferBuilder entityFbb;
168 auto command = createEntityCommand(createEvent(entityFbb));
169
170 Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1");
171 pipeline.startTransaction();
172 pipeline.newEntity(command.constData(), command.size());
173 pipeline.commit();
174
175 auto result = getKeys("org.kde.pipelinetest.instance1", "event.main");
176 QCOMPARE(result.size(), 1);
177 }
178
179 void testModify()
180 {
181 flatbuffers::FlatBufferBuilder entityFbb;
182 auto command = createEntityCommand(createEvent(entityFbb));
183
184 Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1");
185
186 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
187 pipeline.setAdaptorFactory("event", adaptorFactory);
188
189 //Create the initial revision
190 pipeline.startTransaction();
191 pipeline.newEntity(command.constData(), command.size());
192 pipeline.commit();
193
194 //Get uid of written entity
195 auto keys = getKeys("org.kde.pipelinetest.instance1", "event.main");
196 QCOMPARE(keys.size(), 1);
197 const auto key = keys.first();
198 const auto uid = Akonadi2::Storage::uidFromKey(key);
199
200 //Execute the modification
201 entityFbb.Clear();
202 auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1);
203 pipeline.startTransaction();
204 pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size());
205 pipeline.commit();
206
207 //Ensure we've got the new revision with the modification
208 auto buffer = getEntity("org.kde.pipelinetest.instance1", "event.main", Akonadi2::Storage::assembleKey(uid, 2));
209 QVERIFY(!buffer.isEmpty());
210 Akonadi2::EntityBuffer entityBuffer(buffer.data(), buffer.size());
211 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity());
212 QCOMPARE(adaptor->getProperty("summary").toString(), QString("summary2"));
213 }
214
215 void testDelete()
216 {
217 flatbuffers::FlatBufferBuilder entityFbb;
218 auto command = createEntityCommand(createEvent(entityFbb));
219
220 //Create the initial revision
221 Akonadi2::Pipeline pipeline("org.kde.pipelinetest.instance1");
222 pipeline.startTransaction();
223 pipeline.newEntity(command.constData(), command.size());
224 pipeline.commit();
225
226 // const auto uid = Akonadi2::Storage::uidFromKey(key);
227 auto result = getKeys("org.kde.pipelinetest.instance1", "event.main");
228 QCOMPARE(result.size(), 1);
229
230 const auto uid = Akonadi2::Storage::uidFromKey(result.first());
231
232 //Delete entity
233 auto deleteCommand = deleteEntityCommand(uid,1);
234 pipeline.startTransaction();
235 pipeline.deletedEntity(deleteCommand.constData(), deleteCommand.size());
236 pipeline.commit();
237 }
238};
239
240QTEST_MAIN(PipelineTest)
241#include "pipelinetest.moc"