diff options
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 77 |
1 files changed, 59 insertions, 18 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index d94083b..3863cc4 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -274,12 +274,32 @@ void Synchronizer::flush(int commandId, const QByteArray &flushId) | |||
274 | processSyncQueue().exec(); | 274 | processSyncQueue().exec(); |
275 | } | 275 | } |
276 | 276 | ||
277 | void Synchronizer::flushComplete(const QByteArray &flushId) | ||
278 | { | ||
279 | SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId; | ||
280 | if (mPendingSyncRequests.contains(flushId)) { | ||
281 | const auto requests = mPendingSyncRequests.values(flushId); | ||
282 | for (const auto &r : requests) { | ||
283 | //We want to process the pending request before any others in the queue | ||
284 | mSyncRequestQueue.prepend(r); | ||
285 | } | ||
286 | mPendingSyncRequests.remove(flushId); | ||
287 | processSyncQueue().exec(); | ||
288 | } | ||
289 | } | ||
290 | |||
277 | KAsync::Job<void> Synchronizer::processSyncQueue() | 291 | KAsync::Job<void> Synchronizer::processSyncQueue() |
278 | { | 292 | { |
279 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { | 293 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { |
280 | SinkTrace() << "Sync still in progress or nothing to do."; | 294 | SinkTrace() << "Sync still in progress or nothing to do."; |
281 | return KAsync::null<void>(); | 295 | return KAsync::null<void>(); |
282 | } | 296 | } |
297 | //Don't process any new requests until we're done with the pending ones. | ||
298 | //Otherwise we might process a flush before the previous request actually completed. | ||
299 | if (!mPendingSyncRequests.isEmpty()) { | ||
300 | SinkTrace() << "We still have pending sync requests. Not executing next request."; | ||
301 | return KAsync::null<void>(); | ||
302 | } | ||
283 | 303 | ||
284 | auto job = KAsync::syncStart<void>([this] { | 304 | auto job = KAsync::syncStart<void>([this] { |
285 | mMessageQueue->startTransaction(); | 305 | mMessageQueue->startTransaction(); |
@@ -287,7 +307,26 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
287 | }); | 307 | }); |
288 | while (!mSyncRequestQueue.isEmpty()) { | 308 | while (!mSyncRequestQueue.isEmpty()) { |
289 | const auto request = mSyncRequestQueue.takeFirst(); | 309 | const auto request = mSyncRequestQueue.takeFirst(); |
290 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | 310 | if (request.options & SyncRequest::RequestFlush) { |
311 | job = job.then([=] { | ||
312 | //Trigger a flush and record original request without flush option | ||
313 | auto modifiedRequest = request; | ||
314 | modifiedRequest.options = SyncRequest::NoOptions; | ||
315 | //Normally we won't have a requestId here | ||
316 | if (modifiedRequest.requestId.isEmpty()) { | ||
317 | modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); | ||
318 | } | ||
319 | |||
320 | //The sync request will be executed once the flush has completed | ||
321 | mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); | ||
322 | |||
323 | flatbuffers::FlatBufferBuilder fbb; | ||
324 | auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); | ||
325 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | ||
326 | Sink::Commands::FinishFlushBuffer(fbb, location); | ||
327 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | ||
328 | }); | ||
329 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | ||
291 | job = job.then([this, request] { | 330 | job = job.then([this, request] { |
292 | Sink::Notification n; | 331 | Sink::Notification n; |
293 | n.id = request.requestId; | 332 | n.id = request.requestId; |
@@ -295,14 +334,14 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
295 | n.message = "Synchronization has started."; | 334 | n.message = "Synchronization has started."; |
296 | n.code = ApplicationDomain::BusyStatus; | 335 | n.code = ApplicationDomain::BusyStatus; |
297 | emit notify(n); | 336 | emit notify(n); |
298 | SinkLogCtx(mLogCtx) << "Synchronizing " << request.query; | 337 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; |
299 | }).then(synchronizeWithSource(request.query)).then([this] { | 338 | }).then(synchronizeWithSource(request.query)).then([this] { |
300 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 339 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
301 | commit(); | 340 | commit(); |
302 | }).then<void>([this, request](const KAsync::Error &error) { | 341 | }).then<void>([this, request](const KAsync::Error &error) { |
303 | if (error) { | 342 | if (error) { |
304 | //Emit notification with error | 343 | //Emit notification with error |
305 | SinkWarning() << "Synchronization failed: " << error.errorMessage; | 344 | SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage; |
306 | Sink::Notification n; | 345 | Sink::Notification n; |
307 | n.id = request.requestId; | 346 | n.id = request.requestId; |
308 | n.type = Notification::Status; | 347 | n.type = Notification::Status; |
@@ -311,7 +350,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
311 | emit notify(n); | 350 | emit notify(n); |
312 | return KAsync::error(error); | 351 | return KAsync::error(error); |
313 | } else { | 352 | } else { |
314 | SinkLog() << "Done Synchronizing"; | 353 | SinkLogCtx(mLogCtx) << "Done Synchronizing"; |
315 | Sink::Notification n; | 354 | Sink::Notification n; |
316 | n.id = request.requestId; | 355 | n.id = request.requestId; |
317 | n.type = Notification::Status; | 356 | n.type = Notification::Status; |
@@ -322,20 +361,22 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
322 | } | 361 | } |
323 | }); | 362 | }); |
324 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { | 363 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { |
325 | Q_ASSERT(!request.requestId.isEmpty()); | 364 | job = job.then([=] { |
326 | if (request.flushType == Flush::FlushReplayQueue) { | 365 | Q_ASSERT(!request.requestId.isEmpty()); |
327 | SinkTrace() << "Emitting flush completion."; | 366 | if (request.flushType == Flush::FlushReplayQueue) { |
328 | Sink::Notification n; | 367 | SinkTrace() << "Emitting flush completion."; |
329 | n.type = Sink::Notification::FlushCompletion; | 368 | Sink::Notification n; |
330 | n.id = request.requestId; | 369 | n.type = Sink::Notification::FlushCompletion; |
331 | emit notify(n); | 370 | n.id = request.requestId; |
332 | } else { | 371 | emit notify(n); |
333 | flatbuffers::FlatBufferBuilder fbb; | 372 | } else { |
334 | auto flushId = fbb.CreateString(request.requestId.toStdString()); | 373 | flatbuffers::FlatBufferBuilder fbb; |
335 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | 374 | auto flushId = fbb.CreateString(request.requestId.toStdString()); |
336 | Sink::Commands::FinishFlushBuffer(fbb, location); | 375 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); |
337 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | 376 | Sink::Commands::FinishFlushBuffer(fbb, location); |
338 | } | 377 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); |
378 | } | ||
379 | }); | ||
339 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { | 380 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { |
340 | job = job.then(replayNextRevision()); | 381 | job = job.then(replayNextRevision()); |
341 | } else { | 382 | } else { |