summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-01-23 10:48:37 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-01-23 10:48:37 +0100
commit752f0907574debe9d7d139a117b2efac80636e93 (patch)
treea2819bca0b883b46e8d139efcf82784fbc7fa78d
parent4c99d9a644d86410a93b683d1a34ab6d499b99f9 (diff)
downloadsink-752f0907574debe9d7d139a117b2efac80636e93.tar.gz
sink-752f0907574debe9d7d139a117b2efac80636e93.zip
Process sync requests one by one
-rw-r--r--common/synchronizer.cpp172
-rw-r--r--common/synchronizer.h1
2 files changed, 90 insertions, 83 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 3863cc4..57e994e 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -288,6 +288,89 @@ void Synchronizer::flushComplete(const QByteArray &flushId)
288 } 288 }
289} 289}
290 290
291KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
292{
293 if (request.options & SyncRequest::RequestFlush) {
294 return KAsync::syncStart<void>([=] {
295 //Trigger a flush and record original request without flush option
296 auto modifiedRequest = request;
297 modifiedRequest.options = SyncRequest::NoOptions;
298 //Normally we won't have a requestId here
299 if (modifiedRequest.requestId.isEmpty()) {
300 modifiedRequest.requestId = QUuid::createUuid().toRfc4122();
301 }
302 SinkWarning() << "Enquing flush request " << modifiedRequest.requestId;
303
304 //The sync request will be executed once the flush has completed
305 mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest);
306
307 flatbuffers::FlatBufferBuilder fbb;
308 auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString());
309 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
310 Sink::Commands::FinishFlushBuffer(fbb, location);
311 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
312 });
313 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
314 return KAsync::syncStart<void>([this, request] {
315 Sink::Notification n;
316 n.id = request.requestId;
317 n.type = Notification::Status;
318 n.message = "Synchronization has started.";
319 n.code = ApplicationDomain::BusyStatus;
320 emit notify(n);
321 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query;
322 }).then(synchronizeWithSource(request.query)).then([this] {
323 //Commit after every request, so implementations only have to commit more if they add a lot of data.
324 commit();
325 }).then<void>([this, request](const KAsync::Error &error) {
326 if (error) {
327 //Emit notification with error
328 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage;
329 Sink::Notification n;
330 n.id = request.requestId;
331 n.type = Notification::Status;
332 n.message = "Synchronization has ended.";
333 n.code = ApplicationDomain::ErrorStatus;
334 emit notify(n);
335 return KAsync::error(error);
336 } else {
337 SinkLogCtx(mLogCtx) << "Done Synchronizing";
338 Sink::Notification n;
339 n.id = request.requestId;
340 n.type = Notification::Status;
341 n.message = "Synchronization has ended.";
342 n.code = ApplicationDomain::ConnectedStatus;
343 emit notify(n);
344 return KAsync::null();
345 }
346 });
347 } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
348 return KAsync::syncStart<void>([=] {
349 Q_ASSERT(!request.requestId.isEmpty());
350 //FIXME it looks like this is emitted before the replay actually finishes
351 if (request.flushType == Flush::FlushReplayQueue) {
352 SinkTrace() << "Emitting flush completion.";
353 Sink::Notification n;
354 n.type = Sink::Notification::FlushCompletion;
355 n.id = request.requestId;
356 emit notify(n);
357 } else {
358 flatbuffers::FlatBufferBuilder fbb;
359 auto flushId = fbb.CreateString(request.requestId.toStdString());
360 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
361 Sink::Commands::FinishFlushBuffer(fbb, location);
362 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
363 }
364 });
365 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
366 return replayNextRevision();
367 } else {
368 SinkWarning() << "Unknown request type: " << request.requestType;
369 return KAsync::error(KAsync::Error{"Unknown request type."});
370 }
371
372}
373
291KAsync::Job<void> Synchronizer::processSyncQueue() 374KAsync::Job<void> Synchronizer::processSyncQueue()
292{ 375{
293 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { 376 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
@@ -301,90 +384,13 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
301 return KAsync::null<void>(); 384 return KAsync::null<void>();
302 } 385 }
303 386
304 auto job = KAsync::syncStart<void>([this] { 387 const auto request = mSyncRequestQueue.takeFirst();
388 return KAsync::syncStart<void>([this] {
305 mMessageQueue->startTransaction(); 389 mMessageQueue->startTransaction();
306 mSyncInProgress = true; 390 mSyncInProgress = true;
307 }); 391 })
308 while (!mSyncRequestQueue.isEmpty()) { 392 .then(processRequest(request))
309 const auto request = mSyncRequestQueue.takeFirst(); 393 .then<void>([this](const KAsync::Error &error) {
310 if (request.options & SyncRequest::RequestFlush) {
311 job = job.then([=] {
312 //Trigger a flush and record original request without flush option
313 auto modifiedRequest = request;
314 modifiedRequest.options = SyncRequest::NoOptions;
315 //Normally we won't have a requestId here
316 if (modifiedRequest.requestId.isEmpty()) {
317 modifiedRequest.requestId = QUuid::createUuid().toRfc4122();
318 }
319
320 //The sync request will be executed once the flush has completed
321 mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest);
322
323 flatbuffers::FlatBufferBuilder fbb;
324 auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString());
325 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
326 Sink::Commands::FinishFlushBuffer(fbb, location);
327 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
328 });
329 } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
330 job = job.then([this, request] {
331 Sink::Notification n;
332 n.id = request.requestId;
333 n.type = Notification::Status;
334 n.message = "Synchronization has started.";
335 n.code = ApplicationDomain::BusyStatus;
336 emit notify(n);
337 SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query;
338 }).then(synchronizeWithSource(request.query)).then([this] {
339 //Commit after every request, so implementations only have to commit more if they add a lot of data.
340 commit();
341 }).then<void>([this, request](const KAsync::Error &error) {
342 if (error) {
343 //Emit notification with error
344 SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error.errorMessage;
345 Sink::Notification n;
346 n.id = request.requestId;
347 n.type = Notification::Status;
348 n.message = "Synchronization has ended.";
349 n.code = ApplicationDomain::ErrorStatus;
350 emit notify(n);
351 return KAsync::error(error);
352 } else {
353 SinkLogCtx(mLogCtx) << "Done Synchronizing";
354 Sink::Notification n;
355 n.id = request.requestId;
356 n.type = Notification::Status;
357 n.message = "Synchronization has ended.";
358 n.code = ApplicationDomain::ConnectedStatus;
359 emit notify(n);
360 return KAsync::null();
361 }
362 });
363 } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
364 job = job.then([=] {
365 Q_ASSERT(!request.requestId.isEmpty());
366 if (request.flushType == Flush::FlushReplayQueue) {
367 SinkTrace() << "Emitting flush completion.";
368 Sink::Notification n;
369 n.type = Sink::Notification::FlushCompletion;
370 n.id = request.requestId;
371 emit notify(n);
372 } else {
373 flatbuffers::FlatBufferBuilder fbb;
374 auto flushId = fbb.CreateString(request.requestId.toStdString());
375 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
376 Sink::Commands::FinishFlushBuffer(fbb, location);
377 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
378 }
379 });
380 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
381 job = job.then(replayNextRevision());
382 } else {
383 SinkWarning() << "Unknown request type: " << request.requestType;
384 return KAsync::error(KAsync::Error{"Unknown request type."});
385 }
386 }
387 return job.then<void>([this](const KAsync::Error &error) {
388 SinkTrace() << "Sync request processed"; 394 SinkTrace() << "Sync request processed";
389 mSyncTransaction.abort(); 395 mSyncTransaction.abort();
390 mMessageQueue->commit(); 396 mMessageQueue->commit();
@@ -504,7 +510,7 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
504 } else if (operation == Sink::Operation_Modification) { 510 } else if (operation == Sink::Operation_Modification) {
505 SinkTrace() << "Replayed modification with remote id: " << remoteId; 511 SinkTrace() << "Replayed modification with remote id: " << remoteId;
506 if (remoteId.isEmpty()) { 512 if (remoteId.isEmpty()) {
507 SinkWarning() << "Returned an empty remoteId from the creation"; 513 SinkWarning() << "Returned an empty remoteId from the modification";
508 } else { 514 } else {
509 syncStore().updateRemoteId(type, uid, remoteId); 515 syncStore().updateRemoteId(type, uid, remoteId);
510 } 516 }
diff --git a/common/synchronizer.h b/common/synchronizer.h
index be90293..73c91a3 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -178,6 +178,7 @@ protected:
178 Sink::Log::Context mLogCtx; 178 Sink::Log::Context mLogCtx;
179private: 179private:
180 void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); 180 void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
181 KAsync::Job<void> processRequest(const SyncRequest &request);
181 KAsync::Job<void> processSyncQueue(); 182 KAsync::Job<void> processSyncQueue();
182 183
183 Sink::ResourceContext mResourceContext; 184 Sink::ResourceContext mResourceContext;