diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-02-10 10:33:31 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-02-10 10:33:31 +0100 |
commit | c1c336a9064557bb987c30582bce84bab3f869bc (patch) | |
tree | 7868cfdb5662bf508c30d8f29c222f6aab76bb3d | |
parent | 1f547bac86c0b4dc3f4ce6d872fe49bbfd77ea51 (diff) | |
download | sink-c1c336a9064557bb987c30582bce84bab3f869bc.tar.gz sink-c1c336a9064557bb987c30582bce84bab3f869bc.zip |
Gather more timings
-rw-r--r-- | common/genericresource.cpp | 15 | ||||
-rw-r--r-- | common/pipeline.cpp | 12 |
2 files changed, 21 insertions, 6 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index c097893..2688df0 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -234,15 +234,17 @@ private slots: | |||
234 | //Process all messages of this queue | 234 | //Process all messages of this queue |
235 | KAsync::Job<void> processQueue(MessageQueue *queue) | 235 | KAsync::Job<void> processQueue(MessageQueue *queue) |
236 | { | 236 | { |
237 | auto time = QSharedPointer<QTime>::create(); | ||
237 | return KAsync::start<void>([this](){ | 238 | return KAsync::start<void>([this](){ |
238 | mPipeline->startTransaction(); | 239 | mPipeline->startTransaction(); |
239 | }).then(KAsync::dowhile( | 240 | }).then(KAsync::dowhile( |
240 | [queue]() { return !queue->isEmpty(); }, | 241 | [queue]() { return !queue->isEmpty(); }, |
241 | [this, queue](KAsync::Future<void> &future) { | 242 | [this, queue, time](KAsync::Future<void> &future) { |
242 | queue->dequeueBatch(sBatchSize, [this](const QByteArray &data) { | 243 | queue->dequeueBatch(sBatchSize, [this, time](const QByteArray &data) { |
243 | return KAsync::start<void>([this, data](KAsync::Future<void> &future) { | 244 | time->start(); |
244 | processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) { | 245 | return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { |
245 | Trace() << "Created revision " << createdRevision; | 246 | processQueuedCommand(data).then<void, qint64>([&future, this, time](qint64 createdRevision) { |
247 | Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); | ||
246 | future.setFinished(); | 248 | future.setFinished(); |
247 | }).exec(); | 249 | }).exec(); |
248 | }); | 250 | }); |
@@ -264,12 +266,15 @@ private slots: | |||
264 | 266 | ||
265 | KAsync::Job<void> processPipeline() | 267 | KAsync::Job<void> processPipeline() |
266 | { | 268 | { |
269 | auto time = QSharedPointer<QTime>::create(); | ||
270 | time->start(); | ||
267 | mPipeline->startTransaction(); | 271 | mPipeline->startTransaction(); |
268 | Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; | 272 | Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; |
269 | for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { | 273 | for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { |
270 | mPipeline->cleanupRevision(revision); | 274 | mPipeline->cleanupRevision(revision); |
271 | } | 275 | } |
272 | mPipeline->commit(); | 276 | mPipeline->commit(); |
277 | Trace() << "Cleanup done." << Log::TraceTime(time->elapsed()); | ||
273 | 278 | ||
274 | //Go through all message queues | 279 | //Go through all message queues |
275 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | 280 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 627550e..93d8236 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -25,6 +25,7 @@ | |||
25 | #include <QVector> | 25 | #include <QVector> |
26 | #include <QUuid> | 26 | #include <QUuid> |
27 | #include <QDebug> | 27 | #include <QDebug> |
28 | #include <QTime> | ||
28 | #include "entity_generated.h" | 29 | #include "entity_generated.h" |
29 | #include "metadata_generated.h" | 30 | #include "metadata_generated.h" |
30 | #include "createentity_generated.h" | 31 | #include "createentity_generated.h" |
@@ -57,6 +58,8 @@ public: | |||
57 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; | 58 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; |
58 | bool revisionChanged; | 59 | bool revisionChanged; |
59 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | 60 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); |
61 | QTime transactionTime; | ||
62 | int transactionItemCount; | ||
60 | }; | 63 | }; |
61 | 64 | ||
62 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 65 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
@@ -103,6 +106,9 @@ void Pipeline::startTransaction() | |||
103 | if (d->transaction) { | 106 | if (d->transaction) { |
104 | return; | 107 | return; |
105 | } | 108 | } |
109 | Trace() << "Starting transaction."; | ||
110 | d->transactionTime.start(); | ||
111 | d->transactionItemCount = 0; | ||
106 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite)); | 112 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite)); |
107 | } | 113 | } |
108 | 114 | ||
@@ -114,7 +120,8 @@ void Pipeline::commit() | |||
114 | // processor->finalize(); | 120 | // processor->finalize(); |
115 | // } | 121 | // } |
116 | const auto revision = Storage::maxRevision(d->transaction); | 122 | const auto revision = Storage::maxRevision(d->transaction); |
117 | Trace() << "Committing " << revision; | 123 | const auto elapsed = d->transactionTime.elapsed(); |
124 | Trace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " << (double)elapsed/(double)qMax(d->transactionItemCount, 1) << "[ms/item]"; | ||
118 | if (d->transaction) { | 125 | if (d->transaction) { |
119 | d->transaction.commit(); | 126 | d->transaction.commit(); |
120 | } | 127 | } |
@@ -138,6 +145,7 @@ Storage &Pipeline::storage() const | |||
138 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | 145 | KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) |
139 | { | 146 | { |
140 | Trace() << "Pipeline: New Entity"; | 147 | Trace() << "Pipeline: New Entity"; |
148 | d->transactionItemCount++; | ||
141 | 149 | ||
142 | { | 150 | { |
143 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 151 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
@@ -217,6 +225,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
217 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 225 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
218 | { | 226 | { |
219 | Trace() << "Pipeline: Modified Entity"; | 227 | Trace() << "Pipeline: Modified Entity"; |
228 | d->transactionItemCount++; | ||
220 | 229 | ||
221 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; | 230 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; |
222 | 231 | ||
@@ -337,6 +346,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
337 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | 346 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) |
338 | { | 347 | { |
339 | Trace() << "Pipeline: Deleted Entity"; | 348 | Trace() << "Pipeline: Deleted Entity"; |
349 | d->transactionItemCount++; | ||
340 | 350 | ||
341 | { | 351 | { |
342 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 352 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |