summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-27 02:26:47 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-15 16:14:19 +0200
commit26816c21f60450e461a5b6ef4ef740f6070ce278 (patch)
tree55e8aee03e094abf702438e6cd26233047345e70 /common/genericresource.cpp
parent9a9bb39f7641a818434cafa0dae0c8aa47124c0b (diff)
downloadsink-26816c21f60450e461a5b6ef4ef740f6070ce278.tar.gz
sink-26816c21f60450e461a5b6ef4ef740f6070ce278.zip
Ported to the kasync revamp
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp134
1 files changed, 66 insertions, 68 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 7136882..f5b1775 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -100,7 +100,7 @@ private slots:
100 } 100 }
101 mProcessingLock = true; 101 mProcessingLock = true;
102 auto job = processPipeline() 102 auto job = processPipeline()
103 .then<void>([this]() { 103 .syncThen<void>([this]() {
104 mProcessingLock = false; 104 mProcessingLock = false;
105 if (messagesToProcessAvailable()) { 105 if (messagesToProcessAvailable()) {
106 process(); 106 process();
@@ -122,7 +122,8 @@ private slots:
122 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 122 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
123 case Sink::Commands::InspectionCommand: 123 case Sink::Commands::InspectionCommand:
124 if (mInspect) { 124 if (mInspect) {
125 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { return -1; }); 125 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size())
126 .syncThen<qint64>([]() { return -1; });
126 } else { 127 } else {
127 return KAsync::error<qint64>(-1, "Missing inspection command."); 128 return KAsync::error<qint64>(-1, "Missing inspection command.");
128 } 129 }
@@ -131,7 +132,7 @@ private slots:
131 } 132 }
132 } 133 }
133 134
134 KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) 135 KAsync::Job<qint64> processQueuedCommand(const QByteArray &data)
135 { 136 {
136 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); 137 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
137 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { 138 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
@@ -143,13 +144,13 @@ private slots:
143 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); 144 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId);
144 return processQueuedCommand(queuedCommand) 145 return processQueuedCommand(queuedCommand)
145 .then<qint64, qint64>( 146 .then<qint64, qint64>(
146 [this, commandId](qint64 createdRevision) -> qint64 { 147 [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> {
148 if (error) {
149 SinkWarning() << "Error while processing queue command: " << error.errorMessage;
150 return KAsync::error<qint64>(error);
151 }
147 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); 152 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId);
148 return createdRevision; 153 return KAsync::value<qint64>(createdRevision);
149 },
150 [](int errorCode, QString errorMessage) {
151 // FIXME propagate error, we didn't handle it
152 SinkWarning() << "Error while processing queue command: " << errorMessage;
153 }); 154 });
154 } 155 }
155 156
@@ -157,31 +158,31 @@ private slots:
157 KAsync::Job<void> processQueue(MessageQueue *queue) 158 KAsync::Job<void> processQueue(MessageQueue *queue)
158 { 159 {
159 auto time = QSharedPointer<QTime>::create(); 160 auto time = QSharedPointer<QTime>::create();
160 return KAsync::start<void>([this]() { mPipeline->startTransaction(); }) 161 return KAsync::syncStart<void>([this]() { mPipeline->startTransaction(); })
161 .then(KAsync::dowhile([queue]() { return !queue->isEmpty(); }, 162 .then(KAsync::dowhile(
162 [this, queue, time](KAsync::Future<void> &future) { 163 [this, queue, time]() -> KAsync::Job<KAsync::ControlFlowFlag> {
163 queue->dequeueBatch(sBatchSize, 164 return queue->dequeueBatch(sBatchSize,
164 [this, time](const QByteArray &data) { 165 [this, time](const QByteArray &data) -> KAsync::Job<void> {
165 time->start(); 166 time->start();
166 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { 167 return processQueuedCommand(data)
167 processQueuedCommand(data) 168 .syncThen<void, qint64>([this, time](qint64 createdRevision) {
168 .then<void, qint64>([&future, this, time](qint64 createdRevision) { 169 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
169 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); 170 });
170 future.setFinished(); 171 })
171 }) 172 .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) {
172 .exec(); 173 if (error) {
173 }); 174 if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) {
174 }) 175 SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage;
175 .then<void>([&future, queue]() { future.setFinished(); }, 176 }
176 [&future](int i, QString error) {
177 if (i != MessageQueue::ErrorCodes::NoMessageFound) {
178 SinkWarning() << "Error while getting message from messagequeue: " << error;
179 } 177 }
180 future.setFinished(); 178 if (queue->isEmpty()) {
181 }) 179 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Break);
182 .exec(); 180 } else {
181 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Continue);
182 }
183 });
183 })) 184 }))
184 .then<void>([this]() { mPipeline->commit(); }); 185 .syncThen<void>([this](const KAsync::Error &) { mPipeline->commit(); });
185 } 186 }
186 187
187 KAsync::Job<void> processPipeline() 188 KAsync::Job<void> processPipeline()
@@ -198,18 +199,20 @@ private slots:
198 199
199 // Go through all message queues 200 // Go through all message queues
200 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); 201 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues);
201 return KAsync::dowhile([it]() { return it->hasNext(); }, 202 return KAsync::dowhile(
202 [it, this](KAsync::Future<void> &future) { 203 [it, this]() {
203 auto time = QSharedPointer<QTime>::create(); 204 auto time = QSharedPointer<QTime>::create();
204 time->start(); 205 time->start();
205 206
206 auto queue = it->next(); 207 auto queue = it->next();
207 processQueue(queue) 208 return processQueue(queue)
208 .then<void>([this, &future, time]() { 209 .syncThen<KAsync::ControlFlowFlag>([this, time, it]() {
209 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); 210 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed());
210 future.setFinished(); 211 if (it->hasNext()) {
211 }) 212 return KAsync::Continue;
212 .exec(); 213 }
214 return KAsync::Break;
215 });
213 }); 216 });
214 } 217 }
215 218
@@ -251,22 +254,19 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
251 s >> expectedValue; 254 s >> expectedValue;
252 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) 255 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue)
253 .then<void>( 256 .then<void>(
254 [=]() { 257 [=](const KAsync::Error &error) {
255 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
256 Sink::Notification n;
257 n.type = Sink::Notification::Inspection;
258 n.id = inspectionId;
259 n.code = Sink::Notification::Success;
260 emit notify(n);
261 },
262 [=](int code, const QString &message) {
263 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message;
264 Sink::Notification n; 258 Sink::Notification n;
265 n.type = Sink::Notification::Inspection; 259 n.type = Sink::Notification::Inspection;
266 n.message = message;
267 n.id = inspectionId; 260 n.id = inspectionId;
268 n.code = Sink::Notification::Failure; 261 if (error) {
262 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage;
263 n.code = Sink::Notification::Failure;
264 } else {
265 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
266 n.code = Sink::Notification::Success;
267 }
269 emit notify(n); 268 emit notify(n);
269 return KAsync::null();
270 }) 270 })
271 .exec(); 271 .exec();
272 return KAsync::null<void>(); 272 return KAsync::null<void>();
@@ -420,7 +420,7 @@ void GenericResource::processCommand(int commandId, const QByteArray &data)
420 420
421KAsync::Job<void> GenericResource::synchronizeWithSource() 421KAsync::Job<void> GenericResource::synchronizeWithSource()
422{ 422{
423 return KAsync::start<void>([this](KAsync::Future<void> &future) { 423 return KAsync::start<void>([this]() {
424 424
425 Sink::Notification n; 425 Sink::Notification n;
426 n.id = "sync"; 426 n.id = "sync";
@@ -432,23 +432,21 @@ KAsync::Job<void> GenericResource::synchronizeWithSource()
432 SinkLog() << " Synchronizing"; 432 SinkLog() << " Synchronizing";
433 // Changereplay would deadlock otherwise when trying to open the synchronization store 433 // Changereplay would deadlock otherwise when trying to open the synchronization store
434 enableChangeReplay(false); 434 enableChangeReplay(false);
435 mSynchronizer->synchronize() 435 return mSynchronizer->synchronize()
436 .then<void>([this, &future]() { 436 .then<void>([this](const KAsync::Error &error) {
437 SinkLog() << "Done Synchronizing";
438 Sink::Notification n;
439 n.id = "sync";
440 n.type = Sink::Notification::Status;
441 n.message = "Synchronization has ended.";
442 n.code = Sink::ApplicationDomain::ConnectedStatus;
443 emit notify(n);
444
445 enableChangeReplay(true);
446 future.setFinished();
447 }, [this, &future](int errorCode, const QString &error) {
448 enableChangeReplay(true); 437 enableChangeReplay(true);
449 future.setError(errorCode, error); 438 if (!error) {
450 }) 439 SinkLog() << "Done Synchronizing";
451 .exec(); 440 Sink::Notification n;
441 n.id = "sync";
442 n.type = Sink::Notification::Status;
443 n.message = "Synchronization has ended.";
444 n.code = Sink::ApplicationDomain::ConnectedStatus;
445 emit notify(n);
446 return KAsync::null();
447 }
448 return KAsync::error(error);
449 });
452 }); 450 });
453} 451}
454 452