diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-01-23 11:03:03 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-01-23 11:03:03 +0100 |
commit | 6003ae63e45485d1f1c76ea378900bc5242465cd (patch) | |
tree | b03327d733714a0dab7272c0deb9a44075ac29e4 /common/synchronizer.cpp | |
parent | 752f0907574debe9d7d139a117b2efac80636e93 (diff) | |
download | sink-6003ae63e45485d1f1c76ea378900bc5242465cd.tar.gz sink-6003ae63e45485d1f1c76ea378900bc5242465cd.zip |
Debug output
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 76 |
1 files changed, 37 insertions, 39 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 57e994e..f731b31 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -30,8 +30,6 @@ | |||
30 | #include "flush_generated.h" | 30 | #include "flush_generated.h" |
31 | #include "notification_generated.h" | 31 | #include "notification_generated.h" |
32 | 32 | ||
33 | SINK_DEBUG_AREA("synchronizer") | ||
34 | |||
35 | using namespace Sink; | 33 | using namespace Sink; |
36 | 34 | ||
37 | Synchronizer::Synchronizer(const Sink::ResourceContext &context) | 35 | Synchronizer::Synchronizer(const Sink::ResourceContext &context) |
@@ -42,7 +40,7 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context) | |||
42 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), | 40 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), |
43 | mSyncInProgress(false) | 41 | mSyncInProgress(false) |
44 | { | 42 | { |
45 | SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); | 43 | SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); |
46 | } | 44 | } |
47 | 45 | ||
48 | Synchronizer::~Synchronizer() | 46 | Synchronizer::~Synchronizer() |
@@ -128,11 +126,11 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func | |||
128 | { | 126 | { |
129 | entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) { | 127 | entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) { |
130 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); | 128 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); |
131 | SinkTrace() << "Checking for removal " << sinkId << remoteId; | 129 | SinkTraceCtx(mLogCtx) << "Checking for removal " << sinkId << remoteId; |
132 | // If we have no remoteId, the entity hasn't been replayed to the source yet | 130 | // If we have no remoteId, the entity hasn't been replayed to the source yet |
133 | if (!remoteId.isEmpty()) { | 131 | if (!remoteId.isEmpty()) { |
134 | if (!exists(remoteId)) { | 132 | if (!exists(remoteId)) { |
135 | SinkTrace() << "Found a removed entity: " << sinkId; | 133 | SinkTraceCtx(mLogCtx) << "Found a removed entity: " << sinkId; |
136 | deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); | 134 | deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); |
137 | } | 135 | } |
138 | } | 136 | } |
@@ -157,12 +155,12 @@ void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray | |||
157 | bool changed = false; | 155 | bool changed = false; |
158 | for (const auto &property : entity.changedProperties()) { | 156 | for (const auto &property : entity.changedProperties()) { |
159 | if (entity.getProperty(property) != current.getProperty(property)) { | 157 | if (entity.getProperty(property) != current.getProperty(property)) { |
160 | SinkTrace() << "Property changed " << sinkId << property; | 158 | SinkTraceCtx(mLogCtx) << "Property changed " << sinkId << property; |
161 | changed = true; | 159 | changed = true; |
162 | } | 160 | } |
163 | } | 161 | } |
164 | if (changed) { | 162 | if (changed) { |
165 | SinkTrace() << "Found a modified entity: " << sinkId; | 163 | SinkTraceCtx(mLogCtx) << "Found a modified entity: " << sinkId; |
166 | modifyEntity(sinkId, store.maxRevision(), bufferType, entity); | 164 | modifyEntity(sinkId, store.maxRevision(), bufferType, entity); |
167 | } | 165 | } |
168 | }); | 166 | }); |
@@ -177,12 +175,12 @@ void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remote | |||
177 | 175 | ||
178 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | 176 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) |
179 | { | 177 | { |
180 | SinkTrace() << "Create or modify" << bufferType << remoteId; | 178 | SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; |
181 | Storage::EntityStore store(mResourceContext, mLogCtx); | 179 | Storage::EntityStore store(mResourceContext, mLogCtx); |
182 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | 180 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); |
183 | const auto found = store.contains(bufferType, sinkId); | 181 | const auto found = store.contains(bufferType, sinkId); |
184 | if (!found) { | 182 | if (!found) { |
185 | SinkTrace() << "Found a new entity: " << remoteId; | 183 | SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; |
186 | createEntity(sinkId, bufferType, entity); | 184 | createEntity(sinkId, bufferType, entity); |
187 | } else { // modification | 185 | } else { // modification |
188 | modify(bufferType, remoteId, entity); | 186 | modify(bufferType, remoteId, entity); |
@@ -193,7 +191,7 @@ template<typename DomainType> | |||
193 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria) | 191 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria) |
194 | { | 192 | { |
195 | 193 | ||
196 | SinkTrace() << "Create or modify" << bufferType << remoteId; | 194 | SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; |
197 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | 195 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); |
198 | Storage::EntityStore store(mResourceContext, mLogCtx); | 196 | Storage::EntityStore store(mResourceContext, mLogCtx); |
199 | const auto found = store.contains(bufferType, sinkId); | 197 | const auto found = store.contains(bufferType, sinkId); |
@@ -209,16 +207,16 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray | |||
209 | auto resultSet = dataStoreQuery.execute(); | 207 | auto resultSet = dataStoreQuery.execute(); |
210 | resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) { | 208 | resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) { |
211 | merge = true; | 209 | merge = true; |
212 | SinkTrace() << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId; | 210 | SinkTraceCtx(mLogCtx) << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId; |
213 | syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId); | 211 | syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId); |
214 | }); | 212 | }); |
215 | 213 | ||
216 | if (!merge) { | 214 | if (!merge) { |
217 | SinkTrace() << "Found a new entity: " << remoteId; | 215 | SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; |
218 | createEntity(sinkId, bufferType, entity); | 216 | createEntity(sinkId, bufferType, entity); |
219 | } | 217 | } |
220 | } else { | 218 | } else { |
221 | SinkTrace() << "Found a new entity: " << remoteId; | 219 | SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; |
222 | createEntity(sinkId, bufferType, entity); | 220 | createEntity(sinkId, bufferType, entity); |
223 | } | 221 | } |
224 | } else { // modification | 222 | } else { // modification |
@@ -240,7 +238,7 @@ QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter) | |||
240 | result << r.entity.identifier(); | 238 | result << r.entity.identifier(); |
241 | }); | 239 | }); |
242 | } else { | 240 | } else { |
243 | SinkWarning() << "unknown filter type: " << filter; | 241 | SinkWarningCtx(mLogCtx) << "unknown filter type: " << filter; |
244 | Q_ASSERT(false); | 242 | Q_ASSERT(false); |
245 | } | 243 | } |
246 | return result; | 244 | return result; |
@@ -261,7 +259,7 @@ QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::Query | |||
261 | 259 | ||
262 | void Synchronizer::synchronize(const Sink::QueryBase &query) | 260 | void Synchronizer::synchronize(const Sink::QueryBase &query) |
263 | { | 261 | { |
264 | SinkTrace() << "Synchronizing"; | 262 | SinkTraceCtx(mLogCtx) << "Synchronizing"; |
265 | mSyncRequestQueue << getSyncRequests(query); | 263 | mSyncRequestQueue << getSyncRequests(query); |
266 | processSyncQueue().exec(); | 264 | processSyncQueue().exec(); |
267 | } | 265 | } |
@@ -269,7 +267,7 @@ void Synchronizer::synchronize(const Sink::QueryBase &query) | |||
269 | void Synchronizer::flush(int commandId, const QByteArray &flushId) | 267 | void Synchronizer::flush(int commandId, const QByteArray &flushId) |
270 | { | 268 | { |
271 | Q_ASSERT(!flushId.isEmpty()); | 269 | Q_ASSERT(!flushId.isEmpty()); |
272 | SinkTrace() << "Flushing the synchronization queue " << flushId; | 270 | SinkTraceCtx(mLogCtx) << "Flushing the synchronization queue " << flushId; |
273 | mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; | 271 | mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; |
274 | processSyncQueue().exec(); | 272 | processSyncQueue().exec(); |
275 | } | 273 | } |
@@ -299,7 +297,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
299 | if (modifiedRequest.requestId.isEmpty()) { | 297 | if (modifiedRequest.requestId.isEmpty()) { |
300 | modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); | 298 | modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); |
301 | } | 299 | } |
302 | SinkWarning() << "Enquing flush request " << modifiedRequest.requestId; | 300 | SinkWarningCtx(mLogCtx) << "Enquing flush request " << modifiedRequest.requestId; |
303 | 301 | ||
304 | //The sync request will be executed once the flush has completed | 302 | //The sync request will be executed once the flush has completed |
305 | mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); | 303 | mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); |
@@ -349,7 +347,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
349 | Q_ASSERT(!request.requestId.isEmpty()); | 347 | Q_ASSERT(!request.requestId.isEmpty()); |
350 | //FIXME it looks like this is emitted before the replay actually finishes | 348 | //FIXME it looks like this is emitted before the replay actually finishes |
351 | if (request.flushType == Flush::FlushReplayQueue) { | 349 | if (request.flushType == Flush::FlushReplayQueue) { |
352 | SinkTrace() << "Emitting flush completion."; | 350 | SinkTraceCtx(mLogCtx) << "Emitting flush completion."; |
353 | Sink::Notification n; | 351 | Sink::Notification n; |
354 | n.type = Sink::Notification::FlushCompletion; | 352 | n.type = Sink::Notification::FlushCompletion; |
355 | n.id = request.requestId; | 353 | n.id = request.requestId; |
@@ -365,7 +363,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
365 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { | 363 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { |
366 | return replayNextRevision(); | 364 | return replayNextRevision(); |
367 | } else { | 365 | } else { |
368 | SinkWarning() << "Unknown request type: " << request.requestType; | 366 | SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType; |
369 | return KAsync::error(KAsync::Error{"Unknown request type."}); | 367 | return KAsync::error(KAsync::Error{"Unknown request type."}); |
370 | } | 368 | } |
371 | 369 | ||
@@ -374,13 +372,13 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
374 | KAsync::Job<void> Synchronizer::processSyncQueue() | 372 | KAsync::Job<void> Synchronizer::processSyncQueue() |
375 | { | 373 | { |
376 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { | 374 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { |
377 | SinkTrace() << "Sync still in progress or nothing to do."; | 375 | SinkTraceCtx(mLogCtx) << "Sync still in progress or nothing to do."; |
378 | return KAsync::null<void>(); | 376 | return KAsync::null<void>(); |
379 | } | 377 | } |
380 | //Don't process any new requests until we're done with the pending ones. | 378 | //Don't process any new requests until we're done with the pending ones. |
381 | //Otherwise we might process a flush before the previous request actually completed. | 379 | //Otherwise we might process a flush before the previous request actually completed. |
382 | if (!mPendingSyncRequests.isEmpty()) { | 380 | if (!mPendingSyncRequests.isEmpty()) { |
383 | SinkTrace() << "We still have pending sync requests. Not executing next request."; | 381 | SinkTraceCtx(mLogCtx) << "We still have pending sync requests. Not executing next request."; |
384 | return KAsync::null<void>(); | 382 | return KAsync::null<void>(); |
385 | } | 383 | } |
386 | 384 | ||
@@ -391,7 +389,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
391 | }) | 389 | }) |
392 | .then(processRequest(request)) | 390 | .then(processRequest(request)) |
393 | .then<void>([this](const KAsync::Error &error) { | 391 | .then<void>([this](const KAsync::Error &error) { |
394 | SinkTrace() << "Sync request processed"; | 392 | SinkTraceCtx(mLogCtx) << "Sync request processed"; |
395 | mSyncTransaction.abort(); | 393 | mSyncTransaction.abort(); |
396 | mMessageQueue->commit(); | 394 | mMessageQueue->commit(); |
397 | mSyncStore.clear(); | 395 | mSyncStore.clear(); |
@@ -400,7 +398,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
400 | emit changesReplayed(); | 398 | emit changesReplayed(); |
401 | } | 399 | } |
402 | if (error) { | 400 | if (error) { |
403 | SinkWarning() << "Error during sync: " << error.errorMessage; | 401 | SinkWarningCtx(mLogCtx) << "Error during sync: " << error.errorMessage; |
404 | return KAsync::error(error); | 402 | return KAsync::error(error); |
405 | } | 403 | } |
406 | //In case we got more requests meanwhile. | 404 | //In case we got more requests meanwhile. |
@@ -422,7 +420,7 @@ void Synchronizer::commit() | |||
422 | Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() | 420 | Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() |
423 | { | 421 | { |
424 | if (!mSyncTransaction) { | 422 | if (!mSyncTransaction) { |
425 | SinkTrace() << "Starting transaction on sync store."; | 423 | SinkTraceCtx(mLogCtx) << "Starting transaction on sync store."; |
426 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); | 424 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); |
427 | } | 425 | } |
428 | return mSyncTransaction; | 426 | return mSyncTransaction; |
@@ -447,28 +445,28 @@ bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, cons | |||
447 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | 445 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); |
448 | Q_ASSERT(metadataBuffer); | 446 | Q_ASSERT(metadataBuffer); |
449 | if (!metadataBuffer->replayToSource()) { | 447 | if (!metadataBuffer->replayToSource()) { |
450 | SinkTrace() << "Change is coming from the source"; | 448 | SinkTraceCtx(mLogCtx) << "Change is coming from the source"; |
451 | } | 449 | } |
452 | return metadataBuffer->replayToSource(); | 450 | return metadataBuffer->replayToSource(); |
453 | } | 451 | } |
454 | 452 | ||
455 | KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | 453 | KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) |
456 | { | 454 | { |
457 | SinkTrace() << "Replaying" << type << key; | 455 | SinkTraceCtx(mLogCtx) << "Replaying" << type << key; |
458 | 456 | ||
459 | Sink::EntityBuffer buffer(value); | 457 | Sink::EntityBuffer buffer(value); |
460 | const Sink::Entity &entity = buffer.entity(); | 458 | const Sink::Entity &entity = buffer.entity(); |
461 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | 459 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); |
462 | if (!metadataBuffer) { | 460 | if (!metadataBuffer) { |
463 | SinkError() << "No metadata buffer available."; | 461 | SinkErrorCtx(mLogCtx) << "No metadata buffer available."; |
464 | return KAsync::error("No metadata buffer"); | 462 | return KAsync::error("No metadata buffer"); |
465 | } | 463 | } |
466 | if (mSyncTransaction) { | 464 | if (mSyncTransaction) { |
467 | SinkError() << "Leftover sync transaction."; | 465 | SinkErrorCtx(mLogCtx) << "Leftover sync transaction."; |
468 | mSyncTransaction.abort(); | 466 | mSyncTransaction.abort(); |
469 | } | 467 | } |
470 | if (mSyncStore) { | 468 | if (mSyncStore) { |
471 | SinkError() << "Leftover sync store."; | 469 | SinkErrorCtx(mLogCtx) << "Leftover sync store."; |
472 | mSyncStore.clear(); | 470 | mSyncStore.clear(); |
473 | } | 471 | } |
474 | Q_ASSERT(metadataBuffer); | 472 | Q_ASSERT(metadataBuffer); |
@@ -485,7 +483,7 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray | |||
485 | oldRemoteId = syncStore().resolveLocalId(type, uid); | 483 | oldRemoteId = syncStore().resolveLocalId(type, uid); |
486 | //oldRemoteId can be empty if the resource implementation didn't return a remoteid | 484 | //oldRemoteId can be empty if the resource implementation didn't return a remoteid |
487 | } | 485 | } |
488 | SinkTrace() << "Replaying " << key << type << uid << oldRemoteId; | 486 | SinkTraceCtx(mLogCtx) << "Replaying " << key << type << uid << oldRemoteId; |
489 | 487 | ||
490 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | 488 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); |
491 | //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally? | 489 | //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally? |
@@ -496,34 +494,34 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray | |||
496 | auto mail = store().readEntity<ApplicationDomain::Mail>(key); | 494 | auto mail = store().readEntity<ApplicationDomain::Mail>(key); |
497 | job = replay(mail, operation, oldRemoteId, modifiedProperties); | 495 | job = replay(mail, operation, oldRemoteId, modifiedProperties); |
498 | } else { | 496 | } else { |
499 | SinkError() << "Replayed unknown type: " << type; | 497 | SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type; |
500 | } | 498 | } |
501 | 499 | ||
502 | return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { | 500 | return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { |
503 | if (operation == Sink::Operation_Creation) { | 501 | if (operation == Sink::Operation_Creation) { |
504 | SinkTrace() << "Replayed creation with remote id: " << remoteId; | 502 | SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId; |
505 | if (remoteId.isEmpty()) { | 503 | if (remoteId.isEmpty()) { |
506 | SinkWarning() << "Returned an empty remoteId from the creation"; | 504 | SinkWarningCtx(mLogCtx) << "Returned an empty remoteId from the creation"; |
507 | } else { | 505 | } else { |
508 | syncStore().recordRemoteId(type, uid, remoteId); | 506 | syncStore().recordRemoteId(type, uid, remoteId); |
509 | } | 507 | } |
510 | } else if (operation == Sink::Operation_Modification) { | 508 | } else if (operation == Sink::Operation_Modification) { |
511 | SinkTrace() << "Replayed modification with remote id: " << remoteId; | 509 | SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId; |
512 | if (remoteId.isEmpty()) { | 510 | if (remoteId.isEmpty()) { |
513 | SinkWarning() << "Returned an empty remoteId from the modification"; | 511 | SinkWarningCtx(mLogCtx) << "Returned an empty remoteId from the modification"; |
514 | } else { | 512 | } else { |
515 | syncStore().updateRemoteId(type, uid, remoteId); | 513 | syncStore().updateRemoteId(type, uid, remoteId); |
516 | } | 514 | } |
517 | } else if (operation == Sink::Operation_Removal) { | 515 | } else if (operation == Sink::Operation_Removal) { |
518 | SinkTrace() << "Replayed removal with remote id: " << oldRemoteId; | 516 | SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId; |
519 | syncStore().removeRemoteId(type, uid, oldRemoteId); | 517 | syncStore().removeRemoteId(type, uid, oldRemoteId); |
520 | } else { | 518 | } else { |
521 | SinkError() << "Unkown operation" << operation; | 519 | SinkErrorCtx(mLogCtx) << "Unkown operation" << operation; |
522 | } | 520 | } |
523 | }) | 521 | }) |
524 | .then([this](const KAsync::Error &error) { | 522 | .then([this](const KAsync::Error &error) { |
525 | if (error) { | 523 | if (error) { |
526 | SinkWarning() << "Failed to replay change: " << error.errorMessage; | 524 | SinkWarningCtx(mLogCtx) << "Failed to replay change: " << error.errorMessage; |
527 | } | 525 | } |
528 | mSyncStore.clear(); | 526 | mSyncStore.clear(); |
529 | mSyncTransaction.commit(); | 527 | mSyncTransaction.commit(); |
@@ -544,7 +542,7 @@ KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Folder &, | |||
544 | bool Synchronizer::allChangesReplayed() | 542 | bool Synchronizer::allChangesReplayed() |
545 | { | 543 | { |
546 | if (!mSyncRequestQueue.isEmpty()) { | 544 | if (!mSyncRequestQueue.isEmpty()) { |
547 | SinkTrace() << "Queue is not empty"; | 545 | SinkTraceCtx(mLogCtx) << "Queue is not empty"; |
548 | return false; | 546 | return false; |
549 | } | 547 | } |
550 | return ChangeReplay::allChangesReplayed(); | 548 | return ChangeReplay::allChangesReplayed(); |