summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-01-23 11:03:03 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-01-23 11:03:03 +0100
commit6003ae63e45485d1f1c76ea378900bc5242465cd (patch)
treeb03327d733714a0dab7272c0deb9a44075ac29e4
parent752f0907574debe9d7d139a117b2efac80636e93 (diff)
downloadsink-6003ae63e45485d1f1c76ea378900bc5242465cd.tar.gz
sink-6003ae63e45485d1f1c76ea378900bc5242465cd.zip
Debug output
-rw-r--r--common/changereplay.cpp2
-rw-r--r--common/storage/entitystore.cpp50
-rw-r--r--common/synchronizer.cpp76
3 files changed, 62 insertions, 66 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 8ed0532..532cca8 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -30,7 +30,7 @@ using namespace Sink;
30using namespace Sink::Storage; 30using namespace Sink::Storage;
31 31
32ChangeReplay::ChangeReplay(const ResourceContext &resourceContext) 32ChangeReplay::ChangeReplay(const ResourceContext &resourceContext)
33 : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{resourceContext.instanceId() + ".changereplay"} 33 : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{"changereplay"}
34{ 34{
35 SinkTraceCtx(mLogCtx) << "Created change replay: " << resourceContext.instanceId(); 35 SinkTraceCtx(mLogCtx) << "Created change replay: " << resourceContext.instanceId();
36} 36}
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index 96c8ccd..6c341e2 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -37,8 +37,6 @@
37using namespace Sink; 37using namespace Sink;
38using namespace Sink::Storage; 38using namespace Sink::Storage;
39 39
40SINK_DEBUG_AREA("entitystore");
41
42class EntityStore::Private { 40class EntityStore::Private {
43public: 41public:
44 Private(const ResourceContext &context, const Sink::Log::Context &ctx) : resourceContext(context), logCtx(ctx.subContext("entitystore")) {} 42 Private(const ResourceContext &context, const Sink::Log::Context &ctx) : resourceContext(context), logCtx(ctx.subContext("entitystore")) {}
@@ -112,7 +110,7 @@ EntityStore::EntityStore(const ResourceContext &context, const Log::Context &ctx
112 110
113void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMode) 111void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMode)
114{ 112{
115 SinkTraceCtx(d->logCtx) << "Starting transaction"; 113 SinkTraceCtx(d->logCtx) << "Starting transaction: " << accessMode;
116 Sink::Storage::DataStore store(Sink::storageLocation(), d->resourceContext.instanceId(), accessMode); 114 Sink::Storage::DataStore store(Sink::storageLocation(), d->resourceContext.instanceId(), accessMode);
117 d->transaction = store.createTransaction(accessMode); 115 d->transaction = store.createTransaction(accessMode);
118 Q_ASSERT(d->transaction.validateNamedDatabases()); 116 Q_ASSERT(d->transaction.validateNamedDatabases());
@@ -136,7 +134,7 @@ void EntityStore::copyBlobs(ApplicationDomain::ApplicationDomainType &entity, qi
136{ 134{
137 const auto directory = d->entityBlobStoragePath(entity.identifier()); 135 const auto directory = d->entityBlobStoragePath(entity.identifier());
138 if (!QDir().mkpath(directory)) { 136 if (!QDir().mkpath(directory)) {
139 SinkWarning() << "Failed to create the directory: " << directory; 137 SinkWarningCtx(d->logCtx) << "Failed to create the directory: " << directory;
140 } 138 }
141 139
142 for (const auto &property : entity.changedProperties()) { 140 for (const auto &property : entity.changedProperties()) {
@@ -151,10 +149,10 @@ void EntityStore::copyBlobs(ApplicationDomain::ApplicationDomainType &entity, qi
151 QFile::remove(filePath); 149 QFile::remove(filePath);
152 QFile origFile(oldPath); 150 QFile origFile(oldPath);
153 if (!origFile.open(QIODevice::ReadWrite)) { 151 if (!origFile.open(QIODevice::ReadWrite)) {
154 SinkWarning() << "Failed to open the original file with write rights: " << origFile.errorString(); 152 SinkWarningCtx(d->logCtx) << "Failed to open the original file with write rights: " << origFile.errorString();
155 } 153 }
156 if (!origFile.rename(filePath)) { 154 if (!origFile.rename(filePath)) {
157 SinkWarning() << "Failed to move the file from: " << oldPath << " to " << filePath << ". " << origFile.errorString(); 155 SinkWarningCtx(d->logCtx) << "Failed to move the file from: " << oldPath << " to " << filePath << ". " << origFile.errorString();
158 } 156 }
159 origFile.close(); 157 origFile.close();
160 entity.setProperty(property, QVariant::fromValue(ApplicationDomain::BLOB{filePath})); 158 entity.setProperty(property, QVariant::fromValue(ApplicationDomain::BLOB{filePath}));
@@ -166,7 +164,7 @@ void EntityStore::copyBlobs(ApplicationDomain::ApplicationDomainType &entity, qi
166bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) 164bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess)
167{ 165{
168 if (entity_.identifier().isEmpty()) { 166 if (entity_.identifier().isEmpty()) {
169 SinkWarning() << "Can't write entity with an empty identifier"; 167 SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier";
170 return false; 168 return false;
171 } 169 }
172 170
@@ -197,7 +195,7 @@ bool EntityStore::add(const QByteArray &type, const ApplicationDomain::Applicati
197 195
198 DataStore::mainDatabase(d->transaction, type) 196 DataStore::mainDatabase(d->transaction, type)
199 .write(DataStore::assembleKey(entity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), 197 .write(DataStore::assembleKey(entity.identifier(), newRevision), BufferUtils::extractBuffer(fbb),
200 [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << entity.identifier() << newRevision; }); 198 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; });
201 DataStore::setMaxRevision(d->transaction, newRevision); 199 DataStore::setMaxRevision(d->transaction, newRevision);
202 DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); 200 DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type);
203 SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision; 201 SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision;
@@ -209,7 +207,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
209 auto changeset = diff.changedProperties(); 207 auto changeset = diff.changedProperties();
210 const auto current = readLatest(type, diff.identifier()); 208 const auto current = readLatest(type, diff.identifier());
211 if (current.identifier().isEmpty()) { 209 if (current.identifier().isEmpty()) {
212 SinkWarning() << "Failed to read current version: " << diff.identifier(); 210 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
213 return false; 211 return false;
214 } 212 }
215 213
@@ -263,7 +261,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
263 261
264 DataStore::mainDatabase(d->transaction, type) 262 DataStore::mainDatabase(d->transaction, type)
265 .write(DataStore::assembleKey(newEntity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), 263 .write(DataStore::assembleKey(newEntity.identifier(), newRevision), BufferUtils::extractBuffer(fbb),
266 [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << newEntity.identifier() << newRevision; }); 264 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; });
267 DataStore::setMaxRevision(d->transaction, newRevision); 265 DataStore::setMaxRevision(d->transaction, newRevision);
268 DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); 266 DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type);
269 SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; 267 SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision;
@@ -287,14 +285,14 @@ bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool rep
287 } 285 }
288 return false; 286 return false;
289 }, 287 },
290 [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); 288 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; });
291 289
292 if (!found) { 290 if (!found) {
293 SinkWarning() << "Remove: Failed to find entity " << uid; 291 SinkWarningCtx(d->logCtx) << "Remove: Failed to find entity " << uid;
294 return false; 292 return false;
295 } 293 }
296 if (alreadyRemoved) { 294 if (alreadyRemoved) {
297 SinkWarning() << "Remove: Entity is already removed " << uid; 295 SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid;
298 return false; 296 return false;
299 } 297 }
300 298
@@ -320,7 +318,7 @@ bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool rep
320 318
321 DataStore::mainDatabase(d->transaction, type) 319 DataStore::mainDatabase(d->transaction, type)
322 .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), 320 .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
323 [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); 321 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; });
324 DataStore::setMaxRevision(d->transaction, newRevision); 322 DataStore::setMaxRevision(d->transaction, newRevision);
325 DataStore::recordRevision(d->transaction, newRevision, uid, type); 323 DataStore::recordRevision(d->transaction, newRevision, uid, type);
326 return true; 324 return true;
@@ -336,7 +334,7 @@ void EntityStore::cleanupEntityRevisionsUntil(qint64 revision)
336 [&](const QByteArray &key, const QByteArray &data) -> bool { 334 [&](const QByteArray &key, const QByteArray &data) -> bool {
337 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 335 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
338 if (!buffer.isValid()) { 336 if (!buffer.isValid()) {
339 SinkWarning() << "Read invalid buffer from disk"; 337 SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk";
340 } else { 338 } else {
341 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); 339 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
342 const qint64 rev = metadata->revision(); 340 const qint64 rev = metadata->revision();
@@ -350,7 +348,7 @@ void EntityStore::cleanupEntityRevisionsUntil(qint64 revision)
350 const auto directory = d->entityBlobStoragePath(uid); 348 const auto directory = d->entityBlobStoragePath(uid);
351 QDir dir(directory); 349 QDir dir(directory);
352 if (!dir.removeRecursively()) { 350 if (!dir.removeRecursively()) {
353 SinkError() << "Failed to cleanup: " << directory; 351 SinkErrorCtx(d->logCtx) << "Failed to cleanup: " << directory;
354 } 352 }
355 } 353 }
356 //Don't cleanup more than specified 354 //Don't cleanup more than specified
@@ -361,7 +359,7 @@ void EntityStore::cleanupEntityRevisionsUntil(qint64 revision)
361 359
362 return true; 360 return true;
363 }, 361 },
364 [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); 362 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true);
365 DataStore::setCleanedUpRevision(d->transaction, revision); 363 DataStore::setCleanedUpRevision(d->transaction, revision);
366} 364}
367 365
@@ -407,7 +405,7 @@ QVector<QByteArray> EntityStore::fullScan(const QByteArray &type)
407 keys << uid; 405 keys << uid;
408 return true; 406 return true;
409 }, 407 },
410 [](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message; }); 408 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during query: " << error.message; });
411 409
412 SinkTraceCtx(d->logCtx) << "Full scan retrieved " << keys.size() << " results."; 410 SinkTraceCtx(d->logCtx) << "Full scan retrieved " << keys.size() << " results.";
413 return keys.toList().toVector(); 411 return keys.toList().toVector();
@@ -446,7 +444,7 @@ void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property
446 /* callback(sinkId); */ 444 /* callback(sinkId); */
447 /* }, */ 445 /* }, */
448 /* [&](const Index::Error &error) { */ 446 /* [&](const Index::Error &error) { */
449 /* SinkWarning() << "Error in index: " << error.message << property; */ 447 /* SinkWarningCtx(d->logCtx) << "Error in index: " << error.message << property; */
450 /* }); */ 448 /* }); */
451} 449}
452 450
@@ -458,7 +456,7 @@ void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, cons
458 callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); 456 callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size()));
459 return false; 457 return false;
460 }, 458 },
461 [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message << uid; }); 459 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during query: " << error.message << uid; });
462} 460}
463 461
464void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback) 462void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback)
@@ -494,7 +492,7 @@ void EntityStore::readEntity(const QByteArray &type, const QByteArray &key, cons
494 callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); 492 callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size()));
495 return false; 493 return false;
496 }, 494 },
497 [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); 495 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during query: " << error.message << key; });
498} 496}
499 497
500void EntityStore::readEntity(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback) 498void EntityStore::readEntity(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback)
@@ -524,7 +522,7 @@ void EntityStore::readAll(const QByteArray &type, const std::function<void(const
524 callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer)); 522 callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer));
525 return true; 523 return true;
526 }, 524 },
527 [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message; }); 525 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during query: " << error.message; });
528} 526}
529 527
530void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function<void(const QByteArray &key)> &callback) 528void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function<void(const QByteArray &key)> &callback)
@@ -561,7 +559,7 @@ void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qi
561 } 559 }
562 return true; 560 return true;
563 }, 561 },
564 [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); 562 [&](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }, true);
565 return readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback); 563 return readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback);
566} 564}
567 565
@@ -591,7 +589,7 @@ void EntityStore::readAllUids(const QByteArray &type, const std::function<void(c
591 callback(Sink::Storage::DataStore::uidFromKey(key)); 589 callback(Sink::Storage::DataStore::uidFromKey(key));
592 return true; 590 return true;
593 }, 591 },
594 [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); 592 [&](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; });
595} 593}
596 594
597bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) 595bool EntityStore::contains(const QByteArray &type, const QByteArray &uid)
@@ -615,10 +613,10 @@ qint64 EntityStore::maxRevision()
615/* Sink::Storage::DataStore storage(Sink::storageLocation(), mResourceInstanceIdentifier); */ 613/* Sink::Storage::DataStore storage(Sink::storageLocation(), mResourceInstanceIdentifier); */
616/* if (!storage.exists()) { */ 614/* if (!storage.exists()) { */
617/* //This is not an error if the resource wasn't started before */ 615/* //This is not an error if the resource wasn't started before */
618/* SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier; */ 616/* SinkLogCtx(d->logCtx) << "Store doesn't exist: " << mResourceInstanceIdentifier; */
619/* return Sink::Storage::DataStore::Transaction(); */ 617/* return Sink::Storage::DataStore::Transaction(); */
620/* } */ 618/* } */
621/* storage.setDefaultErrorHandler([this](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; }); */ 619/* storage.setDefaultErrorHandler([this](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during query: " << error.store << error.message; }); */
622/* transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); */ 620/* transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); */
623/* } */ 621/* } */
624 622
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 57e994e..f731b31 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -30,8 +30,6 @@
30#include "flush_generated.h" 30#include "flush_generated.h"
31#include "notification_generated.h" 31#include "notification_generated.h"
32 32
33SINK_DEBUG_AREA("synchronizer")
34
35using namespace Sink; 33using namespace Sink;
36 34
37Synchronizer::Synchronizer(const Sink::ResourceContext &context) 35Synchronizer::Synchronizer(const Sink::ResourceContext &context)
@@ -42,7 +40,7 @@ Synchronizer::Synchronizer(const Sink::ResourceContext &context)
42 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), 40 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite),
43 mSyncInProgress(false) 41 mSyncInProgress(false)
44{ 42{
45 SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); 43 SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
46} 44}
47 45
48Synchronizer::~Synchronizer() 46Synchronizer::~Synchronizer()
@@ -128,11 +126,11 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func
128{ 126{
129 entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) { 127 entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) {
130 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); 128 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
131 SinkTrace() << "Checking for removal " << sinkId << remoteId; 129 SinkTraceCtx(mLogCtx) << "Checking for removal " << sinkId << remoteId;
132 // If we have no remoteId, the entity hasn't been replayed to the source yet 130 // If we have no remoteId, the entity hasn't been replayed to the source yet
133 if (!remoteId.isEmpty()) { 131 if (!remoteId.isEmpty()) {
134 if (!exists(remoteId)) { 132 if (!exists(remoteId)) {
135 SinkTrace() << "Found a removed entity: " << sinkId; 133 SinkTraceCtx(mLogCtx) << "Found a removed entity: " << sinkId;
136 deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); 134 deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType);
137 } 135 }
138 } 136 }
@@ -157,12 +155,12 @@ void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray
157 bool changed = false; 155 bool changed = false;
158 for (const auto &property : entity.changedProperties()) { 156 for (const auto &property : entity.changedProperties()) {
159 if (entity.getProperty(property) != current.getProperty(property)) { 157 if (entity.getProperty(property) != current.getProperty(property)) {
160 SinkTrace() << "Property changed " << sinkId << property; 158 SinkTraceCtx(mLogCtx) << "Property changed " << sinkId << property;
161 changed = true; 159 changed = true;
162 } 160 }
163 } 161 }
164 if (changed) { 162 if (changed) {
165 SinkTrace() << "Found a modified entity: " << sinkId; 163 SinkTraceCtx(mLogCtx) << "Found a modified entity: " << sinkId;
166 modifyEntity(sinkId, store.maxRevision(), bufferType, entity); 164 modifyEntity(sinkId, store.maxRevision(), bufferType, entity);
167 } 165 }
168 }); 166 });
@@ -177,12 +175,12 @@ void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remote
177 175
178void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 176void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
179{ 177{
180 SinkTrace() << "Create or modify" << bufferType << remoteId; 178 SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId;
181 Storage::EntityStore store(mResourceContext, mLogCtx); 179 Storage::EntityStore store(mResourceContext, mLogCtx);
182 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 180 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
183 const auto found = store.contains(bufferType, sinkId); 181 const auto found = store.contains(bufferType, sinkId);
184 if (!found) { 182 if (!found) {
185 SinkTrace() << "Found a new entity: " << remoteId; 183 SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId;
186 createEntity(sinkId, bufferType, entity); 184 createEntity(sinkId, bufferType, entity);
187 } else { // modification 185 } else { // modification
188 modify(bufferType, remoteId, entity); 186 modify(bufferType, remoteId, entity);
@@ -193,7 +191,7 @@ template<typename DomainType>
193void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria) 191void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria)
194{ 192{
195 193
196 SinkTrace() << "Create or modify" << bufferType << remoteId; 194 SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId;
197 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 195 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
198 Storage::EntityStore store(mResourceContext, mLogCtx); 196 Storage::EntityStore store(mResourceContext, mLogCtx);
199 const auto found = store.contains(bufferType, sinkId); 197 const auto found = store.contains(bufferType, sinkId);
@@ -209,16 +207,16 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
209 auto resultSet = dataStoreQuery.execute(); 207 auto resultSet = dataStoreQuery.execute();
210 resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) { 208 resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) {
211 merge = true; 209 merge = true;
212 SinkTrace() << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId; 210 SinkTraceCtx(mLogCtx) << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId;
213 syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId); 211 syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId);
214 }); 212 });
215 213
216 if (!merge) { 214 if (!merge) {
217 SinkTrace() << "Found a new entity: " << remoteId; 215 SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId;
218 createEntity(sinkId, bufferType, entity); 216 createEntity(sinkId, bufferType, entity);
219 } 217 }
220 } else { 218 } else {
221 SinkTrace() << "Found a new entity: " << remoteId; 219 SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId;
222 createEntity(sinkId, bufferType, entity); 220 createEntity(sinkId, bufferType, entity);
223 } 221 }
224 } else { // modification 222 } else { // modification
@@ -240,7 +238,7 @@ QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter)
240 result << r.entity.identifier(); 238 result << r.entity.identifier();
241 }); 239 });
242 } else { 240 } else {
243 SinkWarning() << "unknown filter type: " << filter; 241 SinkWarningCtx(mLogCtx) << "unknown filter type: " << filter;
244 Q_ASSERT(false); 242 Q_ASSERT(false);
245 } 243 }
246 return result; 244 return result;
@@ -261,7 +259,7 @@ QList<Synchronizer::SyncRequest> Synchronizer::getSyncRequests(const Sink::Query
261 259
262void Synchronizer::synchronize(const Sink::QueryBase &query) 260void Synchronizer::synchronize(const Sink::QueryBase &query)
263{ 261{
264 SinkTrace() << "Synchronizing"; 262 SinkTraceCtx(mLogCtx) << "Synchronizing";
265 mSyncRequestQueue << getSyncRequests(query); 263 mSyncRequestQueue << getSyncRequests(query);
266 processSyncQueue().exec(); 264 processSyncQueue().exec();
267} 265}
@@ -269,7 +267,7 @@ void Synchronizer::synchronize(const Sink::QueryBase &query)
269void Synchronizer::flush(int commandId, const QByteArray &flushId) 267void Synchronizer::flush(int commandId, const QByteArray &flushId)
270{ 268{
271 Q_ASSERT(!flushId.isEmpty()); 269 Q_ASSERT(!flushId.isEmpty());
272 SinkTrace() << "Flushing the synchronization queue " << flushId; 270 SinkTraceCtx(mLogCtx) << "Flushing the synchronization queue " << flushId;
273 mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; 271 mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId};
274 processSyncQueue().exec(); 272 processSyncQueue().exec();
275} 273}
@@ -299,7 +297,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
299 if (modifiedRequest.requestId.isEmpty()) { 297 if (modifiedRequest.requestId.isEmpty()) {
300 modifiedRequest.requestId = QUuid::createUuid().toRfc4122(); 298 modifiedRequest.requestId = QUuid::createUuid().toRfc4122();
301 } 299 }
302 SinkWarning() << "Enquing flush request " << modifiedRequest.requestId; 300 SinkWarningCtx(mLogCtx) << "Enquing flush request " << modifiedRequest.requestId;
303 301
304 //The sync request will be executed once the flush has completed 302 //The sync request will be executed once the flush has completed
305 mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); 303 mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest);
@@ -349,7 +347,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
349 Q_ASSERT(!request.requestId.isEmpty()); 347 Q_ASSERT(!request.requestId.isEmpty());
350 //FIXME it looks like this is emitted before the replay actually finishes 348 //FIXME it looks like this is emitted before the replay actually finishes
351 if (request.flushType == Flush::FlushReplayQueue) { 349 if (request.flushType == Flush::FlushReplayQueue) {
352 SinkTrace() << "Emitting flush completion."; 350 SinkTraceCtx(mLogCtx) << "Emitting flush completion.";
353 Sink::Notification n; 351 Sink::Notification n;
354 n.type = Sink::Notification::FlushCompletion; 352 n.type = Sink::Notification::FlushCompletion;
355 n.id = request.requestId; 353 n.id = request.requestId;
@@ -365,7 +363,7 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
365 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { 363 } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) {
366 return replayNextRevision(); 364 return replayNextRevision();
367 } else { 365 } else {
368 SinkWarning() << "Unknown request type: " << request.requestType; 366 SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType;
369 return KAsync::error(KAsync::Error{"Unknown request type."}); 367 return KAsync::error(KAsync::Error{"Unknown request type."});
370 } 368 }
371 369
@@ -374,13 +372,13 @@ KAsync::Job<void> Synchronizer::processRequest(const SyncRequest &request)
374KAsync::Job<void> Synchronizer::processSyncQueue() 372KAsync::Job<void> Synchronizer::processSyncQueue()
375{ 373{
376 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { 374 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
377 SinkTrace() << "Sync still in progress or nothing to do."; 375 SinkTraceCtx(mLogCtx) << "Sync still in progress or nothing to do.";
378 return KAsync::null<void>(); 376 return KAsync::null<void>();
379 } 377 }
380 //Don't process any new requests until we're done with the pending ones. 378 //Don't process any new requests until we're done with the pending ones.
381 //Otherwise we might process a flush before the previous request actually completed. 379 //Otherwise we might process a flush before the previous request actually completed.
382 if (!mPendingSyncRequests.isEmpty()) { 380 if (!mPendingSyncRequests.isEmpty()) {
383 SinkTrace() << "We still have pending sync requests. Not executing next request."; 381 SinkTraceCtx(mLogCtx) << "We still have pending sync requests. Not executing next request.";
384 return KAsync::null<void>(); 382 return KAsync::null<void>();
385 } 383 }
386 384
@@ -391,7 +389,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
391 }) 389 })
392 .then(processRequest(request)) 390 .then(processRequest(request))
393 .then<void>([this](const KAsync::Error &error) { 391 .then<void>([this](const KAsync::Error &error) {
394 SinkTrace() << "Sync request processed"; 392 SinkTraceCtx(mLogCtx) << "Sync request processed";
395 mSyncTransaction.abort(); 393 mSyncTransaction.abort();
396 mMessageQueue->commit(); 394 mMessageQueue->commit();
397 mSyncStore.clear(); 395 mSyncStore.clear();
@@ -400,7 +398,7 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
400 emit changesReplayed(); 398 emit changesReplayed();
401 } 399 }
402 if (error) { 400 if (error) {
403 SinkWarning() << "Error during sync: " << error.errorMessage; 401 SinkWarningCtx(mLogCtx) << "Error during sync: " << error.errorMessage;
404 return KAsync::error(error); 402 return KAsync::error(error);
405 } 403 }
406 //In case we got more requests meanwhile. 404 //In case we got more requests meanwhile.
@@ -422,7 +420,7 @@ void Synchronizer::commit()
422Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() 420Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction()
423{ 421{
424 if (!mSyncTransaction) { 422 if (!mSyncTransaction) {
425 SinkTrace() << "Starting transaction on sync store."; 423 SinkTraceCtx(mLogCtx) << "Starting transaction on sync store.";
426 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); 424 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite);
427 } 425 }
428 return mSyncTransaction; 426 return mSyncTransaction;
@@ -447,28 +445,28 @@ bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, cons
447 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 445 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
448 Q_ASSERT(metadataBuffer); 446 Q_ASSERT(metadataBuffer);
449 if (!metadataBuffer->replayToSource()) { 447 if (!metadataBuffer->replayToSource()) {
450 SinkTrace() << "Change is coming from the source"; 448 SinkTraceCtx(mLogCtx) << "Change is coming from the source";
451 } 449 }
452 return metadataBuffer->replayToSource(); 450 return metadataBuffer->replayToSource();
453} 451}
454 452
455KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) 453KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
456{ 454{
457 SinkTrace() << "Replaying" << type << key; 455 SinkTraceCtx(mLogCtx) << "Replaying" << type << key;
458 456
459 Sink::EntityBuffer buffer(value); 457 Sink::EntityBuffer buffer(value);
460 const Sink::Entity &entity = buffer.entity(); 458 const Sink::Entity &entity = buffer.entity();
461 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 459 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
462 if (!metadataBuffer) { 460 if (!metadataBuffer) {
463 SinkError() << "No metadata buffer available."; 461 SinkErrorCtx(mLogCtx) << "No metadata buffer available.";
464 return KAsync::error("No metadata buffer"); 462 return KAsync::error("No metadata buffer");
465 } 463 }
466 if (mSyncTransaction) { 464 if (mSyncTransaction) {
467 SinkError() << "Leftover sync transaction."; 465 SinkErrorCtx(mLogCtx) << "Leftover sync transaction.";
468 mSyncTransaction.abort(); 466 mSyncTransaction.abort();
469 } 467 }
470 if (mSyncStore) { 468 if (mSyncStore) {
471 SinkError() << "Leftover sync store."; 469 SinkErrorCtx(mLogCtx) << "Leftover sync store.";
472 mSyncStore.clear(); 470 mSyncStore.clear();
473 } 471 }
474 Q_ASSERT(metadataBuffer); 472 Q_ASSERT(metadataBuffer);
@@ -485,7 +483,7 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
485 oldRemoteId = syncStore().resolveLocalId(type, uid); 483 oldRemoteId = syncStore().resolveLocalId(type, uid);
486 //oldRemoteId can be empty if the resource implementation didn't return a remoteid 484 //oldRemoteId can be empty if the resource implementation didn't return a remoteid
487 } 485 }
488 SinkTrace() << "Replaying " << key << type << uid << oldRemoteId; 486 SinkTraceCtx(mLogCtx) << "Replaying " << key << type << uid << oldRemoteId;
489 487
490 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); 488 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
491 //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally? 489 //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally?
@@ -496,34 +494,34 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
496 auto mail = store().readEntity<ApplicationDomain::Mail>(key); 494 auto mail = store().readEntity<ApplicationDomain::Mail>(key);
497 job = replay(mail, operation, oldRemoteId, modifiedProperties); 495 job = replay(mail, operation, oldRemoteId, modifiedProperties);
498 } else { 496 } else {
499 SinkError() << "Replayed unknown type: " << type; 497 SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type;
500 } 498 }
501 499
502 return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { 500 return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) {
503 if (operation == Sink::Operation_Creation) { 501 if (operation == Sink::Operation_Creation) {
504 SinkTrace() << "Replayed creation with remote id: " << remoteId; 502 SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId;
505 if (remoteId.isEmpty()) { 503 if (remoteId.isEmpty()) {
506 SinkWarning() << "Returned an empty remoteId from the creation"; 504 SinkWarningCtx(mLogCtx) << "Returned an empty remoteId from the creation";
507 } else { 505 } else {
508 syncStore().recordRemoteId(type, uid, remoteId); 506 syncStore().recordRemoteId(type, uid, remoteId);
509 } 507 }
510 } else if (operation == Sink::Operation_Modification) { 508 } else if (operation == Sink::Operation_Modification) {
511 SinkTrace() << "Replayed modification with remote id: " << remoteId; 509 SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId;
512 if (remoteId.isEmpty()) { 510 if (remoteId.isEmpty()) {
513 SinkWarning() << "Returned an empty remoteId from the modification"; 511 SinkWarningCtx(mLogCtx) << "Returned an empty remoteId from the modification";
514 } else { 512 } else {
515 syncStore().updateRemoteId(type, uid, remoteId); 513 syncStore().updateRemoteId(type, uid, remoteId);
516 } 514 }
517 } else if (operation == Sink::Operation_Removal) { 515 } else if (operation == Sink::Operation_Removal) {
518 SinkTrace() << "Replayed removal with remote id: " << oldRemoteId; 516 SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId;
519 syncStore().removeRemoteId(type, uid, oldRemoteId); 517 syncStore().removeRemoteId(type, uid, oldRemoteId);
520 } else { 518 } else {
521 SinkError() << "Unkown operation" << operation; 519 SinkErrorCtx(mLogCtx) << "Unkown operation" << operation;
522 } 520 }
523 }) 521 })
524 .then([this](const KAsync::Error &error) { 522 .then([this](const KAsync::Error &error) {
525 if (error) { 523 if (error) {
526 SinkWarning() << "Failed to replay change: " << error.errorMessage; 524 SinkWarningCtx(mLogCtx) << "Failed to replay change: " << error.errorMessage;
527 } 525 }
528 mSyncStore.clear(); 526 mSyncStore.clear();
529 mSyncTransaction.commit(); 527 mSyncTransaction.commit();
@@ -544,7 +542,7 @@ KAsync::Job<QByteArray> Synchronizer::replay(const ApplicationDomain::Folder &,
544bool Synchronizer::allChangesReplayed() 542bool Synchronizer::allChangesReplayed()
545{ 543{
546 if (!mSyncRequestQueue.isEmpty()) { 544 if (!mSyncRequestQueue.isEmpty()) {
547 SinkTrace() << "Queue is not empty"; 545 SinkTraceCtx(mLogCtx) << "Queue is not empty";
548 return false; 546 return false;
549 } 547 }
550 return ChangeReplay::allChangesReplayed(); 548 return ChangeReplay::allChangesReplayed();