summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-29 11:27:04 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-29 11:27:04 +0100
commit6a072b2dcf23cbcdb210f2bd5c273ea0f425b188 (patch)
treecc2789c59e04018743aa0d575ee51a6f10869ffc /common/synchronizer.cpp
parent81b459c0f013704e95fb5933525c82a6ca46f13f (diff)
downloadsink-6a072b2dcf23cbcdb210f2bd5c273ea0f425b188.tar.gz
sink-6a072b2dcf23cbcdb210f2bd5c273ea0f425b188.zip
The synchronization call can be sync.
... because we really just enqueue the request and then wait for the notification.
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp43
1 files changed, 36 insertions, 7 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 11c7caf..6483cdf 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -38,7 +38,8 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context)
38 : ChangeReplay(context), 38 : ChangeReplay(context),
39 mResourceContext(context), 39 mResourceContext(context),
40 mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), 40 mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)),
41 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) 41 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite),
42 mSyncInProgress(false)
42{ 43{
43 SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); 44 SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
44} 45}
@@ -254,15 +255,15 @@ void Synchronizer::modify(const DomainType &entity)
254QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query) 255QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query)
255{ 256{
256 QList<Synchronizer::SyncRequest> list; 257 QList<Synchronizer::SyncRequest> list;
257 list << Synchronizer::SyncRequest{query}; 258 list << Synchronizer::SyncRequest{query, "sync"};
258 return list; 259 return list;
259} 260}
260 261
261KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) 262void Synchronizer::synchronize(const Sink::QueryBase &query)
262{ 263{
263 SinkTrace() << "Synchronizing"; 264 SinkTrace() << "Synchronizing";
264 mSyncRequestQueue << getSyncRequests(query); 265 mSyncRequestQueue << getSyncRequests(query);
265 return processSyncQueue(); 266 processSyncQueue().exec();
266} 267}
267 268
268void Synchronizer::flush(int commandId, const QByteArray &flushId) 269void Synchronizer::flush(int commandId, const QByteArray &flushId)
@@ -284,20 +285,48 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
284 while (!mSyncRequestQueue.isEmpty()) { 285 while (!mSyncRequestQueue.isEmpty()) {
285 auto request = mSyncRequestQueue.takeFirst(); 286 auto request = mSyncRequestQueue.takeFirst();
286 if (request.requestType == Synchronizer::SyncRequest::Synchronization) { 287 if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
287 job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] { 288 job = job.syncThen<void>([this, request] {
289 Sink::Notification n;
290 n.id = request.requestId;
291 n.type = Notification::Status;
292 n.message = "Synchronization has started.";
293 n.code = ApplicationDomain::BusyStatus;
294 emit notify(n);
295 }).then(synchronizeWithSource(request.query)).syncThen<void>([this] {
288 //Commit after every request, so implementations only have to commit more if they add a lot of data. 296 //Commit after every request, so implementations only have to commit more if they add a lot of data.
289 commit(); 297 commit();
298 }).then<void>([this, request](const KAsync::Error &error) {
299 if (error) {
300 //Emit notification with error
301 SinkWarning() << "Synchronization failed: " << error.errorMessage;
302 Sink::Notification n;
303 n.id = request.requestId;
304 n.type = Notification::Status;
305 n.message = "Synchronization has ended.";
306 n.code = ApplicationDomain::ErrorStatus;
307 emit notify(n);
308 return KAsync::error(error);
309 } else {
310 SinkLog() << "Done Synchronizing";
311 Sink::Notification n;
312 n.id = request.requestId;
313 n.type = Notification::Status;
314 n.message = "Synchronization has ended.";
315 n.code = ApplicationDomain::ConnectedStatus;
316 emit notify(n);
317 return KAsync::null();
318 }
290 }); 319 });
291 } else if (request.requestType == Synchronizer::SyncRequest::Flush) { 320 } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
292 if (request.flushType == Flush::FlushReplayQueue) { 321 if (request.flushType == Flush::FlushReplayQueue) {
293 SinkTrace() << "Emitting flush completion."; 322 SinkTrace() << "Emitting flush completion.";
294 Sink::Notification n; 323 Sink::Notification n;
295 n.type = Sink::Notification::FlushCompletion; 324 n.type = Sink::Notification::FlushCompletion;
296 n.id = request.flushId; 325 n.id = request.requestId;
297 emit notify(n); 326 emit notify(n);
298 } else { 327 } else {
299 flatbuffers::FlatBufferBuilder fbb; 328 flatbuffers::FlatBufferBuilder fbb;
300 auto flushId = fbb.CreateString(request.flushId); 329 auto flushId = fbb.CreateString(request.requestId);
301 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); 330 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
302 Sink::Commands::FinishFlushBuffer(fbb, location); 331 Sink::Commands::FinishFlushBuffer(fbb, location);
303 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); 332 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));