diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-07-27 02:26:47 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-09-15 16:14:19 +0200 |
commit | 26816c21f60450e461a5b6ef4ef740f6070ce278 (patch) | |
tree | 55e8aee03e094abf702438e6cd26233047345e70 /common/genericresource.cpp | |
parent | 9a9bb39f7641a818434cafa0dae0c8aa47124c0b (diff) | |
download | sink-26816c21f60450e461a5b6ef4ef740f6070ce278.tar.gz sink-26816c21f60450e461a5b6ef4ef740f6070ce278.zip |
Ported to the kasync revamp
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 134 |
1 files changed, 66 insertions, 68 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 7136882..f5b1775 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -100,7 +100,7 @@ private slots: | |||
100 | } | 100 | } |
101 | mProcessingLock = true; | 101 | mProcessingLock = true; |
102 | auto job = processPipeline() | 102 | auto job = processPipeline() |
103 | .then<void>([this]() { | 103 | .syncThen<void>([this]() { |
104 | mProcessingLock = false; | 104 | mProcessingLock = false; |
105 | if (messagesToProcessAvailable()) { | 105 | if (messagesToProcessAvailable()) { |
106 | process(); | 106 | process(); |
@@ -122,7 +122,8 @@ private slots: | |||
122 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 122 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
123 | case Sink::Commands::InspectionCommand: | 123 | case Sink::Commands::InspectionCommand: |
124 | if (mInspect) { | 124 | if (mInspect) { |
125 | return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { return -1; }); | 125 | return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()) |
126 | .syncThen<qint64>([]() { return -1; }); | ||
126 | } else { | 127 | } else { |
127 | return KAsync::error<qint64>(-1, "Missing inspection command."); | 128 | return KAsync::error<qint64>(-1, "Missing inspection command."); |
128 | } | 129 | } |
@@ -131,7 +132,7 @@ private slots: | |||
131 | } | 132 | } |
132 | } | 133 | } |
133 | 134 | ||
134 | KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) | 135 | KAsync::Job<qint64> processQueuedCommand(const QByteArray &data) |
135 | { | 136 | { |
136 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); | 137 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); |
137 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { | 138 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { |
@@ -143,13 +144,13 @@ private slots: | |||
143 | SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); | 144 | SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); |
144 | return processQueuedCommand(queuedCommand) | 145 | return processQueuedCommand(queuedCommand) |
145 | .then<qint64, qint64>( | 146 | .then<qint64, qint64>( |
146 | [this, commandId](qint64 createdRevision) -> qint64 { | 147 | [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> { |
148 | if (error) { | ||
149 | SinkWarning() << "Error while processing queue command: " << error.errorMessage; | ||
150 | return KAsync::error<qint64>(error); | ||
151 | } | ||
147 | SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); | 152 | SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); |
148 | return createdRevision; | 153 | return KAsync::value<qint64>(createdRevision); |
149 | }, | ||
150 | [](int errorCode, QString errorMessage) { | ||
151 | // FIXME propagate error, we didn't handle it | ||
152 | SinkWarning() << "Error while processing queue command: " << errorMessage; | ||
153 | }); | 154 | }); |
154 | } | 155 | } |
155 | 156 | ||
@@ -157,31 +158,31 @@ private slots: | |||
157 | KAsync::Job<void> processQueue(MessageQueue *queue) | 158 | KAsync::Job<void> processQueue(MessageQueue *queue) |
158 | { | 159 | { |
159 | auto time = QSharedPointer<QTime>::create(); | 160 | auto time = QSharedPointer<QTime>::create(); |
160 | return KAsync::start<void>([this]() { mPipeline->startTransaction(); }) | 161 | return KAsync::syncStart<void>([this]() { mPipeline->startTransaction(); }) |
161 | .then(KAsync::dowhile([queue]() { return !queue->isEmpty(); }, | 162 | .then(KAsync::dowhile( |
162 | [this, queue, time](KAsync::Future<void> &future) { | 163 | [this, queue, time]() -> KAsync::Job<KAsync::ControlFlowFlag> { |
163 | queue->dequeueBatch(sBatchSize, | 164 | return queue->dequeueBatch(sBatchSize, |
164 | [this, time](const QByteArray &data) { | 165 | [this, time](const QByteArray &data) -> KAsync::Job<void> { |
165 | time->start(); | 166 | time->start(); |
166 | return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { | 167 | return processQueuedCommand(data) |
167 | processQueuedCommand(data) | 168 | .syncThen<void, qint64>([this, time](qint64 createdRevision) { |
168 | .then<void, qint64>([&future, this, time](qint64 createdRevision) { | 169 | SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); |
169 | SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); | 170 | }); |
170 | future.setFinished(); | 171 | }) |
171 | }) | 172 | .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) { |
172 | .exec(); | 173 | if (error) { |
173 | }); | 174 | if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { |
174 | }) | 175 | SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; |
175 | .then<void>([&future, queue]() { future.setFinished(); }, | 176 | } |
176 | [&future](int i, QString error) { | ||
177 | if (i != MessageQueue::ErrorCodes::NoMessageFound) { | ||
178 | SinkWarning() << "Error while getting message from messagequeue: " << error; | ||
179 | } | 177 | } |
180 | future.setFinished(); | 178 | if (queue->isEmpty()) { |
181 | }) | 179 | return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Break); |
182 | .exec(); | 180 | } else { |
181 | return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Continue); | ||
182 | } | ||
183 | }); | ||
183 | })) | 184 | })) |
184 | .then<void>([this]() { mPipeline->commit(); }); | 185 | .syncThen<void>([this](const KAsync::Error &) { mPipeline->commit(); }); |
185 | } | 186 | } |
186 | 187 | ||
187 | KAsync::Job<void> processPipeline() | 188 | KAsync::Job<void> processPipeline() |
@@ -198,18 +199,20 @@ private slots: | |||
198 | 199 | ||
199 | // Go through all message queues | 200 | // Go through all message queues |
200 | auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); | 201 | auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); |
201 | return KAsync::dowhile([it]() { return it->hasNext(); }, | 202 | return KAsync::dowhile( |
202 | [it, this](KAsync::Future<void> &future) { | 203 | [it, this]() { |
203 | auto time = QSharedPointer<QTime>::create(); | 204 | auto time = QSharedPointer<QTime>::create(); |
204 | time->start(); | 205 | time->start(); |
205 | 206 | ||
206 | auto queue = it->next(); | 207 | auto queue = it->next(); |
207 | processQueue(queue) | 208 | return processQueue(queue) |
208 | .then<void>([this, &future, time]() { | 209 | .syncThen<KAsync::ControlFlowFlag>([this, time, it]() { |
209 | SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); | 210 | SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); |
210 | future.setFinished(); | 211 | if (it->hasNext()) { |
211 | }) | 212 | return KAsync::Continue; |
212 | .exec(); | 213 | } |
214 | return KAsync::Break; | ||
215 | }); | ||
213 | }); | 216 | }); |
214 | } | 217 | } |
215 | 218 | ||
@@ -251,22 +254,19 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra | |||
251 | s >> expectedValue; | 254 | s >> expectedValue; |
252 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) | 255 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) |
253 | .then<void>( | 256 | .then<void>( |
254 | [=]() { | 257 | [=](const KAsync::Error &error) { |
255 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; | ||
256 | Sink::Notification n; | ||
257 | n.type = Sink::Notification::Inspection; | ||
258 | n.id = inspectionId; | ||
259 | n.code = Sink::Notification::Success; | ||
260 | emit notify(n); | ||
261 | }, | ||
262 | [=](int code, const QString &message) { | ||
263 | Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message; | ||
264 | Sink::Notification n; | 258 | Sink::Notification n; |
265 | n.type = Sink::Notification::Inspection; | 259 | n.type = Sink::Notification::Inspection; |
266 | n.message = message; | ||
267 | n.id = inspectionId; | 260 | n.id = inspectionId; |
268 | n.code = Sink::Notification::Failure; | 261 | if (error) { |
262 | Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage; | ||
263 | n.code = Sink::Notification::Failure; | ||
264 | } else { | ||
265 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; | ||
266 | n.code = Sink::Notification::Success; | ||
267 | } | ||
269 | emit notify(n); | 268 | emit notify(n); |
269 | return KAsync::null(); | ||
270 | }) | 270 | }) |
271 | .exec(); | 271 | .exec(); |
272 | return KAsync::null<void>(); | 272 | return KAsync::null<void>(); |
@@ -420,7 +420,7 @@ void GenericResource::processCommand(int commandId, const QByteArray &data) | |||
420 | 420 | ||
421 | KAsync::Job<void> GenericResource::synchronizeWithSource() | 421 | KAsync::Job<void> GenericResource::synchronizeWithSource() |
422 | { | 422 | { |
423 | return KAsync::start<void>([this](KAsync::Future<void> &future) { | 423 | return KAsync::start<void>([this]() { |
424 | 424 | ||
425 | Sink::Notification n; | 425 | Sink::Notification n; |
426 | n.id = "sync"; | 426 | n.id = "sync"; |
@@ -432,23 +432,21 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
432 | SinkLog() << " Synchronizing"; | 432 | SinkLog() << " Synchronizing"; |
433 | // Changereplay would deadlock otherwise when trying to open the synchronization store | 433 | // Changereplay would deadlock otherwise when trying to open the synchronization store |
434 | enableChangeReplay(false); | 434 | enableChangeReplay(false); |
435 | mSynchronizer->synchronize() | 435 | return mSynchronizer->synchronize() |
436 | .then<void>([this, &future]() { | 436 | .then<void>([this](const KAsync::Error &error) { |
437 | SinkLog() << "Done Synchronizing"; | ||
438 | Sink::Notification n; | ||
439 | n.id = "sync"; | ||
440 | n.type = Sink::Notification::Status; | ||
441 | n.message = "Synchronization has ended."; | ||
442 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
443 | emit notify(n); | ||
444 | |||
445 | enableChangeReplay(true); | ||
446 | future.setFinished(); | ||
447 | }, [this, &future](int errorCode, const QString &error) { | ||
448 | enableChangeReplay(true); | 437 | enableChangeReplay(true); |
449 | future.setError(errorCode, error); | 438 | if (!error) { |
450 | }) | 439 | SinkLog() << "Done Synchronizing"; |
451 | .exec(); | 440 | Sink::Notification n; |
441 | n.id = "sync"; | ||
442 | n.type = Sink::Notification::Status; | ||
443 | n.message = "Synchronization has ended."; | ||
444 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
445 | emit notify(n); | ||
446 | return KAsync::null(); | ||
447 | } | ||
448 | return KAsync::error(error); | ||
449 | }); | ||
452 | }); | 450 | }); |
453 | } | 451 | } |
454 | 452 | ||