summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-09-29 00:52:07 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-09-29 00:52:07 +0200
commitc3f6e72c2d46906a4699127b558ca248729ce577 (patch)
treea2066e43b45a01226e19bbb309fabf7064018a06 /common/genericresource.cpp
parentb43c422a2b1b899ce5ac27a0bc381e8a49f05d86 (diff)
downloadsink-c3f6e72c2d46906a4699127b558ca248729ce577.tar.gz
sink-c3f6e72c2d46906a4699127b558ca248729ce577.zip
Revision cleanup
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp31
1 files changed, 21 insertions, 10 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 3ffc56b..4abcecd 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -60,7 +60,7 @@ private slots:
60 }).exec(); 60 }).exec();
61 } 61 }
62 62
63 KAsync::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) 63 KAsync::Job<qint64> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand)
64 { 64 {
65 Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); 65 Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId());
66 //Throw command into appropriate pipeline 66 //Throw command into appropriate pipeline
@@ -72,25 +72,27 @@ private slots:
72 case Akonadi2::Commands::CreateEntityCommand: 72 case Akonadi2::Commands::CreateEntityCommand:
73 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 73 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
74 default: 74 default:
75 return KAsync::error<void>(-1, "Unhandled command"); 75 return KAsync::error<qint64>(-1, "Unhandled command");
76 } 76 }
77 return KAsync::null<void>(); 77 return KAsync::null<qint64>();
78 } 78 }
79 79
80 KAsync::Job<void> processQueuedCommand(const QByteArray &data) 80 KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data)
81 { 81 {
82 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); 82 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
83 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { 83 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
84 Warning() << "invalid buffer"; 84 Warning() << "invalid buffer";
85 return KAsync::error<void>(1, "Invalid Buffer"); 85 // return KAsync::error<void, qint64>(1, "Invalid Buffer");
86 } 86 }
87 auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData()); 87 auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData());
88 const auto commandId = queuedCommand->commandId(); 88 const auto commandId = queuedCommand->commandId();
89 Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); 89 Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId);
90 return processQueuedCommand(queuedCommand).then<void>( 90 return processQueuedCommand(queuedCommand).then<qint64, qint64>(
91 [commandId]() { 91 [commandId](qint64 createdRevision) -> qint64 {
92 Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); 92 Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId);
93 }, 93 return createdRevision;
94 }
95 ,
94 [](int errorCode, QString errorMessage) { 96 [](int errorCode, QString errorMessage) {
95 //FIXME propagate error, we didn't handle it 97 //FIXME propagate error, we didn't handle it
96 Warning() << "Error while processing queue command: " << errorMessage; 98 Warning() << "Error while processing queue command: " << errorMessage;
@@ -106,8 +108,17 @@ private slots:
106 }).then(KAsync::dowhile( 108 }).then(KAsync::dowhile(
107 [queue]() { return !queue->isEmpty(); }, 109 [queue]() { return !queue->isEmpty(); },
108 [this, queue](KAsync::Future<void> &future) { 110 [this, queue](KAsync::Future<void> &future) {
109 queue->dequeueBatch(100, [this](const QByteArray &data) { 111 const int batchSize = 100;
110 return processQueuedCommand(data); 112 queue->dequeueBatch(batchSize, [this](const QByteArray &data) {
113 return KAsync::start<void>([this, data](KAsync::Future<void> &future) {
114 processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) {
115 Trace() << "Created revision " << createdRevision;
116 //We don't have a writeback yet, so we cleanup revisions immediately
117 //TODO: only cleanup once writeback is done
118 mPipeline->cleanupRevision(createdRevision);
119 future.setFinished();
120 }).exec();
121 });
111 } 122 }
112 ).then<void>([&future, queue](){ 123 ).then<void>([&future, queue](){
113 future.setFinished(); 124 future.setFinished();