summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/changereplay.cpp58
-rw-r--r--common/genericresource.cpp134
-rw-r--r--common/listener.cpp18
-rw-r--r--common/messagequeue.cpp37
-rw-r--r--common/pipeline.cpp6
-rw-r--r--common/queryrunner.cpp4
-rw-r--r--common/resourceaccess.cpp117
-rw-r--r--common/resourcecontrol.cpp29
-rw-r--r--common/resourcefacade.cpp6
-rw-r--r--common/sourcewriteback.cpp14
-rw-r--r--common/store.cpp68
-rw-r--r--common/synchronizer.cpp2
-rw-r--r--common/test.cpp2
-rw-r--r--examples/dummyresource/resourcefactory.cpp2
-rw-r--r--examples/imapresource/imapresource.cpp74
-rw-r--r--examples/imapresource/imapserverproxy.cpp35
-rw-r--r--examples/maildirresource/maildirresource.cpp18
-rw-r--r--examples/mailtransportresource/mailtransportresource.cpp7
-rw-r--r--sinksh/syntax_modules/sink_sync.cpp4
-rw-r--r--tests/accountstest.cpp8
-rw-r--r--tests/clientapitest.cpp4
-rw-r--r--tests/dummyresourcebenchmark.cpp30
-rw-r--r--tests/mailsynctest.cpp22
-rw-r--r--tests/mailtest.cpp22
-rw-r--r--tests/resourcecommunicationtest.cpp28
25 files changed, 337 insertions, 412 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index fbd556f..e3b7158 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -72,7 +72,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
72{ 72{
73 auto lastReplayedRevision = QSharedPointer<qint64>::create(0); 73 auto lastReplayedRevision = QSharedPointer<qint64>::create(0);
74 auto topRevision = QSharedPointer<qint64>::create(0); 74 auto topRevision = QSharedPointer<qint64>::create(0);
75 return KAsync::start<void>([this, lastReplayedRevision, topRevision]() { 75 return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() {
76 mReplayInProgress = true; 76 mReplayInProgress = true;
77 mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 77 mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
78 SinkWarning() << error.message; 78 SinkWarning() << error.message;
@@ -90,11 +90,9 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
90 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; 90 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision;
91 }) 91 })
92 .then(KAsync::dowhile( 92 .then(KAsync::dowhile(
93 [this, lastReplayedRevision, topRevision](KAsync::Future<bool> &future) { 93 [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> {
94 if (*lastReplayedRevision >= *topRevision) { 94 if (*lastReplayedRevision >= *topRevision) {
95 future.setValue(false); 95 return KAsync::value(KAsync::Break);
96 future.setFinished();
97 return;
98 } 96 }
99 97
100 qint64 revision = *lastReplayedRevision + 1; 98 qint64 revision = *lastReplayedRevision + 1;
@@ -109,12 +107,15 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
109 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { 107 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool {
110 SinkTrace() << "Replaying " << key; 108 SinkTrace() << "Replaying " << key;
111 if (canReplay(type, key, value)) { 109 if (canReplay(type, key, value)) {
112 replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision]() { 110 replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision](const KAsync::Error &error) {
113 recordReplayedRevision(revision); 111 if (error) {
114 *lastReplayedRevision = revision; 112 SinkTrace() << "Change replay failed" << revision;
115 }, 113 return KAsync::error(error);
116 [revision](int, QString) { 114 } else {
117 SinkTrace() << "Change replay failed" << revision; 115 recordReplayedRevision(revision);
116 *lastReplayedRevision = revision;
117 }
118 return KAsync::null();
118 }); 119 });
119 exitLoop = true; 120 exitLoop = true;
120 } else { 121 } else {
@@ -128,23 +129,26 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
128 } 129 }
129 revision++; 130 revision++;
130 } 131 }
131 replayJob.then<void>([this, revision, lastReplayedRevision, topRevision, &future]() { 132 return replayJob.then<KAsync::ControlFlowFlag>([this, revision, lastReplayedRevision, topRevision](const KAsync::Error &error) ->KAsync::Job<KAsync::ControlFlowFlag> {
132 SinkTrace() << "Replayed until " << revision; 133 if (error) {
133 recordReplayedRevision(*lastReplayedRevision); 134 SinkTrace() << "Change replay failed" << revision;
134 QTimer::singleShot(0, [&future, lastReplayedRevision, topRevision]() { 135 //We're probably not online or so, so postpone retrying
135 future.setValue((*lastReplayedRevision < *topRevision)); 136 return KAsync::value(KAsync::Break);
136 future.setFinished(); 137 } else {
137 }); 138 SinkTrace() << "Replayed until " << revision;
138 }, 139 recordReplayedRevision(*lastReplayedRevision);
139 [this, revision, &future](int, QString) { 140 if (*lastReplayedRevision < *topRevision) {
140 SinkTrace() << "Change replay failed" << revision; 141 return KAsync::wait(0).then(KAsync::value(KAsync::Continue));
141 //We're probably not online or so, so postpone retrying 142 } else {
142 future.setValue(false); 143 return KAsync::value(KAsync::Break);
143 future.setFinished(); 144 }
144 }).exec(); 145 }
145 146 //We shouldn't ever get here
147 Q_ASSERT(false);
148 return KAsync::value(KAsync::Break);
149 });
146 })) 150 }))
147 .then<void>([this, lastReplayedRevision]() { 151 .syncThen<void>([this, lastReplayedRevision]() {
148 recordReplayedRevision(*lastReplayedRevision); 152 recordReplayedRevision(*lastReplayedRevision);
149 mMainStoreTransaction.abort(); 153 mMainStoreTransaction.abort();
150 if (allChangesReplayed()) { 154 if (allChangesReplayed()) {
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 7136882..f5b1775 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -100,7 +100,7 @@ private slots:
100 } 100 }
101 mProcessingLock = true; 101 mProcessingLock = true;
102 auto job = processPipeline() 102 auto job = processPipeline()
103 .then<void>([this]() { 103 .syncThen<void>([this]() {
104 mProcessingLock = false; 104 mProcessingLock = false;
105 if (messagesToProcessAvailable()) { 105 if (messagesToProcessAvailable()) {
106 process(); 106 process();
@@ -122,7 +122,8 @@ private slots:
122 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 122 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
123 case Sink::Commands::InspectionCommand: 123 case Sink::Commands::InspectionCommand:
124 if (mInspect) { 124 if (mInspect) {
125 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { return -1; }); 125 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size())
126 .syncThen<qint64>([]() { return -1; });
126 } else { 127 } else {
127 return KAsync::error<qint64>(-1, "Missing inspection command."); 128 return KAsync::error<qint64>(-1, "Missing inspection command.");
128 } 129 }
@@ -131,7 +132,7 @@ private slots:
131 } 132 }
132 } 133 }
133 134
134 KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) 135 KAsync::Job<qint64> processQueuedCommand(const QByteArray &data)
135 { 136 {
136 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); 137 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
137 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { 138 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
@@ -143,13 +144,13 @@ private slots:
143 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); 144 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId);
144 return processQueuedCommand(queuedCommand) 145 return processQueuedCommand(queuedCommand)
145 .then<qint64, qint64>( 146 .then<qint64, qint64>(
146 [this, commandId](qint64 createdRevision) -> qint64 { 147 [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> {
148 if (error) {
149 SinkWarning() << "Error while processing queue command: " << error.errorMessage;
150 return KAsync::error<qint64>(error);
151 }
147 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); 152 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId);
148 return createdRevision; 153 return KAsync::value<qint64>(createdRevision);
149 },
150 [](int errorCode, QString errorMessage) {
151 // FIXME propagate error, we didn't handle it
152 SinkWarning() << "Error while processing queue command: " << errorMessage;
153 }); 154 });
154 } 155 }
155 156
@@ -157,31 +158,31 @@ private slots:
157 KAsync::Job<void> processQueue(MessageQueue *queue) 158 KAsync::Job<void> processQueue(MessageQueue *queue)
158 { 159 {
159 auto time = QSharedPointer<QTime>::create(); 160 auto time = QSharedPointer<QTime>::create();
160 return KAsync::start<void>([this]() { mPipeline->startTransaction(); }) 161 return KAsync::syncStart<void>([this]() { mPipeline->startTransaction(); })
161 .then(KAsync::dowhile([queue]() { return !queue->isEmpty(); }, 162 .then(KAsync::dowhile(
162 [this, queue, time](KAsync::Future<void> &future) { 163 [this, queue, time]() -> KAsync::Job<KAsync::ControlFlowFlag> {
163 queue->dequeueBatch(sBatchSize, 164 return queue->dequeueBatch(sBatchSize,
164 [this, time](const QByteArray &data) { 165 [this, time](const QByteArray &data) -> KAsync::Job<void> {
165 time->start(); 166 time->start();
166 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { 167 return processQueuedCommand(data)
167 processQueuedCommand(data) 168 .syncThen<void, qint64>([this, time](qint64 createdRevision) {
168 .then<void, qint64>([&future, this, time](qint64 createdRevision) { 169 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
169 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); 170 });
170 future.setFinished(); 171 })
171 }) 172 .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) {
172 .exec(); 173 if (error) {
173 }); 174 if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) {
174 }) 175 SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage;
175 .then<void>([&future, queue]() { future.setFinished(); }, 176 }
176 [&future](int i, QString error) {
177 if (i != MessageQueue::ErrorCodes::NoMessageFound) {
178 SinkWarning() << "Error while getting message from messagequeue: " << error;
179 } 177 }
180 future.setFinished(); 178 if (queue->isEmpty()) {
181 }) 179 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Break);
182 .exec(); 180 } else {
181 return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Continue);
182 }
183 });
183 })) 184 }))
184 .then<void>([this]() { mPipeline->commit(); }); 185 .syncThen<void>([this](const KAsync::Error &) { mPipeline->commit(); });
185 } 186 }
186 187
187 KAsync::Job<void> processPipeline() 188 KAsync::Job<void> processPipeline()
@@ -198,18 +199,20 @@ private slots:
198 199
199 // Go through all message queues 200 // Go through all message queues
200 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); 201 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues);
201 return KAsync::dowhile([it]() { return it->hasNext(); }, 202 return KAsync::dowhile(
202 [it, this](KAsync::Future<void> &future) { 203 [it, this]() {
203 auto time = QSharedPointer<QTime>::create(); 204 auto time = QSharedPointer<QTime>::create();
204 time->start(); 205 time->start();
205 206
206 auto queue = it->next(); 207 auto queue = it->next();
207 processQueue(queue) 208 return processQueue(queue)
208 .then<void>([this, &future, time]() { 209 .syncThen<KAsync::ControlFlowFlag>([this, time, it]() {
209 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); 210 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed());
210 future.setFinished(); 211 if (it->hasNext()) {
211 }) 212 return KAsync::Continue;
212 .exec(); 213 }
214 return KAsync::Break;
215 });
213 }); 216 });
214 } 217 }
215 218
@@ -251,22 +254,19 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
251 s >> expectedValue; 254 s >> expectedValue;
252 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) 255 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue)
253 .then<void>( 256 .then<void>(
254 [=]() { 257 [=](const KAsync::Error &error) {
255 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
256 Sink::Notification n;
257 n.type = Sink::Notification::Inspection;
258 n.id = inspectionId;
259 n.code = Sink::Notification::Success;
260 emit notify(n);
261 },
262 [=](int code, const QString &message) {
263 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message;
264 Sink::Notification n; 258 Sink::Notification n;
265 n.type = Sink::Notification::Inspection; 259 n.type = Sink::Notification::Inspection;
266 n.message = message;
267 n.id = inspectionId; 260 n.id = inspectionId;
268 n.code = Sink::Notification::Failure; 261 if (error) {
262 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage;
263 n.code = Sink::Notification::Failure;
264 } else {
265 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
266 n.code = Sink::Notification::Success;
267 }
269 emit notify(n); 268 emit notify(n);
269 return KAsync::null();
270 }) 270 })
271 .exec(); 271 .exec();
272 return KAsync::null<void>(); 272 return KAsync::null<void>();
@@ -420,7 +420,7 @@ void GenericResource::processCommand(int commandId, const QByteArray &data)
420 420
421KAsync::Job<void> GenericResource::synchronizeWithSource() 421KAsync::Job<void> GenericResource::synchronizeWithSource()
422{ 422{
423 return KAsync::start<void>([this](KAsync::Future<void> &future) { 423 return KAsync::start<void>([this]() {
424 424
425 Sink::Notification n; 425 Sink::Notification n;
426 n.id = "sync"; 426 n.id = "sync";
@@ -432,23 +432,21 @@ KAsync::Job<void> GenericResource::synchronizeWithSource()
432 SinkLog() << " Synchronizing"; 432 SinkLog() << " Synchronizing";
433 // Changereplay would deadlock otherwise when trying to open the synchronization store 433 // Changereplay would deadlock otherwise when trying to open the synchronization store
434 enableChangeReplay(false); 434 enableChangeReplay(false);
435 mSynchronizer->synchronize() 435 return mSynchronizer->synchronize()
436 .then<void>([this, &future]() { 436 .then<void>([this](const KAsync::Error &error) {
437 SinkLog() << "Done Synchronizing";
438 Sink::Notification n;
439 n.id = "sync";
440 n.type = Sink::Notification::Status;
441 n.message = "Synchronization has ended.";
442 n.code = Sink::ApplicationDomain::ConnectedStatus;
443 emit notify(n);
444
445 enableChangeReplay(true);
446 future.setFinished();
447 }, [this, &future](int errorCode, const QString &error) {
448 enableChangeReplay(true); 437 enableChangeReplay(true);
449 future.setError(errorCode, error); 438 if (!error) {
450 }) 439 SinkLog() << "Done Synchronizing";
451 .exec(); 440 Sink::Notification n;
441 n.id = "sync";
442 n.type = Sink::Notification::Status;
443 n.message = "Synchronization has ended.";
444 n.code = Sink::ApplicationDomain::ConnectedStatus;
445 emit notify(n);
446 return KAsync::null();
447 }
448 return KAsync::error(error);
449 });
452 }); 450 });
453} 451}
454 452
diff --git a/common/listener.cpp b/common/listener.cpp
index a051293..027d9ae 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -248,13 +248,17 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
248 if (buffer->localSync()) { 248 if (buffer->localSync()) {
249 job = job.then<void>(loadResource().processAllMessages()); 249 job = job.then<void>(loadResource().processAllMessages());
250 } 250 }
251 job.then<void>([callback, timer]() { 251 job.then<void>([callback, timer](const KAsync::Error &error) {
252 SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); 252 if (error) {
253 callback(true); 253 SinkWarning() << "Sync failed: " << error.errorMessage;
254 }, [callback](int errorCode, const QString &msg) { 254 callback(false);
255 SinkWarning() << "Sync failed: " << msg; 255 return KAsync::error(error);
256 callback(false); 256 } else {
257 }) 257 SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
258 callback(true);
259 return KAsync::null();
260 }
261 })
258 .exec(); 262 .exec();
259 return; 263 return;
260 } else { 264 } else {
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 3567a10..28eacb7 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -5,37 +5,6 @@
5 5
6SINK_DEBUG_AREA("messagequeue") 6SINK_DEBUG_AREA("messagequeue")
7 7
8static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void>> &futures)
9{
10 auto context = new QObject;
11 return KAsync::start<void>([futures, context](KAsync::Future<void> &future) {
12 const auto total = futures.size();
13 auto count = QSharedPointer<int>::create();
14 int i = 0;
15 for (KAsync::Future<void> subFuture : futures) {
16 i++;
17 if (subFuture.isFinished()) {
18 *count += 1;
19 continue;
20 }
21 // FIXME bind lifetime all watcher to future (repectively the main job
22 auto watcher = QSharedPointer<KAsync::FutureWatcher<void>>::create();
23 QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, [count, total, &future]() {
24 *count += 1;
25 if (*count == total) {
26 future.setFinished();
27 }
28 });
29 watcher->setFuture(subFuture);
30 context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher));
31 }
32 if (*count == total) {
33 future.setFinished();
34 }
35 })
36 .then<void>([context]() { delete context; });
37}
38
39MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) 8MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite)
40{ 9{
41} 10}
@@ -101,7 +70,7 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu
101 return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) { 70 return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) {
102 resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); 71 resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); });
103 }); 72 });
104 }).then<void>([]() {}, [errorHandler](int error, const QString &errorString) { errorHandler(Error("messagequeue", error, errorString.toLatin1())); }).exec(); 73 }).onError([errorHandler](const KAsync::Error &error) { errorHandler(Error("messagequeue", error.errorCode, error.errorMessage.toLatin1())); }).exec();
105} 74}
106 75
107KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) 76KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler)
@@ -135,8 +104,8 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
135 }); 104 });
136 105
137 // Trace() << "Waiting on " << waitCondition.size() << " results"; 106 // Trace() << "Waiting on " << waitCondition.size() << " results";
138 ::waitForCompletion(waitCondition) 107 KAsync::waitForCompletion(waitCondition)
139 .then<void>([this, resultCount, &future]() { 108 .syncThen<void>([this, resultCount, &future]() {
140 processRemovals(); 109 processRemovals();
141 if (*resultCount == 0) { 110 if (*resultCount == 0) {
142 future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found"); 111 future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found");
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 1d45340..ce864f7 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -233,7 +233,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
233 233
234 d->storeNewRevision(newRevision, fbb, bufferType, key); 234 d->storeNewRevision(newRevision, fbb, bufferType, key);
235 235
236 return KAsync::start<qint64>([newRevision]() { return newRevision; }); 236 return KAsync::value(newRevision);
237} 237}
238 238
239KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 239KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
@@ -346,7 +346,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
346 adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 346 adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
347 347
348 d->storeNewRevision(newRevision, fbb, bufferType, key); 348 d->storeNewRevision(newRevision, fbb, bufferType, key);
349 return KAsync::start<qint64>([newRevision]() { return newRevision; }); 349 return KAsync::value(newRevision);
350} 350}
351 351
352KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) 352KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
@@ -433,7 +433,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
433 processor->deletedEntity(key, newRevision, *current, d->transaction); 433 processor->deletedEntity(key, newRevision, *current, d->transaction);
434 } 434 }
435 435
436 return KAsync::start<qint64>([newRevision]() { return newRevision; }); 436 return KAsync::value(newRevision);
437} 437}
438 438
439void Pipeline::cleanupRevision(qint64 revision) 439void Pipeline::cleanupRevision(qint64 revision)
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 2e2e96d..052db39 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -86,7 +86,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
86 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 86 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize);
87 return newRevisionAndReplayedEntities; 87 return newRevisionAndReplayedEntities;
88 }) 88 })
89 .template then<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { 89 .template syncThen<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) {
90 mOffset[parentId] += newRevisionAndReplayedEntities.second; 90 mOffset[parentId] += newRevisionAndReplayedEntities.second;
91 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 91 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
92 if (query.liveQuery) { 92 if (query.liveQuery) {
@@ -110,7 +110,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
110 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); 110 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider);
111 return newRevisionAndReplayedEntities; 111 return newRevisionAndReplayedEntities;
112 }) 112 })
113 .template then<void, QPair<qint64, qint64> >([query, this, resultProvider](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { 113 .template syncThen<void, QPair<qint64, qint64> >([query, this, resultProvider](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) {
114 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 114 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
115 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); 115 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first);
116 resultProvider->setRevision(newRevisionAndReplayedEntities.first); 116 resultProvider->setRevision(newRevisionAndReplayedEntities.first);
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 7b4d839..364616c 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -159,67 +159,65 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect()
159 // We may have a socket from the last connection leftover 159 // We may have a socket from the last connection leftover
160 socket.reset(); 160 socket.reset();
161 auto counter = QSharedPointer<int>::create(0); 161 auto counter = QSharedPointer<int>::create(0);
162 return KAsync::dowhile([this]() -> bool { return !socket; }, 162 return KAsync::dowhile(
163 [this, counter](KAsync::Future<void> &future) { 163 [this, counter]() {
164 SinkTrace() << "Loop"; 164 SinkTrace() << "Loop";
165 connectToServer(resourceInstanceIdentifier) 165 return connectToServer(resourceInstanceIdentifier)
166 .then<void, QSharedPointer<QLocalSocket>>( 166 .then<KAsync::ControlFlowFlag, QSharedPointer<QLocalSocket>>(
167 [this, &future](const QSharedPointer<QLocalSocket> &s) { 167 [this, counter](const KAsync::Error &error, const QSharedPointer<QLocalSocket> &s) {
168 Q_ASSERT(s); 168 if (error) {
169 socket = s; 169 static int waitTime = 10;
170 future.setFinished(); 170 static int timeout = 500;
171 }, 171 static int maxRetries = timeout / waitTime;
172 [&future, counter, this](int errorCode, const QString &errorString) { 172 if (*counter > maxRetries) {
173 static int waitTime = 10; 173 SinkTrace() << "Giving up";
174 static int timeout = 500; 174 return KAsync::error<KAsync::ControlFlowFlag>("Failed to connect to socket");
175 static int maxRetries = timeout / waitTime; 175 } else {
176 if (*counter > maxRetries) { 176 *counter = *counter + 1;
177 SinkTrace() << "Giving up"; 177 return KAsync::wait(waitTime).then(KAsync::value(KAsync::Continue));
178 future.setError(-1, "Failed to connect to socket"); 178 }
179 } else { 179 } else {
180 KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec(); 180 Q_ASSERT(s);
181 socket = s;
182 return KAsync::value(KAsync::Break);
181 } 183 }
182 *counter = *counter + 1; 184 });
183 })
184 .exec();
185 }); 185 });
186} 186}
187 187
188KAsync::Job<void> ResourceAccess::Private::initializeSocket() 188KAsync::Job<void> ResourceAccess::Private::initializeSocket()
189{ 189{
190 return KAsync::start<void>([this](KAsync::Future<void> &future) { 190 return KAsync::start<void>([this] {
191 SinkTrace() << "Trying to connect"; 191 SinkTrace() << "Trying to connect";
192 connectToServer(resourceInstanceIdentifier) 192 return connectToServer(resourceInstanceIdentifier)
193 .then<void, QSharedPointer<QLocalSocket>>( 193 .then<void, QSharedPointer<QLocalSocket>>(
194 [this, &future](const QSharedPointer<QLocalSocket> &s) { 194 [this](const KAsync::Error &error, const QSharedPointer<QLocalSocket> &s) {
195 SinkTrace() << "Connected to resource, without having to start it."; 195 if (error) {
196 Q_ASSERT(s); 196 SinkTrace() << "Failed to connect, starting resource";
197 socket = s; 197 // We failed to connect, so let's start the resource
198 future.setFinished(); 198 QStringList args;
199 }, 199 if (Sink::Test::testModeEnabled()) {
200 [this, &future](int errorCode, const QString &errorString) { 200 args << "--test";
201 SinkTrace() << "Failed to connect, starting resource"; 201 }
202 // We failed to connect, so let's start the resource 202 args << resourceInstanceIdentifier << resourceName;
203 QStringList args; 203 qint64 pid = 0;
204 if (Sink::Test::testModeEnabled()) { 204 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) {
205 args << "--test"; 205 SinkTrace() << "Started resource " << pid;
206 } 206 return tryToConnect()
207 args << resourceInstanceIdentifier << resourceName; 207 .onError([this](const KAsync::Error &error) {
208 qint64 pid = 0; 208 SinkWarning() << "Failed to connect to started resource";
209 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { 209 });
210 SinkTrace() << "Started resource " << pid; 210 } else {
211 tryToConnect() 211 SinkWarning() << "Failed to start resource";
212 .then<void>([&future]() { future.setFinished(); }, 212 }
213 [this, &future](int errorCode, const QString &errorString) { 213 return KAsync::null();
214 SinkWarning() << "Failed to connect to started resource";
215 future.setError(errorCode, errorString);
216 })
217 .exec();
218 } else { 214 } else {
219 SinkWarning() << "Failed to start resource"; 215 SinkTrace() << "Connected to resource, without having to start it.";
216 Q_ASSERT(s);
217 socket = s;
218 return KAsync::null();
220 } 219 }
221 }) 220 });
222 .exec();
223 }); 221 });
224} 222}
225 223
@@ -383,17 +381,18 @@ void ResourceAccess::open()
383 d->openingSocket = true; 381 d->openingSocket = true;
384 d->initializeSocket() 382 d->initializeSocket()
385 .then<void>( 383 .then<void>(
386 [this, time]() { 384 [this, time](const KAsync::Error &error) {
387 SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed());
388 d->openingSocket = false; 385 d->openingSocket = false;
389 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); 386 if (error) {
390 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); 387 SinkWarning() << "Failed to initialize socket " << error.errorMessage;
391 QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); 388 } else {
392 connected(); 389 SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed());
393 }, 390 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected);
394 [this](int error, const QString &errorString) { 391 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError)));
395 d->openingSocket = false; 392 QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage);
396 SinkWarning() << "Failed to initialize socket " << errorString; 393 connected();
394 }
395 return KAsync::null();
397 }) 396 })
398 .exec(); 397 .exec();
399} 398}
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp
index 7d092a4..f509318 100644
--- a/common/resourcecontrol.cpp
+++ b/common/resourcecontrol.cpp
@@ -41,22 +41,22 @@ KAsync::Job<void> ResourceControl::shutdown(const QByteArray &identifier)
41 time->start(); 41 time->start();
42 return ResourceAccess::connectToServer(identifier) 42 return ResourceAccess::connectToServer(identifier)
43 .then<void, QSharedPointer<QLocalSocket>>( 43 .then<void, QSharedPointer<QLocalSocket>>(
44 [identifier, time](QSharedPointer<QLocalSocket> socket, KAsync::Future<void> &future) { 44 [identifier, time](const KAsync::Error &error, QSharedPointer<QLocalSocket> socket) {
45 if (error) {
46 SinkTrace() << "Resource is already closed.";
47 // Resource isn't started, nothing to shutdown
48 return KAsync::null();
49 }
45 // We can't currently reuse the socket 50 // We can't currently reuse the socket
46 socket->close(); 51 socket->close();
47 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); 52 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier));
48 resourceAccess->open(); 53 resourceAccess->open();
49 resourceAccess->sendCommand(Sink::Commands::ShutdownCommand) 54 return resourceAccess->sendCommand(Sink::Commands::ShutdownCommand)
50 .then<void>([&future, resourceAccess, time]() { 55 .addToContext(resourceAccess)
56 .syncThen<void>([resourceAccess, time]() {
51 resourceAccess->close(); 57 resourceAccess->close();
52 SinkTrace() << "Shutdown complete." << Log::TraceTime(time->elapsed()); 58 SinkTrace() << "Shutdown complete." << Log::TraceTime(time->elapsed());
53 future.setFinished(); 59 });
54 })
55 .exec();
56 },
57 [](int, const QString &) {
58 SinkTrace() << "Resource is already closed.";
59 // Resource isn't started, nothing to shutdown
60 }); 60 });
61} 61}
62 62
@@ -67,18 +67,19 @@ KAsync::Job<void> ResourceControl::start(const QByteArray &identifier)
67 time->start(); 67 time->start();
68 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); 68 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier));
69 resourceAccess->open(); 69 resourceAccess->open();
70 return resourceAccess->sendCommand(Sink::Commands::PingCommand).then<void>([resourceAccess, time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); }); 70 return resourceAccess->sendCommand(Sink::Commands::PingCommand).addToContext(resourceAccess).syncThen<void>([time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); });
71} 71}
72 72
73KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier) 73KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier)
74{ 74{
75 SinkTrace() << "flushMessageQueue" << resourceIdentifier; 75 SinkTrace() << "flushMessageQueue" << resourceIdentifier;
76 return KAsync::iterate(resourceIdentifier) 76 return KAsync::value(resourceIdentifier)
77 .template each<void, QByteArray>([](const QByteArray &resource, KAsync::Future<void> &future) { 77 .template each([](const QByteArray &resource) {
78 SinkTrace() << "Flushing message queue " << resource; 78 SinkTrace() << "Flushing message queue " << resource;
79 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); 79 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource));
80 resourceAccess->open(); 80 resourceAccess->open();
81 resourceAccess->synchronizeResource(false, true).then<void>([&future, resourceAccess]() { future.setFinished(); }).exec(); 81 return resourceAccess->synchronizeResource(false, true)
82 .addToContext(resourceAccess);
82 }); 83 });
83} 84}
84 85
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp
index bf4239d..1c56fe5 100644
--- a/common/resourcefacade.cpp
+++ b/common/resourcefacade.cpp
@@ -167,7 +167,7 @@ template <typename DomainType>
167KAsync::Job<void> LocalStorageFacade<DomainType>::create(const DomainType &domainObject) 167KAsync::Job<void> LocalStorageFacade<DomainType>::create(const DomainType &domainObject)
168{ 168{
169 auto configStoreIdentifier = mIdentifier; 169 auto configStoreIdentifier = mIdentifier;
170 return KAsync::start<void>([domainObject, configStoreIdentifier]() { 170 return KAsync::syncStart<void>([domainObject, configStoreIdentifier]() {
171 const QByteArray type = domainObject.getProperty("type").toByteArray(); 171 const QByteArray type = domainObject.getProperty("type").toByteArray();
172 const QByteArray providedIdentifier = domainObject.identifier().isEmpty() ? domainObject.getProperty("identifier").toByteArray() : domainObject.identifier(); 172 const QByteArray providedIdentifier = domainObject.identifier().isEmpty() ? domainObject.getProperty("identifier").toByteArray() : domainObject.identifier();
173 const QByteArray identifier = providedIdentifier.isEmpty() ? ResourceConfig::newIdentifier(type) : providedIdentifier; 173 const QByteArray identifier = providedIdentifier.isEmpty() ? ResourceConfig::newIdentifier(type) : providedIdentifier;
@@ -192,7 +192,7 @@ template <typename DomainType>
192KAsync::Job<void> LocalStorageFacade<DomainType>::modify(const DomainType &domainObject) 192KAsync::Job<void> LocalStorageFacade<DomainType>::modify(const DomainType &domainObject)
193{ 193{
194 auto configStoreIdentifier = mIdentifier; 194 auto configStoreIdentifier = mIdentifier;
195 return KAsync::start<void>([domainObject, configStoreIdentifier]() { 195 return KAsync::syncStart<void>([domainObject, configStoreIdentifier]() {
196 const QByteArray identifier = domainObject.identifier(); 196 const QByteArray identifier = domainObject.identifier();
197 if (identifier.isEmpty()) { 197 if (identifier.isEmpty()) {
198 SinkWarning() << "We need an \"identifier\" property to identify the entity to configure."; 198 SinkWarning() << "We need an \"identifier\" property to identify the entity to configure.";
@@ -220,7 +220,7 @@ template <typename DomainType>
220KAsync::Job<void> LocalStorageFacade<DomainType>::remove(const DomainType &domainObject) 220KAsync::Job<void> LocalStorageFacade<DomainType>::remove(const DomainType &domainObject)
221{ 221{
222 auto configStoreIdentifier = mIdentifier; 222 auto configStoreIdentifier = mIdentifier;
223 return KAsync::start<void>([domainObject, configStoreIdentifier]() { 223 return KAsync::syncStart<void>([domainObject, configStoreIdentifier]() {
224 const QByteArray identifier = domainObject.identifier(); 224 const QByteArray identifier = domainObject.identifier();
225 if (identifier.isEmpty()) { 225 if (identifier.isEmpty()) {
226 SinkWarning() << "We need an \"identifier\" property to identify the entity to configure"; 226 SinkWarning() << "We need an \"identifier\" property to identify the entity to configure";
diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp
index fe996cb..702d8e3 100644
--- a/common/sourcewriteback.cpp
+++ b/common/sourcewriteback.cpp
@@ -106,7 +106,7 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr
106 job = replay(mail, operation, oldRemoteId, modifiedProperties); 106 job = replay(mail, operation, oldRemoteId, modifiedProperties);
107 } 107 }
108 108
109 return job.then<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { 109 return job.syncThen<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) {
110 if (operation == Sink::Operation_Creation) { 110 if (operation == Sink::Operation_Creation) {
111 SinkTrace() << "Replayed creation with remote id: " << remoteId; 111 SinkTrace() << "Replayed creation with remote id: " << remoteId;
112 if (remoteId.isEmpty()) { 112 if (remoteId.isEmpty()) {
@@ -127,13 +127,11 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr
127 } else { 127 } else {
128 SinkError() << "Unkown operation" << operation; 128 SinkError() << "Unkown operation" << operation;
129 } 129 }
130 130 })
131 mSyncStore.clear(); 131 .syncThen<void>([this](const KAsync::Error &error) {
132 mEntityStore.clear(); 132 if (error) {
133 mTransaction.abort(); 133 SinkWarning() << "Failed to replay change: " << error.errorMessage;
134 mSyncTransaction.commit(); 134 }
135 }, [this](int errorCode, const QString &errorMessage) {
136 SinkWarning() << "Failed to replay change: " << errorMessage;
137 mSyncStore.clear(); 135 mSyncStore.clear();
138 mEntityStore.clear(); 136 mEntityStore.clear();
139 mTransaction.abort(); 137 mTransaction.abort();
diff --git a/common/store.cpp b/common/store.cpp
index 07f41f8..c01d220 100644
--- a/common/store.cpp
+++ b/common/store.cpp
@@ -39,6 +39,8 @@
39SINK_DEBUG_AREA("store") 39SINK_DEBUG_AREA("store")
40 40
41Q_DECLARE_METATYPE(QSharedPointer<Sink::ResultEmitter<Sink::ApplicationDomain::SinkResource::Ptr>>) 41Q_DECLARE_METATYPE(QSharedPointer<Sink::ResultEmitter<Sink::ApplicationDomain::SinkResource::Ptr>>)
42Q_DECLARE_METATYPE(QSharedPointer<Sink::ResourceAccessInterface>);
43Q_DECLARE_METATYPE(std::shared_ptr<void>);
42 44
43namespace Sink { 45namespace Sink {
44 46
@@ -169,12 +171,10 @@ QSharedPointer<QAbstractItemModel> Store::loadModel(Query query)
169 result.first.exec(); 171 result.first.exec();
170 } 172 }
171 173
172 KAsync::iterate(resources.keys()) 174 KAsync::value(resources.keys())
173 .template each<void, QByteArray>([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier, KAsync::Future<void> &future) { 175 .template each([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier) {
174 const auto resourceType = resources.value(resourceInstanceIdentifier); 176 const auto resourceType = resources.value(resourceInstanceIdentifier);
175 queryResource<DomainType>(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter).template then<void>([&future]() { 177 return queryResource<DomainType>(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter);
176 future.setFinished();
177 }).exec();
178 }) 178 })
179 .exec(); 179 .exec();
180 model->fetchMore(QModelIndex()); 180 model->fetchMore(QModelIndex());
@@ -201,7 +201,7 @@ KAsync::Job<void> Store::create(const DomainType &domainObject)
201{ 201{
202 // Potentially move to separate thread as well 202 // Potentially move to separate thread as well
203 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 203 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
204 return facade->create(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to create"; }); 204 return facade->create(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create"; });
205} 205}
206 206
207template <class DomainType> 207template <class DomainType>
@@ -209,7 +209,7 @@ KAsync::Job<void> Store::modify(const DomainType &domainObject)
209{ 209{
210 // Potentially move to separate thread as well 210 // Potentially move to separate thread as well
211 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 211 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
212 return facade->modify(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to modify"; }); 212 return facade->modify(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; });
213} 213}
214 214
215template <class DomainType> 215template <class DomainType>
@@ -217,7 +217,7 @@ KAsync::Job<void> Store::remove(const DomainType &domainObject)
217{ 217{
218 // Potentially move to separate thread as well 218 // Potentially move to separate thread as well
219 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 219 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
220 return facade->remove(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to remove"; }); 220 return facade->remove(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove"; });
221} 221}
222 222
223KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) 223KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
@@ -231,6 +231,7 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
231 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); 231 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier));
232 resourceAccess->open(); 232 resourceAccess->open();
233 return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) 233 return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand)
234 .addToContext(resourceAccess)
234 .then<void>([resourceAccess](KAsync::Future<void> &future) { 235 .then<void>([resourceAccess](KAsync::Future<void> &future) {
235 if (resourceAccess->isReady()) { 236 if (resourceAccess->isReady()) {
236 //Wait for the resource shutdown 237 //Wait for the resource shutdown
@@ -243,8 +244,8 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
243 future.setFinished(); 244 future.setFinished();
244 } 245 }
245 }) 246 })
246 .then<void>([resourceAccess, time]() { 247 .syncThen<void>([time]() {
247 SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); 248 SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed());
248 }); 249 });
249} 250}
250 251
@@ -253,41 +254,38 @@ KAsync::Job<void> Store::synchronize(const Sink::Query &query)
253 SinkTrace() << "synchronize" << query.resources; 254 SinkTrace() << "synchronize" << query.resources;
254 auto resources = getResources(query.resources, query.accounts).keys(); 255 auto resources = getResources(query.resources, query.accounts).keys();
255 //FIXME only necessary because each doesn't propagate errors 256 //FIXME only necessary because each doesn't propagate errors
256 auto error = new bool; 257 auto errorFlag = new bool;
257 return KAsync::iterate(resources) 258 return KAsync::value(resources)
258 .template each<void, QByteArray>([query, error](const QByteArray &resource, KAsync::Future<void> &future) { 259 .template each([query, errorFlag](const QByteArray &resource) {
259 SinkTrace() << "Synchronizing " << resource; 260 SinkTrace() << "Synchronizing " << resource;
260 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); 261 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource));
261 resourceAccess->open(); 262 resourceAccess->open();
262 resourceAccess->synchronizeResource(true, false).then<void>([resourceAccess, &future]() {SinkTrace() << "synced."; future.setFinished(); }, 263 return resourceAccess->synchronizeResource(true, false)
263 [&future, error](int errorCode, QString msg) { *error = true; SinkWarning() << "Error during sync."; future.setError(errorCode, msg); }).exec(); 264 .addToContext(resourceAccess)
264 }).then<void>([error](KAsync::Future<void> &future) { 265 .then<void>([errorFlag](const KAsync::Error &error) {
265 if (*error) { 266 if (error) {
266 future.setError(1, "Error during sync."); 267 *errorFlag = true;
267 } else { 268 SinkWarning() << "Error during sync.";
268 future.setFinished(); 269 return KAsync::error<void>(error);
270 }
271 SinkTrace() << "synced.";
272 return KAsync::null<void>();
273 });
274 })
275 .then<void>([errorFlag]() {
276 if (*errorFlag) {
277 return KAsync::error<void>("Error during sync.");
269 } 278 }
270 delete error; 279 delete errorFlag;
280 return KAsync::null<void>();
271 }); 281 });
272} 282}
273 283
274template <class DomainType> 284template <class DomainType>
275KAsync::Job<DomainType> Store::fetchOne(const Sink::Query &query) 285KAsync::Job<DomainType> Store::fetchOne(const Sink::Query &query)
276{ 286{
277 return KAsync::start<DomainType>([query](KAsync::Future<DomainType> &future) { 287 return fetch<DomainType>(query, 1).template then<DomainType, QList<typename DomainType::Ptr>>([](const QList<typename DomainType::Ptr> &list) {
278 // FIXME We could do this more elegantly if composed jobs would have the correct type (In that case we'd simply return the value from then continuation, and could avoid the 288 return KAsync::value(*list.first());
279 // outer job entirely)
280 fetch<DomainType>(query, 1)
281 .template then<void, QList<typename DomainType::Ptr>>(
282 [&future](const QList<typename DomainType::Ptr> &list) {
283 future.setValue(*list.first());
284 future.setFinished();
285 },
286 [&future](int errorCode, const QString &errorMessage) {
287 future.setError(errorCode, errorMessage);
288 future.setFinished();
289 })
290 .exec();
291 }); 289 });
292} 290}
293 291
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 2d4fb8d..15a06e7 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -244,7 +244,7 @@ KAsync::Job<void> Synchronizer::synchronize()
244 SinkTrace() << "Synchronizing"; 244 SinkTrace() << "Synchronizing";
245 mSyncInProgress = true; 245 mSyncInProgress = true;
246 mMessageQueue->startTransaction(); 246 mMessageQueue->startTransaction();
247 return synchronizeWithSource().then<void>([this]() { 247 return synchronizeWithSource().syncThen<void>([this]() {
248 mSyncStore.clear(); 248 mSyncStore.clear();
249 mEntityStore.clear(); 249 mEntityStore.clear();
250 mMessageQueue->commit(); 250 mMessageQueue->commit();
diff --git a/common/test.cpp b/common/test.cpp
index 5b4c899..1a8e11d 100644
--- a/common/test.cpp
+++ b/common/test.cpp
@@ -156,7 +156,7 @@ public:
156 } 156 }
157 resultProvider->initialResultSetComplete(parent); 157 resultProvider->initialResultSetComplete(parent);
158 }); 158 });
159 auto job = KAsync::start<void>([query, resultProvider]() {}); 159 auto job = KAsync::syncStart<void>([query, resultProvider]() {});
160 return qMakePair(job, emitter); 160 return qMakePair(job, emitter);
161 } 161 }
162 162
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp
index 0f7463f..221e20d 100644
--- a/examples/dummyresource/resourcefactory.cpp
+++ b/examples/dummyresource/resourcefactory.cpp
@@ -113,7 +113,7 @@ class DummySynchronizer : public Sink::Synchronizer {
113 KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE 113 KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE
114 { 114 {
115 SinkLog() << " Synchronizing with the source"; 115 SinkLog() << " Synchronizing with the source";
116 return KAsync::start<void>([this]() { 116 return KAsync::syncStart<void>([this]() {
117 synchronize(ENTITY_TYPE_EVENT, DummyStore::instance().events(), [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) { 117 synchronize(ENTITY_TYPE_EVENT, DummyStore::instance().events(), [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) {
118 return createEvent(ridBuffer, data); 118 return createEvent(ridBuffer, data);
119 }); 119 });
diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp
index e199ea1..f78376a 100644
--- a/examples/imapresource/imapresource.cpp
+++ b/examples/imapresource/imapresource.cpp
@@ -56,6 +56,8 @@
56 56
57SINK_DEBUG_AREA("imapresource") 57SINK_DEBUG_AREA("imapresource")
58 58
59Q_DECLARE_METATYPE(QSharedPointer<Imap::ImapServerProxy>)
60
59using namespace Imap; 61using namespace Imap;
60using namespace Sink; 62using namespace Sink;
61 63
@@ -217,22 +219,22 @@ public:
217 SinkLog() << "Synchronizing mails" << folder.normalizedPath(); 219 SinkLog() << "Synchronizing mails" << folder.normalizedPath();
218 auto capabilities = imap->getCapabilities(); 220 auto capabilities = imap->getCapabilities();
219 bool canDoIncrementalRemovals = false; 221 bool canDoIncrementalRemovals = false;
220 return KAsync::start<void>([=]() { 222 return KAsync::syncStart<void>([=]() {
221 //TODO update flags 223 //TODO update flags
222 }) 224 })
223 .then<void, KAsync::Job<void>>([=]() { 225 .then<void>([=]() {
224 //TODO Remove what's no longer existing 226 //TODO Remove what's no longer existing
225 if (canDoIncrementalRemovals) { 227 if (canDoIncrementalRemovals) {
226 } else { 228 } else {
227 return imap->fetchUids(folder).then<void, QVector<qint64>>([this, folder](const QVector<qint64> &uids) { 229 return imap->fetchUids(folder).syncThen<void, QVector<qint64>>([this, folder](const QVector<qint64> &uids) {
228 SinkTrace() << "Syncing removals"; 230 SinkTrace() << "Syncing removals";
229 synchronizeRemovals(folder.normalizedPath(), uids.toList().toSet()); 231 synchronizeRemovals(folder.normalizedPath(), uids.toList().toSet());
230 commit(); 232 commit();
231 }).then<void>([](){}); 233 });
232 } 234 }
233 return KAsync::null<void>(); 235 return KAsync::null<void>();
234 }) 236 })
235 .then<void, KAsync::Job<void>>([this, folder, imap]() { 237 .then<void>([this, folder, imap]() {
236 SinkTrace() << "About to fetch mail"; 238 SinkTrace() << "About to fetch mail";
237 const auto uidNext = syncStore().readValue(folder.normalizedPath().toUtf8() + "uidnext").toLongLong(); 239 const auto uidNext = syncStore().readValue(folder.normalizedPath().toUtf8() + "uidnext").toLongLong();
238 auto maxUid = QSharedPointer<qint64>::create(0); 240 auto maxUid = QSharedPointer<qint64>::create(0);
@@ -248,7 +250,7 @@ public:
248 [](int progress, int total) { 250 [](int progress, int total) {
249 SinkTrace() << "Progress: " << progress << " out of " << total; 251 SinkTrace() << "Progress: " << progress << " out of " << total;
250 }) 252 })
251 .then<void>([this, maxUid, folder]() { 253 .syncThen<void>([this, maxUid, folder]() {
252 syncStore().writeValue(folder.normalizedPath().toUtf8() + "uidnext", QByteArray::number(*maxUid)); 254 syncStore().writeValue(folder.normalizedPath().toUtf8() + "uidnext", QByteArray::number(*maxUid));
253 }); 255 });
254 }); 256 });
@@ -330,15 +332,12 @@ public:
330 flags << Imap::Flags::Flagged; 332 flags << Imap::Flags::Flagged;
331 } 333 }
332 QDateTime internalDate = mail.getDate(); 334 QDateTime internalDate = mail.getDate();
333 auto rid = QSharedPointer<QByteArray>::create();
334 return login.then(imap->append(mailbox, content, flags, internalDate)) 335 return login.then(imap->append(mailbox, content, flags, internalDate))
335 .then<void, qint64>([imap, mailbox, rid, mail](qint64 uid) { 336 .addToContext(imap)
337 .syncThen<QByteArray, qint64>([mail](qint64 uid) {
336 const auto remoteId = assembleMailRid(mail, uid); 338 const auto remoteId = assembleMailRid(mail, uid);
337 //FIXME this get's called after the final error handler? WTF?
338 SinkTrace() << "Finished creating a new mail: " << remoteId; 339 SinkTrace() << "Finished creating a new mail: " << remoteId;
339 *rid = remoteId; 340 return remoteId;
340 }).then<QByteArray>([rid, imap]() { //FIXME fix KJob so we don't need this extra clause
341 return *rid;
342 }); 341 });
343 } else if (operation == Sink::Operation_Removal) { 342 } else if (operation == Sink::Operation_Removal) {
344 const auto folderId = folderIdFromMailRid(oldRemoteId); 343 const auto folderId = folderIdFromMailRid(oldRemoteId);
@@ -348,7 +347,7 @@ public:
348 KIMAP::ImapSet set; 347 KIMAP::ImapSet set;
349 set.add(uid); 348 set.add(uid);
350 return login.then(imap->remove(mailbox, set)) 349 return login.then(imap->remove(mailbox, set))
351 .then<QByteArray>([imap, oldRemoteId]() { 350 .syncThen<QByteArray>([imap, oldRemoteId] {
352 SinkTrace() << "Finished removing a mail: " << oldRemoteId; 351 SinkTrace() << "Finished removing a mail: " << oldRemoteId;
353 return QByteArray(); 352 return QByteArray();
354 }); 353 });
@@ -374,29 +373,24 @@ public:
374 const QString oldMailbox = syncStore().resolveLocalId(ENTITY_TYPE_FOLDER, folderId); 373 const QString oldMailbox = syncStore().resolveLocalId(ENTITY_TYPE_FOLDER, folderId);
375 QByteArray content = KMime::LFtoCRLF(mail.getMimeMessage()); 374 QByteArray content = KMime::LFtoCRLF(mail.getMimeMessage());
376 QDateTime internalDate = mail.getDate(); 375 QDateTime internalDate = mail.getDate();
377 auto rid = QSharedPointer<QByteArray>::create();
378 KIMAP::ImapSet set; 376 KIMAP::ImapSet set;
379 set.add(uid); 377 set.add(uid);
380 return login.then(imap->append(mailbox, content, flags, internalDate)) 378 return login.then(imap->append(mailbox, content, flags, internalDate))
381 .then<void, qint64>([imap, mailbox, rid, mail](qint64 uid) { 379 .addToContext(imap)
380 .then<QByteArray, qint64>([=](qint64 uid) {
382 const auto remoteId = assembleMailRid(mail, uid); 381 const auto remoteId = assembleMailRid(mail, uid);
383 SinkTrace() << "Finished creating a modified mail: " << remoteId; 382 SinkTrace() << "Finished creating a modified mail: " << remoteId;
384 *rid = remoteId; 383 return imap->remove(oldMailbox, set).then(KAsync::value(remoteId));
385 })
386 .then(imap->remove(oldMailbox, set))
387 .then<QByteArray>([rid, imap]() {
388 return *rid;
389 }); 384 });
390 } else { 385 } else {
391 SinkTrace() << "Updating flags only."; 386 SinkTrace() << "Updating flags only.";
392 KIMAP::ImapSet set; 387 KIMAP::ImapSet set;
393 set.add(uid); 388 set.add(uid);
394 return login.then(imap->select(mailbox)) 389 return login.then(imap->select(mailbox))
390 .addToContext(imap)
395 .then(imap->storeFlags(set, flags)) 391 .then(imap->storeFlags(set, flags))
396 .then<void>([imap, mailbox]() { 392 .syncThen<QByteArray>([=] {
397 SinkTrace() << "Finished modifying mail"; 393 SinkTrace() << "Finished modifying mail";
398 })
399 .then<QByteArray>([oldRemoteId, imap]() {
400 return oldRemoteId; 394 return oldRemoteId;
401 }); 395 });
402 } 396 }
@@ -416,13 +410,13 @@ public:
416 SinkTrace() << "Creating a new folder: " << parentFolder << folder.getName(); 410 SinkTrace() << "Creating a new folder: " << parentFolder << folder.getName();
417 auto rid = QSharedPointer<QByteArray>::create(); 411 auto rid = QSharedPointer<QByteArray>::create();
418 auto createFolder = login.then<QString>(imap->createSubfolder(parentFolder, folder.getName())) 412 auto createFolder = login.then<QString>(imap->createSubfolder(parentFolder, folder.getName()))
419 .then<void, QString>([imap, rid](const QString &createdFolder) { 413 .syncThen<void, QString>([imap, rid](const QString &createdFolder) {
420 SinkTrace() << "Finished creating a new folder: " << createdFolder; 414 SinkTrace() << "Finished creating a new folder: " << createdFolder;
421 *rid = createdFolder.toUtf8(); 415 *rid = createdFolder.toUtf8();
422 }); 416 });
423 if (folder.getSpecialPurpose().isEmpty()) { 417 if (folder.getSpecialPurpose().isEmpty()) {
424 return createFolder 418 return createFolder
425 .then<QByteArray>([rid](){ 419 .syncThen<QByteArray>([rid](){
426 return *rid; 420 return *rid;
427 }); 421 });
428 } else { //We try to merge special purpose folders first 422 } else { //We try to merge special purpose folders first
@@ -435,7 +429,7 @@ public:
435 }; 429 };
436 } 430 }
437 })) 431 }))
438 .then<void, KAsync::Job<void>>([specialPurposeFolders, folder, imap, parentFolder, rid]() -> KAsync::Job<void> { 432 .then<void>([specialPurposeFolders, folder, imap, parentFolder, rid]() -> KAsync::Job<void> {
439 for (const auto &purpose : folder.getSpecialPurpose()) { 433 for (const auto &purpose : folder.getSpecialPurpose()) {
440 if (specialPurposeFolders->contains(purpose)) { 434 if (specialPurposeFolders->contains(purpose)) {
441 auto f = specialPurposeFolders->value(purpose); 435 auto f = specialPurposeFolders->value(purpose);
@@ -446,13 +440,13 @@ public:
446 } 440 }
447 SinkTrace() << "No match found for merging, creating a new folder"; 441 SinkTrace() << "No match found for merging, creating a new folder";
448 return imap->createSubfolder(parentFolder, folder.getName()) 442 return imap->createSubfolder(parentFolder, folder.getName())
449 .then<void, QString>([imap, rid](const QString &createdFolder) { 443 .syncThen<void, QString>([imap, rid](const QString &createdFolder) {
450 SinkTrace() << "Finished creating a new folder: " << createdFolder; 444 SinkTrace() << "Finished creating a new folder: " << createdFolder;
451 *rid = createdFolder.toUtf8(); 445 *rid = createdFolder.toUtf8();
452 }); 446 });
453 447
454 }) 448 })
455 .then<QByteArray>([rid](){ 449 .syncThen<QByteArray>([rid](){
456 return *rid; 450 return *rid;
457 }); 451 });
458 return mergeJob; 452 return mergeJob;
@@ -460,7 +454,7 @@ public:
460 } else if (operation == Sink::Operation_Removal) { 454 } else if (operation == Sink::Operation_Removal) {
461 SinkTrace() << "Removing a folder: " << oldRemoteId; 455 SinkTrace() << "Removing a folder: " << oldRemoteId;
462 return login.then<void>(imap->remove(oldRemoteId)) 456 return login.then<void>(imap->remove(oldRemoteId))
463 .then<QByteArray>([oldRemoteId, imap]() { 457 .syncThen<QByteArray>([oldRemoteId, imap]() {
464 SinkTrace() << "Finished removing a folder: " << oldRemoteId; 458 SinkTrace() << "Finished removing a folder: " << oldRemoteId;
465 return QByteArray(); 459 return QByteArray();
466 }); 460 });
@@ -468,11 +462,11 @@ public:
468 SinkTrace() << "Renaming a folder: " << oldRemoteId << folder.getName(); 462 SinkTrace() << "Renaming a folder: " << oldRemoteId << folder.getName();
469 auto rid = QSharedPointer<QByteArray>::create(); 463 auto rid = QSharedPointer<QByteArray>::create();
470 return login.then<QString>(imap->renameSubfolder(oldRemoteId, folder.getName())) 464 return login.then<QString>(imap->renameSubfolder(oldRemoteId, folder.getName()))
471 .then<void, QString>([imap, rid](const QString &createdFolder) { 465 .syncThen<void, QString>([imap, rid](const QString &createdFolder) {
472 SinkTrace() << "Finished renaming a folder: " << createdFolder; 466 SinkTrace() << "Finished renaming a folder: " << createdFolder;
473 *rid = createdFolder.toUtf8(); 467 *rid = createdFolder.toUtf8();
474 }) 468 })
475 .then<QByteArray>([rid](){ 469 .syncThen<QByteArray>([rid](){
476 return *rid; 470 return *rid;
477 }); 471 });
478 } 472 }
@@ -566,7 +560,8 @@ KAsync::Job<void> ImapResource::inspect(int inspectionType, const QByteArray &in
566 SinkTrace() << "Connecting to:" << mServer << mPort; 560 SinkTrace() << "Connecting to:" << mServer << mPort;
567 SinkTrace() << "as:" << mUser; 561 SinkTrace() << "as:" << mUser;
568 auto inspectionJob = imap->login(mUser, mPassword) 562 auto inspectionJob = imap->login(mUser, mPassword)
569 .then<void>(imap->select(folderRemoteId).then<void>([](){})) 563 .then<Imap::SelectResult>(imap->select(folderRemoteId))
564 .syncThen<void, Imap::SelectResult>([](Imap::SelectResult){})
570 .then<void>(imap->fetch(set, scope, [imap, messageByUid](const QVector<Imap::Message> &messages) { 565 .then<void>(imap->fetch(set, scope, [imap, messageByUid](const QVector<Imap::Message> &messages) {
571 for (const auto &m : messages) { 566 for (const auto &m : messages) {
572 messageByUid->insert(m.uid, m); 567 messageByUid->insert(m.uid, m);
@@ -575,7 +570,7 @@ KAsync::Job<void> ImapResource::inspect(int inspectionType, const QByteArray &in
575 570
576 if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) { 571 if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) {
577 if (property == "unread") { 572 if (property == "unread") {
578 return inspectionJob.then<void, KAsync::Job<void>>([=]() { 573 return inspectionJob.then<void>([=]() {
579 auto msg = messageByUid->value(uid); 574 auto msg = messageByUid->value(uid);
580 if (expectedValue.toBool() && msg.flags.contains(Imap::Flags::Seen)) { 575 if (expectedValue.toBool() && msg.flags.contains(Imap::Flags::Seen)) {
581 return KAsync::error<void>(1, "Expected unread but couldn't find it."); 576 return KAsync::error<void>(1, "Expected unread but couldn't find it.");
@@ -587,7 +582,7 @@ KAsync::Job<void> ImapResource::inspect(int inspectionType, const QByteArray &in
587 }); 582 });
588 } 583 }
589 if (property == "subject") { 584 if (property == "subject") {
590 return inspectionJob.then<void, KAsync::Job<void>>([=]() { 585 return inspectionJob.then<void>([=]() {
591 auto msg = messageByUid->value(uid); 586 auto msg = messageByUid->value(uid);
592 if (msg.msg->subject(true)->asUnicodeString() != expectedValue.toString()) { 587 if (msg.msg->subject(true)->asUnicodeString() != expectedValue.toString()) {
593 return KAsync::error<void>(1, "Subject not as expected: " + msg.msg->subject(true)->asUnicodeString()); 588 return KAsync::error<void>(1, "Subject not as expected: " + msg.msg->subject(true)->asUnicodeString());
@@ -597,7 +592,7 @@ KAsync::Job<void> ImapResource::inspect(int inspectionType, const QByteArray &in
597 } 592 }
598 } 593 }
599 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { 594 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) {
600 return inspectionJob.then<void, KAsync::Job<void>>([=]() { 595 return inspectionJob.then<void>([=]() {
601 if (!messageByUid->contains(uid)) { 596 if (!messageByUid->contains(uid)) {
602 SinkWarning() << "Existing messages are: " << messageByUid->keys(); 597 SinkWarning() << "Existing messages are: " << messageByUid->keys();
603 SinkWarning() << "We're looking for: " << uid; 598 SinkWarning() << "We're looking for: " << uid;
@@ -628,20 +623,19 @@ KAsync::Job<void> ImapResource::inspect(int inspectionType, const QByteArray &in
628 scope.mode = KIMAP::FetchJob::FetchScope::Headers; 623 scope.mode = KIMAP::FetchJob::FetchScope::Headers;
629 auto imap = QSharedPointer<ImapServerProxy>::create(mServer, mPort); 624 auto imap = QSharedPointer<ImapServerProxy>::create(mServer, mPort);
630 auto messageByUid = QSharedPointer<QHash<qint64, Imap::Message>>::create(); 625 auto messageByUid = QSharedPointer<QHash<qint64, Imap::Message>>::create();
631 auto inspectionJob = imap->login(mUser, mPassword) 626 return imap->login(mUser, mPassword)
632 .then<void>(imap->select(remoteId).then<void>([](){})) 627 .then<void>(imap->select(remoteId).syncThen<void>([](){}))
633 .then<void>(imap->fetch(set, scope, [=](const QVector<Imap::Message> &messages) { 628 .then<void>(imap->fetch(set, scope, [=](const QVector<Imap::Message> &messages) {
634 for (const auto &m : messages) { 629 for (const auto &m : messages) {
635 messageByUid->insert(m.uid, m); 630 messageByUid->insert(m.uid, m);
636 } 631 }
637 })) 632 }))
638 .then<void, KAsync::Job<void>>([imap, messageByUid, expectedCount]() { 633 .then<void>([imap, messageByUid, expectedCount]() {
639 if (messageByUid->size() != expectedCount) { 634 if (messageByUid->size() != expectedCount) {
640 return KAsync::error<void>(1, QString("Wrong number of messages on the server; found %1 instead of %2.").arg(messageByUid->size()).arg(expectedCount)); 635 return KAsync::error<void>(1, QString("Wrong number of messages on the server; found %1 instead of %2.").arg(messageByUid->size()).arg(expectedCount));
641 } 636 }
642 return KAsync::null<void>(); 637 return KAsync::null<void>();
643 }); 638 });
644 return inspectionJob;
645 } 639 }
646 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { 640 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) {
647 auto folderByPath = QSharedPointer<QSet<QString>>::create(); 641 auto folderByPath = QSharedPointer<QSet<QString>>::create();
@@ -655,7 +649,7 @@ KAsync::Job<void> ImapResource::inspect(int inspectionType, const QByteArray &in
655 *folderByName << f.pathParts.last(); 649 *folderByName << f.pathParts.last();
656 } 650 }
657 })) 651 }))
658 .then<void, KAsync::Job<void>>([this, folderByName, folderByPath, folder, remoteId, imap]() { 652 .then<void>([this, folderByName, folderByPath, folder, remoteId, imap]() {
659 if (!folderByName->contains(folder.getName())) { 653 if (!folderByName->contains(folder.getName())) {
660 SinkWarning() << "Existing folders are: " << *folderByPath; 654 SinkWarning() << "Existing folders are: " << *folderByPath;
661 SinkWarning() << "We're looking for: " << folder.getName(); 655 SinkWarning() << "We're looking for: " << folder.getName();
diff --git a/examples/imapresource/imapserverproxy.cpp b/examples/imapresource/imapserverproxy.cpp
index a3d8d16..94367d8 100644
--- a/examples/imapresource/imapserverproxy.cpp
+++ b/examples/imapresource/imapserverproxy.cpp
@@ -161,7 +161,7 @@ KAsync::Job<void> ImapServerProxy::login(const QString &username, const QString
161 auto namespaceJob = new KIMAP::NamespaceJob(mSession); 161 auto namespaceJob = new KIMAP::NamespaceJob(mSession);
162 162
163 //FIXME The ping is only required because the login job doesn't fail after the configured timeout 163 //FIXME The ping is only required because the login job doesn't fail after the configured timeout
164 return ping().then(runJob(loginJob)).then(runJob(capabilitiesJob)).then<void>([this](){ 164 return ping().then(runJob(loginJob)).then(runJob(capabilitiesJob)).syncThen<void>([this](){
165 SinkTrace() << "Supported capabilities: " << mCapabilities; 165 SinkTrace() << "Supported capabilities: " << mCapabilities;
166 QStringList requiredExtensions = QStringList() << "UIDPLUS" << "NAMESPACE"; 166 QStringList requiredExtensions = QStringList() << "UIDPLUS" << "NAMESPACE";
167 for (const auto &requiredExtension : requiredExtensions) { 167 for (const auto &requiredExtension : requiredExtensions) {
@@ -170,7 +170,7 @@ KAsync::Job<void> ImapServerProxy::login(const QString &username, const QString
170 //TODO fail the job 170 //TODO fail the job
171 } 171 }
172 } 172 }
173 }).then(runJob(namespaceJob)).then<void>([this, namespaceJob](){ 173 }).then(runJob(namespaceJob)).syncThen<void>([this, namespaceJob] {
174 for (const auto &ns :namespaceJob->personalNamespaces()) { 174 for (const auto &ns :namespaceJob->personalNamespaces()) {
175 mPersonalNamespaces << ns.name; 175 mPersonalNamespaces << ns.name;
176 mPersonalNamespaceSeparator = ns.separator; 176 mPersonalNamespaceSeparator = ns.separator;
@@ -363,7 +363,7 @@ KAsync::Job<QList<qint64>> ImapServerProxy::fetchHeaders(const QString &mailbox,
363 list->append(uids.value(id)); 363 list->append(uids.value(id));
364 } 364 }
365 }) 365 })
366 .then<QList<qint64>>([list](){ 366 .syncThen<QList<qint64>>([list](){
367 return *list; 367 return *list;
368 }); 368 });
369} 369}
@@ -402,35 +402,34 @@ KAsync::Job<void> ImapServerProxy::move(const QString &mailbox, const KIMAP::Ima
402 402
403KAsync::Job<QString> ImapServerProxy::createSubfolder(const QString &parentMailbox, const QString &folderName) 403KAsync::Job<QString> ImapServerProxy::createSubfolder(const QString &parentMailbox, const QString &folderName)
404{ 404{
405 auto folder = QSharedPointer<QString>::create(); 405 return KAsync::start<QString>([this, parentMailbox, folderName]() {
406 return KAsync::start<void, KAsync::Job<void>>([this, parentMailbox, folderName, folder]() {
407 Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); 406 Q_ASSERT(!mPersonalNamespaceSeparator.isNull());
407 auto folder = QSharedPointer<QString>::create();
408 if (parentMailbox.isEmpty()) { 408 if (parentMailbox.isEmpty()) {
409 *folder = mPersonalNamespaces.toList().first() + folderName; 409 *folder = mPersonalNamespaces.toList().first() + folderName;
410 } else { 410 } else {
411 *folder = parentMailbox + mPersonalNamespaceSeparator + folderName; 411 *folder = parentMailbox + mPersonalNamespaceSeparator + folderName;
412 } 412 }
413 SinkTrace() << "Creating subfolder: " << *folder; 413 SinkTrace() << "Creating subfolder: " << *folder;
414 return create(*folder); 414 return create(*folder)
415 }) 415 .syncThen<QString>([=]() {
416 .then<QString>([=]() { 416 return *folder;
417 return *folder; 417 });
418 }); 418 });
419} 419}
420 420
421KAsync::Job<QString> ImapServerProxy::renameSubfolder(const QString &oldMailbox, const QString &newName) 421KAsync::Job<QString> ImapServerProxy::renameSubfolder(const QString &oldMailbox, const QString &newName)
422{ 422{
423 auto folder = QSharedPointer<QString>::create(); 423 return KAsync::start<QString>([this, oldMailbox, newName] {
424 return KAsync::start<void, KAsync::Job<void>>([this, oldMailbox, newName, folder]() {
425 Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); 424 Q_ASSERT(!mPersonalNamespaceSeparator.isNull());
426 auto parts = oldMailbox.split(mPersonalNamespaceSeparator); 425 auto parts = oldMailbox.split(mPersonalNamespaceSeparator);
427 parts.removeLast(); 426 parts.removeLast();
428 *folder = parts.join(mPersonalNamespaceSeparator) + mPersonalNamespaceSeparator + newName; 427 auto folder = QSharedPointer<QString>::create(parts.join(mPersonalNamespaceSeparator) + mPersonalNamespaceSeparator + newName);
429 SinkTrace() << "Renaming subfolder: " << oldMailbox << *folder; 428 SinkTrace() << "Renaming subfolder: " << oldMailbox << *folder;
430 return rename(oldMailbox, *folder); 429 return rename(oldMailbox, *folder)
431 }) 430 .syncThen<QString>([=]() {
432 .then<QString>([=]() { 431 return *folder;
433 return *folder; 432 });
434 }); 433 });
435} 434}
436 435
@@ -461,7 +460,7 @@ KAsync::Job<void> ImapServerProxy::fetchMessages(const Folder &folder, qint64 ui
461 auto time = QSharedPointer<QTime>::create(); 460 auto time = QSharedPointer<QTime>::create();
462 time->start(); 461 time->start();
463 Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); 462 Q_ASSERT(!mPersonalNamespaceSeparator.isNull());
464 return select(mailboxFromFolder(folder)).then<void, KAsync::Job<void>, SelectResult>([this, callback, folder, time, progress, uidNext](const SelectResult &selectResult) -> KAsync::Job<void> { 463 return select(mailboxFromFolder(folder)).then<void, SelectResult>([this, callback, folder, time, progress, uidNext](const SelectResult &selectResult) -> KAsync::Job<void> {
465 464
466 SinkLog() << "UIDNEXT " << selectResult.uidNext << uidNext; 465 SinkLog() << "UIDNEXT " << selectResult.uidNext << uidNext;
467 if (selectResult.uidNext == (uidNext + 1)) { 466 if (selectResult.uidNext == (uidNext + 1)) {
@@ -469,7 +468,7 @@ KAsync::Job<void> ImapServerProxy::fetchMessages(const Folder &folder, qint64 ui
469 return KAsync::null<void>(); 468 return KAsync::null<void>();
470 } 469 }
471 470
472 return fetchHeaders(mailboxFromFolder(folder), (uidNext + 1)).then<void, KAsync::Job<void>, QList<qint64>>([this, callback, time, progress](const QList<qint64> &uidsToFetch){ 471 return fetchHeaders(mailboxFromFolder(folder), (uidNext + 1)).then<void, QList<qint64>>([this, callback, time, progress](const QList<qint64> &uidsToFetch){
473 SinkTrace() << "Fetched headers"; 472 SinkTrace() << "Fetched headers";
474 SinkTrace() << " Total: " << uidsToFetch.size(); 473 SinkTrace() << " Total: " << uidsToFetch.size();
475 SinkTrace() << " Uids to fetch: " << uidsToFetch; 474 SinkTrace() << " Uids to fetch: " << uidsToFetch;
diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp
index 392b422..e69d822 100644
--- a/examples/maildirresource/maildirresource.cpp
+++ b/examples/maildirresource/maildirresource.cpp
@@ -367,7 +367,7 @@ public:
367 KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE 367 KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE
368 { 368 {
369 SinkLog() << " Synchronizing"; 369 SinkLog() << " Synchronizing";
370 return KAsync::start<void, KAsync::Job<void> >([this]() { 370 return KAsync::start<void>([this]() {
371 KPIM::Maildir maildir(mMaildirPath, true); 371 KPIM::Maildir maildir(mMaildirPath, true);
372 if (!maildir.isValid(false)) { 372 if (!maildir.isValid(false)) {
373 return KAsync::error<void>(1, "Maildir path doesn't point to a valid maildir: " + mMaildirPath); 373 return KAsync::error<void>(1, "Maildir path doesn't point to a valid maildir: " + mMaildirPath);
@@ -402,18 +402,14 @@ public:
402 if (operation == Sink::Operation_Creation) { 402 if (operation == Sink::Operation_Creation) {
403 const auto remoteId = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); 403 const auto remoteId = getFilePathFromMimeMessagePath(mail.getMimeMessagePath());
404 SinkTrace() << "Mail created: " << remoteId; 404 SinkTrace() << "Mail created: " << remoteId;
405 return KAsync::start<QByteArray>([=]() -> QByteArray { 405 return KAsync::value(remoteId.toUtf8());
406 return remoteId.toUtf8();
407 });
408 } else if (operation == Sink::Operation_Removal) { 406 } else if (operation == Sink::Operation_Removal) {
409 SinkTrace() << "Removing a mail: " << oldRemoteId; 407 SinkTrace() << "Removing a mail: " << oldRemoteId;
410 return KAsync::null<QByteArray>(); 408 return KAsync::null<QByteArray>();
411 } else if (operation == Sink::Operation_Modification) { 409 } else if (operation == Sink::Operation_Modification) {
412 SinkTrace() << "Modifying a mail: " << oldRemoteId; 410 SinkTrace() << "Modifying a mail: " << oldRemoteId;
413 const auto remoteId = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); 411 const auto remoteId = getFilePathFromMimeMessagePath(mail.getMimeMessagePath());
414 return KAsync::start<QByteArray>([=]() -> QByteArray { 412 return KAsync::value(remoteId.toUtf8());
415 return remoteId.toUtf8();
416 });
417 } 413 }
418 return KAsync::null<QByteArray>(); 414 return KAsync::null<QByteArray>();
419 } 415 }
@@ -427,9 +423,7 @@ public:
427 SinkTrace() << "Creating a new folder: " << path; 423 SinkTrace() << "Creating a new folder: " << path;
428 KPIM::Maildir maildir(path, false); 424 KPIM::Maildir maildir(path, false);
429 maildir.create(); 425 maildir.create();
430 return KAsync::start<QByteArray>([=]() -> QByteArray { 426 return KAsync::value(path.toUtf8());
431 return path.toUtf8();
432 });
433 } else if (operation == Sink::Operation_Removal) { 427 } else if (operation == Sink::Operation_Removal) {
434 const auto path = oldRemoteId; 428 const auto path = oldRemoteId;
435 SinkTrace() << "Removing a folder: " << path; 429 SinkTrace() << "Removing a folder: " << path;
@@ -438,9 +432,7 @@ public:
438 return KAsync::null<QByteArray>(); 432 return KAsync::null<QByteArray>();
439 } else if (operation == Sink::Operation_Modification) { 433 } else if (operation == Sink::Operation_Modification) {
440 SinkWarning() << "Folder modifications are not implemented"; 434 SinkWarning() << "Folder modifications are not implemented";
441 return KAsync::start<QByteArray>([=]() -> QByteArray { 435 return KAsync::value(oldRemoteId);
442 return oldRemoteId;
443 });
444 } 436 }
445 return KAsync::null<QByteArray>(); 437 return KAsync::null<QByteArray>();
446 } 438 }
diff --git a/examples/mailtransportresource/mailtransportresource.cpp b/examples/mailtransportresource/mailtransportresource.cpp
index a729d4d..be4e4e0 100644
--- a/examples/mailtransportresource/mailtransportresource.cpp
+++ b/examples/mailtransportresource/mailtransportresource.cpp
@@ -124,17 +124,14 @@ public:
124 }); 124 });
125 auto job = KAsync::null<void>(); 125 auto job = KAsync::null<void>();
126 for (const auto &m : toSend) { 126 for (const auto &m : toSend) {
127 job = job.then(send(m, mSettings)).then<void>([this, m]() { 127 job = job.then(send(m, mSettings)).syncThen<void>([this, m] {
128 auto modifiedMail = ApplicationDomain::Mail(mResourceInstanceIdentifier, m.identifier(), m.revision(), QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create()); 128 auto modifiedMail = ApplicationDomain::Mail(mResourceInstanceIdentifier, m.identifier(), m.revision(), QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create());
129 modifiedMail.setSent(true); 129 modifiedMail.setSent(true);
130 modify(modifiedMail); 130 modify(modifiedMail);
131 //TODO copy to a sent mail folder as well 131 //TODO copy to a sent mail folder as well
132 }); 132 });
133 } 133 }
134 job = job.then<void>([&future]() { 134 job = job.syncThen<void>([&future](const KAsync::Error &) {
135 future.setFinished();
136 },
137 [&future](int errorCode, const QString &errorString) {
138 future.setFinished(); 135 future.setFinished();
139 }); 136 });
140 job.exec(); 137 job.exec();
diff --git a/sinksh/syntax_modules/sink_sync.cpp b/sinksh/syntax_modules/sink_sync.cpp
index 208b869..2ed4cf7 100644
--- a/sinksh/syntax_modules/sink_sync.cpp
+++ b/sinksh/syntax_modules/sink_sync.cpp
@@ -45,10 +45,10 @@ bool sync(const QStringList &args, State &state)
45 } 45 }
46 46
47 QTimer::singleShot(0, [query, state]() { 47 QTimer::singleShot(0, [query, state]() {
48 Sink::Store::synchronize(query).then<void>([state]() { 48 Sink::Store::synchronize(query).syncThen<void>([state]() {
49 state.printLine("Synchronization complete!"); 49 state.printLine("Synchronization complete!");
50 state.commandFinished(); 50 state.commandFinished();
51 }).exec(); 51 }).exec();
52 }); 52 });
53 53
54 return true; 54 return true;
diff --git a/tests/accountstest.cpp b/tests/accountstest.cpp
index 4be8bd6..e0a99c2 100644
--- a/tests/accountstest.cpp
+++ b/tests/accountstest.cpp
@@ -39,7 +39,7 @@ private slots:
39 account.setProperty("icon", accountIcon); 39 account.setProperty("icon", accountIcon);
40 Store::create(account).exec().waitForFinished(); 40 Store::create(account).exec().waitForFinished();
41 41
42 Store::fetchAll<SinkAccount>(Query()).then<void, QList<SinkAccount::Ptr>>([&](const QList<SinkAccount::Ptr> &accounts) { 42 Store::fetchAll<SinkAccount>(Query()).syncThen<void, QList<SinkAccount::Ptr>>([&](const QList<SinkAccount::Ptr> &accounts) {
43 QCOMPARE(accounts.size(), 1); 43 QCOMPARE(accounts.size(), 1);
44 auto account = accounts.first(); 44 auto account = accounts.first();
45 QCOMPARE(account->getProperty("type").toString(), QString("maildir")); 45 QCOMPARE(account->getProperty("type").toString(), QString("maildir"));
@@ -60,7 +60,7 @@ private slots:
60 Store::create(resource).exec().waitForFinished(); 60 Store::create(resource).exec().waitForFinished();
61 61
62 62
63 Store::fetchAll<SinkResource>(Query()).then<void, QList<SinkResource::Ptr>>([&](const QList<SinkResource::Ptr> &resources) { 63 Store::fetchAll<SinkResource>(Query()).syncThen<void, QList<SinkResource::Ptr>>([&](const QList<SinkResource::Ptr> &resources) {
64 QCOMPARE(resources.size(), 1); 64 QCOMPARE(resources.size(), 1);
65 auto resource = resources.first(); 65 auto resource = resources.first();
66 QCOMPARE(resource->getProperty("type").toString(), QString("sink.mailtransport")); 66 QCOMPARE(resource->getProperty("type").toString(), QString("sink.mailtransport"));
@@ -74,7 +74,7 @@ private slots:
74 identity.setProperty("account", account.identifier()); 74 identity.setProperty("account", account.identifier());
75 Store::create(identity).exec().waitForFinished(); 75 Store::create(identity).exec().waitForFinished();
76 76
77 Store::fetchAll<Identity>(Query()).then<void, QList<Identity::Ptr>>([&](const QList<Identity::Ptr> &identities) { 77 Store::fetchAll<Identity>(Query()).syncThen<void, QList<Identity::Ptr>>([&](const QList<Identity::Ptr> &identities) {
78 QCOMPARE(identities.size(), 1); 78 QCOMPARE(identities.size(), 1);
79 }) 79 })
80 .exec().waitForFinished(); 80 .exec().waitForFinished();
@@ -82,7 +82,7 @@ private slots:
82 82
83 Store::remove(resource).exec().waitForFinished(); 83 Store::remove(resource).exec().waitForFinished();
84 84
85 Store::fetchAll<SinkResource>(Query()).then<void, QList<SinkResource::Ptr>>([](const QList<SinkResource::Ptr> &resources) { 85 Store::fetchAll<SinkResource>(Query()).syncThen<void, QList<SinkResource::Ptr>>([](const QList<SinkResource::Ptr> &resources) {
86 QCOMPARE(resources.size(), 0); 86 QCOMPARE(resources.size(), 0);
87 }) 87 })
88 .exec().waitForFinished(); 88 .exec().waitForFinished();
diff --git a/tests/clientapitest.cpp b/tests/clientapitest.cpp
index b1e49c4..b5405cf 100644
--- a/tests/clientapitest.cpp
+++ b/tests/clientapitest.cpp
@@ -70,7 +70,7 @@ public:
70 } 70 }
71 resultProvider->initialResultSetComplete(parent); 71 resultProvider->initialResultSetComplete(parent);
72 }); 72 });
73 auto job = KAsync::start<void>([query, resultProvider]() {}); 73 auto job = KAsync::syncStart<void>([query, resultProvider]() {});
74 mResultProvider = resultProvider; 74 mResultProvider = resultProvider;
75 return qMakePair(job, emitter); 75 return qMakePair(job, emitter);
76 } 76 }
@@ -273,7 +273,7 @@ private slots:
273 273
274 bool gotValue = false; 274 bool gotValue = false;
275 auto result = Sink::Store::fetchOne<Sink::ApplicationDomain::Event>(query) 275 auto result = Sink::Store::fetchOne<Sink::ApplicationDomain::Event>(query)
276 .then<void, Sink::ApplicationDomain::Event>([&gotValue](const Sink::ApplicationDomain::Event &event) { gotValue = true; }) 276 .syncThen<void, Sink::ApplicationDomain::Event>([&gotValue](const Sink::ApplicationDomain::Event &event) { gotValue = true; })
277 .exec(); 277 .exec();
278 result.waitForFinished(); 278 result.waitForFinished();
279 QVERIFY(!result.errorCode()); 279 QVERIFY(!result.errorCode());
diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp
index 72562c3..8636bf6 100644
--- a/tests/dummyresourcebenchmark.cpp
+++ b/tests/dummyresourcebenchmark.cpp
@@ -44,34 +44,6 @@ private slots:
44 { 44 {
45 } 45 }
46 46
47 static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void>> &futures)
48 {
49 auto context = new QObject;
50 return KAsync::start<void>([futures, context](KAsync::Future<void> &future) {
51 const auto total = futures.size();
52 auto count = QSharedPointer<int>::create();
53 int i = 0;
54 for (KAsync::Future<void> subFuture : futures) {
55 i++;
56 if (subFuture.isFinished()) {
57 *count += 1;
58 continue;
59 }
60 // FIXME bind lifetime all watcher to future (repectively the main job
61 auto watcher = QSharedPointer<KAsync::FutureWatcher<void>>::create();
62 QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, [count, total, &future]() {
63 *count += 1;
64 if (*count == total) {
65 future.setFinished();
66 }
67 });
68 watcher->setFuture(subFuture);
69 context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher));
70 }
71 })
72 .then<void>([context]() { delete context; });
73 }
74
75 // Ensure we can process a command in less than 0.1s 47 // Ensure we can process a command in less than 0.1s
76 void testCommandResponsiveness() 48 void testCommandResponsiveness()
77 { 49 {
@@ -120,7 +92,7 @@ private slots:
120 event.setProperty("summary", "summaryValue"); 92 event.setProperty("summary", "summaryValue");
121 waitCondition << Sink::Store::create<Sink::ApplicationDomain::Event>(event).exec(); 93 waitCondition << Sink::Store::create<Sink::ApplicationDomain::Event>(event).exec();
122 } 94 }
123 waitForCompletion(waitCondition).exec().waitForFinished(); 95 KAsync::waitForCompletion(waitCondition).exec().waitForFinished();
124 auto appendTime = time.elapsed(); 96 auto appendTime = time.elapsed();
125 97
126 // Ensure everything is processed 98 // Ensure everything is processed
diff --git a/tests/mailsynctest.cpp b/tests/mailsynctest.cpp
index 4b797c8..9c57f0a 100644
--- a/tests/mailsynctest.cpp
+++ b/tests/mailsynctest.cpp
@@ -69,7 +69,7 @@ void MailSyncTest::testListFolders()
69 //First figure out how many folders we have by default 69 //First figure out how many folders we have by default
70 { 70 {
71 auto job = Store::fetchAll<Folder>(Query()) 71 auto job = Store::fetchAll<Folder>(Query())
72 .then<void, QList<Folder::Ptr>>([&](const QList<Folder::Ptr> &folders) { 72 .syncThen<void, QList<Folder::Ptr>>([&](const QList<Folder::Ptr> &folders) {
73 QStringList names; 73 QStringList names;
74 for (const auto &folder : folders) { 74 for (const auto &folder : folders) {
75 names << folder->getName(); 75 names << folder->getName();
@@ -88,7 +88,7 @@ void MailSyncTest::testListFolders()
88 VERIFYEXEC(Store::synchronize(query)); 88 VERIFYEXEC(Store::synchronize(query));
89 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); 89 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished();
90 90
91 auto job = Store::fetchAll<Folder>(query).then<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { 91 auto job = Store::fetchAll<Folder>(query).syncThen<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) {
92 QStringList names; 92 QStringList names;
93 QHash<QByteArray, QByteArray> specialPurposeFolders; 93 QHash<QByteArray, QByteArray> specialPurposeFolders;
94 for (const auto &folder : folders) { 94 for (const auto &folder : folders) {
@@ -130,7 +130,7 @@ void MailSyncTest::testListNewFolder()
130 VERIFYEXEC(Store::synchronize(query)); 130 VERIFYEXEC(Store::synchronize(query));
131 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); 131 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished();
132 132
133 auto job = Store::fetchAll<Folder>(query).then<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) { 133 auto job = Store::fetchAll<Folder>(query).syncThen<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) {
134 QStringList names; 134 QStringList names;
135 for (const auto &folder : folders) { 135 for (const auto &folder : folders) {
136 names << folder->getName(); 136 names << folder->getName();
@@ -155,7 +155,7 @@ void MailSyncTest::testListRemovedFolder()
155 VERIFYEXEC(Store::synchronize(query)); 155 VERIFYEXEC(Store::synchronize(query));
156 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); 156 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished();
157 157
158 auto job = Store::fetchAll<Folder>(query).then<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) { 158 auto job = Store::fetchAll<Folder>(query).syncThen<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) {
159 QStringList names; 159 QStringList names;
160 for (const auto &folder : folders) { 160 for (const auto &folder : folders) {
161 names << folder->getName(); 161 names << folder->getName();
@@ -180,7 +180,7 @@ void MailSyncTest::testListFolderHierarchy()
180 VERIFYEXEC(Store::synchronize(query)); 180 VERIFYEXEC(Store::synchronize(query));
181 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); 181 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished();
182 182
183 auto job = Store::fetchAll<Folder>(query).then<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { 183 auto job = Store::fetchAll<Folder>(query).syncThen<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) {
184 QHash<QString, Folder::Ptr> map; 184 QHash<QString, Folder::Ptr> map;
185 for (const auto &folder : folders) { 185 for (const auto &folder : folders) {
186 map.insert(folder->getName(), folder); 186 map.insert(folder->getName(), folder);
@@ -223,7 +223,7 @@ void MailSyncTest::testListNewSubFolder()
223 VERIFYEXEC(Store::synchronize(query)); 223 VERIFYEXEC(Store::synchronize(query));
224 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); 224 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished();
225 225
226 auto job = Store::fetchAll<Folder>(query).then<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) { 226 auto job = Store::fetchAll<Folder>(query).syncThen<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) {
227 QStringList names; 227 QStringList names;
228 for (const auto &folder : folders) { 228 for (const auto &folder : folders) {
229 names << folder->getName(); 229 names << folder->getName();
@@ -251,7 +251,7 @@ void MailSyncTest::testListRemovedSubFolder()
251 VERIFYEXEC(Store::synchronize(query)); 251 VERIFYEXEC(Store::synchronize(query));
252 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); 252 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished();
253 253
254 auto job = Store::fetchAll<Folder>(query).then<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) { 254 auto job = Store::fetchAll<Folder>(query).syncThen<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) {
255 QStringList names; 255 QStringList names;
256 for (const auto &folder : folders) { 256 for (const auto &folder : folders) {
257 names << folder->getName(); 257 names << folder->getName();
@@ -271,7 +271,7 @@ void MailSyncTest::testListMails()
271 VERIFYEXEC(Store::synchronize(query)); 271 VERIFYEXEC(Store::synchronize(query));
272 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); 272 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished();
273 273
274 auto job = Store::fetchAll<Mail>(query).then<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { 274 auto job = Store::fetchAll<Mail>(query).syncThen<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) {
275 QCOMPARE(mails.size(), 1); 275 QCOMPARE(mails.size(), 1);
276 QVERIFY(mails.first()->getSubject().startsWith(QString("[Nepomuk] Jenkins build is still unstable"))); 276 QVERIFY(mails.first()->getSubject().startsWith(QString("[Nepomuk] Jenkins build is still unstable")));
277 const auto data = mails.first()->getMimeMessage(); 277 const auto data = mails.first()->getMimeMessage();
@@ -300,7 +300,7 @@ void MailSyncTest::testResyncMails()
300 VERIFYEXEC(Store::synchronize(query)); 300 VERIFYEXEC(Store::synchronize(query));
301 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); 301 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished();
302 302
303 auto job = Store::fetchAll<Mail>(query).then<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { 303 auto job = Store::fetchAll<Mail>(query).syncThen<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) {
304 QCOMPARE(mails.size(), 1); 304 QCOMPARE(mails.size(), 1);
305 }); 305 });
306 VERIFYEXEC(job); 306 VERIFYEXEC(job);
@@ -325,7 +325,7 @@ void MailSyncTest::testFetchNewRemovedMessages()
325 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); 325 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished();
326 326
327 { 327 {
328 auto job = Store::fetchAll<Mail>(query).then<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { 328 auto job = Store::fetchAll<Mail>(query).syncThen<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) {
329 QCOMPARE(mails.size(), 2); 329 QCOMPARE(mails.size(), 2);
330 }); 330 });
331 VERIFYEXEC(job); 331 VERIFYEXEC(job);
@@ -337,7 +337,7 @@ void MailSyncTest::testFetchNewRemovedMessages()
337 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); 337 ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished();
338 338
339 { 339 {
340 auto job = Store::fetchAll<Mail>(query).then<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { 340 auto job = Store::fetchAll<Mail>(query).syncThen<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) {
341 QCOMPARE(mails.size(), 1); 341 QCOMPARE(mails.size(), 1);
342 }); 342 });
343 VERIFYEXEC(job); 343 VERIFYEXEC(job);
diff --git a/tests/mailtest.cpp b/tests/mailtest.cpp
index 908fb84..925fb70 100644
--- a/tests/mailtest.cpp
+++ b/tests/mailtest.cpp
@@ -66,7 +66,7 @@ void MailTest::testCreateModifyDeleteFolder()
66 //First figure out how many folders we have by default 66 //First figure out how many folders we have by default
67 { 67 {
68 auto job = Store::fetchAll<Folder>(Query()) 68 auto job = Store::fetchAll<Folder>(Query())
69 .then<void, QList<Folder::Ptr>>([&](const QList<Folder::Ptr> &folders) { 69 .syncThen<void, QList<Folder::Ptr>>([&](const QList<Folder::Ptr> &folders) {
70 baseCount = folders.size(); 70 baseCount = folders.size();
71 }); 71 });
72 VERIFYEXEC(job); 72 VERIFYEXEC(job);
@@ -83,7 +83,7 @@ void MailTest::testCreateModifyDeleteFolder()
83 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); 83 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier));
84 { 84 {
85 auto job = Store::fetchAll<Folder>(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name)) 85 auto job = Store::fetchAll<Folder>(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name))
86 .then<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { 86 .syncThen<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) {
87 QCOMPARE(folders.size(), baseCount + 1); 87 QCOMPARE(folders.size(), baseCount + 1);
88 QHash<QString, Folder::Ptr> foldersByName; 88 QHash<QString, Folder::Ptr> foldersByName;
89 for (const auto &folder : folders) { 89 for (const auto &folder : folders) {
@@ -109,7 +109,7 @@ void MailTest::testCreateModifyDeleteFolder()
109 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); 109 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier));
110 { 110 {
111 auto job = Store::fetchAll<Folder>(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name)) 111 auto job = Store::fetchAll<Folder>(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name))
112 .then<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { 112 .syncThen<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) {
113 QCOMPARE(folders.size(), baseCount + 1); 113 QCOMPARE(folders.size(), baseCount + 1);
114 QHash<QString, Folder::Ptr> foldersByName; 114 QHash<QString, Folder::Ptr> foldersByName;
115 for (const auto &folder : folders) { 115 for (const auto &folder : folders) {
@@ -130,7 +130,7 @@ void MailTest::testCreateModifyDeleteFolder()
130 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); 130 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier));
131 { 131 {
132 auto job = Store::fetchAll<Folder>(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name)) 132 auto job = Store::fetchAll<Folder>(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name))
133 .then<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { 133 .syncThen<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) {
134 QCOMPARE(folders.size(), baseCount); 134 QCOMPARE(folders.size(), baseCount);
135 }); 135 });
136 VERIFYEXEC(job); 136 VERIFYEXEC(job);
@@ -160,7 +160,7 @@ void MailTest::testCreateModifyDeleteMail()
160 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); 160 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier));
161 { 161 {
162 auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) 162 auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name))
163 .then<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) { 163 .syncThen<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) {
164 QCOMPARE(mails.size(), 1); 164 QCOMPARE(mails.size(), 1);
165 auto mail = *mails.first(); 165 auto mail = *mails.first();
166 QCOMPARE(mail.getSubject(), subject); 166 QCOMPARE(mail.getSubject(), subject);
@@ -189,7 +189,7 @@ void MailTest::testCreateModifyDeleteMail()
189 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); 189 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier));
190 { 190 {
191 auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) 191 auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name))
192 .then<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) { 192 .syncThen<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) {
193 QCOMPARE(mails.size(), 1); 193 QCOMPARE(mails.size(), 1);
194 auto mail = *mails.first(); 194 auto mail = *mails.first();
195 QCOMPARE(mail.getSubject(), subject2); 195 QCOMPARE(mail.getSubject(), subject2);
@@ -211,7 +211,7 @@ void MailTest::testCreateModifyDeleteMail()
211 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); 211 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier));
212 { 212 {
213 auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name)) 213 auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name))
214 .then<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) { 214 .syncThen<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) {
215 QCOMPARE(mails.size(), 0); 215 QCOMPARE(mails.size(), 0);
216 }); 216 });
217 VERIFYEXEC(job); 217 VERIFYEXEC(job);
@@ -247,7 +247,7 @@ void MailTest::testMoveMail()
247 Mail modifiedMail; 247 Mail modifiedMail;
248 { 248 {
249 auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) 249 auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name))
250 .then<void, QList<Mail::Ptr>>([=, &modifiedMail](const QList<Mail::Ptr> &mails) { 250 .syncThen<void, QList<Mail::Ptr>>([=, &modifiedMail](const QList<Mail::Ptr> &mails) {
251 QCOMPARE(mails.size(), 1); 251 QCOMPARE(mails.size(), 1);
252 auto mail = *mails.first(); 252 auto mail = *mails.first();
253 modifiedMail = mail; 253 modifiedMail = mail;
@@ -266,7 +266,7 @@ void MailTest::testMoveMail()
266 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); 266 VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier));
267 { 267 {
268 auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) 268 auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name))
269 .then<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) { 269 .syncThen<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) {
270 QCOMPARE(mails.size(), 1); 270 QCOMPARE(mails.size(), 1);
271 auto mail = *mails.first(); 271 auto mail = *mails.first();
272 QCOMPARE(mail.getFolder(), folder1.identifier()); 272 QCOMPARE(mail.getFolder(), folder1.identifier());
@@ -299,7 +299,7 @@ void MailTest::testMarkMailAsRead()
299 auto job = Store::fetchAll<Mail>(Query::ResourceFilter(mResourceInstanceIdentifier) + 299 auto job = Store::fetchAll<Mail>(Query::ResourceFilter(mResourceInstanceIdentifier) +
300 Query::RequestedProperties(QByteArrayList() << Mail::Folder::name 300 Query::RequestedProperties(QByteArrayList() << Mail::Folder::name
301 << Mail::Subject::name)) 301 << Mail::Subject::name))
302 .then<void, KAsync::Job<void>, QList<Mail::Ptr>>([this](const QList<Mail::Ptr> &mails) { 302 .then<void, QList<Mail::Ptr>>([this](const QList<Mail::Ptr> &mails) {
303 ASYNCCOMPARE(mails.size(), 1); 303 ASYNCCOMPARE(mails.size(), 1);
304 auto mail = mails.first(); 304 auto mail = mails.first();
305 mail->setUnread(false); 305 mail->setUnread(false);
@@ -316,7 +316,7 @@ void MailTest::testMarkMailAsRead()
316 << Mail::Subject::name 316 << Mail::Subject::name
317 << Mail::MimeMessage::name 317 << Mail::MimeMessage::name
318 << Mail::Unread::name)) 318 << Mail::Unread::name))
319 .then<void, KAsync::Job<void>, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { 319 .then<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) {
320 ASYNCCOMPARE(mails.size(), 1); 320 ASYNCCOMPARE(mails.size(), 1);
321 auto mail = mails.first(); 321 auto mail = mails.first();
322 ASYNCVERIFY(!mail->getSubject().isEmpty()); 322 ASYNCVERIFY(!mail->getSubject().isEmpty());
diff --git a/tests/resourcecommunicationtest.cpp b/tests/resourcecommunicationtest.cpp
index 1530f63..201db53 100644
--- a/tests/resourcecommunicationtest.cpp
+++ b/tests/resourcecommunicationtest.cpp
@@ -51,13 +51,14 @@ private slots:
51 int errors = 0; 51 int errors = 0;
52 for (int i = 0; i < count; i++) { 52 for (int i = 0; i < count; i++) {
53 auto result = resourceAccess.sendCommand(Sink::Commands::PingCommand) 53 auto result = resourceAccess.sendCommand(Sink::Commands::PingCommand)
54 .then<void>([&complete]() { complete++; }, 54 .syncThen<void>([&resourceAccess, &errors, &complete](const KAsync::Error &error) {
55 [&errors, &complete](int error, const QString &msg) { 55 complete++;
56 qWarning() << msg; 56 if (error) {
57 errors++; 57 qWarning() << error.errorMessage;
58 complete++; 58 errors++;
59 }) 59 }
60 .exec(); 60 })
61 .exec();
61 } 62 }
62 QTRY_COMPARE(complete, count); 63 QTRY_COMPARE(complete, count);
63 QVERIFY(!errors); 64 QVERIFY(!errors);
@@ -76,13 +77,12 @@ private slots:
76 int errors = 0; 77 int errors = 0;
77 for (int i = 0; i < count; i++) { 78 for (int i = 0; i < count; i++) {
78 resourceAccess.sendCommand(Sink::Commands::PingCommand) 79 resourceAccess.sendCommand(Sink::Commands::PingCommand)
79 .then<void>([&complete]() { complete++; }, 80 .syncThen<void>([&resourceAccess, &errors, &complete](const KAsync::Error &error) {
80 [&errors, &complete](int error, const QString &msg) { 81 complete++;
81 qWarning() << msg; 82 if (error) {
83 qWarning() << error.errorMessage;
82 errors++; 84 errors++;
83 complete++; 85 }
84 })
85 .then<void>([&resourceAccess]() {
86 resourceAccess.close(); 86 resourceAccess.close();
87 resourceAccess.open(); 87 resourceAccess.open();
88 }) 88 })
@@ -104,7 +104,7 @@ private slots:
104 auto resourceAccess = Sink::ResourceAccessFactory::instance().getAccess(resourceIdentifier, ""); 104 auto resourceAccess = Sink::ResourceAccessFactory::instance().getAccess(resourceIdentifier, "");
105 weakRef = resourceAccess.toWeakRef(); 105 weakRef = resourceAccess.toWeakRef();
106 resourceAccess->open(); 106 resourceAccess->open();
107 resourceAccess->sendCommand(Sink::Commands::PingCommand).then<void>([resourceAccess]() { qDebug() << "Pind complete"; }).exec(); 107 resourceAccess->sendCommand(Sink::Commands::PingCommand).syncThen<void>([resourceAccess]() { qDebug() << "Ping complete"; }).exec();
108 } 108 }
109 QVERIFY(weakRef.toStrongRef()); 109 QVERIFY(weakRef.toStrongRef());
110 QTRY_VERIFY(!weakRef.toStrongRef()); 110 QTRY_VERIFY(!weakRef.toStrongRef());