summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-02-10 10:33:31 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-02-10 10:33:31 +0100
commitc1c336a9064557bb987c30582bce84bab3f869bc (patch)
tree7868cfdb5662bf508c30d8f29c222f6aab76bb3d
parent1f547bac86c0b4dc3f4ce6d872fe49bbfd77ea51 (diff)
downloadsink-c1c336a9064557bb987c30582bce84bab3f869bc.tar.gz
sink-c1c336a9064557bb987c30582bce84bab3f869bc.zip
Gather more timings
-rw-r--r--common/genericresource.cpp15
-rw-r--r--common/pipeline.cpp12
2 files changed, 21 insertions, 6 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index c097893..2688df0 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -234,15 +234,17 @@ private slots:
234 //Process all messages of this queue 234 //Process all messages of this queue
235 KAsync::Job<void> processQueue(MessageQueue *queue) 235 KAsync::Job<void> processQueue(MessageQueue *queue)
236 { 236 {
237 auto time = QSharedPointer<QTime>::create();
237 return KAsync::start<void>([this](){ 238 return KAsync::start<void>([this](){
238 mPipeline->startTransaction(); 239 mPipeline->startTransaction();
239 }).then(KAsync::dowhile( 240 }).then(KAsync::dowhile(
240 [queue]() { return !queue->isEmpty(); }, 241 [queue]() { return !queue->isEmpty(); },
241 [this, queue](KAsync::Future<void> &future) { 242 [this, queue, time](KAsync::Future<void> &future) {
242 queue->dequeueBatch(sBatchSize, [this](const QByteArray &data) { 243 queue->dequeueBatch(sBatchSize, [this, time](const QByteArray &data) {
243 return KAsync::start<void>([this, data](KAsync::Future<void> &future) { 244 time->start();
244 processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) { 245 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) {
245 Trace() << "Created revision " << createdRevision; 246 processQueuedCommand(data).then<void, qint64>([&future, this, time](qint64 createdRevision) {
247 Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
246 future.setFinished(); 248 future.setFinished();
247 }).exec(); 249 }).exec();
248 }); 250 });
@@ -264,12 +266,15 @@ private slots:
264 266
265 KAsync::Job<void> processPipeline() 267 KAsync::Job<void> processPipeline()
266 { 268 {
269 auto time = QSharedPointer<QTime>::create();
270 time->start();
267 mPipeline->startTransaction(); 271 mPipeline->startTransaction();
268 Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; 272 Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision;
269 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { 273 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) {
270 mPipeline->cleanupRevision(revision); 274 mPipeline->cleanupRevision(revision);
271 } 275 }
272 mPipeline->commit(); 276 mPipeline->commit();
277 Trace() << "Cleanup done." << Log::TraceTime(time->elapsed());
273 278
274 //Go through all message queues 279 //Go through all message queues
275 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); 280 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 627550e..93d8236 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -25,6 +25,7 @@
25#include <QVector> 25#include <QVector>
26#include <QUuid> 26#include <QUuid>
27#include <QDebug> 27#include <QDebug>
28#include <QTime>
28#include "entity_generated.h" 29#include "entity_generated.h"
29#include "metadata_generated.h" 30#include "metadata_generated.h"
30#include "createentity_generated.h" 31#include "createentity_generated.h"
@@ -57,6 +58,8 @@ public:
57 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; 58 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory;
58 bool revisionChanged; 59 bool revisionChanged;
59 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 60 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
61 QTime transactionTime;
62 int transactionItemCount;
60}; 63};
61 64
62void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 65void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
@@ -103,6 +106,9 @@ void Pipeline::startTransaction()
103 if (d->transaction) { 106 if (d->transaction) {
104 return; 107 return;
105 } 108 }
109 Trace() << "Starting transaction.";
110 d->transactionTime.start();
111 d->transactionItemCount = 0;
106 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite)); 112 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite));
107} 113}
108 114
@@ -114,7 +120,8 @@ void Pipeline::commit()
114 // processor->finalize(); 120 // processor->finalize();
115 // } 121 // }
116 const auto revision = Storage::maxRevision(d->transaction); 122 const auto revision = Storage::maxRevision(d->transaction);
117 Trace() << "Committing " << revision; 123 const auto elapsed = d->transactionTime.elapsed();
124 Trace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " << (double)elapsed/(double)qMax(d->transactionItemCount, 1) << "[ms/item]";
118 if (d->transaction) { 125 if (d->transaction) {
119 d->transaction.commit(); 126 d->transaction.commit();
120 } 127 }
@@ -138,6 +145,7 @@ Storage &Pipeline::storage() const
138KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) 145KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
139{ 146{
140 Trace() << "Pipeline: New Entity"; 147 Trace() << "Pipeline: New Entity";
148 d->transactionItemCount++;
141 149
142 { 150 {
143 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 151 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
@@ -217,6 +225,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
217KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 225KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
218{ 226{
219 Trace() << "Pipeline: Modified Entity"; 227 Trace() << "Pipeline: Modified Entity";
228 d->transactionItemCount++;
220 229
221 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; 230 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
222 231
@@ -337,6 +346,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
337KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) 346KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
338{ 347{
339 Trace() << "Pipeline: Deleted Entity"; 348 Trace() << "Pipeline: Deleted Entity";
349 d->transactionItemCount++;
340 350
341 { 351 {
342 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 352 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);