diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-19 15:28:42 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-21 09:18:49 +0200 |
commit | ba7c8b890c45d735216888204ec88882ef58c918 (patch) | |
tree | cb00c9b51e5353ba3726216679c81c0e2fe9ac35 /common/storage | |
parent | da1c86b80f230c3a2023f97c0048020a12e38de4 (diff) | |
download | sink-ba7c8b890c45d735216888204ec88882ef58c918.tar.gz sink-ba7c8b890c45d735216888204ec88882ef58c918.zip |
Ported the pipeline to the entitystore
Diffstat (limited to 'common/storage')
-rw-r--r-- | common/storage/entitystore.cpp | 185 | ||||
-rw-r--r-- | common/storage/entitystore.h | 11 |
2 files changed, 193 insertions, 3 deletions
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index fe63f0b..30c7a71 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp | |||
@@ -25,6 +25,8 @@ | |||
25 | #include "definitions.h" | 25 | #include "definitions.h" |
26 | #include "resourcecontext.h" | 26 | #include "resourcecontext.h" |
27 | #include "index.h" | 27 | #include "index.h" |
28 | #include "bufferutils.h" | ||
29 | #include "entity_generated.h" | ||
28 | 30 | ||
29 | #include "mail.h" | 31 | #include "mail.h" |
30 | #include "folder.h" | 32 | #include "folder.h" |
@@ -108,16 +110,199 @@ void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMo | |||
108 | 110 | ||
109 | void EntityStore::commitTransaction() | 111 | void EntityStore::commitTransaction() |
110 | { | 112 | { |
113 | SinkTrace() << "Committing transaction"; | ||
111 | d->transaction.commit(); | 114 | d->transaction.commit(); |
112 | d->transaction = Storage::DataStore::Transaction(); | 115 | d->transaction = Storage::DataStore::Transaction(); |
113 | } | 116 | } |
114 | 117 | ||
115 | void EntityStore::abortTransaction() | 118 | void EntityStore::abortTransaction() |
116 | { | 119 | { |
120 | SinkTrace() << "Aborting transaction"; | ||
117 | d->transaction.abort(); | 121 | d->transaction.abort(); |
118 | d->transaction = Storage::DataStore::Transaction(); | 122 | d->transaction = Storage::DataStore::Transaction(); |
119 | } | 123 | } |
120 | 124 | ||
125 | bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) | ||
126 | { | ||
127 | if (entity_.identifier().isEmpty()) { | ||
128 | SinkWarning() << "Can't write entity with an empty identifier"; | ||
129 | return false; | ||
130 | } | ||
131 | |||
132 | auto entity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(entity_, entity_.availableProperties()); | ||
133 | entity.setChangedProperties(entity.availableProperties().toSet()); | ||
134 | |||
135 | preprocess(entity); | ||
136 | d->typeIndex(type).add(entity.identifier(), entity, d->transaction); | ||
137 | |||
138 | //The maxRevision may have changed meanwhile if the entity created sub-entities | ||
139 | const qint64 newRevision = maxRevision() + 1; | ||
140 | |||
141 | // Add metadata buffer | ||
142 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
143 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
144 | metadataBuilder.add_revision(newRevision); | ||
145 | metadataBuilder.add_operation(Operation_Creation); | ||
146 | metadataBuilder.add_replayToSource(replayToSource); | ||
147 | auto metadataBuffer = metadataBuilder.Finish(); | ||
148 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
149 | |||
150 | flatbuffers::FlatBufferBuilder fbb; | ||
151 | d->resourceContext.adaptorFactory(type).createBuffer(entity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | ||
152 | |||
153 | DataStore::mainDatabase(d->transaction, type) | ||
154 | .write(DataStore::assembleKey(entity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), | ||
155 | [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << entity.identifier() << newRevision; }); | ||
156 | DataStore::setMaxRevision(d->transaction, newRevision); | ||
157 | DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); | ||
158 | SinkTrace() << "Wrote entity: " << entity.identifier() << type << newRevision; | ||
159 | return true; | ||
160 | } | ||
161 | |||
162 | bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &preprocess) | ||
163 | { | ||
164 | auto changeset = diff.changedProperties(); | ||
165 | //TODO handle errors | ||
166 | const auto current = readLatest(type, diff.identifier()); | ||
167 | if (current.identifier().isEmpty()) { | ||
168 | SinkWarning() << "Failed to read current version: " << diff.identifier(); | ||
169 | return false; | ||
170 | } | ||
171 | |||
172 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); | ||
173 | |||
174 | // Apply diff | ||
175 | //SinkTrace() << "Applying changed properties: " << changeset; | ||
176 | for (const auto &property : changeset) { | ||
177 | const auto value = diff.getProperty(property); | ||
178 | if (value.isValid()) { | ||
179 | //SinkTrace() << "Setting property: " << property; | ||
180 | newEntity.setProperty(property, value); | ||
181 | } | ||
182 | } | ||
183 | |||
184 | // Remove deletions | ||
185 | for (const auto property : deletions) { | ||
186 | //SinkTrace() << "Removing property: " << property; | ||
187 | newEntity.setProperty(property, QVariant()); | ||
188 | } | ||
189 | |||
190 | preprocess(current, newEntity); | ||
191 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); | ||
192 | d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction); | ||
193 | |||
194 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
195 | |||
196 | // Add metadata buffer | ||
197 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
198 | { | ||
199 | //We add availableProperties to account for the properties that have been changed by the preprocessors | ||
200 | auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newEntity.changedProperties()); | ||
201 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
202 | metadataBuilder.add_revision(newRevision); | ||
203 | metadataBuilder.add_operation(Operation_Modification); | ||
204 | metadataBuilder.add_replayToSource(replayToSource); | ||
205 | metadataBuilder.add_modifiedProperties(modifiedProperties); | ||
206 | auto metadataBuffer = metadataBuilder.Finish(); | ||
207 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
208 | } | ||
209 | |||
210 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); | ||
211 | SinkTrace() << "All properties: " << newEntity.availableProperties(); | ||
212 | |||
213 | flatbuffers::FlatBufferBuilder fbb; | ||
214 | d->resourceContext.adaptorFactory(type).createBuffer(newEntity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | ||
215 | |||
216 | DataStore::mainDatabase(d->transaction, type) | ||
217 | .write(DataStore::assembleKey(newEntity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), | ||
218 | [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << newEntity.identifier() << newRevision; }); | ||
219 | DataStore::setMaxRevision(d->transaction, newRevision); | ||
220 | DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); | ||
221 | SinkTrace() << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; | ||
222 | return true; | ||
223 | } | ||
224 | |||
225 | bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &preprocess) | ||
226 | { | ||
227 | bool found = false; | ||
228 | bool alreadyRemoved = false; | ||
229 | DataStore::mainDatabase(d->transaction, type) | ||
230 | .findLatest(uid, | ||
231 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | ||
232 | auto entity = GetEntity(data.data()); | ||
233 | if (entity && entity->metadata()) { | ||
234 | auto metadata = GetMetadata(entity->metadata()->Data()); | ||
235 | found = true; | ||
236 | if (metadata->operation() == Operation_Removal) { | ||
237 | alreadyRemoved = true; | ||
238 | } | ||
239 | } | ||
240 | return false; | ||
241 | }, | ||
242 | [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); | ||
243 | |||
244 | if (!found) { | ||
245 | SinkWarning() << "Failed to find entity " << uid; | ||
246 | return false; | ||
247 | } | ||
248 | if (alreadyRemoved) { | ||
249 | SinkWarning() << "Entity is already removed " << uid; | ||
250 | return false; | ||
251 | } | ||
252 | |||
253 | const auto current = readLatest(type, uid); | ||
254 | preprocess(current); | ||
255 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); | ||
256 | |||
257 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; | ||
258 | |||
259 | // Add metadata buffer | ||
260 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
261 | auto metadataBuilder = MetadataBuilder(metadataFbb); | ||
262 | metadataBuilder.add_revision(newRevision); | ||
263 | metadataBuilder.add_operation(Operation_Removal); | ||
264 | metadataBuilder.add_replayToSource(replayToSource); | ||
265 | auto metadataBuffer = metadataBuilder.Finish(); | ||
266 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
267 | |||
268 | flatbuffers::FlatBufferBuilder fbb; | ||
269 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | ||
270 | |||
271 | DataStore::mainDatabase(d->transaction, type) | ||
272 | .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | ||
273 | [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); | ||
274 | DataStore::setMaxRevision(d->transaction, newRevision); | ||
275 | DataStore::recordRevision(d->transaction, newRevision, uid, type); | ||
276 | return true; | ||
277 | } | ||
278 | |||
279 | void EntityStore::cleanupRevision(qint64 revision) | ||
280 | { | ||
281 | const auto uid = DataStore::getUidFromRevision(d->transaction, revision); | ||
282 | const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); | ||
283 | SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; | ||
284 | DataStore::mainDatabase(d->transaction, bufferType) | ||
285 | .scan(uid, | ||
286 | [&](const QByteArray &key, const QByteArray &data) -> bool { | ||
287 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
288 | if (!buffer.isValid()) { | ||
289 | SinkWarning() << "Read invalid buffer from disk"; | ||
290 | } else { | ||
291 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); | ||
292 | const qint64 rev = metadata->revision(); | ||
293 | // Remove old revisions, and the current if the entity has already been removed | ||
294 | if (rev < revision || metadata->operation() == Operation_Removal) { | ||
295 | DataStore::removeRevision(d->transaction, rev); | ||
296 | DataStore::mainDatabase(d->transaction, bufferType).remove(key); | ||
297 | } | ||
298 | } | ||
299 | |||
300 | return true; | ||
301 | }, | ||
302 | [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); | ||
303 | DataStore::setCleanedUpRevision(d->transaction, revision); | ||
304 | } | ||
305 | |||
121 | QVector<QByteArray> EntityStore::fullScan(const QByteArray &type) | 306 | QVector<QByteArray> EntityStore::fullScan(const QByteArray &type) |
122 | { | 307 | { |
123 | SinkTrace() << "Looking for : " << type; | 308 | SinkTrace() << "Looking for : " << type; |
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index 455e9c3..65bff50 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h | |||
@@ -38,9 +38,14 @@ public: | |||
38 | typedef QSharedPointer<EntityStore> Ptr; | 38 | typedef QSharedPointer<EntityStore> Ptr; |
39 | EntityStore(const ResourceContext &resourceContext); | 39 | EntityStore(const ResourceContext &resourceContext); |
40 | 40 | ||
41 | void add(const ApplicationDomain::ApplicationDomainType &); | 41 | typedef std::function<void(const ApplicationDomain::ApplicationDomainType &, ApplicationDomain::ApplicationDomainType &)> PreprocessModification; |
42 | void modify(const ApplicationDomain::ApplicationDomainType &); | 42 | typedef std::function<void(ApplicationDomain::ApplicationDomainType &)> PreprocessCreation; |
43 | void remove(const ApplicationDomain::ApplicationDomainType &); | 43 | typedef std::function<void(const ApplicationDomain::ApplicationDomainType &)> PreprocessRemoval; |
44 | |||
45 | bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); | ||
46 | bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &); | ||
47 | bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); | ||
48 | void cleanupRevision(qint64 revision); | ||
44 | 49 | ||
45 | void startTransaction(Sink::Storage::DataStore::AccessMode); | 50 | void startTransaction(Sink::Storage::DataStore::AccessMode); |
46 | void commitTransaction(); | 51 | void commitTransaction(); |