diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-29 11:27:04 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-29 11:27:04 +0100 |
commit | 6a072b2dcf23cbcdb210f2bd5c273ea0f425b188 (patch) | |
tree | cc2789c59e04018743aa0d575ee51a6f10869ffc /common/synchronizer.cpp | |
parent | 81b459c0f013704e95fb5933525c82a6ca46f13f (diff) | |
download | sink-6a072b2dcf23cbcdb210f2bd5c273ea0f425b188.tar.gz sink-6a072b2dcf23cbcdb210f2bd5c273ea0f425b188.zip |
The synchronization call can be sync.
... because we really just enqueue the request and then wait for the
notification.
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 43 |
1 files changed, 36 insertions, 7 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 11c7caf..6483cdf 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -38,7 +38,8 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context) | |||
38 | : ChangeReplay(context), | 38 | : ChangeReplay(context), |
39 | mResourceContext(context), | 39 | mResourceContext(context), |
40 | mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), | 40 | mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), |
41 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) | 41 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), |
42 | mSyncInProgress(false) | ||
42 | { | 43 | { |
43 | SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); | 44 | SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); |
44 | } | 45 | } |
@@ -254,15 +255,15 @@ void Synchronizer::modify(const DomainType &entity) | |||
254 | QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query) | 255 | QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::QueryBase &query) |
255 | { | 256 | { |
256 | QList<Synchronizer::SyncRequest> list; | 257 | QList<Synchronizer::SyncRequest> list; |
257 | list << Synchronizer::SyncRequest{query}; | 258 | list << Synchronizer::SyncRequest{query, "sync"}; |
258 | return list; | 259 | return list; |
259 | } | 260 | } |
260 | 261 | ||
261 | KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) | 262 | void Synchronizer::synchronize(const Sink::QueryBase &query) |
262 | { | 263 | { |
263 | SinkTrace() << "Synchronizing"; | 264 | SinkTrace() << "Synchronizing"; |
264 | mSyncRequestQueue << getSyncRequests(query); | 265 | mSyncRequestQueue << getSyncRequests(query); |
265 | return processSyncQueue(); | 266 | processSyncQueue().exec(); |
266 | } | 267 | } |
267 | 268 | ||
268 | void Synchronizer::flush(int commandId, const QByteArray &flushId) | 269 | void Synchronizer::flush(int commandId, const QByteArray &flushId) |
@@ -284,20 +285,48 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
284 | while (!mSyncRequestQueue.isEmpty()) { | 285 | while (!mSyncRequestQueue.isEmpty()) { |
285 | auto request = mSyncRequestQueue.takeFirst(); | 286 | auto request = mSyncRequestQueue.takeFirst(); |
286 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | 287 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { |
287 | job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] { | 288 | job = job.syncThen<void>([this, request] { |
289 | Sink::Notification n; | ||
290 | n.id = request.requestId; | ||
291 | n.type = Notification::Status; | ||
292 | n.message = "Synchronization has started."; | ||
293 | n.code = ApplicationDomain::BusyStatus; | ||
294 | emit notify(n); | ||
295 | }).then(synchronizeWithSource(request.query)).syncThen<void>([this] { | ||
288 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 296 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
289 | commit(); | 297 | commit(); |
298 | }).then<void>([this, request](const KAsync::Error &error) { | ||
299 | if (error) { | ||
300 | //Emit notification with error | ||
301 | SinkWarning() << "Synchronization failed: " << error.errorMessage; | ||
302 | Sink::Notification n; | ||
303 | n.id = request.requestId; | ||
304 | n.type = Notification::Status; | ||
305 | n.message = "Synchronization has ended."; | ||
306 | n.code = ApplicationDomain::ErrorStatus; | ||
307 | emit notify(n); | ||
308 | return KAsync::error(error); | ||
309 | } else { | ||
310 | SinkLog() << "Done Synchronizing"; | ||
311 | Sink::Notification n; | ||
312 | n.id = request.requestId; | ||
313 | n.type = Notification::Status; | ||
314 | n.message = "Synchronization has ended."; | ||
315 | n.code = ApplicationDomain::ConnectedStatus; | ||
316 | emit notify(n); | ||
317 | return KAsync::null(); | ||
318 | } | ||
290 | }); | 319 | }); |
291 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { | 320 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { |
292 | if (request.flushType == Flush::FlushReplayQueue) { | 321 | if (request.flushType == Flush::FlushReplayQueue) { |
293 | SinkTrace() << "Emitting flush completion."; | 322 | SinkTrace() << "Emitting flush completion."; |
294 | Sink::Notification n; | 323 | Sink::Notification n; |
295 | n.type = Sink::Notification::FlushCompletion; | 324 | n.type = Sink::Notification::FlushCompletion; |
296 | n.id = request.flushId; | 325 | n.id = request.requestId; |
297 | emit notify(n); | 326 | emit notify(n); |
298 | } else { | 327 | } else { |
299 | flatbuffers::FlatBufferBuilder fbb; | 328 | flatbuffers::FlatBufferBuilder fbb; |
300 | auto flushId = fbb.CreateString(request.flushId); | 329 | auto flushId = fbb.CreateString(request.requestId); |
301 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | 330 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); |
302 | Sink::Commands::FinishFlushBuffer(fbb, location); | 331 | Sink::Commands::FinishFlushBuffer(fbb, location); |
303 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | 332 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); |