summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-01-23 11:03:03 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-01-23 11:03:03 +0100
commit6003ae63e45485d1f1c76ea378900bc5242465cd (patch)
treeb03327d733714a0dab7272c0deb9a44075ac29e4 /common/synchronizer.cpp
parent752f0907574debe9d7d139a117b2efac80636e93 (diff)
downloadsink-6003ae63e45485d1f1c76ea378900bc5242465cd.tar.gz
sink-6003ae63e45485d1f1c76ea378900bc5242465cd.zip
Debug output
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp76
1 files changed, 37 insertions, 39 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 57e994e..f731b31 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -30,8 +30,6 @@
30#include "flush_generated.h" 30#include "flush_generated.h"
31#include "notification_generated.h" 31#include "notification_generated.h"
32 32
33SINK_DEBUG_AREA("synchronizer")
34
35using namespace Sink; 33using namespace Sink;
36 34
37Synchronizer::Synchronizer(const Sink::ResourceContext &context) 35Synchronizer::Synchronizer(const Sink::ResourceContext &context)
@@ -42,7 +40,7 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context)
42 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), 40 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite),
43 mSyncInProgress(false) 41 mSyncInProgress(false)
44{ 42{
45 SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); 43 SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
46} 44}
47 45
48Synchronizer::~Synchronizer() 46Synchronizer::~Synchronizer()
@@ -128,11 +126,11 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func
128{ 126{
129 entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) { 127 entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) {
130 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); 128 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
131 SinkTrace() << "Checking for removal " << sinkId << remoteId; 129 SinkTraceCtx(mLogCtx) << "Checking for removal " << sinkId << remoteId;
132 // If we have no remoteId, the entity hasn't been replayed to the source yet 130 // If we have no remoteId, the entity hasn't been replayed to the source yet
133 if (!remoteId.isEmpty()) { 131 if (!remoteId.isEmpty()) {
134 if (!exists(remoteId)) { 132 if (!exists(remoteId)) {
135 SinkTrace() << "Found a removed entity: " << sinkId; 133 SinkTraceCtx(mLogCtx) << "Found a removed entity: " << sinkId;
136 deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); 134 deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType);
137 } 135 }
138 } 136 }
@@ -157,12 +155,12 @@ void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray
157 bool changed = false; 155 bool changed = false;
158 for (const auto &property : entity.changedProperties()) { 156 for (const auto &property : entity.changedProperties()) {
159 if (entity.getProperty(property) != current.getProperty(property)) { 157 if (entity.getProperty(property) != current.getProperty(property)) {
160 SinkTrace() << "Property changed " << sinkId << property; 158 SinkTraceCtx(mLogCtx) << "Property changed " << sinkId << property;
161 changed = true; 159 changed = true;
162 } 160 }
163 } 161 }
164 if (changed) { 162 if (changed) {
165 SinkTrace() << "Found a modified entity: " << sinkId; 163 SinkTraceCtx(mLogCtx) << "Found a modified entity: " << sinkId;
166 modifyEntity(sinkId, store.maxRevision(), bufferType, entity); 164 modifyEntity(sinkId, store.maxRevision(), bufferType, entity);
167 } 165 }
168 }); 166 });
@@ -177,12 +175,12 @@ void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remote
177 175
178void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 176void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
179{ 177{
180 SinkTrace() << "Create or modify" << bufferType << remoteId; 178 SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId;
181 Storage::EntityStore store(mResourceContext, mLogCtx); 179 Storage::EntityStore store(mResourceContext, mLogCtx);
182 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 180 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
183 const auto found = store.contains(bufferType, sinkId); 181 const auto found = store.contains(bufferType, sinkId);
184 if (!found) { 182 if (!found) {
185 SinkTrace() << "Found a new entity: " << remoteId; 183 SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId;
186 createEntity(sinkId, bufferType, entity); 184 createEntity(sinkId, bufferType, entity);
187 } else { // modification 185 } else { // modification
188 modify(bufferType, remoteId, entity); 186 modify(bufferType, remoteId, entity);
@@ -193,7 +191,7 @@ template<typename DomainType>
193void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria) 191void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria)
194{ 192{
195 193
196 SinkTrace() << "Create or modify" << bufferType << remoteId; 194 SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId;
197 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 195 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
198 Storage::EntityStore store(mResourceContext, mLogCtx); 196 Storage::EntityStore store(mResourceContext, mLogCtx);
199 const auto found = store.contains(bufferType, sinkId); 197 const auto found = store.contains(bufferType, sinkId);
@@ -209,16 +207,16 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
209 auto resultSet = dataStoreQuery.execute(); 207 auto resultSet = dataStoreQuery.execute();
210 resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) { 208 resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) {
211 merge = true; 209 merge = true;
212 SinkTrace() << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId; 210 SinkTraceCtx(mLogCtx) << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId;
213 syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId); 211 syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId);
214 }); 212 });
215 213
216 if (!merge) { 214 if (!merge) {
217 SinkTrace() << "Found a new entity: " << remoteId; 215 SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId;
218 createEntity(sinkId, bufferType, entity); 216 createEntity(sinkId, bufferType, entity);
219 } 217 }
220 } else { 218 } else {
221 SinkTrace() << "Found a new entity: " << remoteId; 219 SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId;
222 createEntity(sinkId, bufferType, entity); 220 createEntity(sinkId, bufferType, entity);
223 } 221 }
224 } else { // modification 222 } else { // modification
@@ -240,7 +238,7 @@ QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter)
240 result << r.entity.identifier(); 238 result << r.entity.identifier();
241 }); 239 });
242 } else { 240 } else {
243 SinkWarning() << "unknown filter type: " << filter; 241 SinkWarningCtx(mLogCtx) << "unknown filter type: " << filter;
244 Q_ASSERT(false); 242 Q_ASSERT(false);
245 } 243 }
246 return result; 244 return result;
@@ -261,7 +259,7 @@ QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::Query
261 259
262void Synchronizer::synchronize(const Sink::QueryBase &query) 260void Synchronizer::synchronize(const Sink::QueryBase &query)
263{ 261{
264 SinkTrace() << "Synchronizing"; 262 SinkTraceCtx(mLogCtx) << "Synchronizing";
265 mSyncRequestQueue << getSyncRequests(query); 263 mSyncRequestQueue << getSyncRequests(query);
266 processSyncQueue().exec(); 264 processSyncQueue().exec();
267} 265}
@@ -269,7 +267,7 @@ void Synchronizer::synchronize(const Sink::QueryBase &query)
269void Synchronizer::flush(int commandId, const QByteArray &flushId) 267void Synchronizer::flush(int commandId, const QByteArray &flushId)
270{ 268{
271 Q_ASSERT(!flushId.isEmpty()); 269 Q_ASSERT(!flushId.isEmpty());
272 SinkTrace() << "Flushing the synchronization queue " << flushId; 270 SinkTraceCtx(mLogCtx) << "Flushing the synchronization queue " << flushId;
273 mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; 271 mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId};
274 processSyncQueue().exec(); 272 processSyncQueue().exec();
275} 273}
@@ -299,7 +297,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
299 if (modifiedRequest.requestId.isEmpty()) { 297 if (modifiedRequest.requestId.isEmpty()) {
300 modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); 298 modifiedRequest.requestId = QUuid::createUuid().toRfc4122();
301 } 299 }
302 SinkWarning() << "Enquing flush request " << modifiedRequest.requestId; 300 SinkWarningCtx(mLogCtx) << "Enquing flush request " << modifiedRequest.requestId;
303 301
304 //The sync request will be executed once the flush has completed 302 //The sync request will be executed once the flush has completed
305 mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); 303 mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest);
@@ -349,7 +347,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
349 Q_ASSERT(!request.requestId.isEmpty()); 347 Q_ASSERT(!request.requestId.isEmpty());
350 //FIXME it looks like this is emitted before the replay actually finishes 348 //FIXME it looks like this is emitted before the replay actually finishes
351 if (request.flushType == Flush::FlushReplayQueue) { 349 if (request.flushType == Flush::FlushReplayQueue) {
352 SinkTrace() << "Emitting flush completion."; 350 SinkTraceCtx(mLogCtx) << "Emitting flush completion.";
353 Sink::Notification n; 351 Sink::Notification n;
354 n.type = Sink::Notification::FlushCompletion; 352 n.type = Sink::Notification::FlushCompletion;
355 n.id = request.requestId; 353 n.id = request.requestId;
@@ -365,7 +363,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
365 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { 363 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
366 return replayNextRevision(); 364 return replayNextRevision();
367 } else { 365 } else {
368 SinkWarning() << "Unknown request type: " << request.requestType; 366 SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType;
369 return KAsync::error(KAsync::Error{"Unknown request type."}); 367 return KAsync::error(KAsync::Error{"Unknown request type."});
370 } 368 }
371 369
@@ -374,13 +372,13 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
374KAsync::Job<void> Synchronizer::processSyncQueue() 372KAsync::Job<void> Synchronizer::processSyncQueue()
375{ 373{
376 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { 374 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
377 SinkTrace() << "Sync still in progress or nothing to do."; 375 SinkTraceCtx(mLogCtx) << "Sync still in progress or nothing to do.";
378 return KAsync::null<void>(); 376 return KAsync::null<void>();
379 } 377 }
380 //Don't process any new requests until we're done with the pending ones. 378 //Don't process any new requests until we're done with the pending ones.
381 //Otherwise we might process a flush before the previous request actually completed. 379 //Otherwise we might process a flush before the previous request actually completed.
382 if (!mPendingSyncRequests.isEmpty()) { 380 if (!mPendingSyncRequests.isEmpty()) {
383 SinkTrace() << "We still have pending sync requests. Not executing next request."; 381 SinkTraceCtx(mLogCtx) << "We still have pending sync requests. Not executing next request.";
384 return KAsync::null<void>(); 382 return KAsync::null<void>();
385 } 383 }
386 384
@@ -391,7 +389,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
391 }) 389 })
392 .then(processRequest(request)) 390 .then(processRequest(request))
393 .then<void>([this](const KAsync::Error &error) { 391 .then<void>([this](const KAsync::Error &error) {
394 SinkTrace() << "Sync request processed"; 392 SinkTraceCtx(mLogCtx) << "Sync request processed";
395 mSyncTransaction.abort(); 393 mSyncTransaction.abort();
396 mMessageQueue->commit(); 394 mMessageQueue->commit();
397 mSyncStore.clear(); 395 mSyncStore.clear();
@@ -400,7 +398,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
400 emit changesReplayed(); 398 emit changesReplayed();
401 } 399 }
402 if (error) { 400 if (error) {
403 SinkWarning() << "Error during sync: " << error.errorMessage; 401 SinkWarningCtx(mLogCtx) << "Error during sync: " << error.errorMessage;
404 return KAsync::error(error); 402 return KAsync::error(error);
405 } 403 }
406 //In case we got more requests meanwhile. 404 //In case we got more requests meanwhile.
@@ -422,7 +420,7 @@ void Synchronizer::commit()
422Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() 420Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction()
423{ 421{
424 if (!mSyncTransaction) { 422 if (!mSyncTransaction) {
425 SinkTrace() << "Starting transaction on sync store."; 423 SinkTraceCtx(mLogCtx) << "Starting transaction on sync store.";
426 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); 424 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite);
427 } 425 }
428 return mSyncTransaction; 426 return mSyncTransaction;
@@ -447,28 +445,28 @@ bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, cons
447 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 445 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
448 Q_ASSERT(metadataBuffer); 446 Q_ASSERT(metadataBuffer);
449 if (!metadataBuffer->replayToSource()) { 447 if (!metadataBuffer->replayToSource()) {
450 SinkTrace() << "Change is coming from the source"; 448 SinkTraceCtx(mLogCtx) << "Change is coming from the source";
451 } 449 }
452 return metadataBuffer->replayToSource(); 450 return metadataBuffer->replayToSource();
453} 451}
454 452
455KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) 453KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
456{ 454{
457 SinkTrace() << "Replaying" << type << key; 455 SinkTraceCtx(mLogCtx) << "Replaying" << type << key;
458 456
459 Sink::EntityBuffer buffer(value); 457 Sink::EntityBuffer buffer(value);
460 const Sink::Entity &entity = buffer.entity(); 458 const Sink::Entity &entity = buffer.entity();
461 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 459 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
462 if (!metadataBuffer) { 460 if (!metadataBuffer) {
463 SinkError() << "No metadata buffer available."; 461 SinkErrorCtx(mLogCtx) << "No metadata buffer available.";
464 return KAsync::error("No metadata buffer"); 462 return KAsync::error("No metadata buffer");
465 } 463 }
466 if (mSyncTransaction) { 464 if (mSyncTransaction) {
467 SinkError() << "Leftover sync transaction."; 465 SinkErrorCtx(mLogCtx) << "Leftover sync transaction.";
468 mSyncTransaction.abort(); 466 mSyncTransaction.abort();
469 } 467 }
470 if (mSyncStore) { 468 if (mSyncStore) {
471 SinkError() << "Leftover sync store."; 469 SinkErrorCtx(mLogCtx) << "Leftover sync store.";
472 mSyncStore.clear(); 470 mSyncStore.clear();
473 } 471 }
474 Q_ASSERT(metadataBuffer); 472 Q_ASSERT(metadataBuffer);
@@ -485,7 +483,7 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
485 oldRemoteId = syncStore().resolveLocalId(type, uid); 483 oldRemoteId = syncStore().resolveLocalId(type, uid);
486 //oldRemoteId can be empty if the resource implementation didn't return a remoteid 484 //oldRemoteId can be empty if the resource implementation didn't return a remoteid
487 } 485 }
488 SinkTrace() << "Replaying " << key << type << uid << oldRemoteId; 486 SinkTraceCtx(mLogCtx) << "Replaying " << key << type << uid << oldRemoteId;
489 487
490 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); 488 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
491 //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally? 489 //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally?
@@ -496,34 +494,34 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
496 auto mail = store().readEntity<ApplicationDomain::Mail>(key); 494 auto mail = store().readEntity<ApplicationDomain::Mail>(key);
497 job = replay(mail, operation, oldRemoteId, modifiedProperties); 495 job = replay(mail, operation, oldRemoteId, modifiedProperties);
498 } else { 496 } else {
499 SinkError() << "Replayed unknown type: " << type; 497 SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type;
500 } 498 }
501 499
502 return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { 500 return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) {
503 if (operation == Sink::Operation_Creation) { 501 if (operation == Sink::Operation_Creation) {
504 SinkTrace() << "Replayed creation with remote id: " << remoteId; 502 SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId;
505 if (remoteId.isEmpty()) { 503 if (remoteId.isEmpty()) {
506 SinkWarning() << "Returned an empty remoteId from the creation"; 504 SinkWarningCtx(mLogCtx) << "Returned an empty remoteId from the creation";
507 } else { 505 } else {
508 syncStore().recordRemoteId(type, uid, remoteId); 506 syncStore().recordRemoteId(type, uid, remoteId);
509 } 507 }
510 } else if (operation == Sink::Operation_Modification) { 508 } else if (operation == Sink::Operation_Modification) {
511 SinkTrace() << "Replayed modification with remote id: " << remoteId; 509 SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId;
512 if (remoteId.isEmpty()) { 510 if (remoteId.isEmpty()) {
513 SinkWarning() << "Returned an empty remoteId from the modification"; 511 SinkWarningCtx(mLogCtx) << "Returned an empty remoteId from the modification";
514 } else { 512 } else {
515 syncStore().updateRemoteId(type, uid, remoteId); 513 syncStore().updateRemoteId(type, uid, remoteId);
516 } 514 }
517 } else if (operation == Sink::Operation_Removal) { 515 } else if (operation == Sink::Operation_Removal) {
518 SinkTrace() << "Replayed removal with remote id: " << oldRemoteId; 516 SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId;
519 syncStore().removeRemoteId(type, uid, oldRemoteId); 517 syncStore().removeRemoteId(type, uid, oldRemoteId);
520 } else { 518 } else {
521 SinkError() << "Unkown operation" << operation; 519 SinkErrorCtx(mLogCtx) << "Unkown operation" << operation;
522 } 520 }
523 }) 521 })
524 .then([this](const KAsync::Error &error) { 522 .then([this](const KAsync::Error &error) {
525 if (error) { 523 if (error) {
526 SinkWarning() << "Failed to replay change: " << error.errorMessage; 524 SinkWarningCtx(mLogCtx) << "Failed to replay change: " << error.errorMessage;
527 } 525 }
528 mSyncStore.clear(); 526 mSyncStore.clear();
529 mSyncTransaction.commit(); 527 mSyncTransaction.commit();
@@ -544,7 +542,7 @@ KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Folder &,
544bool Synchronizer::allChangesReplayed() 542bool Synchronizer::allChangesReplayed()
545{ 543{
546 if (!mSyncRequestQueue.isEmpty()) { 544 if (!mSyncRequestQueue.isEmpty()) {
547 SinkTrace() << "Queue is not empty"; 545 SinkTraceCtx(mLogCtx) << "Queue is not empty";
548 return false; 546 return false;
549 } 547 }
550 return ChangeReplay::allChangesReplayed(); 548 return ChangeReplay::allChangesReplayed();