summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/changereplay.cpp58
-rw-r--r--common/genericresource.cpp134
-rw-r--r--common/listener.cpp18
-rw-r--r--common/messagequeue.cpp37
-rw-r--r--common/pipeline.cpp6
-rw-r--r--common/queryrunner.cpp4
-rw-r--r--common/resourceaccess.cpp117
-rw-r--r--common/resourcecontrol.cpp29
-rw-r--r--common/resourcefacade.cpp6
-rw-r--r--common/sourcewriteback.cpp14
-rw-r--r--common/store.cpp68
-rw-r--r--common/synchronizer.cpp2
-rw-r--r--common/test.cpp2
13 files changed, 233 insertions, 262 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index fbd556f..e3b7158 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -72,7 +72,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
72{ 72{
73 auto lastReplayedRevision = QSharedPointer<qint64>::create(0); 73 auto lastReplayedRevision = QSharedPointer<qint64>::create(0);
74 auto topRevision = QSharedPointer<qint64>::create(0); 74 auto topRevision = QSharedPointer<qint64>::create(0);
75 return KAsync::start<void>([this, lastReplayedRevision, topRevision]() { 75 return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() {
76 mReplayInProgress = true; 76 mReplayInProgress = true;
77 mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 77 mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
78 SinkWarning() << error.message; 78 SinkWarning() << error.message;
@@ -90,11 +90,9 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
90 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; 90 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision;
91 }) 91 })
92 .then(KAsync::dowhile( 92 .then(KAsync::dowhile(
93 [this, lastReplayedRevision, topRevision](KAsync::Future<bool> &future) { 93 [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> {
94 if (*lastReplayedRevision >= *topRevision) { 94 if (*lastReplayedRevision >= *topRevision) {
95 future.setValue(false); 95 return KAsync::value(KAsync::Break);
96 future.setFinished();
97 return;
98 } 96 }
99 97
100 qint64 revision = *lastReplayedRevision + 1; 98 qint64 revision = *lastReplayedRevision + 1;
@@ -109,12 +107,15 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
109 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { 107 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool {
110 SinkTrace() << "Replaying " << key; 108 SinkTrace() << "Replaying " << key;
111 if (canReplay(type, key, value)) { 109 if (canReplay(type, key, value)) {
112 replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision]() { 110 replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision](const KAsync::Error &error) {
113 recordReplayedRevision(revision); 111 if (error) {
114 *lastReplayedRevision = revision; 112 SinkTrace() << "Change replay failed" << revision;
115 }, 113 return KAsync::error(error);
116 [revision](int, QString) { 114 } else {
117 SinkTrace() << "Change replay failed" << revision; 115 recordReplayedRevision(revision);
116 *lastReplayedRevision = revision;
117 }
118 return KAsync::null();
118 }); 119 });
119 exitLoop = true; 120 exitLoop = true;
120 } else { 121 } else {
@@ -128,23 +129,26 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
128 } 129 }
129 revision++; 130 revision++;
130 } 131 }
131 replayJob.then<void>([this, revision, lastReplayedRevision, topRevision, &future]() { 132 return replayJob.then<KAsync::ControlFlowFlag>([this, revision, lastReplayedRevision, topRevision](const KAsync::Error &error) ->KAsync::Job<KAsync::ControlFlowFlag> {
132 SinkTrace() << "Replayed until " << revision; 133 if (error) {
133 recordReplayedRevision(*lastReplayedRevision); 134 SinkTrace() << "Change replay failed" << revision;
134 QTimer::singleShot(0, [&future, lastReplayedRevision, topRevision]() { 135 //We're probably not online or so, so postpone retrying
135 future.setValue((*lastReplayedRevision < *topRevision)); 136 return KAsync::value(KAsync::Break);
136 future.setFinished(); 137 } else {
137 }); 138 SinkTrace() << "Replayed until " << revision;
138 }, 139 recordReplayedRevision(*lastReplayedRevision);
139 [this, revision, &future](int, QString) { 140 if (*lastReplayedRevision < *topRevision) {
140 SinkTrace() << "Change replay failed" << revision; 141 return KAsync::wait(0).then(KAsync::value(KAsync::Continue));
141 //We're probably not online or so, so postpone retrying 142 } else {
142 future.setValue(false); 143 return KAsync::value(KAsync::Break);
143 future.setFinished(); 144 }
144 }).exec(); 145 }
145 146 //We shouldn't ever get here
147 Q_ASSERT(false);
148 return KAsync::value(KAsync::Break);
149 });
146 })) 150 }))
147 .then<void>([this, lastReplayedRevision]() { 151 .syncThen<void>([this, lastReplayedRevision]() {
148 recordReplayedRevision(*lastReplayedRevision); 152 recordReplayedRevision(*lastReplayedRevision);
149 mMainStoreTransaction.abort(); 153 mMainStoreTransaction.abort();
150 if (allChangesReplayed()) { 154 if (allChangesReplayed()) {
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 7136882..f5b1775 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -100,7 +100,7 @@ private slots:
100 } 100 }
101 mProcessingLock = true; 101 mProcessingLock = true;
102 auto job = processPipeline() 102 auto job = processPipeline()
103 .then<void>([this]() { 103 .syncThen<void>([this]() {
104 mProcessingLock = false; 104 mProcessingLock = false;
105 if (messagesToProcessAvailable()) { 105 if (messagesToProcessAvailable()) {
106 process(); 106 process();
@@ -122,7 +122,8 @@ private slots:
122 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 122 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
123 case Sink::Commands::InspectionCommand: 123 case Sink::Commands::InspectionCommand:
124 if (mInspect) { 124 if (mInspect) {
125 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { return -1; }); 125 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size())
126 .syncThen<qint64>([]() { return -1; });
126 } else { 127 } else {
127 return KAsync::error<qint64>(-1, "Missing inspection command."); 128 return KAsync::error<qint64>(-1, "Missing inspection command.");
128 } 129 }
@@ -131,7 +132,7 @@ private slots:
131 } 132 }
132 } 133 }
133 134
134 KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) 135 KAsync::Job<qint64> processQueuedCommand(const QByteArray &data)
135 { 136 {
136 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); 137 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
137 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { 138 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
@@ -143,13 +144,13 @@ private slots:
143 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); 144 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId);
144 return processQueuedCommand(queuedCommand) 145 return processQueuedCommand(queuedCommand)
145 .then<qint64, qint64>( 146 .then<qint64, qint64>(
146 [this, commandId](qint64 createdRevision) -> qint64 { 147 [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> {
148 if (error) {
149 SinkWarning() << "Error while processing queue command: " << error.errorMessage;
150 return KAsync::error<qint64>(error);
151 }
147 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); 152 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId);
148 return createdRevision; 153 return KAsync::value<qint64>(createdRevision);
149 },
150 [](int errorCode, QString errorMessage) {
151 // FIXME propagate error, we didn't handle it
152 SinkWarning() << "Error while processing queue command: " << errorMessage;
153 }); 154 });
154 } 155 }
155 156
@@ -157,31 +158,31 @@ private slots:
157 KAsync::Job<void> processQueue(MessageQueue *queue) 158 KAsync::Job<void> processQueue(MessageQueue *queue)
158 { 159 {
159 auto time = QSharedPointer<QTime>::create(); 160 auto time = QSharedPointer<QTime>::create();
160 return KAsync::start<void>([this]() { mPipeline->startTransaction(); }) 161 return KAsync::syncStart<void>([this]() { mPipeline->startTransaction(); })
161 .then(KAsync::dowhile([queue]() { return !queue->isEmpty(); }, 162 .then(KAsync::dowhile(
162 [this, queue, time](KAsync::Future<void> &future) { 163 [this, queue, time]() -> KAsync::Job<KAsync::ControlFlowFlag> {
163 queue->dequeueBatch(sBatchSize, 164 return queue->dequeueBatch(sBatchSize,
164 [this, time](const QByteArray &data) { 165 [this, time](const QByteArray &data) -> KAsync::Job<void> {
165 time->start(); 166 time->start();
166 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { 167 return processQueuedCommand(data)
167 processQueuedCommand(data) 168 .syncThen<void, qint64>([this, time](qint64 createdRevision) {
168 .then<void, qint64>([&future, this, time](qint64 createdRevision) { 169 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
169 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); 170 });
170 future.setFinished(); 171 })
171 }) 172 .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) {
172 .exec(); 173 if (error) {
173 }); 174 if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) {
174 }) 175 SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage;
175 .then<void>([&future, queue]() { future.setFinished(); }, 176 }
176 [&future](int i, QString error) {
177 if (i != MessageQueue::ErrorCodes::NoMessageFound) {
178 SinkWarning() << "Error while getting message from messagequeue: " << error;
179 } 177 }
180 future.setFinished(); 178 if (queue->isEmpty()) {
181 }) 179 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Break);
182 .exec(); 180 } else {
181 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Continue);
182 }
183 });
183 })) 184 }))
184 .then<void>([this]() { mPipeline->commit(); }); 185 .syncThen<void>([this](const KAsync::Error &) { mPipeline->commit(); });
185 } 186 }
186 187
187 KAsync::Job<void> processPipeline() 188 KAsync::Job<void> processPipeline()
@@ -198,18 +199,20 @@ private slots:
198 199
199 // Go through all message queues 200 // Go through all message queues
200 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); 201 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues);
201 return KAsync::dowhile([it]() { return it->hasNext(); }, 202 return KAsync::dowhile(
202 [it, this](KAsync::Future<void> &future) { 203 [it, this]() {
203 auto time = QSharedPointer<QTime>::create(); 204 auto time = QSharedPointer<QTime>::create();
204 time->start(); 205 time->start();
205 206
206 auto queue = it->next(); 207 auto queue = it->next();
207 processQueue(queue) 208 return processQueue(queue)
208 .then<void>([this, &future, time]() { 209 .syncThen<KAsync::ControlFlowFlag>([this, time, it]() {
209 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); 210 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed());
210 future.setFinished(); 211 if (it->hasNext()) {
211 }) 212 return KAsync::Continue;
212 .exec(); 213 }
214 return KAsync::Break;
215 });
213 }); 216 });
214 } 217 }
215 218
@@ -251,22 +254,19 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
251 s >> expectedValue; 254 s >> expectedValue;
252 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) 255 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue)
253 .then<void>( 256 .then<void>(
254 [=]() { 257 [=](const KAsync::Error &error) {
255 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
256 Sink::Notification n;
257 n.type = Sink::Notification::Inspection;
258 n.id = inspectionId;
259 n.code = Sink::Notification::Success;
260 emit notify(n);
261 },
262 [=](int code, const QString &message) {
263 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message;
264 Sink::Notification n; 258 Sink::Notification n;
265 n.type = Sink::Notification::Inspection; 259 n.type = Sink::Notification::Inspection;
266 n.message = message;
267 n.id = inspectionId; 260 n.id = inspectionId;
268 n.code = Sink::Notification::Failure; 261 if (error) {
262 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage;
263 n.code = Sink::Notification::Failure;
264 } else {
265 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
266 n.code = Sink::Notification::Success;
267 }
269 emit notify(n); 268 emit notify(n);
269 return KAsync::null();
270 }) 270 })
271 .exec(); 271 .exec();
272 return KAsync::null<void>(); 272 return KAsync::null<void>();
@@ -420,7 +420,7 @@ void GenericResource::processCommand(int commandId, const QByteArray &data)
420 420
421KAsync::Job<void> GenericResource::synchronizeWithSource() 421KAsync::Job<void> GenericResource::synchronizeWithSource()
422{ 422{
423 return KAsync::start<void>([this](KAsync::Future<void> &future) { 423 return KAsync::start<void>([this]() {
424 424
425 Sink::Notification n; 425 Sink::Notification n;
426 n.id = "sync"; 426 n.id = "sync";
@@ -432,23 +432,21 @@ KAsync::Job<void> GenericResource::synchronizeWithSource()
432 SinkLog() << " Synchronizing"; 432 SinkLog() << " Synchronizing";
433 // Changereplay would deadlock otherwise when trying to open the synchronization store 433 // Changereplay would deadlock otherwise when trying to open the synchronization store
434 enableChangeReplay(false); 434 enableChangeReplay(false);
435 mSynchronizer->synchronize() 435 return mSynchronizer->synchronize()
436 .then<void>([this, &future]() { 436 .then<void>([this](const KAsync::Error &error) {
437 SinkLog() << "Done Synchronizing";
438 Sink::Notification n;
439 n.id = "sync";
440 n.type = Sink::Notification::Status;
441 n.message = "Synchronization has ended.";
442 n.code = Sink::ApplicationDomain::ConnectedStatus;
443 emit notify(n);
444
445 enableChangeReplay(true);
446 future.setFinished();
447 }, [this, &future](int errorCode, const QString &error) {
448 enableChangeReplay(true); 437 enableChangeReplay(true);
449 future.setError(errorCode, error); 438 if (!error) {
450 }) 439 SinkLog() << "Done Synchronizing";
451 .exec(); 440 Sink::Notification n;
441 n.id = "sync";
442 n.type = Sink::Notification::Status;
443 n.message = "Synchronization has ended.";
444 n.code = Sink::ApplicationDomain::ConnectedStatus;
445 emit notify(n);
446 return KAsync::null();
447 }
448 return KAsync::error(error);
449 });
452 }); 450 });
453} 451}
454 452
diff --git a/common/listener.cpp b/common/listener.cpp
index a051293..027d9ae 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -248,13 +248,17 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
248 if (buffer->localSync()) { 248 if (buffer->localSync()) {
249 job = job.then<void>(loadResource().processAllMessages()); 249 job = job.then<void>(loadResource().processAllMessages());
250 } 250 }
251 job.then<void>([callback, timer]() { 251 job.then<void>([callback, timer](const KAsync::Error &error) {
252 SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); 252 if (error) {
253 callback(true); 253 SinkWarning() << "Sync failed: " << error.errorMessage;
254 }, [callback](int errorCode, const QString &msg) { 254 callback(false);
255 SinkWarning() << "Sync failed: " << msg; 255 return KAsync::error(error);
256 callback(false); 256 } else {
257 }) 257 SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
258 callback(true);
259 return KAsync::null();
260 }
261 })
258 .exec(); 262 .exec();
259 return; 263 return;
260 } else { 264 } else {
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 3567a10..28eacb7 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -5,37 +5,6 @@
5 5
6SINK_DEBUG_AREA("messagequeue") 6SINK_DEBUG_AREA("messagequeue")
7 7
8static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void>> &futures)
9{
10 auto context = new QObject;
11 return KAsync::start<void>([futures, context](KAsync::Future<void> &future) {
12 const auto total = futures.size();
13 auto count = QSharedPointer<int>::create();
14 int i = 0;
15 for (KAsync::Future<void> subFuture : futures) {
16 i++;
17 if (subFuture.isFinished()) {
18 *count += 1;
19 continue;
20 }
21 // FIXME bind lifetime all watcher to future (repectively the main job
22 auto watcher = QSharedPointer<KAsync::FutureWatcher<void>>::create();
23 QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, [count, total, &future]() {
24 *count += 1;
25 if (*count == total) {
26 future.setFinished();
27 }
28 });
29 watcher->setFuture(subFuture);
30 context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher));
31 }
32 if (*count == total) {
33 future.setFinished();
34 }
35 })
36 .then<void>([context]() { delete context; });
37}
38
39MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) 8MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite)
40{ 9{
41} 10}
@@ -101,7 +70,7 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu
101 return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) { 70 return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) {
102 resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); 71 resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); });
103 }); 72 });
104 }).then<void>([]() {}, [errorHandler](int error, const QString &errorString) { errorHandler(Error("messagequeue", error, errorString.toLatin1())); }).exec(); 73 }).onError([errorHandler](const KAsync::Error &error) { errorHandler(Error("messagequeue", error.errorCode, error.errorMessage.toLatin1())); }).exec();
105} 74}
106 75
107KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) 76KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler)
@@ -135,8 +104,8 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
135 }); 104 });
136 105
137 // Trace() << "Waiting on " << waitCondition.size() << " results"; 106 // Trace() << "Waiting on " << waitCondition.size() << " results";
138 ::waitForCompletion(waitCondition) 107 KAsync::waitForCompletion(waitCondition)
139 .then<void>([this, resultCount, &future]() { 108 .syncThen<void>([this, resultCount, &future]() {
140 processRemovals(); 109 processRemovals();
141 if (*resultCount == 0) { 110 if (*resultCount == 0) {
142 future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found"); 111 future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found");
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 1d45340..ce864f7 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -233,7 +233,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
233 233
234 d->storeNewRevision(newRevision, fbb, bufferType, key); 234 d->storeNewRevision(newRevision, fbb, bufferType, key);
235 235
236 return KAsync::start<qint64>([newRevision]() { return newRevision; }); 236 return KAsync::value(newRevision);
237} 237}
238 238
239KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 239KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
@@ -346,7 +346,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
346 adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 346 adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
347 347
348 d->storeNewRevision(newRevision, fbb, bufferType, key); 348 d->storeNewRevision(newRevision, fbb, bufferType, key);
349 return KAsync::start<qint64>([newRevision]() { return newRevision; }); 349 return KAsync::value(newRevision);
350} 350}
351 351
352KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) 352KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
@@ -433,7 +433,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
433 processor->deletedEntity(key, newRevision, *current, d->transaction); 433 processor->deletedEntity(key, newRevision, *current, d->transaction);
434 } 434 }
435 435
436 return KAsync::start<qint64>([newRevision]() { return newRevision; }); 436 return KAsync::value(newRevision);
437} 437}
438 438
439void Pipeline::cleanupRevision(qint64 revision) 439void Pipeline::cleanupRevision(qint64 revision)
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 2e2e96d..052db39 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -86,7 +86,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
86 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 86 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize);
87 return newRevisionAndReplayedEntities; 87 return newRevisionAndReplayedEntities;
88 }) 88 })
89 .template then<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { 89 .template syncThen<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) {
90 mOffset[parentId] += newRevisionAndReplayedEntities.second; 90 mOffset[parentId] += newRevisionAndReplayedEntities.second;
91 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 91 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
92 if (query.liveQuery) { 92 if (query.liveQuery) {
@@ -110,7 +110,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
110 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); 110 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider);
111 return newRevisionAndReplayedEntities; 111 return newRevisionAndReplayedEntities;
112 }) 112 })
113 .template then<void, QPair<qint64, qint64> >([query, this, resultProvider](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { 113 .template syncThen<void, QPair<qint64, qint64> >([query, this, resultProvider](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) {
114 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 114 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
115 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); 115 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first);
116 resultProvider->setRevision(newRevisionAndReplayedEntities.first); 116 resultProvider->setRevision(newRevisionAndReplayedEntities.first);
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 7b4d839..364616c 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -159,67 +159,65 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect()
159 // We may have a socket from the last connection leftover 159 // We may have a socket from the last connection leftover
160 socket.reset(); 160 socket.reset();
161 auto counter = QSharedPointer<int>::create(0); 161 auto counter = QSharedPointer<int>::create(0);
162 return KAsync::dowhile([this]() -> bool { return !socket; }, 162 return KAsync::dowhile(
163 [this, counter](KAsync::Future<void> &future) { 163 [this, counter]() {
164 SinkTrace() << "Loop"; 164 SinkTrace() << "Loop";
165 connectToServer(resourceInstanceIdentifier) 165 return connectToServer(resourceInstanceIdentifier)
166 .then<void, QSharedPointer<QLocalSocket>>( 166 .then<KAsync::ControlFlowFlag, QSharedPointer<QLocalSocket>>(
167 [this, &future](const QSharedPointer<QLocalSocket> &s) { 167 [this, counter](const KAsync::Error &error, const QSharedPointer<QLocalSocket> &s) {
168 Q_ASSERT(s); 168 if (error) {
169 socket = s; 169 static int waitTime = 10;
170 future.setFinished(); 170 static int timeout = 500;
171 }, 171 static int maxRetries = timeout / waitTime;
172 [&future, counter, this](int errorCode, const QString &errorString) { 172 if (*counter > maxRetries) {
173 static int waitTime = 10; 173 SinkTrace() << "Giving up";
174 static int timeout = 500; 174 return KAsync::error<KAsync::ControlFlowFlag>("Failed to connect to socket");
175 static int maxRetries = timeout / waitTime; 175 } else {
176 if (*counter > maxRetries) { 176 *counter = *counter + 1;
177 SinkTrace() << "Giving up"; 177 return KAsync::wait(waitTime).then(KAsync::value(KAsync::Continue));
178 future.setError(-1, "Failed to connect to socket"); 178 }
179 } else { 179 } else {
180 KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec(); 180 Q_ASSERT(s);
181 socket = s;
182 return KAsync::value(KAsync::Break);
181 } 183 }
182 *counter = *counter + 1; 184 });
183 })
184 .exec();
185 }); 185 });
186} 186}
187 187
188KAsync::Job<void> ResourceAccess::Private::initializeSocket() 188KAsync::Job<void> ResourceAccess::Private::initializeSocket()
189{ 189{
190 return KAsync::start<void>([this](KAsync::Future<void> &future) { 190 return KAsync::start<void>([this] {
191 SinkTrace() << "Trying to connect"; 191 SinkTrace() << "Trying to connect";
192 connectToServer(resourceInstanceIdentifier) 192 return connectToServer(resourceInstanceIdentifier)
193 .then<void, QSharedPointer<QLocalSocket>>( 193 .then<void, QSharedPointer<QLocalSocket>>(
194 [this, &future](const QSharedPointer<QLocalSocket> &s) { 194 [this](const KAsync::Error &error, const QSharedPointer<QLocalSocket> &s) {
195 SinkTrace() << "Connected to resource, without having to start it."; 195 if (error) {
196 Q_ASSERT(s); 196 SinkTrace() << "Failed to connect, starting resource";
197 socket = s; 197 // We failed to connect, so let's start the resource
198 future.setFinished(); 198 QStringList args;
199 }, 199 if (Sink::Test::testModeEnabled()) {
200 [this, &future](int errorCode, const QString &errorString) { 200 args << "--test";
201 SinkTrace() << "Failed to connect, starting resource"; 201 }
202 // We failed to connect, so let's start the resource 202 args << resourceInstanceIdentifier << resourceName;
203 QStringList args; 203 qint64 pid = 0;
204 if (Sink::Test::testModeEnabled()) { 204 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) {
205 args << "--test"; 205 SinkTrace() << "Started resource " << pid;
206 } 206 return tryToConnect()
207 args << resourceInstanceIdentifier << resourceName; 207 .onError([this](const KAsync::Error &error) {
208 qint64 pid = 0; 208 SinkWarning() << "Failed to connect to started resource";
209 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { 209 });
210 SinkTrace() << "Started resource " << pid; 210 } else {
211 tryToConnect() 211 SinkWarning() << "Failed to start resource";
212 .then<void>([&future]() { future.setFinished(); }, 212 }
213 [this, &future](int errorCode, const QString &errorString) { 213 return KAsync::null();
214 SinkWarning() << "Failed to connect to started resource";
215 future.setError(errorCode, errorString);
216 })
217 .exec();
218 } else { 214 } else {
219 SinkWarning() << "Failed to start resource"; 215 SinkTrace() << "Connected to resource, without having to start it.";
216 Q_ASSERT(s);
217 socket = s;
218 return KAsync::null();
220 } 219 }
221 }) 220 });
222 .exec();
223 }); 221 });
224} 222}
225 223
@@ -383,17 +381,18 @@ void ResourceAccess::open()
383 d->openingSocket = true; 381 d->openingSocket = true;
384 d->initializeSocket() 382 d->initializeSocket()
385 .then<void>( 383 .then<void>(
386 [this, time]() { 384 [this, time](const KAsync::Error &error) {
387 SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed());
388 d->openingSocket = false; 385 d->openingSocket = false;
389 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); 386 if (error) {
390 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); 387 SinkWarning() << "Failed to initialize socket " << error.errorMessage;
391 QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); 388 } else {
392 connected(); 389 SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed());
393 }, 390 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected);
394 [this](int error, const QString &errorString) { 391 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError)));
395 d->openingSocket = false; 392 QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage);
396 SinkWarning() << "Failed to initialize socket " << errorString; 393 connected();
394 }
395 return KAsync::null();
397 }) 396 })
398 .exec(); 397 .exec();
399} 398}
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp
index 7d092a4..f509318 100644
--- a/common/resourcecontrol.cpp
+++ b/common/resourcecontrol.cpp
@@ -41,22 +41,22 @@ KAsync::Job<void> ResourceControl::shutdown(const QByteArray &identifier)
41 time->start(); 41 time->start();
42 return ResourceAccess::connectToServer(identifier) 42 return ResourceAccess::connectToServer(identifier)
43 .then<void, QSharedPointer<QLocalSocket>>( 43 .then<void, QSharedPointer<QLocalSocket>>(
44 [identifier, time](QSharedPointer<QLocalSocket> socket, KAsync::Future<void> &future) { 44 [identifier, time](const KAsync::Error &error, QSharedPointer<QLocalSocket> socket) {
45 if (error) {
46 SinkTrace() << "Resource is already closed.";
47 // Resource isn't started, nothing to shutdown
48 return KAsync::null();
49 }
45 // We can't currently reuse the socket 50 // We can't currently reuse the socket
46 socket->close(); 51 socket->close();
47 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); 52 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier));
48 resourceAccess->open(); 53 resourceAccess->open();
49 resourceAccess->sendCommand(Sink::Commands::ShutdownCommand) 54 return resourceAccess->sendCommand(Sink::Commands::ShutdownCommand)
50 .then<void>([&future, resourceAccess, time]() { 55 .addToContext(resourceAccess)
56 .syncThen<void>([resourceAccess, time]() {
51 resourceAccess->close(); 57 resourceAccess->close();
52 SinkTrace() << "Shutdown complete." << Log::TraceTime(time->elapsed()); 58 SinkTrace() << "Shutdown complete." << Log::TraceTime(time->elapsed());
53 future.setFinished(); 59 });
54 })
55 .exec();
56 },
57 [](int, const QString &) {
58 SinkTrace() << "Resource is already closed.";
59 // Resource isn't started, nothing to shutdown
60 }); 60 });
61} 61}
62 62
@@ -67,18 +67,19 @@ KAsync::Job<void> ResourceControl::start(const QByteArray &identifier)
67 time->start(); 67 time->start();
68 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); 68 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier));
69 resourceAccess->open(); 69 resourceAccess->open();
70 return resourceAccess->sendCommand(Sink::Commands::PingCommand).then<void>([resourceAccess, time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); }); 70 return resourceAccess->sendCommand(Sink::Commands::PingCommand).addToContext(resourceAccess).syncThen<void>([time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); });
71} 71}
72 72
73KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier) 73KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier)
74{ 74{
75 SinkTrace() << "flushMessageQueue" << resourceIdentifier; 75 SinkTrace() << "flushMessageQueue" << resourceIdentifier;
76 return KAsync::iterate(resourceIdentifier) 76 return KAsync::value(resourceIdentifier)
77 .template each<void, QByteArray>([](const QByteArray &resource, KAsync::Future<void> &future) { 77 .template each([](const QByteArray &resource) {
78 SinkTrace() << "Flushing message queue " << resource; 78 SinkTrace() << "Flushing message queue " << resource;
79 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); 79 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource));
80 resourceAccess->open(); 80 resourceAccess->open();
81 resourceAccess->synchronizeResource(false, true).then<void>([&future, resourceAccess]() { future.setFinished(); }).exec(); 81 return resourceAccess->synchronizeResource(false, true)
82 .addToContext(resourceAccess);
82 }); 83 });
83} 84}
84 85
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp
index bf4239d..1c56fe5 100644
--- a/common/resourcefacade.cpp
+++ b/common/resourcefacade.cpp
@@ -167,7 +167,7 @@ template <typename DomainType>
167KAsync::Job<void> LocalStorageFacade<DomainType>::create(const DomainType &domainObject) 167KAsync::Job<void> LocalStorageFacade<DomainType>::create(const DomainType &domainObject)
168{ 168{
169 auto configStoreIdentifier = mIdentifier; 169 auto configStoreIdentifier = mIdentifier;
170 return KAsync::start<void>([domainObject, configStoreIdentifier]() { 170 return KAsync::syncStart<void>([domainObject, configStoreIdentifier]() {
171 const QByteArray type = domainObject.getProperty("type").toByteArray(); 171 const QByteArray type = domainObject.getProperty("type").toByteArray();
172 const QByteArray providedIdentifier = domainObject.identifier().isEmpty() ? domainObject.getProperty("identifier").toByteArray() : domainObject.identifier(); 172 const QByteArray providedIdentifier = domainObject.identifier().isEmpty() ? domainObject.getProperty("identifier").toByteArray() : domainObject.identifier();
173 const QByteArray identifier = providedIdentifier.isEmpty() ? ResourceConfig::newIdentifier(type) : providedIdentifier; 173 const QByteArray identifier = providedIdentifier.isEmpty() ? ResourceConfig::newIdentifier(type) : providedIdentifier;
@@ -192,7 +192,7 @@ template <typename DomainType>
192KAsync::Job<void> LocalStorageFacade<DomainType>::modify(const DomainType &domainObject) 192KAsync::Job<void> LocalStorageFacade<DomainType>::modify(const DomainType &domainObject)
193{ 193{
194 auto configStoreIdentifier = mIdentifier; 194 auto configStoreIdentifier = mIdentifier;
195 return KAsync::start<void>([domainObject, configStoreIdentifier]() { 195 return KAsync::syncStart<void>([domainObject, configStoreIdentifier]() {
196 const QByteArray identifier = domainObject.identifier(); 196 const QByteArray identifier = domainObject.identifier();
197 if (identifier.isEmpty()) { 197 if (identifier.isEmpty()) {
198 SinkWarning() << "We need an \"identifier\" property to identify the entity to configure."; 198 SinkWarning() << "We need an \"identifier\" property to identify the entity to configure.";
@@ -220,7 +220,7 @@ template <typename DomainType>
220KAsync::Job<void> LocalStorageFacade<DomainType>::remove(const DomainType &domainObject) 220KAsync::Job<void> LocalStorageFacade<DomainType>::remove(const DomainType &domainObject)
221{ 221{
222 auto configStoreIdentifier = mIdentifier; 222 auto configStoreIdentifier = mIdentifier;
223 return KAsync::start<void>([domainObject, configStoreIdentifier]() { 223 return KAsync::syncStart<void>([domainObject, configStoreIdentifier]() {
224 const QByteArray identifier = domainObject.identifier(); 224 const QByteArray identifier = domainObject.identifier();
225 if (identifier.isEmpty()) { 225 if (identifier.isEmpty()) {
226 SinkWarning() << "We need an \"identifier\" property to identify the entity to configure"; 226 SinkWarning() << "We need an \"identifier\" property to identify the entity to configure";
diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp
index fe996cb..702d8e3 100644
--- a/common/sourcewriteback.cpp
+++ b/common/sourcewriteback.cpp
@@ -106,7 +106,7 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr
106 job = replay(mail, operation, oldRemoteId, modifiedProperties); 106 job = replay(mail, operation, oldRemoteId, modifiedProperties);
107 } 107 }
108 108
109 return job.then<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { 109 return job.syncThen<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) {
110 if (operation == Sink::Operation_Creation) { 110 if (operation == Sink::Operation_Creation) {
111 SinkTrace() << "Replayed creation with remote id: " << remoteId; 111 SinkTrace() << "Replayed creation with remote id: " << remoteId;
112 if (remoteId.isEmpty()) { 112 if (remoteId.isEmpty()) {
@@ -127,13 +127,11 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr
127 } else { 127 } else {
128 SinkError() << "Unkown operation" << operation; 128 SinkError() << "Unkown operation" << operation;
129 } 129 }
130 130 })
131 mSyncStore.clear(); 131 .syncThen<void>([this](const KAsync::Error &error) {
132 mEntityStore.clear(); 132 if (error) {
133 mTransaction.abort(); 133 SinkWarning() << "Failed to replay change: " << error.errorMessage;
134 mSyncTransaction.commit(); 134 }
135 }, [this](int errorCode, const QString &errorMessage) {
136 SinkWarning() << "Failed to replay change: " << errorMessage;
137 mSyncStore.clear(); 135 mSyncStore.clear();
138 mEntityStore.clear(); 136 mEntityStore.clear();
139 mTransaction.abort(); 137 mTransaction.abort();
diff --git a/common/store.cpp b/common/store.cpp
index 07f41f8..c01d220 100644
--- a/common/store.cpp
+++ b/common/store.cpp
@@ -39,6 +39,8 @@
39SINK_DEBUG_AREA("store") 39SINK_DEBUG_AREA("store")
40 40
41Q_DECLARE_METATYPE(QSharedPointer<Sink::ResultEmitter<Sink::ApplicationDomain::SinkResource::Ptr>>) 41Q_DECLARE_METATYPE(QSharedPointer<Sink::ResultEmitter<Sink::ApplicationDomain::SinkResource::Ptr>>)
42Q_DECLARE_METATYPE(QSharedPointer<Sink::ResourceAccessInterface>);
43Q_DECLARE_METATYPE(std::shared_ptr<void>);
42 44
43namespace Sink { 45namespace Sink {
44 46
@@ -169,12 +171,10 @@ QSharedPointer<QAbstractItemModel> Store::loadModel(Query query)
169 result.first.exec(); 171 result.first.exec();
170 } 172 }
171 173
172 KAsync::iterate(resources.keys()) 174 KAsync::value(resources.keys())
173 .template each<void, QByteArray>([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier, KAsync::Future<void> &future) { 175 .template each([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier) {
174 const auto resourceType = resources.value(resourceInstanceIdentifier); 176 const auto resourceType = resources.value(resourceInstanceIdentifier);
175 queryResource<DomainType>(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter).template then<void>([&future]() { 177 return queryResource<DomainType>(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter);
176 future.setFinished();
177 }).exec();
178 }) 178 })
179 .exec(); 179 .exec();
180 model->fetchMore(QModelIndex()); 180 model->fetchMore(QModelIndex());
@@ -201,7 +201,7 @@ KAsync::Job<void> Store::create(const DomainType &domainObject)
201{ 201{
202 // Potentially move to separate thread as well 202 // Potentially move to separate thread as well
203 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 203 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
204 return facade->create(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to create"; }); 204 return facade->create(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create"; });
205} 205}
206 206
207template <class DomainType> 207template <class DomainType>
@@ -209,7 +209,7 @@ KAsync::Job<void> Store::modify(const DomainType &domainObject)
209{ 209{
210 // Potentially move to separate thread as well 210 // Potentially move to separate thread as well
211 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 211 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
212 return facade->modify(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to modify"; }); 212 return facade->modify(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; });
213} 213}
214 214
215template <class DomainType> 215template <class DomainType>
@@ -217,7 +217,7 @@ KAsync::Job<void> Store::remove(const DomainType &domainObject)
217{ 217{
218 // Potentially move to separate thread as well 218 // Potentially move to separate thread as well
219 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 219 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
220 return facade->remove(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to remove"; }); 220 return facade->remove(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove"; });
221} 221}
222 222
223KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) 223KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
@@ -231,6 +231,7 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
231 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); 231 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier));
232 resourceAccess->open(); 232 resourceAccess->open();
233 return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) 233 return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand)
234 .addToContext(resourceAccess)
234 .then<void>([resourceAccess](KAsync::Future<void> &future) { 235 .then<void>([resourceAccess](KAsync::Future<void> &future) {
235 if (resourceAccess->isReady()) { 236 if (resourceAccess->isReady()) {
236 //Wait for the resource shutdown 237 //Wait for the resource shutdown
@@ -243,8 +244,8 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
243 future.setFinished(); 244 future.setFinished();
244 } 245 }
245 }) 246 })
246 .then<void>([resourceAccess, time]() { 247 .syncThen<void>([time]() {
247 SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); 248 SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed());
248 }); 249 });
249} 250}
250 251
@@ -253,41 +254,38 @@ KAsync::Job<void> Store::synchronize(const Sink::Query &query)
253 SinkTrace() << "synchronize" << query.resources; 254 SinkTrace() << "synchronize" << query.resources;
254 auto resources = getResources(query.resources, query.accounts).keys(); 255 auto resources = getResources(query.resources, query.accounts).keys();
255 //FIXME only necessary because each doesn't propagate errors 256 //FIXME only necessary because each doesn't propagate errors
256 auto error = new bool; 257 auto errorFlag = new bool;
257 return KAsync::iterate(resources) 258 return KAsync::value(resources)
258 .template each<void, QByteArray>([query, error](const QByteArray &resource, KAsync::Future<void> &future) { 259 .template each([query, errorFlag](const QByteArray &resource) {
259 SinkTrace() << "Synchronizing " << resource; 260 SinkTrace() << "Synchronizing " << resource;
260 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); 261 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource));
261 resourceAccess->open(); 262 resourceAccess->open();
262 resourceAccess->synchronizeResource(true, false).then<void>([resourceAccess, &future]() {SinkTrace() << "synced."; future.setFinished(); }, 263 return resourceAccess->synchronizeResource(true, false)
263 [&future, error](int errorCode, QString msg) { *error = true; SinkWarning() << "Error during sync."; future.setError(errorCode, msg); }).exec(); 264 .addToContext(resourceAccess)
264 }).then<void>([error](KAsync::Future<void> &future) { 265 .then<void>([errorFlag](const KAsync::Error &error) {
265 if (*error) { 266 if (error) {
266 future.setError(1, "Error during sync."); 267 *errorFlag = true;
267 } else { 268 SinkWarning() << "Error during sync.";
268 future.setFinished(); 269 return KAsync::error<void>(error);
270 }
271 SinkTrace() << "synced.";
272 return KAsync::null<void>();
273 });
274 })
275 .then<void>([errorFlag]() {
276 if (*errorFlag) {
277 return KAsync::error<void>("Error during sync.");
269 } 278 }
270 delete error; 279 delete errorFlag;
280 return KAsync::null<void>();
271 }); 281 });
272} 282}
273 283
274template <class DomainType> 284template <class DomainType>
275KAsync::Job<DomainType> Store::fetchOne(const Sink::Query &query) 285KAsync::Job<DomainType> Store::fetchOne(const Sink::Query &query)
276{ 286{
277 return KAsync::start<DomainType>([query](KAsync::Future<DomainType> &future) { 287 return fetch<DomainType>(query, 1).template then<DomainType, QList<typename DomainType::Ptr>>([](const QList<typename DomainType::Ptr> &list) {
278 // FIXME We could do this more elegantly if composed jobs would have the correct type (In that case we'd simply return the value from then continuation, and could avoid the 288 return KAsync::value(*list.first());
279 // outer job entirely)
280 fetch<DomainType>(query, 1)
281 .template then<void, QList<typename DomainType::Ptr>>(
282 [&future](const QList<typename DomainType::Ptr> &list) {
283 future.setValue(*list.first());
284 future.setFinished();
285 },
286 [&future](int errorCode, const QString &errorMessage) {
287 future.setError(errorCode, errorMessage);
288 future.setFinished();
289 })
290 .exec();
291 }); 289 });
292} 290}
293 291
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 2d4fb8d..15a06e7 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -244,7 +244,7 @@ KAsync::Job<void> Synchronizer::synchronize()
244 SinkTrace() << "Synchronizing"; 244 SinkTrace() << "Synchronizing";
245 mSyncInProgress = true; 245 mSyncInProgress = true;
246 mMessageQueue->startTransaction(); 246 mMessageQueue->startTransaction();
247 return synchronizeWithSource().then<void>([this]() { 247 return synchronizeWithSource().syncThen<void>([this]() {
248 mSyncStore.clear(); 248 mSyncStore.clear();
249 mEntityStore.clear(); 249 mEntityStore.clear();
250 mMessageQueue->commit(); 250 mMessageQueue->commit();
diff --git a/common/test.cpp b/common/test.cpp
index 5b4c899..1a8e11d 100644
--- a/common/test.cpp
+++ b/common/test.cpp
@@ -156,7 +156,7 @@ public:
156 } 156 }
157 resultProvider->initialResultSetComplete(parent); 157 resultProvider->initialResultSetComplete(parent);
158 }); 158 });
159 auto job = KAsync::start<void>([query, resultProvider]() {}); 159 auto job = KAsync::syncStart<void>([query, resultProvider]() {});
160 return qMakePair(job, emitter); 160 return qMakePair(job, emitter);
161 } 161 }
162 162