summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-22 10:26:11 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-22 10:26:11 +0100
commitb6981d94fb5fb12024738b639f9e389dd04578da (patch)
treef13e16f84ba9cf33782af7a0b0dbd82c5d65afd4 /common/synchronizer.cpp
parent1d713d9e2dbaf27de9da087f9270d260dfc40c31 (diff)
downloadsink-b6981d94fb5fb12024738b639f9e389dd04578da.tar.gz
sink-b6981d94fb5fb12024738b639f9e389dd04578da.zip
Process change replays as part of the synchronization queue
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp22
1 files changed, 17 insertions, 5 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 10acefc..8010689 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -274,10 +274,14 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
274 auto job = KAsync::null<void>(); 274 auto job = KAsync::null<void>();
275 while (!mSyncRequestQueue.isEmpty()) { 275 while (!mSyncRequestQueue.isEmpty()) {
276 auto request = mSyncRequestQueue.takeFirst(); 276 auto request = mSyncRequestQueue.takeFirst();
277 job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] { 277 if (request.requestType == Synchronizer::SyncRequest::Synchronization) {
278 //Commit after every request, so implementations only have to commit more if they add a lot of data. 278 job = job.then(synchronizeWithSource(request.query)).syncThen<void>([this] {
279 commit(); 279 //Commit after every request, so implementations only have to commit more if they add a lot of data.
280 }); 280 commit();
281 });
282 } else {
283 job = replayNextRevision();
284 }
281 } 285 }
282 return job.then<void>([this](const KAsync::Error &error) { 286 return job.then<void>([this](const KAsync::Error &error) {
283 mSyncStore.clear(); 287 mSyncStore.clear();
@@ -311,6 +315,12 @@ Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction(
311 return mSyncTransaction; 315 return mSyncTransaction;
312} 316}
313 317
318void Synchronizer::revisionChanged()
319{
320 mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay};
321 processSyncQueue().exec();
322}
323
314bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) 324bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
315{ 325{
316 Sink::EntityBuffer buffer(value); 326 Sink::EntityBuffer buffer(value);
@@ -334,7 +344,6 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
334 Q_ASSERT(!mSyncStore); 344 Q_ASSERT(!mSyncStore);
335 Q_ASSERT(!mSyncTransaction); 345 Q_ASSERT(!mSyncTransaction);
336 mEntityStore->startTransaction(Storage::DataStore::ReadOnly); 346 mEntityStore->startTransaction(Storage::DataStore::ReadOnly);
337 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::ReadWrite);
338 347
339 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; 348 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
340 const auto uid = Sink::Storage::DataStore::uidFromKey(key); 349 const auto uid = Sink::Storage::DataStore::uidFromKey(key);
@@ -345,6 +354,9 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray
345 oldRemoteId = syncStore().resolveLocalId(type, uid); 354 oldRemoteId = syncStore().resolveLocalId(type, uid);
346 if (oldRemoteId.isEmpty()) { 355 if (oldRemoteId.isEmpty()) {
347 SinkWarning() << "Couldn't find the remote id for: " << type << uid; 356 SinkWarning() << "Couldn't find the remote id for: " << type << uid;
357 mSyncStore.clear();
358 mSyncTransaction.abort();
359 mEntityStore->abortTransaction();
348 return KAsync::error<void>(1, "Couldn't find the remote id."); 360 return KAsync::error<void>(1, "Couldn't find the remote id.");
349 } 361 }
350 } 362 }