summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/genericresource.cpp8
-rw-r--r--common/listener.cpp3
-rw-r--r--common/resourceaccess.cpp13
3 files changed, 18 insertions, 6 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 8704e5a..b32a22b 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -459,16 +459,20 @@ void GenericResource::processCommand(int commandId, const QByteArray &data)
459 459
460KAsync::Job<void> GenericResource::synchronizeWithSource() 460KAsync::Job<void> GenericResource::synchronizeWithSource()
461{ 461{
462 return KAsync::start<void>([this]() { 462 return KAsync::start<void>([this](KAsync::Future<void> &future) {
463 Log() << " Synchronizing"; 463 Log() << " Synchronizing";
464 // Changereplay would deadlock otherwise when trying to open the synchronization store 464 // Changereplay would deadlock otherwise when trying to open the synchronization store
465 enableChangeReplay(false); 465 enableChangeReplay(false);
466 auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); 466 auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly);
467 auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); 467 auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite);
468 synchronizeWithSource(*mainStore, *syncStore) 468 synchronizeWithSource(*mainStore, *syncStore)
469 .then<void>([this, mainStore, syncStore]() { 469 .then<void>([this, mainStore, syncStore, &future]() {
470 Log() << "Done Synchronizing"; 470 Log() << "Done Synchronizing";
471 enableChangeReplay(true); 471 enableChangeReplay(true);
472 future.setFinished();
473 }, [this, &future](int errorCode, const QString &error) {
474 enableChangeReplay(true);
475 future.setError(errorCode, error);
472 }) 476 })
473 .exec(); 477 .exec();
474 }); 478 });
diff --git a/common/listener.cpp b/common/listener.cpp
index de20b2c..6cbf40f 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -252,6 +252,9 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
252 job.then<void>([callback, timer]() { 252 job.then<void>([callback, timer]() {
253 Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); 253 Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
254 callback(true); 254 callback(true);
255 }, [callback](int errorCode, const QString &msg) {
256 Warning() << "Sync failed: " << msg;
257 callback(false);
255 }) 258 })
256 .exec(); 259 .exec();
257 return; 260 return;
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 991c930..b294221 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -96,7 +96,7 @@ public:
96 QVector<QSharedPointer<QueuedCommand>> commandQueue; 96 QVector<QSharedPointer<QueuedCommand>> commandQueue;
97 QMap<uint, QSharedPointer<QueuedCommand>> pendingCommands; 97 QMap<uint, QSharedPointer<QueuedCommand>> pendingCommands;
98 QMultiMap<uint, std::function<void(int error, const QString &errorMessage)>> resultHandler; 98 QMultiMap<uint, std::function<void(int error, const QString &errorMessage)>> resultHandler;
99 QSet<uint> completeCommands; 99 QHash<uint, bool> completeCommands;
100 uint messageId; 100 uint messageId;
101 bool openingSocket; 101 bool openingSocket;
102}; 102};
@@ -121,12 +121,17 @@ void ResourceAccess::Private::abortPendingOperations()
121 121
122void ResourceAccess::Private::callCallbacks() 122void ResourceAccess::Private::callCallbacks()
123{ 123{
124 for (auto id : completeCommands) { 124 for (auto id : completeCommands.keys()) {
125 const bool success = completeCommands.value(id);
125 // We remove the callbacks first because the handler can kill resourceaccess directly 126 // We remove the callbacks first because the handler can kill resourceaccess directly
126 const auto callbacks = resultHandler.values(id); 127 const auto callbacks = resultHandler.values(id);
127 resultHandler.remove(id); 128 resultHandler.remove(id);
128 for (auto handler : callbacks) { 129 for (auto handler : callbacks) {
129 handler(0, QString()); 130 if (success) {
131 handler(0, QString());
132 } else {
133 handler(1, "Command failed.");
134 }
130 } 135 }
131 } 136 }
132} 137}
@@ -536,7 +541,7 @@ bool ResourceAccess::processMessageBuffer()
536 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 541 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
537 Log() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"); 542 Log() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully");
538 543
539 d->completeCommands << buffer->id(); 544 d->completeCommands.insert(buffer->id(), buffer->success());
540 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 545 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
541 queuedInvoke([=]() { d->callCallbacks(); }, this); 546 queuedInvoke([=]() { d->callCallbacks(); }, this);
542 break; 547 break;