diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-01-23 10:48:37 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-01-23 10:48:37 +0100 |
commit | 752f0907574debe9d7d139a117b2efac80636e93 (patch) | |
tree | a2819bca0b883b46e8d139efcf82784fbc7fa78d /common/synchronizer.cpp | |
parent | 4c99d9a644d86410a93b683d1a34ab6d499b99f9 (diff) | |
download | sink-752f0907574debe9d7d139a117b2efac80636e93.tar.gz sink-752f0907574debe9d7d139a117b2efac80636e93.zip |
Process sync requests one by one
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 172 |
1 files changed, 89 insertions, 83 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 3863cc4..57e994e 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -288,6 +288,89 @@ void Synchronizer::flushComplete(const QByteArray &flushId) | |||
288 | } | 288 | } |
289 | } | 289 | } |
290 | 290 | ||
291 | KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | ||
292 | { | ||
293 | if (request.options & SyncRequest::RequestFlush) { | ||
294 | return KAsync::syncStart<void>([=] { | ||
295 | //Trigger a flush and record original request without flush option | ||
296 | auto modifiedRequest = request; | ||
297 | modifiedRequest.options = SyncRequest::NoOptions; | ||
298 | //Normally we won't have a requestId here | ||
299 | if (modifiedRequest.requestId.isEmpty()) { | ||
300 | modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); | ||
301 | } | ||
302 | SinkWarning() << "Enquing flush request " << modifiedRequest.requestId; | ||
303 | |||
304 | //The sync request will be executed once the flush has completed | ||
305 | mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); | ||
306 | |||
307 | flatbuffers::FlatBufferBuilder fbb; | ||
308 | auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); | ||
309 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | ||
310 | Sink::Commands::FinishFlushBuffer(fbb, location); | ||
311 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | ||
312 | }); | ||
313 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | ||
314 | return KAsync::syncStart<void>([this, request] { | ||
315 | Sink::Notification n; | ||
316 | n.id = request.requestId; | ||
317 | n.type = Notification::Status; | ||
318 | n.message = "Synchronization has started."; | ||
319 | n.code = ApplicationDomain::BusyStatus; | ||
320 | emit notify(n); | ||
321 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; | ||
322 | }).then(synchronizeWithSource(request.query)).then([this] { | ||
323 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | ||
324 | commit(); | ||
325 | }).then<void>([this, request](const KAsync::Error &error) { | ||
326 | if (error) { | ||
327 | //Emit notification with error | ||
328 | SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage; | ||
329 | Sink::Notification n; | ||
330 | n.id = request.requestId; | ||
331 | n.type = Notification::Status; | ||
332 | n.message = "Synchronization has ended."; | ||
333 | n.code = ApplicationDomain::ErrorStatus; | ||
334 | emit notify(n); | ||
335 | return KAsync::error(error); | ||
336 | } else { | ||
337 | SinkLogCtx(mLogCtx) << "Done Synchronizing"; | ||
338 | Sink::Notification n; | ||
339 | n.id = request.requestId; | ||
340 | n.type = Notification::Status; | ||
341 | n.message = "Synchronization has ended."; | ||
342 | n.code = ApplicationDomain::ConnectedStatus; | ||
343 | emit notify(n); | ||
344 | return KAsync::null(); | ||
345 | } | ||
346 | }); | ||
347 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { | ||
348 | return KAsync::syncStart<void>([=] { | ||
349 | Q_ASSERT(!request.requestId.isEmpty()); | ||
350 | //FIXME it looks like this is emitted before the replay actually finishes | ||
351 | if (request.flushType == Flush::FlushReplayQueue) { | ||
352 | SinkTrace() << "Emitting flush completion."; | ||
353 | Sink::Notification n; | ||
354 | n.type = Sink::Notification::FlushCompletion; | ||
355 | n.id = request.requestId; | ||
356 | emit notify(n); | ||
357 | } else { | ||
358 | flatbuffers::FlatBufferBuilder fbb; | ||
359 | auto flushId = fbb.CreateString(request.requestId.toStdString()); | ||
360 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | ||
361 | Sink::Commands::FinishFlushBuffer(fbb, location); | ||
362 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | ||
363 | } | ||
364 | }); | ||
365 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { | ||
366 | return replayNextRevision(); | ||
367 | } else { | ||
368 | SinkWarning() << "Unknown request type: " << request.requestType; | ||
369 | return KAsync::error(KAsync::Error{"Unknown request type."}); | ||
370 | } | ||
371 | |||
372 | } | ||
373 | |||
291 | KAsync::Job<void> Synchronizer::processSyncQueue() | 374 | KAsync::Job<void> Synchronizer::processSyncQueue() |
292 | { | 375 | { |
293 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { | 376 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { |
@@ -301,90 +384,13 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
301 | return KAsync::null<void>(); | 384 | return KAsync::null<void>(); |
302 | } | 385 | } |
303 | 386 | ||
304 | auto job = KAsync::syncStart<void>([this] { | 387 | const auto request = mSyncRequestQueue.takeFirst(); |
388 | return KAsync::syncStart<void>([this] { | ||
305 | mMessageQueue->startTransaction(); | 389 | mMessageQueue->startTransaction(); |
306 | mSyncInProgress = true; | 390 | mSyncInProgress = true; |
307 | }); | 391 | }) |
308 | while (!mSyncRequestQueue.isEmpty()) { | 392 | .then(processRequest(request)) |
309 | const auto request = mSyncRequestQueue.takeFirst(); | 393 | .then<void>([this](const KAsync::Error &error) { |
310 | if (request.options & SyncRequest::RequestFlush) { | ||
311 | job = job.then([=] { | ||
312 | //Trigger a flush and record original request without flush option | ||
313 | auto modifiedRequest = request; | ||
314 | modifiedRequest.options = SyncRequest::NoOptions; | ||
315 | //Normally we won't have a requestId here | ||
316 | if (modifiedRequest.requestId.isEmpty()) { | ||
317 | modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); | ||
318 | } | ||
319 | |||
320 | //The sync request will be executed once the flush has completed | ||
321 | mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); | ||
322 | |||
323 | flatbuffers::FlatBufferBuilder fbb; | ||
324 | auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); | ||
325 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | ||
326 | Sink::Commands::FinishFlushBuffer(fbb, location); | ||
327 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | ||
328 | }); | ||
329 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | ||
330 | job = job.then([this, request] { | ||
331 | Sink::Notification n; | ||
332 | n.id = request.requestId; | ||
333 | n.type = Notification::Status; | ||
334 | n.message = "Synchronization has started."; | ||
335 | n.code = ApplicationDomain::BusyStatus; | ||
336 | emit notify(n); | ||
337 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; | ||
338 | }).then(synchronizeWithSource(request.query)).then([this] { | ||
339 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | ||
340 | commit(); | ||
341 | }).then<void>([this, request](const KAsync::Error &error) { | ||
342 | if (error) { | ||
343 | //Emit notification with error | ||
344 | SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage; | ||
345 | Sink::Notification n; | ||
346 | n.id = request.requestId; | ||
347 | n.type = Notification::Status; | ||
348 | n.message = "Synchronization has ended."; | ||
349 | n.code = ApplicationDomain::ErrorStatus; | ||
350 | emit notify(n); | ||
351 | return KAsync::error(error); | ||
352 | } else { | ||
353 | SinkLogCtx(mLogCtx) << "Done Synchronizing"; | ||
354 | Sink::Notification n; | ||
355 | n.id = request.requestId; | ||
356 | n.type = Notification::Status; | ||
357 | n.message = "Synchronization has ended."; | ||
358 | n.code = ApplicationDomain::ConnectedStatus; | ||
359 | emit notify(n); | ||
360 | return KAsync::null(); | ||
361 | } | ||
362 | }); | ||
363 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { | ||
364 | job = job.then([=] { | ||
365 | Q_ASSERT(!request.requestId.isEmpty()); | ||
366 | if (request.flushType == Flush::FlushReplayQueue) { | ||
367 | SinkTrace() << "Emitting flush completion."; | ||
368 | Sink::Notification n; | ||
369 | n.type = Sink::Notification::FlushCompletion; | ||
370 | n.id = request.requestId; | ||
371 | emit notify(n); | ||
372 | } else { | ||
373 | flatbuffers::FlatBufferBuilder fbb; | ||
374 | auto flushId = fbb.CreateString(request.requestId.toStdString()); | ||
375 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | ||
376 | Sink::Commands::FinishFlushBuffer(fbb, location); | ||
377 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | ||
378 | } | ||
379 | }); | ||
380 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { | ||
381 | job = job.then(replayNextRevision()); | ||
382 | } else { | ||
383 | SinkWarning() << "Unknown request type: " << request.requestType; | ||
384 | return KAsync::error(KAsync::Error{"Unknown request type."}); | ||
385 | } | ||
386 | } | ||
387 | return job.then<void>([this](const KAsync::Error &error) { | ||
388 | SinkTrace() << "Sync request processed"; | 394 | SinkTrace() << "Sync request processed"; |
389 | mSyncTransaction.abort(); | 395 | mSyncTransaction.abort(); |
390 | mMessageQueue->commit(); | 396 | mMessageQueue->commit(); |
@@ -504,7 +510,7 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray | |||
504 | } else if (operation == Sink::Operation_Modification) { | 510 | } else if (operation == Sink::Operation_Modification) { |
505 | SinkTrace() << "Replayed modification with remote id: " << remoteId; | 511 | SinkTrace() << "Replayed modification with remote id: " << remoteId; |
506 | if (remoteId.isEmpty()) { | 512 | if (remoteId.isEmpty()) { |
507 | SinkWarning() << "Returned an empty remoteId from the creation"; | 513 | SinkWarning() << "Returned an empty remoteId from the modification"; |
508 | } else { | 514 | } else { |
509 | syncStore().updateRemoteId(type, uid, remoteId); | 515 | syncStore().updateRemoteId(type, uid, remoteId); |
510 | } | 516 | } |