diff options
-rw-r--r-- | common/synchronizer.cpp | 35 | ||||
-rw-r--r-- | common/synchronizerstore.cpp | 9 |
2 files changed, 31 insertions, 13 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index a341f63..cfe1f09 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -279,10 +279,11 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
279 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { | 279 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { |
280 | return KAsync::null<void>(); | 280 | return KAsync::null<void>(); |
281 | } | 281 | } |
282 | mSyncInProgress = true; | ||
283 | mMessageQueue->startTransaction(); | ||
284 | 282 | ||
285 | auto job = KAsync::null<void>(); | 283 | auto job = KAsync::syncStart<void>([this] { |
284 | mMessageQueue->startTransaction(); | ||
285 | mSyncInProgress = true; | ||
286 | }); | ||
286 | while (!mSyncRequestQueue.isEmpty()) { | 287 | while (!mSyncRequestQueue.isEmpty()) { |
287 | auto request = mSyncRequestQueue.takeFirst(); | 288 | auto request = mSyncRequestQueue.takeFirst(); |
288 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { | 289 | if (request.requestType == Synchronizer::SyncRequest::Synchronization) { |
@@ -334,13 +335,17 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
334 | Sink::Commands::FinishFlushBuffer(fbb, location); | 335 | Sink::Commands::FinishFlushBuffer(fbb, location); |
335 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | 336 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); |
336 | } | 337 | } |
337 | } else { | 338 | } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { |
338 | job = replayNextRevision(); | 339 | job = replayNextRevision(); |
340 | } else { | ||
341 | SinkWarning() << "Unknown request type: " << request.requestType; | ||
342 | return KAsync::error(KAsync::Error{"Unknown request type."}); | ||
339 | } | 343 | } |
340 | } | 344 | } |
341 | return job.then<void>([this](const KAsync::Error &error) { | 345 | return job.then<void>([this](const KAsync::Error &error) { |
342 | mSyncStore.clear(); | 346 | mSyncTransaction.abort(); |
343 | mMessageQueue->commit(); | 347 | mMessageQueue->commit(); |
348 | mSyncStore.clear(); | ||
344 | mSyncInProgress = false; | 349 | mSyncInProgress = false; |
345 | if (allChangesReplayed()) { | 350 | if (allChangesReplayed()) { |
346 | emit changesReplayed(); | 351 | emit changesReplayed(); |
@@ -399,6 +404,18 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray | |||
399 | Sink::EntityBuffer buffer(value); | 404 | Sink::EntityBuffer buffer(value); |
400 | const Sink::Entity &entity = buffer.entity(); | 405 | const Sink::Entity &entity = buffer.entity(); |
401 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | 406 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); |
407 | if (!metadataBuffer) { | ||
408 | SinkError() << "No metadata buffer available."; | ||
409 | return KAsync::error("No metadata buffer"); | ||
410 | } | ||
411 | if (mSyncTransaction) { | ||
412 | SinkError() << "Leftover sync transaction."; | ||
413 | mSyncTransaction.abort(); | ||
414 | } | ||
415 | if (mSyncStore) { | ||
416 | SinkError() << "Leftover sync store."; | ||
417 | mSyncStore.clear(); | ||
418 | } | ||
402 | Q_ASSERT(metadataBuffer); | 419 | Q_ASSERT(metadataBuffer); |
403 | Q_ASSERT(!mSyncStore); | 420 | Q_ASSERT(!mSyncStore); |
404 | Q_ASSERT(!mSyncTransaction); | 421 | Q_ASSERT(!mSyncTransaction); |
@@ -411,13 +428,7 @@ KAsync::Job<void> Synchronizer::replay(const QByteArray &type, const QByteArray | |||
411 | 428 | ||
412 | if (operation != Sink::Operation_Creation) { | 429 | if (operation != Sink::Operation_Creation) { |
413 | oldRemoteId = syncStore().resolveLocalId(type, uid); | 430 | oldRemoteId = syncStore().resolveLocalId(type, uid); |
414 | if (oldRemoteId.isEmpty()) { | 431 | //oldRemoteId can be empty if the resource implementation didn't return a remoteid |
415 | SinkWarning() << "Couldn't find the remote id for: " << type << uid; | ||
416 | mSyncStore.clear(); | ||
417 | mSyncTransaction.abort(); | ||
418 | mEntityStore->abortTransaction(); | ||
419 | return KAsync::error<void>(1, "Couldn't find the remote id."); | ||
420 | } | ||
421 | } | 432 | } |
422 | SinkTrace() << "Replaying " << key << type << uid << oldRemoteId; | 433 | SinkTrace() << "Replaying " << key << type << uid << oldRemoteId; |
423 | 434 | ||
diff --git a/common/synchronizerstore.cpp b/common/synchronizerstore.cpp index a9da2fe..31c1391 100644 --- a/common/synchronizerstore.cpp +++ b/common/synchronizerstore.cpp | |||
@@ -117,13 +117,20 @@ void SynchronizerStore::writeValue(const QByteArray &prefix, const QByteArray &k | |||
117 | 117 | ||
118 | void SynchronizerStore::removeValue(const QByteArray &prefix, const QByteArray &key) | 118 | void SynchronizerStore::removeValue(const QByteArray &prefix, const QByteArray &key) |
119 | { | 119 | { |
120 | mTransaction.openDatabase("values").remove(prefix + key, [&](const Sink::Storage::DataStore::Error &error) { | 120 | auto assembled = prefix + key; |
121 | if (assembled.isEmpty()) { | ||
122 | return; | ||
123 | } | ||
124 | mTransaction.openDatabase("values").remove(assembled, [&](const Sink::Storage::DataStore::Error &error) { | ||
121 | SinkWarning() << "Failed to remove the value: " << prefix + key << error; | 125 | SinkWarning() << "Failed to remove the value: " << prefix + key << error; |
122 | }); | 126 | }); |
123 | } | 127 | } |
124 | 128 | ||
125 | void SynchronizerStore::removePrefix(const QByteArray &prefix) | 129 | void SynchronizerStore::removePrefix(const QByteArray &prefix) |
126 | { | 130 | { |
131 | if (prefix.isEmpty()) { | ||
132 | return; | ||
133 | } | ||
127 | //FIXME remove all values matching prefix | 134 | //FIXME remove all values matching prefix |
128 | // mTransaction.openDatabase("values").remove(prefix, [](const Sink::Storage::DataStore::Error &) { | 135 | // mTransaction.openDatabase("values").remove(prefix, [](const Sink::Storage::DataStore::Error &) { |
129 | // //Ignore errors because we may not find the value | 136 | // //Ignore errors because we may not find the value |