diff options
-rw-r--r-- | common/pipeline.cpp | 40 | ||||
-rw-r--r-- | common/pipeline.h | 1 |
2 files changed, 21 insertions, 20 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 6c75bde..93a79f5 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -124,14 +124,20 @@ void Pipeline::null() | |||
124 | // state.step(); | 124 | // state.step(); |
125 | } | 125 | } |
126 | 126 | ||
127 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | 127 | void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
128 | { | 128 | { |
129 | Log() << "Pipeline: New Entity"; | 129 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), |
130 | 130 | [](const Akonadi2::Storage::Error &error) { | |
131 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. | 131 | Warning() << "Failed to write entity"; |
132 | const auto key = QUuid::createUuid().toString().toUtf8(); | 132 | } |
133 | ); | ||
134 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | ||
135 | Akonadi2::Storage::recordRevision(d->transaction, newRevision, uid, bufferType); | ||
136 | } | ||
133 | 137 | ||
134 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; | 138 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) |
139 | { | ||
140 | Trace() << "Pipeline: New Entity"; | ||
135 | 141 | ||
136 | { | 142 | { |
137 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 143 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
@@ -142,7 +148,6 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
142 | } | 148 | } |
143 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | 149 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); |
144 | 150 | ||
145 | //TODO rename createEntitiy->domainType to bufferType | ||
146 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); | 151 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); |
147 | { | 152 | { |
148 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 153 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
@@ -157,6 +162,9 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
157 | return KAsync::error<qint64>(0); | 162 | return KAsync::error<qint64>(0); |
158 | } | 163 | } |
159 | 164 | ||
165 | const auto key = QUuid::createUuid().toString().toUtf8(); | ||
166 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; | ||
167 | |||
160 | //Add metadata buffer | 168 | //Add metadata buffer |
161 | flatbuffers::FlatBufferBuilder metadataFbb; | 169 | flatbuffers::FlatBufferBuilder metadataFbb; |
162 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | 170 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); |
@@ -164,18 +172,12 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
164 | metadataBuilder.add_processed(false); | 172 | metadataBuilder.add_processed(false); |
165 | auto metadataBuffer = metadataBuilder.Finish(); | 173 | auto metadataBuffer = metadataBuilder.Finish(); |
166 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | 174 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); |
167 | //TODO we should reserve some space in metadata for in-place updates | ||
168 | 175 | ||
169 | flatbuffers::FlatBufferBuilder fbb; | 176 | flatbuffers::FlatBufferBuilder fbb; |
170 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 177 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); |
171 | 178 | ||
172 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), | 179 | storeNewRevision(newRevision, fbb, bufferType, key); |
173 | [](const Akonadi2::Storage::Error &error) { | 180 | |
174 | Warning() << "Failed to write entity"; | ||
175 | } | ||
176 | ); | ||
177 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | ||
178 | Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType); | ||
179 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 181 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
180 | 182 | ||
181 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { | 183 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { |
@@ -190,7 +192,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
190 | 192 | ||
191 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 193 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
192 | { | 194 | { |
193 | Log() << "Pipeline: Modified Entity"; | 195 | Trace() << "Pipeline: Modified Entity"; |
194 | 196 | ||
195 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; | 197 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; |
196 | 198 | ||
@@ -279,9 +281,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
279 | flatbuffers::FlatBufferBuilder fbb; | 281 | flatbuffers::FlatBufferBuilder fbb; |
280 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 282 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
281 | 283 | ||
282 | d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 284 | storeNewRevision(newRevision, fbb, bufferType, key); |
283 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); | ||
284 | Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType); | ||
285 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 285 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
286 | 286 | ||
287 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { | 287 | return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { |
@@ -296,7 +296,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
296 | 296 | ||
297 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | 297 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) |
298 | { | 298 | { |
299 | Log() << "Pipeline: Deleted Entity"; | 299 | Trace() << "Pipeline: Deleted Entity"; |
300 | 300 | ||
301 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; | 301 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; |
302 | 302 | ||
diff --git a/common/pipeline.h b/common/pipeline.h index 89232d0..f4e8ae0 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -80,6 +80,7 @@ private: | |||
80 | //Don't use a reference here (it would invalidate itself) | 80 | //Don't use a reference here (it would invalidate itself) |
81 | void pipelineCompleted(PipelineState state); | 81 | void pipelineCompleted(PipelineState state); |
82 | void scheduleStep(); | 82 | void scheduleStep(); |
83 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | ||
83 | 84 | ||
84 | friend class PipelineState; | 85 | friend class PipelineState; |
85 | 86 | ||