summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-14 17:11:40 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-08-14 17:11:40 +0200
commit9f3a6ff5d27e4983ee626231e43210d2bbb95dd6 (patch)
tree992cdc5c6d51ae3b430dbcd9fadb30b7ea3d76e1 /common/genericresource.cpp
parent4385c6bae1a66aa94beb703dcc16e12bdf0ebb0e (diff)
downloadsink-9f3a6ff5d27e4983ee626231e43210d2bbb95dd6.tar.gz
sink-9f3a6ff5d27e4983ee626231e43210d2bbb95dd6.zip
Almost working batch dequeues
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp75
1 files changed, 40 insertions, 35 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index a86b518..b3df389 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -77,6 +77,27 @@ private slots:
77 return KAsync::null<void>(); 77 return KAsync::null<void>();
78 } 78 }
79 79
80 KAsync::Job<void> processQueuedCommand(const QByteArray &data)
81 {
82 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
83 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
84 Warning() << "invalid buffer";
85 return KAsync::error<void>(1, "Invalid Buffer");
86 }
87 auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData());
88 const auto commandId = queuedCommand->commandId();
89 Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId);
90 return processQueuedCommand(queuedCommand).then<void>(
91 [commandId]() {
92 Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId);
93 },
94 [](int errorCode, QString errorMessage) {
95 //FIXME propagate error, we didn't handle it
96 Warning() << "Error while processing queue command: " << errorMessage;
97 }
98 );
99 }
100
80 //Process all messages of this queue 101 //Process all messages of this queue
81 KAsync::Job<void> processQueue(MessageQueue *queue) 102 KAsync::Job<void> processQueue(MessageQueue *queue)
82 { 103 {
@@ -85,45 +106,29 @@ private slots:
85 //KAsync::foreach("pass iterator here").parallel("process value here").join(); 106 //KAsync::foreach("pass iterator here").parallel("process value here").join();
86 return KAsync::dowhile( 107 return KAsync::dowhile(
87 [this, queue](KAsync::Future<bool> &future) { 108 [this, queue](KAsync::Future<bool> &future) {
88 if (queue->isEmpty()) { 109 queue->dequeueBatch(100, [this](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
89 future.setValue(false); 110 Trace() << "Got value";
90 future.setFinished(); 111 processQueuedCommand(QByteArray::fromRawData(static_cast<char*>(ptr), size)).then<void>(
91 return; 112 [&messageQueueCallback]() {
92 } 113 Trace() << "done";
93 queue->dequeue( 114 messageQueueCallback(true);
94 [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
95 auto callback = [messageQueueCallback, &future](bool success) {
96 messageQueueCallback(true);
97 future.setValue(!success);
98 future.setFinished();
99 };
100
101 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
102 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
103 Warning() << "invalid buffer";
104 callback(false);
105 return;
106 }
107 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
108 const auto commandId = queuedCommand->commandId();
109 Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId);
110 processQueuedCommand(queuedCommand).then<void>(
111 [callback, commandId]() {
112 Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId);
113 callback(true);
114 }, 115 },
115 [callback](int errorCode, QString errorMessage) { 116 [&messageQueueCallback](int errorCode, QString errorMessage) {
116 Warning() << "Error while processing queue command: " << errorMessage; 117 //Use false?
117 callback(false); 118 //For now we use true to make sure we don't get stuck on messages we fail to process
119 messageQueueCallback(true);
118 } 120 }
119 ).exec(); 121 ).exec();
120 },
121 [&future](const MessageQueue::Error &error) {
122 Warning() << "Error while getting message from messagequeue: " << error.message;
123 future.setValue(false);
124 future.setFinished();
125 } 122 }
126 ); 123 ).then<void>([&future](){
124 future.setValue(true);
125 future.setFinished();
126 },
127 [&future](int i, QString error) {
128 Warning() << "Error while getting message from messagequeue: " << error;
129 future.setValue(false);
130 future.setFinished();
131 }).exec();
127 } 132 }
128 ); 133 );
129 } 134 }