summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-10-08 19:19:04 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-10-10 10:40:01 +0200
commit745fb186eeb0c398a5ebf308fe5ee89cf7644869 (patch)
tree47cfcea22349eb5eb33e7288f2da83ede51b22de
parentf269a33d74e76f61f969a661a9428a76ed8e626e (diff)
downloadsink-745fb186eeb0c398a5ebf308fe5ee89cf7644869.tar.gz
sink-745fb186eeb0c398a5ebf308fe5ee89cf7644869.zip
StoreNewRevision function
-rw-r--r--common/pipeline.cpp40
-rw-r--r--common/pipeline.h1
2 files changed, 21 insertions, 20 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 6c75bde..93a79f5 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -124,14 +124,20 @@ void Pipeline::null()
124 // state.step(); 124 // state.step();
125} 125}
126 126
127KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) 127void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
128{ 128{
129 Log() << "Pipeline: New Entity"; 129 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()),
130 130 [](const Akonadi2::Storage::Error &error) {
131 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. 131 Warning() << "Failed to write entity";
132 const auto key = QUuid::createUuid().toString().toUtf8(); 132 }
133 );
134 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
135 Akonadi2::Storage::recordRevision(d->transaction, newRevision, uid, bufferType);
136}
133 137
134 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; 138KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
139{
140 Trace() << "Pipeline: New Entity";
135 141
136 { 142 {
137 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 143 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
@@ -142,7 +148,6 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
142 } 148 }
143 auto createEntity = Akonadi2::Commands::GetCreateEntity(command); 149 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
144 150
145 //TODO rename createEntitiy->domainType to bufferType
146 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); 151 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
147 { 152 {
148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 153 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
@@ -157,6 +162,9 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
157 return KAsync::error<qint64>(0); 162 return KAsync::error<qint64>(0);
158 } 163 }
159 164
165 const auto key = QUuid::createUuid().toString().toUtf8();
166 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1;
167
160 //Add metadata buffer 168 //Add metadata buffer
161 flatbuffers::FlatBufferBuilder metadataFbb; 169 flatbuffers::FlatBufferBuilder metadataFbb;
162 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 170 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
@@ -164,18 +172,12 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
164 metadataBuilder.add_processed(false); 172 metadataBuilder.add_processed(false);
165 auto metadataBuffer = metadataBuilder.Finish(); 173 auto metadataBuffer = metadataBuilder.Finish();
166 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); 174 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
167 //TODO we should reserve some space in metadata for in-place updates
168 175
169 flatbuffers::FlatBufferBuilder fbb; 176 flatbuffers::FlatBufferBuilder fbb;
170 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 177 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
171 178
172 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), 179 storeNewRevision(newRevision, fbb, bufferType, key);
173 [](const Akonadi2::Storage::Error &error) { 180
174 Warning() << "Failed to write entity";
175 }
176 );
177 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
178 Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType);
179 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 181 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
180 182
181 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { 183 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) {
@@ -190,7 +192,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
190 192
191KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 193KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
192{ 194{
193 Log() << "Pipeline: Modified Entity"; 195 Trace() << "Pipeline: Modified Entity";
194 196
195 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; 197 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1;
196 198
@@ -279,9 +281,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
279 flatbuffers::FlatBufferBuilder fbb; 281 flatbuffers::FlatBufferBuilder fbb;
280 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 282 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
281 283
282 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 284 storeNewRevision(newRevision, fbb, bufferType, key);
283 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
284 Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType);
285 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 285 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
286 286
287 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) { 287 return KAsync::start<qint64>([this, key, bufferType, newRevision](KAsync::Future<qint64> &future) {
@@ -296,7 +296,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
296 296
297KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) 297KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
298{ 298{
299 Log() << "Pipeline: Deleted Entity"; 299 Trace() << "Pipeline: Deleted Entity";
300 300
301 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; 301 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1;
302 302
diff --git a/common/pipeline.h b/common/pipeline.h
index 89232d0..f4e8ae0 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -80,6 +80,7 @@ private:
80 //Don't use a reference here (it would invalidate itself) 80 //Don't use a reference here (it would invalidate itself)
81 void pipelineCompleted(PipelineState state); 81 void pipelineCompleted(PipelineState state);
82 void scheduleStep(); 82 void scheduleStep();
83 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
83 84
84 friend class PipelineState; 85 friend class PipelineState;
85 86