summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-19 14:05:05 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-19 14:05:05 +0200
commit67bb6035b6333fe0d6d8566b5962f83c5870185f (patch)
tree39f2fdbeb4ad814cbe0066f1df627b56328f5fe1 /common
parentb6502ce1137b3ef7af8a908a9fa5d8fbeed6ed32 (diff)
downloadsink-67bb6035b6333fe0d6d8566b5962f83c5870185f.tar.gz
sink-67bb6035b6333fe0d6d8566b5962f83c5870185f.zip
Transactions in the pipeline
Diffstat (limited to 'common')
-rw-r--r--common/genericresource.cpp34
-rw-r--r--common/genericresource.h2
-rw-r--r--common/pipeline.cpp48
-rw-r--r--common/pipeline.h3
4 files changed, 61 insertions, 26 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index bbd992b..3b3fdb0 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -101,26 +101,26 @@ private slots:
101 //Process all messages of this queue 101 //Process all messages of this queue
102 KAsync::Job<void> processQueue(MessageQueue *queue) 102 KAsync::Job<void> processQueue(MessageQueue *queue)
103 { 103 {
104 //TODO use something like: 104 return KAsync::start<void>([this](){
105 //KAsync::foreach("pass iterator here").each("process value here").join(); 105 mPipeline->startTransaction();
106 //KAsync::foreach("pass iterator here").parallel("process value here").join(); 106 }).then(KAsync::dowhile(
107 return KAsync::dowhile( 107 [queue]() { return !queue->isEmpty(); },
108 [this, queue](KAsync::Future<bool> &future) { 108 [this, queue](KAsync::Future<void> &future) {
109 queue->dequeueBatch(100, [this](const QByteArray &data) { 109 queue->dequeueBatch(100, [this](const QByteArray &data) {
110 Trace() << "Got value"; 110 Trace() << "Got value";
111 return processQueuedCommand(data); 111 return processQueuedCommand(data);
112 } 112 }
113 ).then<void>([&future](){ 113 ).then<void>([&future, queue](){
114 future.setValue(true);
115 future.setFinished(); 114 future.setFinished();
116 }, 115 },
117 [&future](int i, QString error) { 116 [&future](int i, QString error) {
118 Warning() << "Error while getting message from messagequeue: " << error; 117 Warning() << "Error while getting message from messagequeue: " << error;
119 future.setValue(false);
120 future.setFinished(); 118 future.setFinished();
121 }).exec(); 119 }).exec();
122 } 120 }
123 ); 121 )).then<void>([this]() {
122 mPipeline->commit();
123 });
124 } 124 }
125 125
126 KAsync::Job<void> processPipeline() 126 KAsync::Job<void> processPipeline()
@@ -158,6 +158,10 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
158 mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); 158 mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
159 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 159 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
160 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 160 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
161
162 mCommitQueueTimer.setInterval(100);
163 mCommitQueueTimer.setSingleShot(true);
164 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit);
161} 165}
162 166
163GenericResource::~GenericResource() 167GenericResource::~GenericResource()
@@ -187,10 +191,16 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt
187 191
188void GenericResource::processCommand(int commandId, const QByteArray &data) 192void GenericResource::processCommand(int commandId, const QByteArray &data)
189{ 193{
190 //TODO instead of copying the command including the full entity first into the command queue, we could directly 194 static int modifications = 0;
191 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). 195 mUserQueue.startTransaction();
192 //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire).
193 enqueueCommand(mUserQueue, commandId, data); 196 enqueueCommand(mUserQueue, commandId, data);
197 modifications++;
198 if (modifications >= 100) {
199 mUserQueue.commit();
200 modifications = 0;
201 } else {
202 mCommitQueueTimer.start();
203 }
194} 204}
195 205
196static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) 206static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue)
diff --git a/common/genericresource.h b/common/genericresource.h
index 4a285ea..532632e 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -23,6 +23,7 @@
23#include <resource.h> 23#include <resource.h>
24#include <messagequeue.h> 24#include <messagequeue.h>
25#include <flatbuffers/flatbuffers.h> 25#include <flatbuffers/flatbuffers.h>
26#include <QTimer>
26 27
27class Processor; 28class Processor;
28 29
@@ -56,6 +57,7 @@ protected:
56private: 57private:
57 Processor *mProcessor; 58 Processor *mProcessor;
58 int mError; 59 int mError;
60 QTimer mCommitQueueTimer;
59}; 61};
60 62
61} 63}
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 207cc5e..27b9deb 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -47,6 +47,7 @@ public:
47 } 47 }
48 48
49 Storage storage; 49 Storage storage;
50 Storage::Transaction transaction;
50 QHash<QString, QVector<Preprocessor *> > nullPipeline; 51 QHash<QString, QVector<Preprocessor *> > nullPipeline;
51 QHash<QString, QVector<Preprocessor *> > newPipeline; 52 QHash<QString, QVector<Preprocessor *> > newPipeline;
52 QHash<QString, QVector<Preprocessor *> > modifiedPipeline; 53 QHash<QString, QVector<Preprocessor *> > modifiedPipeline;
@@ -89,6 +90,27 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac
89 d->adaptorFactory.insert(entityType, factory); 90 d->adaptorFactory.insert(entityType, factory);
90} 91}
91 92
93void Pipeline::startTransaction()
94{
95 if (d->transaction) {
96 return;
97 }
98 d->transaction = std::move(storage().createTransaction(Akonadi2::Storage::ReadWrite));
99}
100
101void Pipeline::commit()
102{
103 if (d->transaction) {
104 d->transaction.commit();
105 }
106 d->transaction = Storage::Transaction();
107}
108
109Storage::Transaction &Pipeline::transaction()
110{
111 return d->transaction;
112}
113
92Storage &Pipeline::storage() const 114Storage &Pipeline::storage() const
93{ 115{
94 return d->storage; 116 return d->storage;
@@ -109,7 +131,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
109 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. 131 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
110 const auto key = QUuid::createUuid().toString().toUtf8(); 132 const auto key = QUuid::createUuid().toString().toUtf8();
111 133
112 const qint64 newRevision = storage().maxRevision() + 1; 134 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1;
113 135
114 { 136 {
115 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 137 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
@@ -143,10 +165,8 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
143 flatbuffers::FlatBufferBuilder fbb; 165 flatbuffers::FlatBufferBuilder fbb;
144 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 166 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
145 167
146 auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); 168 d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
147 transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 169 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
148 Akonadi2::Storage::setMaxRevision(transaction, newRevision);
149 transaction.commit();
150 Log() << "Pipeline: wrote entity: " << key << newRevision; 170 Log() << "Pipeline: wrote entity: " << key << newRevision;
151 171
152 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 172 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
@@ -162,7 +182,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
162{ 182{
163 Log() << "Pipeline: Modified Entity"; 183 Log() << "Pipeline: Modified Entity";
164 184
165 const qint64 newRevision = storage().maxRevision() + 1; 185 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1;
166 186
167 { 187 {
168 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 188 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
@@ -245,9 +265,8 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
245 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 265 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
246 266
247 //TODO don't overwrite the old entry, but instead store a new revision 267 //TODO don't overwrite the old entry, but instead store a new revision
248 auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); 268 d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
249 transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 269 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
250 Akonadi2::Storage::setMaxRevision(transaction, newRevision);
251 270
252 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 271 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
253 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { 272 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() {
@@ -262,7 +281,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
262{ 281{
263 Log() << "Pipeline: Deleted Entity"; 282 Log() << "Pipeline: Deleted Entity";
264 283
265 const qint64 newRevision = storage().maxRevision() + 1; 284 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1;
266 285
267 { 286 {
268 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 287 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
@@ -277,9 +296,8 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
277 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 296 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
278 297
279 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted 298 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted
280 auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); 299 d->transaction.remove(key);
281 transaction.remove(key); 300 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
282 Akonadi2::Storage::setMaxRevision(transaction, newRevision);
283 Log() << "Pipeline: deleted entity: "<< newRevision; 301 Log() << "Pipeline: deleted entity: "<< newRevision;
284 302
285 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 303 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
@@ -418,7 +436,9 @@ void PipelineState::step()
418 //TODO skip step if already processed 436 //TODO skip step if already processed
419 //FIXME error handling if no result is found 437 //FIXME error handling if no result is found
420 auto preprocessor = d->filterIt.next(); 438 auto preprocessor = d->filterIt.next();
421 d->pipeline->storage().createTransaction(Akonadi2::Storage::ReadOnly).scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool { 439 //FIXME this read should not be necessary
440 //Perhaps simply use entity that is initially stored and synchronously process all filters. (Making the first filter somewhat redundant)
441 d->pipeline->transaction().scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool {
422 auto entity = Akonadi2::GetEntity(value); 442 auto entity = Akonadi2::GetEntity(value);
423 preprocessor->process(*this, *entity); 443 preprocessor->process(*this, *entity);
424 return false; 444 return false;
diff --git a/common/pipeline.h b/common/pipeline.h
index 1a33f9a..7307b2e 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -51,6 +51,9 @@ public:
51 Storage &storage() const; 51 Storage &storage() const;
52 52
53 void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); 53 void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors);
54 void startTransaction();
55 void commit();
56 Storage::Transaction &transaction();
54 57
55 void null(); 58 void null();
56 void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); 59 void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory);