diff options
-rw-r--r-- | common/commands/deleteentity.fbs | 3 | ||||
-rw-r--r-- | common/commands/modifyentity.fbs | 2 | ||||
-rw-r--r-- | common/genericresource.cpp | 7 | ||||
-rw-r--r-- | common/pipeline.cpp | 119 | ||||
-rw-r--r-- | common/pipeline.h | 6 | ||||
-rw-r--r-- | examples/dummyresource/resourcefactory.cpp | 1 |
6 files changed, 121 insertions, 17 deletions
diff --git a/common/commands/deleteentity.fbs b/common/commands/deleteentity.fbs index c9b7850..4f32b54 100644 --- a/common/commands/deleteentity.fbs +++ b/common/commands/deleteentity.fbs | |||
@@ -1,8 +1,9 @@ | |||
1 | namespace Akonadi2; | 1 | namespace Akonadi2.Commands; |
2 | 2 | ||
3 | table DeleteEntity { | 3 | table DeleteEntity { |
4 | revision: ulong; | 4 | revision: ulong; |
5 | entityId: string; | 5 | entityId: string; |
6 | domainType: string; | ||
6 | } | 7 | } |
7 | 8 | ||
8 | root_type DeleteEntity; | 9 | root_type DeleteEntity; |
diff --git a/common/commands/modifyentity.fbs b/common/commands/modifyentity.fbs index d26051e..a59eb9b 100644 --- a/common/commands/modifyentity.fbs +++ b/common/commands/modifyentity.fbs | |||
@@ -1,4 +1,4 @@ | |||
1 | namespace Akonadi2; | 1 | namespace Akonadi2.Commands; |
2 | 2 | ||
3 | table ModifyEntity { | 3 | table ModifyEntity { |
4 | revision: ulong; | 4 | revision: ulong; |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 139ae98..a500aed 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -9,7 +9,6 @@ | |||
9 | #include "clientapi.h" | 9 | #include "clientapi.h" |
10 | #include "index.h" | 10 | #include "index.h" |
11 | #include "log.h" | 11 | #include "log.h" |
12 | #include <assert.h> | ||
13 | 12 | ||
14 | using namespace Akonadi2; | 13 | using namespace Akonadi2; |
15 | 14 | ||
@@ -67,11 +66,9 @@ private slots: | |||
67 | //Throw command into appropriate pipeline | 66 | //Throw command into appropriate pipeline |
68 | switch (queuedCommand->commandId()) { | 67 | switch (queuedCommand->commandId()) { |
69 | case Akonadi2::Commands::DeleteEntityCommand: | 68 | case Akonadi2::Commands::DeleteEntityCommand: |
70 | //mPipeline->removedEntity | 69 | return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
71 | return KAsync::null<void>(); | ||
72 | case Akonadi2::Commands::ModifyEntityCommand: | 70 | case Akonadi2::Commands::ModifyEntityCommand: |
73 | //mPipeline->modifiedEntity | 71 | return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
74 | return KAsync::null<void>(); | ||
75 | case Akonadi2::Commands::CreateEntityCommand: | 72 | case Akonadi2::Commands::CreateEntityCommand: |
76 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 73 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
77 | default: | 74 | default: |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 21cf1c5..afb9e34 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -28,8 +28,11 @@ | |||
28 | #include "entity_generated.h" | 28 | #include "entity_generated.h" |
29 | #include "metadata_generated.h" | 29 | #include "metadata_generated.h" |
30 | #include "createentity_generated.h" | 30 | #include "createentity_generated.h" |
31 | #include "modifyentity_generated.h" | ||
32 | #include "deleteentity_generated.h" | ||
31 | #include "entitybuffer.h" | 33 | #include "entitybuffer.h" |
32 | #include "log.h" | 34 | #include "log.h" |
35 | #include "domain/applicationdomaintype.h" | ||
33 | 36 | ||
34 | namespace Akonadi2 | 37 | namespace Akonadi2 |
35 | { | 38 | { |
@@ -50,6 +53,7 @@ public: | |||
50 | QHash<QString, QVector<Preprocessor *> > deletedPipeline; | 53 | QHash<QString, QVector<Preprocessor *> > deletedPipeline; |
51 | QVector<PipelineState> activePipelines; | 54 | QVector<PipelineState> activePipelines; |
52 | bool stepScheduled; | 55 | bool stepScheduled; |
56 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; | ||
53 | }; | 57 | }; |
54 | 58 | ||
55 | Pipeline::Pipeline(const QString &resourceName, QObject *parent) | 59 | Pipeline::Pipeline(const QString &resourceName, QObject *parent) |
@@ -81,8 +85,10 @@ void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, co | |||
81 | } | 85 | } |
82 | 86 | ||
83 | Storage &Pipeline::storage() const | 87 | Storage &Pipeline::storage() const |
88 | void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) | ||
84 | { | 89 | { |
85 | return d->storage; | 90 | return d->storage; |
91 | d->adaptorFactory.insert(entityType, factory); | ||
86 | } | 92 | } |
87 | 93 | ||
88 | void Pipeline::null() | 94 | void Pipeline::null() |
@@ -147,18 +153,115 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
147 | }); | 153 | }); |
148 | } | 154 | } |
149 | 155 | ||
150 | void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) | 156 | KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) |
151 | { | 157 | { |
152 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [](){}); | 158 | Log() << "Pipeline: Modified Entity"; |
153 | d->activePipelines << state; | 159 | |
154 | state.step(); | 160 | const qint64 newRevision = storage().maxRevision() + 1; |
161 | |||
162 | { | ||
163 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | ||
164 | if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) { | ||
165 | qWarning() << "invalid buffer, not a modify entity buffer"; | ||
166 | return KAsync::error<void>(); | ||
167 | } | ||
168 | } | ||
169 | auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); | ||
170 | |||
171 | //TODO rename modifyEntity->domainType to bufferType | ||
172 | const QByteArray entityType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); | ||
173 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); | ||
174 | { | ||
175 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); | ||
176 | if (!Akonadi2::VerifyEntityBuffer(verifyer)) { | ||
177 | qWarning() << "invalid buffer, not an entity buffer"; | ||
178 | return KAsync::error<void>(); | ||
179 | } | ||
180 | } | ||
181 | |||
182 | auto adaptorFactory = d->adaptorFactory.value(entityType); | ||
183 | if (adaptorFactory) { | ||
184 | qWarning() << "no adaptor factory"; | ||
185 | return KAsync::error<void>(); | ||
186 | } | ||
187 | |||
188 | auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data()); | ||
189 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | ||
190 | |||
191 | Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr domainType; | ||
192 | storage().scan(QByteArray::fromRawData(key.data(), key.size()), [&domainType](const QByteArray &data) -> bool { | ||
193 | auto existingEntity = Akonadi2::GetEntity(data.data()); | ||
194 | domainType = getDomainType(*existingEntity); | ||
195 | return false; | ||
196 | }); | ||
197 | //TODO error handler | ||
198 | |||
199 | //Apply diff | ||
200 | //FIXME only apply the properties that are available in the buffer | ||
201 | for (const auto &property : diff->availableProperties()) { | ||
202 | domainType->setProperty(property, diff->getProperty(property)); | ||
203 | } | ||
204 | //Remove deletions | ||
205 | for (const auto &property : *modifyEntity->deletions()) { | ||
206 | domainType->setProperty(QByteArray::fromRawData(property->data(), property->size()), QVariant()); | ||
207 | } | ||
208 | |||
209 | |||
210 | //Add metadata buffer | ||
211 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
212 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | ||
213 | metadataBuilder.add_revision(newRevision); | ||
214 | metadataBuilder.add_processed(false); | ||
215 | auto metadataBuffer = metadataBuilder.Finish(); | ||
216 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
217 | |||
218 | |||
219 | flatbuffers::FlatBufferBuilder fbb; | ||
220 | adaptorFactory->createBuffer(*domainType, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | ||
221 | |||
222 | //TODO don't overwrite the old entry, but instead store a new revision | ||
223 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | ||
224 | storage().setMaxRevision(newRevision); | ||
225 | |||
226 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | ||
227 | PipelineState state(this, ModifiedPipeline, key, d->newPipeline[entityType], [&future]() { | ||
228 | future.setFinished(); | ||
229 | }); | ||
230 | d->activePipelines << state; | ||
231 | state.step(); | ||
232 | }); | ||
155 | } | 233 | } |
156 | 234 | ||
157 | void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) | 235 | KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) |
158 | { | 236 | { |
159 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [](){}); | 237 | Log() << "Pipeline: Deleted Entity"; |
160 | d->activePipelines << state; | 238 | |
161 | state.step(); | 239 | const qint64 newRevision = storage().maxRevision() + 1; |
240 | |||
241 | { | ||
242 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | ||
243 | if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) { | ||
244 | qWarning() << "invalid buffer, not a delete entity buffer"; | ||
245 | return KAsync::error<void>(); | ||
246 | } | ||
247 | } | ||
248 | auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); | ||
249 | |||
250 | const QByteArray entityType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); | ||
251 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | ||
252 | |||
253 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted | ||
254 | storage().remove(key.data(), key.size()); | ||
255 | storage().setMaxRevision(newRevision); | ||
256 | Log() << "Pipeline: deleted entity: "<< newRevision; | ||
257 | |||
258 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | ||
259 | PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [&future](){ | ||
260 | future.setFinished(); | ||
261 | }); | ||
262 | d->activePipelines << state; | ||
263 | state.step(); | ||
264 | }); | ||
162 | } | 265 | } |
163 | 266 | ||
164 | void Pipeline::pipelineStepped(const PipelineState &state) | 267 | void Pipeline::pipelineStepped(const PipelineState &state) |
diff --git a/common/pipeline.h b/common/pipeline.h index 6df2d76..a6696ec 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -31,6 +31,7 @@ | |||
31 | #include <Async/Async> | 31 | #include <Async/Async> |
32 | 32 | ||
33 | #include "entity_generated.h" | 33 | #include "entity_generated.h" |
34 | #include "domainadaptor.h" | ||
34 | 35 | ||
35 | namespace Akonadi2 | 36 | namespace Akonadi2 |
36 | { | 37 | { |
@@ -53,10 +54,11 @@ public: | |||
53 | void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); | 54 | void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); |
54 | 55 | ||
55 | void null(); | 56 | void null(); |
57 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); | ||
56 | 58 | ||
57 | KAsync::Job<void> newEntity(void const *command, size_t size); | 59 | KAsync::Job<void> newEntity(void const *command, size_t size); |
58 | void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); | 60 | KAsync::Job<void> modifiedEntity(void const *command, size_t size); |
59 | void deletedEntity(const QString &entityType, const QByteArray &key); | 61 | KAsync::Job<void> deletedEntity(void const *command, size_t size); |
60 | 62 | ||
61 | Q_SIGNALS: | 63 | Q_SIGNALS: |
62 | void revisionUpdated(); | 64 | void revisionUpdated(); |
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index 9e8a2fc..b9123d5 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp | |||
@@ -58,6 +58,7 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) | |||
58 | }); | 58 | }); |
59 | 59 | ||
60 | pipeline->setPreprocessors(ENTITY_TYPE_EVENT, Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); | 60 | pipeline->setPreprocessors(ENTITY_TYPE_EVENT, Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); |
61 | pipeline->setAdaptorFactory(ENTITY_TYPE_EVENT, eventFactory); | ||
61 | //TODO cleanup indexes during removal | 62 | //TODO cleanup indexes during removal |
62 | GenericResource::configurePipeline(pipeline); | 63 | GenericResource::configurePipeline(pipeline); |
63 | } | 64 | } |