summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-11 11:55:29 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-11 11:55:29 +0200
commit3a3118e768e1447dc7524328e84b8d7faef81fe1 (patch)
treeaf5582170ed6164fffc9365f34b17bf449c0db40 /common/synchronizer.cpp
parentf9379318d801df204cc50385c5eca1f28e91755e (diff)
parentce2fd2666f084eebe443598f6f3740a02913091e (diff)
downloadsink-3a3118e768e1447dc7524328e84b8d7faef81fe1.tar.gz
sink-3a3118e768e1447dc7524328e84b8d7faef81fe1.zip
Merge branch 'feature/notifications' into develop
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp38
1 files changed, 20 insertions, 18 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 1374d00..2d4fb8d 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -29,6 +29,8 @@
29#include "modifyentity_generated.h" 29#include "modifyentity_generated.h"
30#include "deleteentity_generated.h" 30#include "deleteentity_generated.h"
31 31
32SINK_DEBUG_AREA("synchronizer")
33
32using namespace Sink; 34using namespace Sink;
33 35
34Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) 36Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
@@ -37,7 +39,7 @@ Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &res
37 mResourceType(resourceType), 39 mResourceType(resourceType),
38 mResourceInstanceIdentifier(resourceInstanceIdentifier) 40 mResourceInstanceIdentifier(resourceInstanceIdentifier)
39{ 41{
40 Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; 42 SinkTrace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier;
41} 43}
42 44
43Synchronizer::~Synchronizer() 45Synchronizer::~Synchronizer()
@@ -129,11 +131,11 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func
129 entryGenerator([this, bufferType, &exists](const QByteArray &key) { 131 entryGenerator([this, bufferType, &exists](const QByteArray &key) {
130 auto sinkId = Sink::Storage::uidFromKey(key); 132 auto sinkId = Sink::Storage::uidFromKey(key);
131 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); 133 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
132 Trace() << "Checking for removal " << key << remoteId; 134 SinkTrace() << "Checking for removal " << key << remoteId;
133 // If we have no remoteId, the entity hasn't been replayed to the source yet 135 // If we have no remoteId, the entity hasn't been replayed to the source yet
134 if (!remoteId.isEmpty()) { 136 if (!remoteId.isEmpty()) {
135 if (!exists(remoteId)) { 137 if (!exists(remoteId)) {
136 Trace() << "Found a removed entity: " << sinkId; 138 SinkTrace() << "Found a removed entity: " << sinkId;
137 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, 139 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType,
138 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); 140 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); });
139 } 141 }
@@ -143,14 +145,14 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func
143 145
144void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 146void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
145{ 147{
146 Trace() << "Create or modify" << bufferType << remoteId; 148 SinkTrace() << "Create or modify" << bufferType << remoteId;
147 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); 149 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType);
148 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 150 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
149 const auto found = mainDatabase.contains(sinkId); 151 const auto found = mainDatabase.contains(sinkId);
150 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); 152 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType);
151 Q_ASSERT(adaptorFactory); 153 Q_ASSERT(adaptorFactory);
152 if (!found) { 154 if (!found) {
153 Trace() << "Found a new entity: " << remoteId; 155 SinkTrace() << "Found a new entity: " << remoteId;
154 createEntity( 156 createEntity(
155 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); 157 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
156 } else { // modification 158 } else { // modification
@@ -159,17 +161,17 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
159 bool changed = false; 161 bool changed = false;
160 for (const auto &property : entity.changedProperties()) { 162 for (const auto &property : entity.changedProperties()) {
161 if (entity.getProperty(property) != current->getProperty(property)) { 163 if (entity.getProperty(property) != current->getProperty(property)) {
162 Trace() << "Property changed " << sinkId << property; 164 SinkTrace() << "Property changed " << sinkId << property;
163 changed = true; 165 changed = true;
164 } 166 }
165 } 167 }
166 if (changed) { 168 if (changed) {
167 Trace() << "Found a modified entity: " << remoteId; 169 SinkTrace() << "Found a modified entity: " << remoteId;
168 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, 170 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory,
169 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); 171 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
170 } 172 }
171 } else { 173 } else {
172 Warning() << "Failed to get current entity"; 174 SinkWarning() << "Failed to get current entity";
173 } 175 }
174 } 176 }
175} 177}
@@ -178,7 +180,7 @@ template<typename DomainType>
178void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria) 180void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria)
179{ 181{
180 182
181 Trace() << "Create or modify" << bufferType << remoteId; 183 SinkTrace() << "Create or modify" << bufferType << remoteId;
182 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); 184 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType);
183 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 185 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
184 const auto found = mainDatabase.contains(sinkId); 186 const auto found = mainDatabase.contains(sinkId);
@@ -192,17 +194,17 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
192 reader.query(query, 194 reader.query(query,
193 [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{ 195 [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{
194 merge = true; 196 merge = true;
195 Trace() << "Merging local entity with remote entity: " << o.identifier() << remoteId; 197 SinkTrace() << "Merging local entity with remote entity: " << o.identifier() << remoteId;
196 syncStore().recordRemoteId(bufferType, o.identifier(), remoteId); 198 syncStore().recordRemoteId(bufferType, o.identifier(), remoteId);
197 return false; 199 return false;
198 }); 200 });
199 if (!merge) { 201 if (!merge) {
200 Trace() << "Found a new entity: " << remoteId; 202 SinkTrace() << "Found a new entity: " << remoteId;
201 createEntity( 203 createEntity(
202 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); 204 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
203 } 205 }
204 } else { 206 } else {
205 Trace() << "Found a new entity: " << remoteId; 207 SinkTrace() << "Found a new entity: " << remoteId;
206 createEntity( 208 createEntity(
207 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); 209 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
208 } 210 }
@@ -212,17 +214,17 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
212 bool changed = false; 214 bool changed = false;
213 for (const auto &property : entity.changedProperties()) { 215 for (const auto &property : entity.changedProperties()) {
214 if (entity.getProperty(property) != current->getProperty(property)) { 216 if (entity.getProperty(property) != current->getProperty(property)) {
215 Trace() << "Property changed " << sinkId << property; 217 SinkTrace() << "Property changed " << sinkId << property;
216 changed = true; 218 changed = true;
217 } 219 }
218 } 220 }
219 if (changed) { 221 if (changed) {
220 Trace() << "Found a modified entity: " << remoteId; 222 SinkTrace() << "Found a modified entity: " << remoteId;
221 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, 223 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory,
222 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); 224 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
223 } 225 }
224 } else { 226 } else {
225 Warning() << "Failed to get current entity"; 227 SinkWarning() << "Failed to get current entity";
226 } 228 }
227 } 229 }
228} 230}
@@ -239,7 +241,7 @@ void Synchronizer::modify(const DomainType &entity)
239 241
240KAsync::Job<void> Synchronizer::synchronize() 242KAsync::Job<void> Synchronizer::synchronize()
241{ 243{
242 Trace() << "Synchronizing"; 244 SinkTrace() << "Synchronizing";
243 mSyncInProgress = true; 245 mSyncInProgress = true;
244 mMessageQueue->startTransaction(); 246 mMessageQueue->startTransaction();
245 return synchronizeWithSource().then<void>([this]() { 247 return synchronizeWithSource().then<void>([this]() {
@@ -265,7 +267,7 @@ void Synchronizer::commit()
265Sink::Storage::Transaction &Synchronizer::transaction() 267Sink::Storage::Transaction &Synchronizer::transaction()
266{ 268{
267 if (!mTransaction) { 269 if (!mTransaction) {
268 Trace() << "Starting transaction"; 270 SinkTrace() << "Starting transaction";
269 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); 271 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
270 } 272 }
271 return mTransaction; 273 return mTransaction;
@@ -274,7 +276,7 @@ Sink::Storage::Transaction &Synchronizer::transaction()
274Sink::Storage::Transaction &Synchronizer::syncTransaction() 276Sink::Storage::Transaction &Synchronizer::syncTransaction()
275{ 277{
276 if (!mSyncTransaction) { 278 if (!mSyncTransaction) {
277 Trace() << "Starting transaction"; 279 SinkTrace() << "Starting transaction";
278 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); 280 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
279 } 281 }
280 return mSyncTransaction; 282 return mSyncTransaction;