summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-01-22 18:25:31 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-01-22 18:25:31 +0100
commite513ee41adb6061aa72de8bfe49d117f47c1545b (patch)
treed91fd8abb5d2f47ec24dff497bcf18e218f01641 /common/synchronizer.cpp
parentec16f3b92d6017462f4bba8354b53d1707850430 (diff)
downloadsink-e513ee41adb6061aa72de8bfe49d117f47c1545b.tar.gz
sink-e513ee41adb6061aa72de8bfe49d117f47c1545b.zip
Support dependencies between sync requests.
If one sync task depends on the previous sync task we want to flush in between, so we can query for the results of the previous sync request locally. If we detect such a dependency we temporarily halt all processing of synchronization requests until the flush completes, so we can continue processing.
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp77
1 files changed, 59 insertions, 18 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index d94083b..3863cc4 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -274,12 +274,32 @@ void Synchronizer::flush(int commandId, const QByteArray &flushId)
274 processSyncQueue().exec(); 274 processSyncQueue().exec();
275} 275}
276 276
277void Synchronizer::flushComplete(const QByteArray &flushId)
278{
279 SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId;
280 if (mPendingSyncRequests.contains(flushId)) {
281 const auto requests = mPendingSyncRequests.values(flushId);
282 for (const auto &r : requests) {
283 //We want to process the pending request before any others in the queue
284 mSyncRequestQueue.prepend(r);
285 }
286 mPendingSyncRequests.remove(flushId);
287 processSyncQueue().exec();
288 }
289}
290
277KAsync::Job<void> Synchronizer::processSyncQueue() 291KAsync::Job<void> Synchronizer::processSyncQueue()
278{ 292{
279 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { 293 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
280 SinkTrace() << "Sync still in progress or nothing to do."; 294 SinkTrace() << "Sync still in progress or nothing to do.";
281 return KAsync::null<void>(); 295 return KAsync::null<void>();
282 } 296 }
297 //Don't process any new requests until we're done with the pending ones.
298 //Otherwise we might process a flush before the previous request actually completed.
299 if (!mPendingSyncRequests.isEmpty()) {
300 SinkTrace() << "We still have pending sync requests. Not executing next request.";
301 return KAsync::null<void>();
302 }
283 303
284 auto job = KAsync::syncStart<void>([this] { 304 auto job = KAsync::syncStart<void>([this] {
285 mMessageQueue->startTransaction(); 305 mMessageQueue->startTransaction();
@@ -287,7 +307,26 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
287 }); 307 });
288 while (!mSyncRequestQueue.isEmpty()) { 308 while (!mSyncRequestQueue.isEmpty()) {
289 const auto request = mSyncRequestQueue.takeFirst(); 309 const auto request = mSyncRequestQueue.takeFirst();
290 if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 310 if (request.options & SyncRequest::RequestFlush) {
311 job = job.then([=] {
312 //Trigger a flush and record original request without flush option
313 auto modifiedRequest = request;
314 modifiedRequest.options = SyncRequest::NoOptions;
315 //Normally we won't have a requestId here
316 if (modifiedRequest.requestId.isEmpty()) {
317 modifiedRequest.requestId = QUuid::createUuid().toRfc4122();
318 }
319
320 //The sync request will be executed once the flush has completed
321 mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest);
322
323 flatbuffers::FlatBufferBuilder fbb;
324 auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString());
325 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
326 Sink::Commands::FinishFlushBuffer(fbb, location);
327 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
328 });
329 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
291 job = job.then([this, request] { 330 job = job.then([this, request] {
292 Sink::Notification n; 331 Sink::Notification n;
293 n.id = request.requestId; 332 n.id = request.requestId;
@@ -295,14 +334,14 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
295 n.message = "Synchronization has started."; 334 n.message = "Synchronization has started.";
296 n.code = ApplicationDomain::BusyStatus; 335 n.code = ApplicationDomain::BusyStatus;
297 emit notify(n); 336 emit notify(n);
298 SinkLogCtx(mLogCtx) << "Synchronizing " << request.query; 337 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query;
299 }).then(synchronizeWithSource(request.query)).then([this] { 338 }).then(synchronizeWithSource(request.query)).then([this] {
300 //Commit after every request, so implementations only have to commit more if they add a lot of data. 339 //Commit after every request, so implementations only have to commit more if they add a lot of data.
301 commit(); 340 commit();
302 }).then<void>([this, request](const KAsync::Error &error) { 341 }).then<void>([this, request](const KAsync::Error &error) {
303 if (error) { 342 if (error) {
304 //Emit notification with error 343 //Emit notification with error
305 SinkWarning() << "Synchronization failed: " << error.errorMessage; 344 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage;
306 Sink::Notification n; 345 Sink::Notification n;
307 n.id = request.requestId; 346 n.id = request.requestId;
308 n.type = Notification::Status; 347 n.type = Notification::Status;
@@ -311,7 +350,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
311 emit notify(n); 350 emit notify(n);
312 return KAsync::error(error); 351 return KAsync::error(error);
313 } else { 352 } else {
314 SinkLog() << "Done Synchronizing"; 353 SinkLogCtx(mLogCtx) << "Done Synchronizing";
315 Sink::Notification n; 354 Sink::Notification n;
316 n.id = request.requestId; 355 n.id = request.requestId;
317 n.type = Notification::Status; 356 n.type = Notification::Status;
@@ -322,20 +361,22 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
322 } 361 }
323 }); 362 });
324 } else if (request.requestType == Synchronizer::SyncRequest::Flush) { 363 } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
325 Q_ASSERT(!request.requestId.isEmpty()); 364 job = job.then([=] {
326 if (request.flushType == Flush::FlushReplayQueue) { 365 Q_ASSERT(!request.requestId.isEmpty());
327 SinkTrace() << "Emitting flush completion."; 366 if (request.flushType == Flush::FlushReplayQueue) {
328 Sink::Notification n; 367 SinkTrace() << "Emitting flush completion.";
329 n.type = Sink::Notification::FlushCompletion; 368 Sink::Notification n;
330 n.id = request.requestId; 369 n.type = Sink::Notification::FlushCompletion;
331 emit notify(n); 370 n.id = request.requestId;
332 } else { 371 emit notify(n);
333 flatbuffers::FlatBufferBuilder fbb; 372 } else {
334 auto flushId = fbb.CreateString(request.requestId.toStdString()); 373 flatbuffers::FlatBufferBuilder fbb;
335 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); 374 auto flushId = fbb.CreateString(request.requestId.toStdString());
336 Sink::Commands::FinishFlushBuffer(fbb, location); 375 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
337 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); 376 Sink::Commands::FinishFlushBuffer(fbb, location);
338 } 377 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
378 }
379 });
339 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { 380 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
340 job = job.then(replayNextRevision()); 381 job = job.then(replayNextRevision());
341 } else { 382 } else {