diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-01-22 18:25:31 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-01-22 18:25:31 +0100 |
commit | e513ee41adb6061aa72de8bfe49d117f47c1545b (patch) | |
tree | d91fd8abb5d2f47ec24dff497bcf18e218f01641 /common/synchronizer.cpp | |
parent | ec16f3b92d6017462f4bba8354b53d1707850430 (diff) | |
download | sink-e513ee41adb6061aa72de8bfe49d117f47c1545b.tar.gz sink-e513ee41adb6061aa72de8bfe49d117f47c1545b.zip |
Support dependencies between sync requests.
If one sync task depends on the previous sync task we want to flush in
between, so we can query for the results of the previous sync request
locally.
If we detect such a dependency we temporarily halt all processing of
synchronization requests until the flush completes, so we can continue
processing.
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 77 |
1 files changed, 59 insertions, 18 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index d94083b..3863cc4 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -274,12 +274,32 @@ void Synchronizer::flush(int commandId, const QByteArray &flushId) | |||
274 | processSyncQueue().exec(); | 274 | processSyncQueue().exec(); |
275 | } | 275 | } |
276 | 276 | ||
277 | void Synchronizer::flushComplete(const QByteArray &flushId) | ||
278 | { | ||
279 | SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId; | ||
280 | if (mPendingSyncRequests.contains(flushId)) { | ||
281 | const auto requests = mPendingSyncRequests.values(flushId); | ||
282 | for (const auto &r : requests) { | ||
283 | //We want to process the pending request before any others in the queue | ||
284 | mSyncRequestQueue.prepend(r); | ||
285 | } | ||
286 | mPendingSyncRequests.remove(flushId); | ||
287 | processSyncQueue().exec(); | ||
288 | } | ||
289 | } | ||
290 | |||
277 | KAsync::Job<void> Synchronizer::processSyncQueue() | 291 | KAsync::Job<void> Synchronizer::processSyncQueue() |
278 | { | 292 | { |
279 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { | 293 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { |
280 | SinkTrace() << "Sync still in progress or nothing to do."; | 294 | SinkTrace() << "Sync still in progress or nothing to do."; |
281 | return KAsync::null<void>(); | 295 | return KAsync::null<void>(); |
282 | } | 296 | } |
297 | //Don't process any new requests until we're done with the pending ones. | ||
298 | //Otherwise we might process a flush before the previous request actually completed. | ||
299 | if (!mPendingSyncRequests.isEmpty()) { | ||
300 | SinkTrace() << "We still have pending sync requests. Not executing next request."; | ||
301 | return KAsync::null<void>(); | ||
302 | } | ||
283 | 303 | ||
284 | auto job = KAsync::syncStart<void>([this] { | 304 | auto job = KAsync::syncStart<void>([this] { |
285 | mMessageQueue->startTransaction(); | 305 | mMessageQueue->startTransaction(); |
@@ -287,7 +307,26 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
287 | }); | 307 | }); |
288 | while (!mSyncRequestQueue.isEmpty()) { | 308 | while (!mSyncRequestQueue.isEmpty()) { |
289 | const auto request = mSyncRequestQueue.takeFirst(); | 309 | const auto request = mSyncRequestQueue.takeFirst(); |
290 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | 310 | if (request.options & SyncRequest::RequestFlush) { |
311 | job = job.then([=] { | ||
312 | //Trigger a flush and record original request without flush option | ||
313 | auto modifiedRequest = request; | ||
314 | modifiedRequest.options = SyncRequest::NoOptions; | ||
315 | //Normally we won't have a requestId here | ||
316 | if (modifiedRequest.requestId.isEmpty()) { | ||
317 | modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); | ||
318 | } | ||
319 | |||
320 | //The sync request will be executed once the flush has completed | ||
321 | mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); | ||
322 | |||
323 | flatbuffers::FlatBufferBuilder fbb; | ||
324 | auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); | ||
325 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | ||
326 | Sink::Commands::FinishFlushBuffer(fbb, location); | ||
327 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | ||
328 | }); | ||
329 | } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | ||
291 | job = job.then([this, request] { | 330 | job = job.then([this, request] { |
292 | Sink::Notification n; | 331 | Sink::Notification n; |
293 | n.id = request.requestId; | 332 | n.id = request.requestId; |
@@ -295,14 +334,14 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
295 | n.message = "Synchronization has started."; | 334 | n.message = "Synchronization has started."; |
296 | n.code = ApplicationDomain::BusyStatus; | 335 | n.code = ApplicationDomain::BusyStatus; |
297 | emit notify(n); | 336 | emit notify(n); |
298 | SinkLogCtx(mLogCtx) << "Synchronizing " << request.query; | 337 | SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; |
299 | }).then(synchronizeWithSource(request.query)).then([this] { | 338 | }).then(synchronizeWithSource(request.query)).then([this] { |
300 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 339 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
301 | commit(); | 340 | commit(); |
302 | }).then<void>([this, request](const KAsync::Error &error) { | 341 | }).then<void>([this, request](const KAsync::Error &error) { |
303 | if (error) { | 342 | if (error) { |
304 | //Emit notification with error | 343 | //Emit notification with error |
305 | SinkWarning() << "Synchronization failed: " << error.errorMessage; | 344 | SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage; |
306 | Sink::Notification n; | 345 | Sink::Notification n; |
307 | n.id = request.requestId; | 346 | n.id = request.requestId; |
308 | n.type = Notification::Status; | 347 | n.type = Notification::Status; |
@@ -311,7 +350,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
311 | emit notify(n); | 350 | emit notify(n); |
312 | return KAsync::error(error); | 351 | return KAsync::error(error); |
313 | } else { | 352 | } else { |
314 | SinkLog() << "Done Synchronizing"; | 353 | SinkLogCtx(mLogCtx) << "Done Synchronizing"; |
315 | Sink::Notification n; | 354 | Sink::Notification n; |
316 | n.id = request.requestId; | 355 | n.id = request.requestId; |
317 | n.type = Notification::Status; | 356 | n.type = Notification::Status; |
@@ -322,20 +361,22 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
322 | } | 361 | } |
323 | }); | 362 | }); |
324 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { | 363 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { |
325 | Q_ASSERT(!request.requestId.isEmpty()); | 364 | job = job.then([=] { |
326 | if (request.flushType == Flush::FlushReplayQueue) { | 365 | Q_ASSERT(!request.requestId.isEmpty()); |
327 | SinkTrace() << "Emitting flush completion."; | 366 | if (request.flushType == Flush::FlushReplayQueue) { |
328 | Sink::Notification n; | 367 | SinkTrace() << "Emitting flush completion."; |
329 | n.type = Sink::Notification::FlushCompletion; | 368 | Sink::Notification n; |
330 | n.id = request.requestId; | 369 | n.type = Sink::Notification::FlushCompletion; |
331 | emit notify(n); | 370 | n.id = request.requestId; |
332 | } else { | 371 | emit notify(n); |
333 | flatbuffers::FlatBufferBuilder fbb; | 372 | } else { |
334 | auto flushId = fbb.CreateString(request.requestId.toStdString()); | 373 | flatbuffers::FlatBufferBuilder fbb; |
335 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | 374 | auto flushId = fbb.CreateString(request.requestId.toStdString()); |
336 | Sink::Commands::FinishFlushBuffer(fbb, location); | 375 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); |
337 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | 376 | Sink::Commands::FinishFlushBuffer(fbb, location); |
338 | } | 377 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); |
378 | } | ||
379 | }); | ||
339 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { | 380 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { |
340 | job = job.then(replayNextRevision()); | 381 | job = job.then(replayNextRevision()); |
341 | } else { | 382 | } else { |