summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp65
1 files changed, 44 insertions, 21 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index afb9e34..1197408 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -84,13 +84,16 @@ void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, co
84 }; 84 };
85} 85}
86 86
87Storage &Pipeline::storage() const
88void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) 87void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory)
89{ 88{
90 return d->storage;
91 d->adaptorFactory.insert(entityType, factory); 89 d->adaptorFactory.insert(entityType, factory);
92} 90}
93 91
92Storage &Pipeline::storage() const
93{
94 return d->storage;
95}
96
94void Pipeline::null() 97void Pipeline::null()
95{ 98{
96 //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) 99 //TODO: is there really any need for the null pipeline? if so, it should be doing something ;)
@@ -111,7 +114,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
111 { 114 {
112 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 115 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
113 if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { 116 if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) {
114 qWarning() << "invalid buffer, not a create entity buffer"; 117 Warning() << "invalid buffer, not a create entity buffer";
115 return KAsync::error<void>(); 118 return KAsync::error<void>();
116 } 119 }
117 } 120 }
@@ -122,7 +125,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
122 { 125 {
123 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 126 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
124 if (!Akonadi2::VerifyEntityBuffer(verifyer)) { 127 if (!Akonadi2::VerifyEntityBuffer(verifyer)) {
125 qWarning() << "invalid buffer, not an entity buffer"; 128 Warning() << "invalid buffer, not an entity buffer";
126 return KAsync::error<void>(); 129 return KAsync::error<void>();
127 } 130 }
128 } 131 }
@@ -142,7 +145,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
142 145
143 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); 146 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
144 storage().setMaxRevision(newRevision); 147 storage().setMaxRevision(newRevision);
145 Log() << "Pipeline: wrote entity: "<< newRevision; 148 Log() << "Pipeline: wrote entity: " << key << newRevision;
146 149
147 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 150 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
148 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { 151 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() {
@@ -162,51 +165,72 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
162 { 165 {
163 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 166 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
164 if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) { 167 if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) {
165 qWarning() << "invalid buffer, not a modify entity buffer"; 168 Warning() << "invalid buffer, not a modify entity buffer";
166 return KAsync::error<void>(); 169 return KAsync::error<void>();
167 } 170 }
168 } 171 }
169 auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); 172 auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command);
173 Q_ASSERT(modifyEntity);
170 174
171 //TODO rename modifyEntity->domainType to bufferType 175 //TODO rename modifyEntity->domainType to bufferType
172 const QByteArray entityType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 176 const QByteArray entityType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
173 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 177 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
178 if (entityType.isEmpty() || key.isEmpty()) {
179 Warning() << "entity type or key " << entityType << key;
180 return KAsync::error<void>();
181 }
174 { 182 {
175 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 183 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
176 if (!Akonadi2::VerifyEntityBuffer(verifyer)) { 184 if (!Akonadi2::VerifyEntityBuffer(verifyer)) {
177 qWarning() << "invalid buffer, not an entity buffer"; 185 Warning() << "invalid buffer, not an entity buffer";
178 return KAsync::error<void>(); 186 return KAsync::error<void>();
179 } 187 }
180 } 188 }
181 189
182 auto adaptorFactory = d->adaptorFactory.value(entityType); 190 auto adaptorFactory = d->adaptorFactory.value(entityType);
183 if (adaptorFactory) { 191 if (!adaptorFactory) {
184 qWarning() << "no adaptor factory"; 192 Warning() << "no adaptor factory for type " << entityType;
185 return KAsync::error<void>(); 193 return KAsync::error<void>();
186 } 194 }
187 195
188 auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data()); 196 auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data());
197 Q_ASSERT(diffEntity);
189 auto diff = adaptorFactory->createAdaptor(*diffEntity); 198 auto diff = adaptorFactory->createAdaptor(*diffEntity);
190 199
191 Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr domainType; 200 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
192 storage().scan(QByteArray::fromRawData(key.data(), key.size()), [&domainType](const QByteArray &data) -> bool { 201 storage().scan(QByteArray::fromRawData(key.data(), key.size()), [&current, adaptorFactory](const QByteArray &data) -> bool {
193 auto existingEntity = Akonadi2::GetEntity(data.data()); 202 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
194 domainType = getDomainType(*existingEntity); 203 if (!buffer.isValid()) {
204 Warning() << "Read invalid buffer from disk";
205 } else {
206 current = adaptorFactory->createAdaptor(buffer.entity());
207 }
195 return false; 208 return false;
196 }); 209 });
197 //TODO error handler 210 //TODO error handler
198 211
212 if (!current) {
213 Warning() << "Failed to read local value ";
214 return KAsync::error<void>();
215 }
216
217 //resource and uid don't matter at this point
218 const Akonadi2::ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current);
219 auto newObject = Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<Akonadi2::ApplicationDomain::ApplicationDomainType>(existingObject);
220
199 //Apply diff 221 //Apply diff
200 //FIXME only apply the properties that are available in the buffer 222 //FIXME only apply the properties that are available in the buffer
201 for (const auto &property : diff->availableProperties()) { 223 for (const auto &property : diff->availableProperties()) {
202 domainType->setProperty(property, diff->getProperty(property)); 224 newObject->setProperty(property, diff->getProperty(property));
203 } 225 }
226
204 //Remove deletions 227 //Remove deletions
205 for (const auto &property : *modifyEntity->deletions()) { 228 if (modifyEntity->deletions()) {
206 domainType->setProperty(QByteArray::fromRawData(property->data(), property->size()), QVariant()); 229 for (const auto &property : *modifyEntity->deletions()) {
230 newObject->setProperty(QByteArray::fromRawData(property->data(), property->size()), QVariant());
231 }
207 } 232 }
208 233
209
210 //Add metadata buffer 234 //Add metadata buffer
211 flatbuffers::FlatBufferBuilder metadataFbb; 235 flatbuffers::FlatBufferBuilder metadataFbb;
212 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 236 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
@@ -215,16 +239,15 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
215 auto metadataBuffer = metadataBuilder.Finish(); 239 auto metadataBuffer = metadataBuilder.Finish();
216 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); 240 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
217 241
218
219 flatbuffers::FlatBufferBuilder fbb; 242 flatbuffers::FlatBufferBuilder fbb;
220 adaptorFactory->createBuffer(*domainType, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 243 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
221 244
222 //TODO don't overwrite the old entry, but instead store a new revision 245 //TODO don't overwrite the old entry, but instead store a new revision
223 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); 246 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
224 storage().setMaxRevision(newRevision); 247 storage().setMaxRevision(newRevision);
225 248
226 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 249 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
227 PipelineState state(this, ModifiedPipeline, key, d->newPipeline[entityType], [&future]() { 250 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() {
228 future.setFinished(); 251 future.setFinished();
229 }); 252 });
230 d->activePipelines << state; 253 d->activePipelines << state;
@@ -241,7 +264,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
241 { 264 {
242 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 265 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
243 if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) { 266 if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) {
244 qWarning() << "invalid buffer, not a delete entity buffer"; 267 Warning() << "invalid buffer, not a delete entity buffer";
245 return KAsync::error<void>(); 268 return KAsync::error<void>();
246 } 269 }
247 } 270 }