diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-19 14:05:05 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-19 14:05:05 +0200 |
commit | 67bb6035b6333fe0d6d8566b5962f83c5870185f (patch) | |
tree | 39f2fdbeb4ad814cbe0066f1df627b56328f5fe1 /common | |
parent | b6502ce1137b3ef7af8a908a9fa5d8fbeed6ed32 (diff) | |
download | sink-67bb6035b6333fe0d6d8566b5962f83c5870185f.tar.gz sink-67bb6035b6333fe0d6d8566b5962f83c5870185f.zip |
Transactions in the pipeline
Diffstat (limited to 'common')
-rw-r--r-- | common/genericresource.cpp | 34 | ||||
-rw-r--r-- | common/genericresource.h | 2 | ||||
-rw-r--r-- | common/pipeline.cpp | 48 | ||||
-rw-r--r-- | common/pipeline.h | 3 |
4 files changed, 61 insertions, 26 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index bbd992b..3b3fdb0 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -101,26 +101,26 @@ private slots: | |||
101 | //Process all messages of this queue | 101 | //Process all messages of this queue |
102 | KAsync::Job<void> processQueue(MessageQueue *queue) | 102 | KAsync::Job<void> processQueue(MessageQueue *queue) |
103 | { | 103 | { |
104 | //TODO use something like: | 104 | return KAsync::start<void>([this](){ |
105 | //KAsync::foreach("pass iterator here").each("process value here").join(); | 105 | mPipeline->startTransaction(); |
106 | //KAsync::foreach("pass iterator here").parallel("process value here").join(); | 106 | }).then(KAsync::dowhile( |
107 | return KAsync::dowhile( | 107 | [queue]() { return !queue->isEmpty(); }, |
108 | [this, queue](KAsync::Future<bool> &future) { | 108 | [this, queue](KAsync::Future<void> &future) { |
109 | queue->dequeueBatch(100, [this](const QByteArray &data) { | 109 | queue->dequeueBatch(100, [this](const QByteArray &data) { |
110 | Trace() << "Got value"; | 110 | Trace() << "Got value"; |
111 | return processQueuedCommand(data); | 111 | return processQueuedCommand(data); |
112 | } | 112 | } |
113 | ).then<void>([&future](){ | 113 | ).then<void>([&future, queue](){ |
114 | future.setValue(true); | ||
115 | future.setFinished(); | 114 | future.setFinished(); |
116 | }, | 115 | }, |
117 | [&future](int i, QString error) { | 116 | [&future](int i, QString error) { |
118 | Warning() << "Error while getting message from messagequeue: " << error; | 117 | Warning() << "Error while getting message from messagequeue: " << error; |
119 | future.setValue(false); | ||
120 | future.setFinished(); | 118 | future.setFinished(); |
121 | }).exec(); | 119 | }).exec(); |
122 | } | 120 | } |
123 | ); | 121 | )).then<void>([this]() { |
122 | mPipeline->commit(); | ||
123 | }); | ||
124 | } | 124 | } |
125 | 125 | ||
126 | KAsync::Job<void> processPipeline() | 126 | KAsync::Job<void> processPipeline() |
@@ -158,6 +158,10 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
158 | mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | 158 | mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); |
159 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 159 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
160 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 160 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
161 | |||
162 | mCommitQueueTimer.setInterval(100); | ||
163 | mCommitQueueTimer.setSingleShot(true); | ||
164 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); | ||
161 | } | 165 | } |
162 | 166 | ||
163 | GenericResource::~GenericResource() | 167 | GenericResource::~GenericResource() |
@@ -187,10 +191,16 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt | |||
187 | 191 | ||
188 | void GenericResource::processCommand(int commandId, const QByteArray &data) | 192 | void GenericResource::processCommand(int commandId, const QByteArray &data) |
189 | { | 193 | { |
190 | //TODO instead of copying the command including the full entity first into the command queue, we could directly | 194 | static int modifications = 0; |
191 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). | 195 | mUserQueue.startTransaction(); |
192 | //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). | ||
193 | enqueueCommand(mUserQueue, commandId, data); | 196 | enqueueCommand(mUserQueue, commandId, data); |
197 | modifications++; | ||
198 | if (modifications >= 100) { | ||
199 | mUserQueue.commit(); | ||
200 | modifications = 0; | ||
201 | } else { | ||
202 | mCommitQueueTimer.start(); | ||
203 | } | ||
194 | } | 204 | } |
195 | 205 | ||
196 | static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) | 206 | static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) |
diff --git a/common/genericresource.h b/common/genericresource.h index 4a285ea..532632e 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -23,6 +23,7 @@ | |||
23 | #include <resource.h> | 23 | #include <resource.h> |
24 | #include <messagequeue.h> | 24 | #include <messagequeue.h> |
25 | #include <flatbuffers/flatbuffers.h> | 25 | #include <flatbuffers/flatbuffers.h> |
26 | #include <QTimer> | ||
26 | 27 | ||
27 | class Processor; | 28 | class Processor; |
28 | 29 | ||
@@ -56,6 +57,7 @@ protected: | |||
56 | private: | 57 | private: |
57 | Processor *mProcessor; | 58 | Processor *mProcessor; |
58 | int mError; | 59 | int mError; |
60 | QTimer mCommitQueueTimer; | ||
59 | }; | 61 | }; |
60 | 62 | ||
61 | } | 63 | } |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 207cc5e..27b9deb 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -47,6 +47,7 @@ public: | |||
47 | } | 47 | } |
48 | 48 | ||
49 | Storage storage; | 49 | Storage storage; |
50 | Storage::Transaction transaction; | ||
50 | QHash<QString, QVector<Preprocessor *> > nullPipeline; | 51 | QHash<QString, QVector<Preprocessor *> > nullPipeline; |
51 | QHash<QString, QVector<Preprocessor *> > newPipeline; | 52 | QHash<QString, QVector<Preprocessor *> > newPipeline; |
52 | QHash<QString, QVector<Preprocessor *> > modifiedPipeline; | 53 | QHash<QString, QVector<Preprocessor *> > modifiedPipeline; |
@@ -89,6 +90,27 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac | |||
89 | d->adaptorFactory.insert(entityType, factory); | 90 | d->adaptorFactory.insert(entityType, factory); |
90 | } | 91 | } |
91 | 92 | ||
93 | void Pipeline::startTransaction() | ||
94 | { | ||
95 | if (d->transaction) { | ||
96 | return; | ||
97 | } | ||
98 | d->transaction = std::move(storage().createTransaction(Akonadi2::Storage::ReadWrite)); | ||
99 | } | ||
100 | |||
101 | void Pipeline::commit() | ||
102 | { | ||
103 | if (d->transaction) { | ||
104 | d->transaction.commit(); | ||
105 | } | ||
106 | d->transaction = Storage::Transaction(); | ||
107 | } | ||
108 | |||
109 | Storage::Transaction &Pipeline::transaction() | ||
110 | { | ||
111 | return d->transaction; | ||
112 | } | ||
113 | |||
92 | Storage &Pipeline::storage() const | 114 | Storage &Pipeline::storage() const |
93 | { | 115 | { |
94 | return d->storage; | 116 | return d->storage; |
@@ -109,7 +131,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
109 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. | 131 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. |
110 | const auto key = QUuid::createUuid().toString().toUtf8(); | 132 | const auto key = QUuid::createUuid().toString().toUtf8(); |
111 | 133 | ||
112 | const qint64 newRevision = storage().maxRevision() + 1; | 134 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; |
113 | 135 | ||
114 | { | 136 | { |
115 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 137 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
@@ -143,10 +165,8 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
143 | flatbuffers::FlatBufferBuilder fbb; | 165 | flatbuffers::FlatBufferBuilder fbb; |
144 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 166 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); |
145 | 167 | ||
146 | auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); | 168 | d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
147 | transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 169 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
148 | Akonadi2::Storage::setMaxRevision(transaction, newRevision); | ||
149 | transaction.commit(); | ||
150 | Log() << "Pipeline: wrote entity: " << key << newRevision; | 170 | Log() << "Pipeline: wrote entity: " << key << newRevision; |
151 | 171 | ||
152 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | 172 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { |
@@ -162,7 +182,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
162 | { | 182 | { |
163 | Log() << "Pipeline: Modified Entity"; | 183 | Log() << "Pipeline: Modified Entity"; |
164 | 184 | ||
165 | const qint64 newRevision = storage().maxRevision() + 1; | 185 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; |
166 | 186 | ||
167 | { | 187 | { |
168 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 188 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
@@ -245,9 +265,8 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
245 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 265 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
246 | 266 | ||
247 | //TODO don't overwrite the old entry, but instead store a new revision | 267 | //TODO don't overwrite the old entry, but instead store a new revision |
248 | auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); | 268 | d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
249 | transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 269 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
250 | Akonadi2::Storage::setMaxRevision(transaction, newRevision); | ||
251 | 270 | ||
252 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | 271 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { |
253 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { | 272 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { |
@@ -262,7 +281,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
262 | { | 281 | { |
263 | Log() << "Pipeline: Deleted Entity"; | 282 | Log() << "Pipeline: Deleted Entity"; |
264 | 283 | ||
265 | const qint64 newRevision = storage().maxRevision() + 1; | 284 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; |
266 | 285 | ||
267 | { | 286 | { |
268 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 287 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
@@ -277,9 +296,8 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
277 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 296 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
278 | 297 | ||
279 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted | 298 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted |
280 | auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); | 299 | d->transaction.remove(key); |
281 | transaction.remove(key); | 300 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
282 | Akonadi2::Storage::setMaxRevision(transaction, newRevision); | ||
283 | Log() << "Pipeline: deleted entity: "<< newRevision; | 301 | Log() << "Pipeline: deleted entity: "<< newRevision; |
284 | 302 | ||
285 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | 303 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { |
@@ -418,7 +436,9 @@ void PipelineState::step() | |||
418 | //TODO skip step if already processed | 436 | //TODO skip step if already processed |
419 | //FIXME error handling if no result is found | 437 | //FIXME error handling if no result is found |
420 | auto preprocessor = d->filterIt.next(); | 438 | auto preprocessor = d->filterIt.next(); |
421 | d->pipeline->storage().createTransaction(Akonadi2::Storage::ReadOnly).scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool { | 439 | //FIXME this read should not be necessary |
440 | //Perhaps simply use entity that is initially stored and synchronously process all filters. (Making the first filter somewhat redundant) | ||
441 | d->pipeline->transaction().scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool { | ||
422 | auto entity = Akonadi2::GetEntity(value); | 442 | auto entity = Akonadi2::GetEntity(value); |
423 | preprocessor->process(*this, *entity); | 443 | preprocessor->process(*this, *entity); |
424 | return false; | 444 | return false; |
diff --git a/common/pipeline.h b/common/pipeline.h index 1a33f9a..7307b2e 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -51,6 +51,9 @@ public: | |||
51 | Storage &storage() const; | 51 | Storage &storage() const; |
52 | 52 | ||
53 | void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); | 53 | void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); |
54 | void startTransaction(); | ||
55 | void commit(); | ||
56 | Storage::Transaction &transaction(); | ||
54 | 57 | ||
55 | void null(); | 58 | void null(); |
56 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); | 59 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); |