summaryrefslogtreecommitdiffstats
path: root/common/storage/entitystore.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-19 15:28:42 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-21 09:18:49 +0200
commitba7c8b890c45d735216888204ec88882ef58c918 (patch)
treecb00c9b51e5353ba3726216679c81c0e2fe9ac35 /common/storage/entitystore.cpp
parentda1c86b80f230c3a2023f97c0048020a12e38de4 (diff)
downloadsink-ba7c8b890c45d735216888204ec88882ef58c918.tar.gz
sink-ba7c8b890c45d735216888204ec88882ef58c918.zip
Ported the pipeline to the entitystore
Diffstat (limited to 'common/storage/entitystore.cpp')
-rw-r--r--common/storage/entitystore.cpp185
1 files changed, 185 insertions, 0 deletions
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index fe63f0b..30c7a71 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -25,6 +25,8 @@
25#include "definitions.h" 25#include "definitions.h"
26#include "resourcecontext.h" 26#include "resourcecontext.h"
27#include "index.h" 27#include "index.h"
28#include "bufferutils.h"
29#include "entity_generated.h"
28 30
29#include "mail.h" 31#include "mail.h"
30#include "folder.h" 32#include "folder.h"
@@ -108,16 +110,199 @@ void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMo
108 110
109void EntityStore::commitTransaction() 111void EntityStore::commitTransaction()
110{ 112{
113 SinkTrace() << "Committing transaction";
111 d->transaction.commit(); 114 d->transaction.commit();
112 d->transaction = Storage::DataStore::Transaction(); 115 d->transaction = Storage::DataStore::Transaction();
113} 116}
114 117
115void EntityStore::abortTransaction() 118void EntityStore::abortTransaction()
116{ 119{
120 SinkTrace() << "Aborting transaction";
117 d->transaction.abort(); 121 d->transaction.abort();
118 d->transaction = Storage::DataStore::Transaction(); 122 d->transaction = Storage::DataStore::Transaction();
119} 123}
120 124
125bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess)
126{
127 if (entity_.identifier().isEmpty()) {
128 SinkWarning() << "Can't write entity with an empty identifier";
129 return false;
130 }
131
132 auto entity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(entity_, entity_.availableProperties());
133 entity.setChangedProperties(entity.availableProperties().toSet());
134
135 preprocess(entity);
136 d->typeIndex(type).add(entity.identifier(), entity, d->transaction);
137
138 //The maxRevision may have changed meanwhile if the entity created sub-entities
139 const qint64 newRevision = maxRevision() + 1;
140
141 // Add metadata buffer
142 flatbuffers::FlatBufferBuilder metadataFbb;
143 auto metadataBuilder = MetadataBuilder(metadataFbb);
144 metadataBuilder.add_revision(newRevision);
145 metadataBuilder.add_operation(Operation_Creation);
146 metadataBuilder.add_replayToSource(replayToSource);
147 auto metadataBuffer = metadataBuilder.Finish();
148 FinishMetadataBuffer(metadataFbb, metadataBuffer);
149
150 flatbuffers::FlatBufferBuilder fbb;
151 d->resourceContext.adaptorFactory(type).createBuffer(entity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
152
153 DataStore::mainDatabase(d->transaction, type)
154 .write(DataStore::assembleKey(entity.identifier(), newRevision), BufferUtils::extractBuffer(fbb),
155 [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << entity.identifier() << newRevision; });
156 DataStore::setMaxRevision(d->transaction, newRevision);
157 DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type);
158 SinkTrace() << "Wrote entity: " << entity.identifier() << type << newRevision;
159 return true;
160}
161
162bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &preprocess)
163{
164 auto changeset = diff.changedProperties();
165 //TODO handle errors
166 const auto current = readLatest(type, diff.identifier());
167 if (current.identifier().isEmpty()) {
168 SinkWarning() << "Failed to read current version: " << diff.identifier();
169 return false;
170 }
171
172 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties());
173
174 // Apply diff
175 //SinkTrace() << "Applying changed properties: " << changeset;
176 for (const auto &property : changeset) {
177 const auto value = diff.getProperty(property);
178 if (value.isValid()) {
179 //SinkTrace() << "Setting property: " << property;
180 newEntity.setProperty(property, value);
181 }
182 }
183
184 // Remove deletions
185 for (const auto property : deletions) {
186 //SinkTrace() << "Removing property: " << property;
187 newEntity.setProperty(property, QVariant());
188 }
189
190 preprocess(current, newEntity);
191 d->typeIndex(type).remove(current.identifier(), current, d->transaction);
192 d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction);
193
194 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
195
196 // Add metadata buffer
197 flatbuffers::FlatBufferBuilder metadataFbb;
198 {
199 //We add availableProperties to account for the properties that have been changed by the preprocessors
200 auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newEntity.changedProperties());
201 auto metadataBuilder = MetadataBuilder(metadataFbb);
202 metadataBuilder.add_revision(newRevision);
203 metadataBuilder.add_operation(Operation_Modification);
204 metadataBuilder.add_replayToSource(replayToSource);
205 metadataBuilder.add_modifiedProperties(modifiedProperties);
206 auto metadataBuffer = metadataBuilder.Finish();
207 FinishMetadataBuffer(metadataFbb, metadataBuffer);
208 }
209
210 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
211 SinkTrace() << "All properties: " << newEntity.availableProperties();
212
213 flatbuffers::FlatBufferBuilder fbb;
214 d->resourceContext.adaptorFactory(type).createBuffer(newEntity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
215
216 DataStore::mainDatabase(d->transaction, type)
217 .write(DataStore::assembleKey(newEntity.identifier(), newRevision), BufferUtils::extractBuffer(fbb),
218 [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << newEntity.identifier() << newRevision; });
219 DataStore::setMaxRevision(d->transaction, newRevision);
220 DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type);
221 SinkTrace() << "Wrote modified entity: " << newEntity.identifier() << type << newRevision;
222 return true;
223}
224
225bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &preprocess)
226{
227 bool found = false;
228 bool alreadyRemoved = false;
229 DataStore::mainDatabase(d->transaction, type)
230 .findLatest(uid,
231 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
232 auto entity = GetEntity(data.data());
233 if (entity && entity->metadata()) {
234 auto metadata = GetMetadata(entity->metadata()->Data());
235 found = true;
236 if (metadata->operation() == Operation_Removal) {
237 alreadyRemoved = true;
238 }
239 }
240 return false;
241 },
242 [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; });
243
244 if (!found) {
245 SinkWarning() << "Failed to find entity " << uid;
246 return false;
247 }
248 if (alreadyRemoved) {
249 SinkWarning() << "Entity is already removed " << uid;
250 return false;
251 }
252
253 const auto current = readLatest(type, uid);
254 preprocess(current);
255 d->typeIndex(type).remove(current.identifier(), current, d->transaction);
256
257 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
258
259 // Add metadata buffer
260 flatbuffers::FlatBufferBuilder metadataFbb;
261 auto metadataBuilder = MetadataBuilder(metadataFbb);
262 metadataBuilder.add_revision(newRevision);
263 metadataBuilder.add_operation(Operation_Removal);
264 metadataBuilder.add_replayToSource(replayToSource);
265 auto metadataBuffer = metadataBuilder.Finish();
266 FinishMetadataBuffer(metadataFbb, metadataBuffer);
267
268 flatbuffers::FlatBufferBuilder fbb;
269 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
270
271 DataStore::mainDatabase(d->transaction, type)
272 .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
273 [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; });
274 DataStore::setMaxRevision(d->transaction, newRevision);
275 DataStore::recordRevision(d->transaction, newRevision, uid, type);
276 return true;
277}
278
279void EntityStore::cleanupRevision(qint64 revision)
280{
281 const auto uid = DataStore::getUidFromRevision(d->transaction, revision);
282 const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision);
283 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType;
284 DataStore::mainDatabase(d->transaction, bufferType)
285 .scan(uid,
286 [&](const QByteArray &key, const QByteArray &data) -> bool {
287 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
288 if (!buffer.isValid()) {
289 SinkWarning() << "Read invalid buffer from disk";
290 } else {
291 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
292 const qint64 rev = metadata->revision();
293 // Remove old revisions, and the current if the entity has already been removed
294 if (rev < revision || metadata->operation() == Operation_Removal) {
295 DataStore::removeRevision(d->transaction, rev);
296 DataStore::mainDatabase(d->transaction, bufferType).remove(key);
297 }
298 }
299
300 return true;
301 },
302 [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true);
303 DataStore::setCleanedUpRevision(d->transaction, revision);
304}
305
121QVector<QByteArray> EntityStore::fullScan(const QByteArray &type) 306QVector<QByteArray> EntityStore::fullScan(const QByteArray &type)
122{ 307{
123 SinkTrace() << "Looking for : " << type; 308 SinkTrace() << "Looking for : " << type;