diff options
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 38 |
1 files changed, 20 insertions, 18 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 1374d00..2d4fb8d 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -29,6 +29,8 @@ | |||
29 | #include "modifyentity_generated.h" | 29 | #include "modifyentity_generated.h" |
30 | #include "deleteentity_generated.h" | 30 | #include "deleteentity_generated.h" |
31 | 31 | ||
32 | SINK_DEBUG_AREA("synchronizer") | ||
33 | |||
32 | using namespace Sink; | 34 | using namespace Sink; |
33 | 35 | ||
34 | Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | 36 | Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) |
@@ -37,7 +39,7 @@ Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &res | |||
37 | mResourceType(resourceType), | 39 | mResourceType(resourceType), |
38 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | 40 | mResourceInstanceIdentifier(resourceInstanceIdentifier) |
39 | { | 41 | { |
40 | Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; | 42 | SinkTrace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; |
41 | } | 43 | } |
42 | 44 | ||
43 | Synchronizer::~Synchronizer() | 45 | Synchronizer::~Synchronizer() |
@@ -129,11 +131,11 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func | |||
129 | entryGenerator([this, bufferType, &exists](const QByteArray &key) { | 131 | entryGenerator([this, bufferType, &exists](const QByteArray &key) { |
130 | auto sinkId = Sink::Storage::uidFromKey(key); | 132 | auto sinkId = Sink::Storage::uidFromKey(key); |
131 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); | 133 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); |
132 | Trace() << "Checking for removal " << key << remoteId; | 134 | SinkTrace() << "Checking for removal " << key << remoteId; |
133 | // If we have no remoteId, the entity hasn't been replayed to the source yet | 135 | // If we have no remoteId, the entity hasn't been replayed to the source yet |
134 | if (!remoteId.isEmpty()) { | 136 | if (!remoteId.isEmpty()) { |
135 | if (!exists(remoteId)) { | 137 | if (!exists(remoteId)) { |
136 | Trace() << "Found a removed entity: " << sinkId; | 138 | SinkTrace() << "Found a removed entity: " << sinkId; |
137 | deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, | 139 | deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, |
138 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); | 140 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); |
139 | } | 141 | } |
@@ -143,14 +145,14 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func | |||
143 | 145 | ||
144 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | 146 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) |
145 | { | 147 | { |
146 | Trace() << "Create or modify" << bufferType << remoteId; | 148 | SinkTrace() << "Create or modify" << bufferType << remoteId; |
147 | auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); | 149 | auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); |
148 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | 150 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); |
149 | const auto found = mainDatabase.contains(sinkId); | 151 | const auto found = mainDatabase.contains(sinkId); |
150 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); | 152 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); |
151 | Q_ASSERT(adaptorFactory); | 153 | Q_ASSERT(adaptorFactory); |
152 | if (!found) { | 154 | if (!found) { |
153 | Trace() << "Found a new entity: " << remoteId; | 155 | SinkTrace() << "Found a new entity: " << remoteId; |
154 | createEntity( | 156 | createEntity( |
155 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); | 157 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); |
156 | } else { // modification | 158 | } else { // modification |
@@ -159,17 +161,17 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray | |||
159 | bool changed = false; | 161 | bool changed = false; |
160 | for (const auto &property : entity.changedProperties()) { | 162 | for (const auto &property : entity.changedProperties()) { |
161 | if (entity.getProperty(property) != current->getProperty(property)) { | 163 | if (entity.getProperty(property) != current->getProperty(property)) { |
162 | Trace() << "Property changed " << sinkId << property; | 164 | SinkTrace() << "Property changed " << sinkId << property; |
163 | changed = true; | 165 | changed = true; |
164 | } | 166 | } |
165 | } | 167 | } |
166 | if (changed) { | 168 | if (changed) { |
167 | Trace() << "Found a modified entity: " << remoteId; | 169 | SinkTrace() << "Found a modified entity: " << remoteId; |
168 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, | 170 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, |
169 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); | 171 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); |
170 | } | 172 | } |
171 | } else { | 173 | } else { |
172 | Warning() << "Failed to get current entity"; | 174 | SinkWarning() << "Failed to get current entity"; |
173 | } | 175 | } |
174 | } | 176 | } |
175 | } | 177 | } |
@@ -178,7 +180,7 @@ template<typename DomainType> | |||
178 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria) | 180 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria) |
179 | { | 181 | { |
180 | 182 | ||
181 | Trace() << "Create or modify" << bufferType << remoteId; | 183 | SinkTrace() << "Create or modify" << bufferType << remoteId; |
182 | auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); | 184 | auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); |
183 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | 185 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); |
184 | const auto found = mainDatabase.contains(sinkId); | 186 | const auto found = mainDatabase.contains(sinkId); |
@@ -192,17 +194,17 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray | |||
192 | reader.query(query, | 194 | reader.query(query, |
193 | [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{ | 195 | [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{ |
194 | merge = true; | 196 | merge = true; |
195 | Trace() << "Merging local entity with remote entity: " << o.identifier() << remoteId; | 197 | SinkTrace() << "Merging local entity with remote entity: " << o.identifier() << remoteId; |
196 | syncStore().recordRemoteId(bufferType, o.identifier(), remoteId); | 198 | syncStore().recordRemoteId(bufferType, o.identifier(), remoteId); |
197 | return false; | 199 | return false; |
198 | }); | 200 | }); |
199 | if (!merge) { | 201 | if (!merge) { |
200 | Trace() << "Found a new entity: " << remoteId; | 202 | SinkTrace() << "Found a new entity: " << remoteId; |
201 | createEntity( | 203 | createEntity( |
202 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); | 204 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); |
203 | } | 205 | } |
204 | } else { | 206 | } else { |
205 | Trace() << "Found a new entity: " << remoteId; | 207 | SinkTrace() << "Found a new entity: " << remoteId; |
206 | createEntity( | 208 | createEntity( |
207 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); | 209 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); |
208 | } | 210 | } |
@@ -212,17 +214,17 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray | |||
212 | bool changed = false; | 214 | bool changed = false; |
213 | for (const auto &property : entity.changedProperties()) { | 215 | for (const auto &property : entity.changedProperties()) { |
214 | if (entity.getProperty(property) != current->getProperty(property)) { | 216 | if (entity.getProperty(property) != current->getProperty(property)) { |
215 | Trace() << "Property changed " << sinkId << property; | 217 | SinkTrace() << "Property changed " << sinkId << property; |
216 | changed = true; | 218 | changed = true; |
217 | } | 219 | } |
218 | } | 220 | } |
219 | if (changed) { | 221 | if (changed) { |
220 | Trace() << "Found a modified entity: " << remoteId; | 222 | SinkTrace() << "Found a modified entity: " << remoteId; |
221 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, | 223 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, |
222 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); | 224 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); |
223 | } | 225 | } |
224 | } else { | 226 | } else { |
225 | Warning() << "Failed to get current entity"; | 227 | SinkWarning() << "Failed to get current entity"; |
226 | } | 228 | } |
227 | } | 229 | } |
228 | } | 230 | } |
@@ -239,7 +241,7 @@ void Synchronizer::modify(const DomainType &entity) | |||
239 | 241 | ||
240 | KAsync::Job<void> Synchronizer::synchronize() | 242 | KAsync::Job<void> Synchronizer::synchronize() |
241 | { | 243 | { |
242 | Trace() << "Synchronizing"; | 244 | SinkTrace() << "Synchronizing"; |
243 | mSyncInProgress = true; | 245 | mSyncInProgress = true; |
244 | mMessageQueue->startTransaction(); | 246 | mMessageQueue->startTransaction(); |
245 | return synchronizeWithSource().then<void>([this]() { | 247 | return synchronizeWithSource().then<void>([this]() { |
@@ -265,7 +267,7 @@ void Synchronizer::commit() | |||
265 | Sink::Storage::Transaction &Synchronizer::transaction() | 267 | Sink::Storage::Transaction &Synchronizer::transaction() |
266 | { | 268 | { |
267 | if (!mTransaction) { | 269 | if (!mTransaction) { |
268 | Trace() << "Starting transaction"; | 270 | SinkTrace() << "Starting transaction"; |
269 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | 271 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); |
270 | } | 272 | } |
271 | return mTransaction; | 273 | return mTransaction; |
@@ -274,7 +276,7 @@ Sink::Storage::Transaction &Synchronizer::transaction() | |||
274 | Sink::Storage::Transaction &Synchronizer::syncTransaction() | 276 | Sink::Storage::Transaction &Synchronizer::syncTransaction() |
275 | { | 277 | { |
276 | if (!mSyncTransaction) { | 278 | if (!mSyncTransaction) { |
277 | Trace() << "Starting transaction"; | 279 | SinkTrace() << "Starting transaction"; |
278 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | 280 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); |
279 | } | 281 | } |
280 | return mSyncTransaction; | 282 | return mSyncTransaction; |