diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-01-23 11:03:03 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-01-23 11:03:03 +0100 |
commit | 6003ae63e45485d1f1c76ea378900bc5242465cd (patch) | |
tree | b03327d733714a0dab7272c0deb9a44075ac29e4 | |
parent | 752f0907574debe9d7d139a117b2efac80636e93 (diff) | |
download | sink-6003ae63e45485d1f1c76ea378900bc5242465cd.tar.gz sink-6003ae63e45485d1f1c76ea378900bc5242465cd.zip |
Debug output
-rw-r--r-- | common/changereplay.cpp | 2 | ||||
-rw-r--r-- | common/storage/entitystore.cpp | 50 | ||||
-rw-r--r-- | common/synchronizer.cpp | 76 |
3 files changed, 62 insertions, 66 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 8ed0532..532cca8 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -30,7 +30,7 @@ using namespace Sink; | |||
30 | using namespace Sink::Storage; | 30 | using namespace Sink::Storage; |
31 | 31 | ||
32 | ChangeReplay::ChangeReplay(const ResourceContext &resourceContext) | 32 | ChangeReplay::ChangeReplay(const ResourceContext &resourceContext) |
33 | : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{resourceContext.instanceId() + ".changereplay"} | 33 | : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{"changereplay"} |
34 | { | 34 | { |
35 | SinkTraceCtx(mLogCtx) << "Created change replay: " << resourceContext.instanceId(); | 35 | SinkTraceCtx(mLogCtx) << "Created change replay: " << resourceContext.instanceId(); |
36 | } | 36 | } |
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 96c8ccd..6c341e2 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp | |||
@@ -37,8 +37,6 @@ | |||
37 | using namespace Sink; | 37 | using namespace Sink; |
38 | using namespace Sink::Storage; | 38 | using namespace Sink::Storage; |
39 | 39 | ||
40 | SINK_DEBUG_AREA("entitystore"); | ||
41 | |||
42 | class EntityStore::Private { | 40 | class EntityStore::Private { |
43 | public: | 41 | public: |
44 | Private(const ResourceContext &context, const Sink::Log::Context &ctx) : resourceContext(context), logCtx(ctx.subContext("entitystore")) {} | 42 | Private(const ResourceContext &context, const Sink::Log::Context &ctx) : resourceContext(context), logCtx(ctx.subContext("entitystore")) {} |
@@ -112,7 +110,7 @@ EntityStore::EntityStore(const ResourceContext &context, const Log::Context &ctx | |||
112 | 110 | ||
113 | void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMode) | 111 | void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMode) |
114 | { | 112 | { |
115 | SinkTraceCtx(d->logCtx) << "Starting transaction"; | 113 | SinkTraceCtx(d->logCtx) << "Starting transaction: " << accessMode; |
116 | Sink::Storage::DataStore store(Sink::storageLocation(), d->resourceContext.instanceId(), accessMode); | 114 | Sink::Storage::DataStore store(Sink::storageLocation(), d->resourceContext.instanceId(), accessMode); |
117 | d->transaction = store.createTransaction(accessMode); | 115 | d->transaction = store.createTransaction(accessMode); |
118 | Q_ASSERT(d->transaction.validateNamedDatabases()); | 116 | Q_ASSERT(d->transaction.validateNamedDatabases()); |
@@ -136,7 +134,7 @@ void EntityStore::copyBlobs(ApplicationDomain::ApplicationDomainType &entity, qi | |||
136 | { | 134 | { |
137 | const auto directory = d->entityBlobStoragePath(entity.identifier()); | 135 | const auto directory = d->entityBlobStoragePath(entity.identifier()); |
138 | if (!QDir().mkpath(directory)) { | 136 | if (!QDir().mkpath(directory)) { |
139 | SinkWarning() << "Failed to create the directory: " << directory; | 137 | SinkWarningCtx(d->logCtx) << "Failed to create the directory: " << directory; |
140 | } | 138 | } |
141 | 139 | ||
142 | for (const auto &property : entity.changedProperties()) { | 140 | for (const auto &property : entity.changedProperties()) { |
@@ -151,10 +149,10 @@ void EntityStore::copyBlobs(ApplicationDomain::ApplicationDomainType &entity, qi | |||
151 | QFile::remove(filePath); | 149 | QFile::remove(filePath); |
152 | QFile origFile(oldPath); | 150 | QFile origFile(oldPath); |
153 | if (!origFile.open(QIODevice::ReadWrite)) { | 151 | if (!origFile.open(QIODevice::ReadWrite)) { |
154 | SinkWarning() << "Failed to open the original file with write rights: " << origFile.errorString(); | 152 | SinkWarningCtx(d->logCtx) << "Failed to open the original file with write rights: " << origFile.errorString(); |
155 | } | 153 | } |
156 | if (!origFile.rename(filePath)) { | 154 | if (!origFile.rename(filePath)) { |
157 | SinkWarning() << "Failed to move the file from: " << oldPath << " to " << filePath << ". " << origFile.errorString(); | 155 | SinkWarningCtx(d->logCtx) << "Failed to move the file from: " << oldPath << " to " << filePath << ". " << origFile.errorString(); |
158 | } | 156 | } |
159 | origFile.close(); | 157 | origFile.close(); |
160 | entity.setProperty(property, QVariant::fromValue(ApplicationDomain::BLOB{filePath})); | 158 | entity.setProperty(property, QVariant::fromValue(ApplicationDomain::BLOB{filePath})); |
@@ -166,7 +164,7 @@ void EntityStore::copyBlobs(ApplicationDomain::ApplicationDomainType &entity, qi | |||
166 | bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) | 164 | bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) |
167 | { | 165 | { |
168 | if (entity_.identifier().isEmpty()) { | 166 | if (entity_.identifier().isEmpty()) { |
169 | SinkWarning() << "Can't write entity with an empty identifier"; | 167 | SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier"; |
170 | return false; | 168 | return false; |
171 | } | 169 | } |
172 | 170 | ||
@@ -197,7 +195,7 @@ bool EntityStore::add(const QByteArray &type, const ApplicationDomain::Applicati | |||
197 | 195 | ||
198 | DataStore::mainDatabase(d->transaction, type) | 196 | DataStore::mainDatabase(d->transaction, type) |
199 | .write(DataStore::assembleKey(entity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), | 197 | .write(DataStore::assembleKey(entity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), |
200 | [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << entity.identifier() << newRevision; }); | 198 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); |
201 | DataStore::setMaxRevision(d->transaction, newRevision); | 199 | DataStore::setMaxRevision(d->transaction, newRevision); |
202 | DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); | 200 | DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); |
203 | SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision; | 201 | SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision; |
@@ -209,7 +207,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic | |||
209 | auto changeset = diff.changedProperties(); | 207 | auto changeset = diff.changedProperties(); |
210 | const auto current = readLatest(type, diff.identifier()); | 208 | const auto current = readLatest(type, diff.identifier()); |
211 | if (current.identifier().isEmpty()) { | 209 | if (current.identifier().isEmpty()) { |
212 | SinkWarning() << "Failed to read current version: " << diff.identifier(); | 210 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); |
213 | return false; | 211 | return false; |
214 | } | 212 | } |
215 | 213 | ||
@@ -263,7 +261,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic | |||
263 | 261 | ||
264 | DataStore::mainDatabase(d->transaction, type) | 262 | DataStore::mainDatabase(d->transaction, type) |
265 | .write(DataStore::assembleKey(newEntity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), | 263 | .write(DataStore::assembleKey(newEntity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), |
266 | [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << newEntity.identifier() << newRevision; }); | 264 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; }); |
267 | DataStore::setMaxRevision(d->transaction, newRevision); | 265 | DataStore::setMaxRevision(d->transaction, newRevision); |
268 | DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); | 266 | DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); |
269 | SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; | 267 | SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; |
@@ -287,14 +285,14 @@ bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool rep | |||
287 | } | 285 | } |
288 | return false; | 286 | return false; |
289 | }, | 287 | }, |
290 | [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); | 288 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); |
291 | 289 | ||
292 | if (!found) { | 290 | if (!found) { |
293 | SinkWarning() << "Remove: Failed to find entity " << uid; | 291 | SinkWarningCtx(d->logCtx) << "Remove: Failed to find entity " << uid; |
294 | return false; | 292 | return false; |
295 | } | 293 | } |
296 | if (alreadyRemoved) { | 294 | if (alreadyRemoved) { |
297 | SinkWarning() << "Remove: Entity is already removed " << uid; | 295 | SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid; |
298 | return false; | 296 | return false; |
299 | } | 297 | } |
300 | 298 | ||
@@ -320,7 +318,7 @@ bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool rep | |||
320 | 318 | ||
321 | DataStore::mainDatabase(d->transaction, type) | 319 | DataStore::mainDatabase(d->transaction, type) |
322 | .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | 320 | .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), |
323 | [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); | 321 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); |
324 | DataStore::setMaxRevision(d->transaction, newRevision); | 322 | DataStore::setMaxRevision(d->transaction, newRevision); |
325 | DataStore::recordRevision(d->transaction, newRevision, uid, type); | 323 | DataStore::recordRevision(d->transaction, newRevision, uid, type); |
326 | return true; | 324 | return true; |
@@ -336,7 +334,7 @@ void EntityStore::cleanupEntityRevisionsUntil(qint64 revision) | |||
336 | [&](const QByteArray &key, const QByteArray &data) -> bool { | 334 | [&](const QByteArray &key, const QByteArray &data) -> bool { |
337 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 335 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
338 | if (!buffer.isValid()) { | 336 | if (!buffer.isValid()) { |
339 | SinkWarning() << "Read invalid buffer from disk"; | 337 | SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk"; |
340 | } else { | 338 | } else { |
341 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); | 339 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); |
342 | const qint64 rev = metadata->revision(); | 340 | const qint64 rev = metadata->revision(); |
@@ -350,7 +348,7 @@ void EntityStore::cleanupEntityRevisionsUntil(qint64 revision) | |||
350 | const auto directory = d->entityBlobStoragePath(uid); | 348 | const auto directory = d->entityBlobStoragePath(uid); |
351 | QDir dir(directory); | 349 | QDir dir(directory); |
352 | if (!dir.removeRecursively()) { | 350 | if (!dir.removeRecursively()) { |
353 | SinkError() << "Failed to cleanup: " << directory; | 351 | SinkErrorCtx(d->logCtx) << "Failed to cleanup: " << directory; |
354 | } | 352 | } |
355 | } | 353 | } |
356 | //Don't cleanup more than specified | 354 | //Don't cleanup more than specified |
@@ -361,7 +359,7 @@ void EntityStore::cleanupEntityRevisionsUntil(qint64 revision) | |||
361 | 359 | ||
362 | return true; | 360 | return true; |
363 | }, | 361 | }, |
364 | [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); | 362 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true); |
365 | DataStore::setCleanedUpRevision(d->transaction, revision); | 363 | DataStore::setCleanedUpRevision(d->transaction, revision); |
366 | } | 364 | } |
367 | 365 | ||
@@ -407,7 +405,7 @@ QVector<QByteArray> EntityStore::fullScan(const QByteArray &type) | |||
407 | keys << uid; | 405 | keys << uid; |
408 | return true; | 406 | return true; |
409 | }, | 407 | }, |
410 | [](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message; }); | 408 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during query: " << error.message; }); |
411 | 409 | ||
412 | SinkTraceCtx(d->logCtx) << "Full scan retrieved " << keys.size() << " results."; | 410 | SinkTraceCtx(d->logCtx) << "Full scan retrieved " << keys.size() << " results."; |
413 | return keys.toList().toVector(); | 411 | return keys.toList().toVector(); |
@@ -446,7 +444,7 @@ void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property | |||
446 | /* callback(sinkId); */ | 444 | /* callback(sinkId); */ |
447 | /* }, */ | 445 | /* }, */ |
448 | /* [&](const Index::Error &error) { */ | 446 | /* [&](const Index::Error &error) { */ |
449 | /* SinkWarning() << "Error in index: " << error.message << property; */ | 447 | /* SinkWarningCtx(d->logCtx) << "Error in index: " << error.message << property; */ |
450 | /* }); */ | 448 | /* }); */ |
451 | } | 449 | } |
452 | 450 | ||
@@ -458,7 +456,7 @@ void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, cons | |||
458 | callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); | 456 | callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); |
459 | return false; | 457 | return false; |
460 | }, | 458 | }, |
461 | [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message << uid; }); | 459 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during query: " << error.message << uid; }); |
462 | } | 460 | } |
463 | 461 | ||
464 | void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback) | 462 | void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback) |
@@ -494,7 +492,7 @@ void EntityStore::readEntity(const QByteArray &type, const QByteArray &key, cons | |||
494 | callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); | 492 | callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); |
495 | return false; | 493 | return false; |
496 | }, | 494 | }, |
497 | [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); | 495 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during query: " << error.message << key; }); |
498 | } | 496 | } |
499 | 497 | ||
500 | void EntityStore::readEntity(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback) | 498 | void EntityStore::readEntity(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback) |
@@ -524,7 +522,7 @@ void EntityStore::readAll(const QByteArray &type, const std::function<void(const | |||
524 | callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer)); | 522 | callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer)); |
525 | return true; | 523 | return true; |
526 | }, | 524 | }, |
527 | [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message; }); | 525 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during query: " << error.message; }); |
528 | } | 526 | } |
529 | 527 | ||
530 | void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function<void(const QByteArray &key)> &callback) | 528 | void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function<void(const QByteArray &key)> &callback) |
@@ -561,7 +559,7 @@ void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qi | |||
561 | } | 559 | } |
562 | return true; | 560 | return true; |
563 | }, | 561 | }, |
564 | [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); | 562 | [&](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }, true); |
565 | return readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback); | 563 | return readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback); |
566 | } | 564 | } |
567 | 565 | ||
@@ -591,7 +589,7 @@ void EntityStore::readAllUids(const QByteArray &type, const std::function<void(c | |||
591 | callback(Sink::Storage::DataStore::uidFromKey(key)); | 589 | callback(Sink::Storage::DataStore::uidFromKey(key)); |
592 | return true; | 590 | return true; |
593 | }, | 591 | }, |
594 | [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); | 592 | [&](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }); |
595 | } | 593 | } |
596 | 594 | ||
597 | bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) | 595 | bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) |
@@ -615,10 +613,10 @@ qint64 EntityStore::maxRevision() | |||
615 | /* Sink::Storage::DataStore storage(Sink::storageLocation(), mResourceInstanceIdentifier); */ | 613 | /* Sink::Storage::DataStore storage(Sink::storageLocation(), mResourceInstanceIdentifier); */ |
616 | /* if (!storage.exists()) { */ | 614 | /* if (!storage.exists()) { */ |
617 | /* //This is not an error if the resource wasn't started before */ | 615 | /* //This is not an error if the resource wasn't started before */ |
618 | /* SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier; */ | 616 | /* SinkLogCtx(d->logCtx) << "Store doesn't exist: " << mResourceInstanceIdentifier; */ |
619 | /* return Sink::Storage::DataStore::Transaction(); */ | 617 | /* return Sink::Storage::DataStore::Transaction(); */ |
620 | /* } */ | 618 | /* } */ |
621 | /* storage.setDefaultErrorHandler([this](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; }); */ | 619 | /* storage.setDefaultErrorHandler([this](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during query: " << error.store << error.message; }); */ |
622 | /* transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); */ | 620 | /* transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); */ |
623 | /* } */ | 621 | /* } */ |
624 | 622 | ||
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 57e994e..f731b31 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -30,8 +30,6 @@ | |||
30 | #include "flush_generated.h" | 30 | #include "flush_generated.h" |
31 | #include "notification_generated.h" | 31 | #include "notification_generated.h" |
32 | 32 | ||
33 | SINK_DEBUG_AREA("synchronizer") | ||
34 | |||
35 | using namespace Sink; | 33 | using namespace Sink; |
36 | 34 | ||
37 | Synchronizer::Synchronizer(const Sink::ResourceContext &context) | 35 | Synchronizer::Synchronizer(const Sink::ResourceContext &context) |
@@ -42,7 +40,7 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context) | |||
42 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), | 40 | mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), |
43 | mSyncInProgress(false) | 41 | mSyncInProgress(false) |
44 | { | 42 | { |
45 | SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); | 43 | SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); |
46 | } | 44 | } |
47 | 45 | ||
48 | Synchronizer::~Synchronizer() | 46 | Synchronizer::~Synchronizer() |
@@ -128,11 +126,11 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func | |||
128 | { | 126 | { |
129 | entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) { | 127 | entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) { |
130 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); | 128 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); |
131 | SinkTrace() << "Checking for removal " << sinkId << remoteId; | 129 | SinkTraceCtx(mLogCtx) << "Checking for removal " << sinkId << remoteId; |
132 | // If we have no remoteId, the entity hasn't been replayed to the source yet | 130 | // If we have no remoteId, the entity hasn't been replayed to the source yet |
133 | if (!remoteId.isEmpty()) { | 131 | if (!remoteId.isEmpty()) { |
134 | if (!exists(remoteId)) { | 132 | if (!exists(remoteId)) { |
135 | SinkTrace() << "Found a removed entity: " << sinkId; | 133 | SinkTraceCtx(mLogCtx) << "Found a removed entity: " << sinkId; |
136 | deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); | 134 | deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); |
137 | } | 135 | } |
138 | } | 136 | } |
@@ -157,12 +155,12 @@ void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray | |||
157 | bool changed = false; | 155 | bool changed = false; |
158 | for (const auto &property : entity.changedProperties()) { | 156 | for (const auto &property : entity.changedProperties()) { |
159 | if (entity.getProperty(property) != current.getProperty(property)) { | 157 | if (entity.getProperty(property) != current.getProperty(property)) { |
160 | SinkTrace() << "Property changed " << sinkId << property; | 158 | SinkTraceCtx(mLogCtx) << "Property changed " << sinkId << property; |
161 | changed = true; | 159 | changed = true; |
162 | } | 160 | } |
163 | } | 161 | } |
164 | if (changed) { | 162 | if (changed) { |
165 | SinkTrace() << "Found a modified entity: " << sinkId; | 163 | SinkTraceCtx(mLogCtx) << "Found a modified entity: " << sinkId; |
166 | modifyEntity(sinkId, store.maxRevision(), bufferType, entity); | 164 | modifyEntity(sinkId, store.maxRevision(), bufferType, entity); |
167 | } | 165 | } |
168 | }); | 166 | }); |
@@ -177,12 +175,12 @@ void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remote | |||
177 | 175 | ||
178 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | 176 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) |
179 | { | 177 | { |
180 | SinkTrace() << "Create or modify" << bufferType << remoteId; | 178 | SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; |
181 | Storage::EntityStore store(mResourceContext, mLogCtx); | 179 | Storage::EntityStore store(mResourceContext, mLogCtx); |
182 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | 180 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); |
183 | const auto found = store.contains(bufferType, sinkId); | 181 | const auto found = store.contains(bufferType, sinkId); |
184 | if (!found) { | 182 | if (!found) { |
185 | SinkTrace() << "Found a new entity: " << remoteId; | 183 | SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; |
186 | createEntity(sinkId, bufferType, entity); | 184 | createEntity(sinkId, bufferType, entity); |
187 | } else { // modification | 185 | } else { // modification |
188 | modify(bufferType, remoteId, entity); | 186 | modify(bufferType, remoteId, entity); |
@@ -193,7 +191,7 @@ template<typename DomainType> | |||
193 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria) | 191 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria) |
194 | { | 192 | { |
195 | 193 | ||
196 | SinkTrace() << "Create or modify" << bufferType << remoteId; | 194 | SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; |
197 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | 195 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); |
198 | Storage::EntityStore store(mResourceContext, mLogCtx); | 196 | Storage::EntityStore store(mResourceContext, mLogCtx); |
199 | const auto found = store.contains(bufferType, sinkId); | 197 | const auto found = store.contains(bufferType, sinkId); |
@@ -209,16 +207,16 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray | |||
209 | auto resultSet = dataStoreQuery.execute(); | 207 | auto resultSet = dataStoreQuery.execute(); |
210 | resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) { | 208 | resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) { |
211 | merge = true; | 209 | merge = true; |
212 | SinkTrace() << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId; | 210 | SinkTraceCtx(mLogCtx) << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId; |
213 | syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId); | 211 | syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId); |
214 | }); | 212 | }); |
215 | 213 | ||
216 | if (!merge) { | 214 | if (!merge) { |
217 | SinkTrace() << "Found a new entity: " << remoteId; | 215 | SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; |
218 | createEntity(sinkId, bufferType, entity); | 216 | createEntity(sinkId, bufferType, entity); |
219 | } | 217 | } |
220 | } else { | 218 | } else { |
221 | SinkTrace() << "Found a new entity: " << remoteId; | 219 | SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; |
222 | createEntity(sinkId, bufferType, entity); | 220 | createEntity(sinkId, bufferType, entity); |
223 | } | 221 | } |
224 | } else { // modification | 222 | } else { // modification |
@@ -240,7 +238,7 @@ QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter) | |||
240 | result << r.entity.identifier(); | 238 | result << r.entity.identifier(); |
241 | }); | 239 | }); |
242 | } else { | 240 | } else { |
243 | SinkWarning() << "unknown filter type: " << filter; | 241 | SinkWarningCtx(mLogCtx) << "unknown filter type: " << filter; |
244 | Q_ASSERT(false); | 242 | Q_ASSERT(false); |
245 | } | 243 | } |
246 | return result; | 244 | return result; |
@@ -261,7 +259,7 @@ QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::Query | |||
261 | 259 | ||
262 | void Synchronizer::synchronize(const Sink::QueryBase &query) | 260 | void Synchronizer::synchronize(const Sink::QueryBase &query) |
263 | { | 261 | { |
264 | SinkTrace() << "Synchronizing"; | 262 | SinkTraceCtx(mLogCtx) << "Synchronizing"; |
265 | mSyncRequestQueue << getSyncRequests(query); | 263 | mSyncRequestQueue << getSyncRequests(query); |
266 | processSyncQueue().exec(); | 264 | processSyncQueue().exec(); |
267 | } | 265 | } |
@@ -269,7 +267,7 @@ void Synchronizer::synchronize(const Sink::QueryBase &query) | |||
269 | void Synchronizer::flush(int commandId, const QByteArray &flushId) | 267 | void Synchronizer::flush(int commandId, const QByteArray &flushId) |
270 | { | 268 | { |
271 | Q_ASSERT(!flushId.isEmpty()); | 269 | Q_ASSERT(!flushId.isEmpty()); |
272 | SinkTrace() << "Flushing the synchronization queue " << flushId; | 270 | SinkTraceCtx(mLogCtx) << "Flushing the synchronization queue " << flushId; |
273 | mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; | 271 | mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; |
274 | processSyncQueue().exec(); | 272 | processSyncQueue().exec(); |
275 | } | 273 | } |
@@ -299,7 +297,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
299 | if (modifiedRequest.requestId.isEmpty()) { | 297 | if (modifiedRequest.requestId.isEmpty()) { |
300 | modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); | 298 | modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); |
301 | } | 299 | } |
302 | SinkWarning() << "Enquing flush request " << modifiedRequest.requestId; | 300 | SinkWarningCtx(mLogCtx) << "Enquing flush request " << modifiedRequest.requestId; |
303 | 301 | ||
304 | //The sync request will be executed once the flush has completed | 302 | //The sync request will be executed once the flush has completed |
305 | mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); | 303 | mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); |
@@ -349,7 +347,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
349 | Q_ASSERT(!request.requestId.isEmpty()); | 347 | Q_ASSERT(!request.requestId.isEmpty()); |
350 | //FIXME it looks like this is emitted before the replay actually finishes | 348 | //FIXME it looks like this is emitted before the replay actually finishes |
351 | if (request.flushType == Flush::FlushReplayQueue) { | 349 | if (request.flushType == Flush::FlushReplayQueue) { |
352 | SinkTrace() << "Emitting flush completion."; | 350 | SinkTraceCtx(mLogCtx) << "Emitting flush completion."; |
353 | Sink::Notification n; | 351 | Sink::Notification n; |
354 | n.type = Sink::Notification::FlushCompletion; | 352 | n.type = Sink::Notification::FlushCompletion; |
355 | n.id = request.requestId; | 353 | n.id = request.requestId; |
@@ -365,7 +363,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
365 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { | 363 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { |
366 | return replayNextRevision(); | 364 | return replayNextRevision(); |
367 | } else { | 365 | } else { |
368 | SinkWarning() << "Unknown request type: " << request.requestType; | 366 | SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType; |
369 | return KAsync::error(KAsync::Error{"Unknown request type."}); | 367 | return KAsync::error(KAsync::Error{"Unknown request type."}); |
370 | } | 368 | } |
371 | 369 | ||
@@ -374,13 +372,13 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request) | |||
374 | KAsync::Job<void> Synchronizer::processSyncQueue() | 372 | KAsync::Job<void> Synchronizer::processSyncQueue() |
375 | { | 373 | { |
376 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { | 374 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { |
377 | SinkTrace() << "Sync still in progress or nothing to do."; | 375 | SinkTraceCtx(mLogCtx) << "Sync still in progress or nothing to do."; |
378 | return KAsync::null<void>(); | 376 | return KAsync::null<void>(); |
379 | } | 377 | } |
380 | //Don't process any new requests until we're done with the pending ones. | 378 | //Don't process any new requests until we're done with the pending ones. |
381 | //Otherwise we might process a flush before the previous request actually completed. | 379 | //Otherwise we might process a flush before the previous request actually completed. |
382 | if (!mPendingSyncRequests.isEmpty()) { | 380 | if (!mPendingSyncRequests.isEmpty()) { |
383 | SinkTrace() << "We still have pending sync requests. Not executing next request."; | 381 | SinkTraceCtx(mLogCtx) << "We still have pending sync requests. Not executing next request."; |
384 | return KAsync::null<void>(); | 382 | return KAsync::null<void>(); |
385 | } | 383 | } |
386 | 384 | ||
@@ -391,7 +389,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
391 | }) | 389 | }) |
392 | .then(processRequest(request)) | 390 | .then(processRequest(request)) |
393 | .then<void>([this](const KAsync::Error &error) { | 391 | .then<void>([this](const KAsync::Error &error) { |
394 | SinkTrace() << "Sync request processed"; | 392 | SinkTraceCtx(mLogCtx) << "Sync request processed"; |
395 | mSyncTransaction.abort(); | 393 | mSyncTransaction.abort(); |
396 | mMessageQueue->commit(); | 394 | mMessageQueue->commit(); |
397 | mSyncStore.clear(); | 395 | mSyncStore.clear(); |
@@ -400,7 +398,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
400 | emit changesReplayed(); | 398 | emit changesReplayed(); |
401 | } | 399 | } |
402 | if (error) { | 400 | if (error) { |
403 | SinkWarning() << "Error during sync: " << error.errorMessage; | 401 | SinkWarningCtx(mLogCtx) << "Error during sync: " << error.errorMessage; |
404 | return KAsync::error(error); | 402 | return KAsync::error(error); |
405 | } | 403 | } |
406 | //In case we got more requests meanwhile. | 404 | //In case we got more requests meanwhile. |
@@ -422,7 +420,7 @@ void Synchronizer::commit() | |||
422 | Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() | 420 | Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() |
423 | { | 421 | { |
424 | if (!mSyncTransaction) { | 422 | if (!mSyncTransaction) { |
425 | SinkTrace() << "Starting transaction on sync store."; | 423 | SinkTraceCtx(mLogCtx) << "Starting transaction on sync store."; |
426 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); | 424 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); |
427 | } | 425 | } |
428 | return mSyncTransaction; | 426 | return mSyncTransaction; |
@@ -447,28 +445,28 @@ bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, cons | |||
447 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | 445 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); |
448 | Q_ASSERT(metadataBuffer); | 446 | Q_ASSERT(metadataBuffer); |
449 | if (!metadataBuffer->replayToSource()) { | 447 | if (!metadataBuffer->replayToSource()) { |
450 | SinkTrace() << "Change is coming from the source"; | 448 | SinkTraceCtx(mLogCtx) << "Change is coming from the source"; |
451 | } | 449 | } |
452 | return metadataBuffer->replayToSource(); | 450 | return metadataBuffer->replayToSource(); |
453 | } | 451 | } |
454 | 452 | ||
455 | KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | 453 | KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) |
456 | { | 454 | { |
457 | SinkTrace() << "Replaying" << type << key; | 455 | SinkTraceCtx(mLogCtx) << "Replaying" << type << key; |
458 | 456 | ||
459 | Sink::EntityBuffer buffer(value); | 457 | Sink::EntityBuffer buffer(value); |
460 | const Sink::Entity &entity = buffer.entity(); | 458 | const Sink::Entity &entity = buffer.entity(); |
461 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | 459 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); |
462 | if (!metadataBuffer) { | 460 | if (!metadataBuffer) { |
463 | SinkError() << "No metadata buffer available."; | 461 | SinkErrorCtx(mLogCtx) << "No metadata buffer available."; |
464 | return KAsync::error("No metadata buffer"); | 462 | return KAsync::error("No metadata buffer"); |
465 | } | 463 | } |
466 | if (mSyncTransaction) { | 464 | if (mSyncTransaction) { |
467 | SinkError() << "Leftover sync transaction."; | 465 | SinkErrorCtx(mLogCtx) << "Leftover sync transaction."; |
468 | mSyncTransaction.abort(); | 466 | mSyncTransaction.abort(); |
469 | } | 467 | } |
470 | if (mSyncStore) { | 468 | if (mSyncStore) { |
471 | SinkError() << "Leftover sync store."; | 469 | SinkErrorCtx(mLogCtx) << "Leftover sync store."; |
472 | mSyncStore.clear(); | 470 | mSyncStore.clear(); |
473 | } | 471 | } |
474 | Q_ASSERT(metadataBuffer); | 472 | Q_ASSERT(metadataBuffer); |
@@ -485,7 +483,7 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray | |||
485 | oldRemoteId = syncStore().resolveLocalId(type, uid); | 483 | oldRemoteId = syncStore().resolveLocalId(type, uid); |
486 | //oldRemoteId can be empty if the resource implementation didn't return a remoteid | 484 | //oldRemoteId can be empty if the resource implementation didn't return a remoteid |
487 | } | 485 | } |
488 | SinkTrace() << "Replaying " << key << type << uid << oldRemoteId; | 486 | SinkTraceCtx(mLogCtx) << "Replaying " << key << type << uid << oldRemoteId; |
489 | 487 | ||
490 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | 488 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); |
491 | //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally? | 489 | //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally? |
@@ -496,34 +494,34 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray | |||
496 | auto mail = store().readEntity<ApplicationDomain::Mail>(key); | 494 | auto mail = store().readEntity<ApplicationDomain::Mail>(key); |
497 | job = replay(mail, operation, oldRemoteId, modifiedProperties); | 495 | job = replay(mail, operation, oldRemoteId, modifiedProperties); |
498 | } else { | 496 | } else { |
499 | SinkError() << "Replayed unknown type: " << type; | 497 | SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type; |
500 | } | 498 | } |
501 | 499 | ||
502 | return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { | 500 | return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { |
503 | if (operation == Sink::Operation_Creation) { | 501 | if (operation == Sink::Operation_Creation) { |
504 | SinkTrace() << "Replayed creation with remote id: " << remoteId; | 502 | SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId; |
505 | if (remoteId.isEmpty()) { | 503 | if (remoteId.isEmpty()) { |
506 | SinkWarning() << "Returned an empty remoteId from the creation"; | 504 | SinkWarningCtx(mLogCtx) << "Returned an empty remoteId from the creation"; |
507 | } else { | 505 | } else { |
508 | syncStore().recordRemoteId(type, uid, remoteId); | 506 | syncStore().recordRemoteId(type, uid, remoteId); |
509 | } | 507 | } |
510 | } else if (operation == Sink::Operation_Modification) { | 508 | } else if (operation == Sink::Operation_Modification) { |
511 | SinkTrace() << "Replayed modification with remote id: " << remoteId; | 509 | SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId; |
512 | if (remoteId.isEmpty()) { | 510 | if (remoteId.isEmpty()) { |
513 | SinkWarning() << "Returned an empty remoteId from the modification"; | 511 | SinkWarningCtx(mLogCtx) << "Returned an empty remoteId from the modification"; |
514 | } else { | 512 | } else { |
515 | syncStore().updateRemoteId(type, uid, remoteId); | 513 | syncStore().updateRemoteId(type, uid, remoteId); |
516 | } | 514 | } |
517 | } else if (operation == Sink::Operation_Removal) { | 515 | } else if (operation == Sink::Operation_Removal) { |
518 | SinkTrace() << "Replayed removal with remote id: " << oldRemoteId; | 516 | SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId; |
519 | syncStore().removeRemoteId(type, uid, oldRemoteId); | 517 | syncStore().removeRemoteId(type, uid, oldRemoteId); |
520 | } else { | 518 | } else { |
521 | SinkError() << "Unkown operation" << operation; | 519 | SinkErrorCtx(mLogCtx) << "Unkown operation" << operation; |
522 | } | 520 | } |
523 | }) | 521 | }) |
524 | .then([this](const KAsync::Error &error) { | 522 | .then([this](const KAsync::Error &error) { |
525 | if (error) { | 523 | if (error) { |
526 | SinkWarning() << "Failed to replay change: " << error.errorMessage; | 524 | SinkWarningCtx(mLogCtx) << "Failed to replay change: " << error.errorMessage; |
527 | } | 525 | } |
528 | mSyncStore.clear(); | 526 | mSyncStore.clear(); |
529 | mSyncTransaction.commit(); | 527 | mSyncTransaction.commit(); |
@@ -544,7 +542,7 @@ KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Folder &, | |||
544 | bool Synchronizer::allChangesReplayed() | 542 | bool Synchronizer::allChangesReplayed() |
545 | { | 543 | { |
546 | if (!mSyncRequestQueue.isEmpty()) { | 544 | if (!mSyncRequestQueue.isEmpty()) { |
547 | SinkTrace() << "Queue is not empty"; | 545 | SinkTraceCtx(mLogCtx) << "Queue is not empty"; |
548 | return false; | 546 | return false; |
549 | } | 547 | } |
550 | return ChangeReplay::allChangesReplayed(); | 548 | return ChangeReplay::allChangesReplayed(); |