summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp77
1 files changed, 59 insertions, 18 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index d94083b..3863cc4 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -274,12 +274,32 @@ void Synchronizer::flush(int commandId, const QByteArray &flushId)
274 processSyncQueue().exec(); 274 processSyncQueue().exec();
275} 275}
276 276
277void Synchronizer::flushComplete(const QByteArray &flushId)
278{
279 SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId;
280 if (mPendingSyncRequests.contains(flushId)) {
281 const auto requests = mPendingSyncRequests.values(flushId);
282 for (const auto &r : requests) {
283 //We want to process the pending request before any others in the queue
284 mSyncRequestQueue.prepend(r);
285 }
286 mPendingSyncRequests.remove(flushId);
287 processSyncQueue().exec();
288 }
289}
290
277KAsync::Job<void> Synchronizer::processSyncQueue() 291KAsync::Job<void> Synchronizer::processSyncQueue()
278{ 292{
279 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { 293 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
280 SinkTrace() << "Sync still in progress or nothing to do."; 294 SinkTrace() << "Sync still in progress or nothing to do.";
281 return KAsync::null<void>(); 295 return KAsync::null<void>();
282 } 296 }
297 //Don't process any new requests until we're done with the pending ones.
298 //Otherwise we might process a flush before the previous request actually completed.
299 if (!mPendingSyncRequests.isEmpty()) {
300 SinkTrace() << "We still have pending sync requests. Not executing next request.";
301 return KAsync::null<void>();
302 }
283 303
284 auto job = KAsync::syncStart<void>([this] { 304 auto job = KAsync::syncStart<void>([this] {
285 mMessageQueue->startTransaction(); 305 mMessageQueue->startTransaction();
@@ -287,7 +307,26 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
287 }); 307 });
288 while (!mSyncRequestQueue.isEmpty()) { 308 while (!mSyncRequestQueue.isEmpty()) {
289 const auto request = mSyncRequestQueue.takeFirst(); 309 const auto request = mSyncRequestQueue.takeFirst();
290 if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 310 if (request.options & SyncRequest::RequestFlush) {
311 job = job.then([=] {
312 //Trigger a flush and record original request without flush option
313 auto modifiedRequest = request;
314 modifiedRequest.options = SyncRequest::NoOptions;
315 //Normally we won't have a requestId here
316 if (modifiedRequest.requestId.isEmpty()) {
317 modifiedRequest.requestId = QUuid::createUuid().toRfc4122();
318 }
319
320 //The sync request will be executed once the flush has completed
321 mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest);
322
323 flatbuffers::FlatBufferBuilder fbb;
324 auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString());
325 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
326 Sink::Commands::FinishFlushBuffer(fbb, location);
327 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
328 });
329 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
291 job = job.then([this, request] { 330 job = job.then([this, request] {
292 Sink::Notification n; 331 Sink::Notification n;
293 n.id = request.requestId; 332 n.id = request.requestId;
@@ -295,14 +334,14 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
295 n.message = "Synchronization has started."; 334 n.message = "Synchronization has started.";
296 n.code = ApplicationDomain::BusyStatus; 335 n.code = ApplicationDomain::BusyStatus;
297 emit notify(n); 336 emit notify(n);
298 SinkLogCtx(mLogCtx) << "Synchronizing " << request.query; 337 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query;
299 }).then(synchronizeWithSource(request.query)).then([this] { 338 }).then(synchronizeWithSource(request.query)).then([this] {
300 //Commit after every request, so implementations only have to commit more if they add a lot of data. 339 //Commit after every request, so implementations only have to commit more if they add a lot of data.
301 commit(); 340 commit();
302 }).then<void>([this, request](const KAsync::Error &error) { 341 }).then<void>([this, request](const KAsync::Error &error) {
303 if (error) { 342 if (error) {
304 //Emit notification with error 343 //Emit notification with error
305 SinkWarning() << "Synchronization failed: " << error.errorMessage; 344 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage;
306 Sink::Notification n; 345 Sink::Notification n;
307 n.id = request.requestId; 346 n.id = request.requestId;
308 n.type = Notification::Status; 347 n.type = Notification::Status;
@@ -311,7 +350,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
311 emit notify(n); 350 emit notify(n);
312 return KAsync::error(error); 351 return KAsync::error(error);
313 } else { 352 } else {
314 SinkLog() << "Done Synchronizing"; 353 SinkLogCtx(mLogCtx) << "Done Synchronizing";
315 Sink::Notification n; 354 Sink::Notification n;
316 n.id = request.requestId; 355 n.id = request.requestId;
317 n.type = Notification::Status; 356 n.type = Notification::Status;
@@ -322,20 +361,22 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
322 } 361 }
323 }); 362 });
324 } else if (request.requestType == Synchronizer::SyncRequest::Flush) { 363 } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
325 Q_ASSERT(!request.requestId.isEmpty()); 364 job = job.then([=] {
326 if (request.flushType == Flush::FlushReplayQueue) { 365 Q_ASSERT(!request.requestId.isEmpty());
327 SinkTrace() << "Emitting flush completion."; 366 if (request.flushType == Flush::FlushReplayQueue) {
328 Sink::Notification n; 367 SinkTrace() << "Emitting flush completion.";
329 n.type = Sink::Notification::FlushCompletion; 368 Sink::Notification n;
330 n.id = request.requestId; 369 n.type = Sink::Notification::FlushCompletion;
331 emit notify(n); 370 n.id = request.requestId;
332 } else { 371 emit notify(n);
333 flatbuffers::FlatBufferBuilder fbb; 372 } else {
334 auto flushId = fbb.CreateString(request.requestId.toStdString()); 373 flatbuffers::FlatBufferBuilder fbb;
335 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); 374 auto flushId = fbb.CreateString(request.requestId.toStdString());
336 Sink::Commands::FinishFlushBuffer(fbb, location); 375 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
337 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); 376 Sink::Commands::FinishFlushBuffer(fbb, location);
338 } 377 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
378 }
379 });
339 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { 380 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
340 job = job.then(replayNextRevision()); 381 job = job.then(replayNextRevision());
341 } else { 382 } else {