diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-14 17:11:40 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-14 17:11:40 +0200 |
commit | 9f3a6ff5d27e4983ee626231e43210d2bbb95dd6 (patch) | |
tree | 992cdc5c6d51ae3b430dbcd9fadb30b7ea3d76e1 /common/genericresource.cpp | |
parent | 4385c6bae1a66aa94beb703dcc16e12bdf0ebb0e (diff) | |
download | sink-9f3a6ff5d27e4983ee626231e43210d2bbb95dd6.tar.gz sink-9f3a6ff5d27e4983ee626231e43210d2bbb95dd6.zip |
Almost working batch dequeues
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 75 |
1 files changed, 40 insertions, 35 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index a86b518..b3df389 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -77,6 +77,27 @@ private slots: | |||
77 | return KAsync::null<void>(); | 77 | return KAsync::null<void>(); |
78 | } | 78 | } |
79 | 79 | ||
80 | KAsync::Job<void> processQueuedCommand(const QByteArray &data) | ||
81 | { | ||
82 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); | ||
83 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | ||
84 | Warning() << "invalid buffer"; | ||
85 | return KAsync::error<void>(1, "Invalid Buffer"); | ||
86 | } | ||
87 | auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData()); | ||
88 | const auto commandId = queuedCommand->commandId(); | ||
89 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); | ||
90 | return processQueuedCommand(queuedCommand).then<void>( | ||
91 | [commandId]() { | ||
92 | Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); | ||
93 | }, | ||
94 | [](int errorCode, QString errorMessage) { | ||
95 | //FIXME propagate error, we didn't handle it | ||
96 | Warning() << "Error while processing queue command: " << errorMessage; | ||
97 | } | ||
98 | ); | ||
99 | } | ||
100 | |||
80 | //Process all messages of this queue | 101 | //Process all messages of this queue |
81 | KAsync::Job<void> processQueue(MessageQueue *queue) | 102 | KAsync::Job<void> processQueue(MessageQueue *queue) |
82 | { | 103 | { |
@@ -85,45 +106,29 @@ private slots: | |||
85 | //KAsync::foreach("pass iterator here").parallel("process value here").join(); | 106 | //KAsync::foreach("pass iterator here").parallel("process value here").join(); |
86 | return KAsync::dowhile( | 107 | return KAsync::dowhile( |
87 | [this, queue](KAsync::Future<bool> &future) { | 108 | [this, queue](KAsync::Future<bool> &future) { |
88 | if (queue->isEmpty()) { | 109 | queue->dequeueBatch(100, [this](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { |
89 | future.setValue(false); | 110 | Trace() << "Got value"; |
90 | future.setFinished(); | 111 | processQueuedCommand(QByteArray::fromRawData(static_cast<char*>(ptr), size)).then<void>( |
91 | return; | 112 | [&messageQueueCallback]() { |
92 | } | 113 | Trace() << "done"; |
93 | queue->dequeue( | 114 | messageQueueCallback(true); |
94 | [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | ||
95 | auto callback = [messageQueueCallback, &future](bool success) { | ||
96 | messageQueueCallback(true); | ||
97 | future.setValue(!success); | ||
98 | future.setFinished(); | ||
99 | }; | ||
100 | |||
101 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); | ||
102 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | ||
103 | Warning() << "invalid buffer"; | ||
104 | callback(false); | ||
105 | return; | ||
106 | } | ||
107 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); | ||
108 | const auto commandId = queuedCommand->commandId(); | ||
109 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); | ||
110 | processQueuedCommand(queuedCommand).then<void>( | ||
111 | [callback, commandId]() { | ||
112 | Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); | ||
113 | callback(true); | ||
114 | }, | 115 | }, |
115 | [callback](int errorCode, QString errorMessage) { | 116 | [&messageQueueCallback](int errorCode, QString errorMessage) { |
116 | Warning() << "Error while processing queue command: " << errorMessage; | 117 | //Use false? |
117 | callback(false); | 118 | //For now we use true to make sure we don't get stuck on messages we fail to process |
119 | messageQueueCallback(true); | ||
118 | } | 120 | } |
119 | ).exec(); | 121 | ).exec(); |
120 | }, | ||
121 | [&future](const MessageQueue::Error &error) { | ||
122 | Warning() << "Error while getting message from messagequeue: " << error.message; | ||
123 | future.setValue(false); | ||
124 | future.setFinished(); | ||
125 | } | 122 | } |
126 | ); | 123 | ).then<void>([&future](){ |
124 | future.setValue(true); | ||
125 | future.setFinished(); | ||
126 | }, | ||
127 | [&future](int i, QString error) { | ||
128 | Warning() << "Error while getting message from messagequeue: " << error; | ||
129 | future.setValue(false); | ||
130 | future.setFinished(); | ||
131 | }).exec(); | ||
127 | } | 132 | } |
128 | ); | 133 | ); |
129 | } | 134 | } |