diff options
25 files changed, 337 insertions, 412 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index fbd556f..e3b7158 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -72,7 +72,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
72 | { | 72 | { |
73 | auto lastReplayedRevision = QSharedPointer<qint64>::create(0); | 73 | auto lastReplayedRevision = QSharedPointer<qint64>::create(0); |
74 | auto topRevision = QSharedPointer<qint64>::create(0); | 74 | auto topRevision = QSharedPointer<qint64>::create(0); |
75 | return KAsync::start<void>([this, lastReplayedRevision, topRevision]() { | 75 | return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { |
76 | mReplayInProgress = true; | 76 | mReplayInProgress = true; |
77 | mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 77 | mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
78 | SinkWarning() << error.message; | 78 | SinkWarning() << error.message; |
@@ -90,11 +90,9 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
90 | SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; | 90 | SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; |
91 | }) | 91 | }) |
92 | .then(KAsync::dowhile( | 92 | .then(KAsync::dowhile( |
93 | [this, lastReplayedRevision, topRevision](KAsync::Future<bool> &future) { | 93 | [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> { |
94 | if (*lastReplayedRevision >= *topRevision) { | 94 | if (*lastReplayedRevision >= *topRevision) { |
95 | future.setValue(false); | 95 | return KAsync::value(KAsync::Break); |
96 | future.setFinished(); | ||
97 | return; | ||
98 | } | 96 | } |
99 | 97 | ||
100 | qint64 revision = *lastReplayedRevision + 1; | 98 | qint64 revision = *lastReplayedRevision + 1; |
@@ -109,12 +107,15 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
109 | [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { | 107 | [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { |
110 | SinkTrace() << "Replaying " << key; | 108 | SinkTrace() << "Replaying " << key; |
111 | if (canReplay(type, key, value)) { | 109 | if (canReplay(type, key, value)) { |
112 | replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision]() { | 110 | replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision](const KAsync::Error &error) { |
113 | recordReplayedRevision(revision); | 111 | if (error) { |
114 | *lastReplayedRevision = revision; | 112 | SinkTrace() << "Change replay failed" << revision; |
115 | }, | 113 | return KAsync::error(error); |
116 | [revision](int, QString) { | 114 | } else { |
117 | SinkTrace() << "Change replay failed" << revision; | 115 | recordReplayedRevision(revision); |
116 | *lastReplayedRevision = revision; | ||
117 | } | ||
118 | return KAsync::null(); | ||
118 | }); | 119 | }); |
119 | exitLoop = true; | 120 | exitLoop = true; |
120 | } else { | 121 | } else { |
@@ -128,23 +129,26 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
128 | } | 129 | } |
129 | revision++; | 130 | revision++; |
130 | } | 131 | } |
131 | replayJob.then<void>([this, revision, lastReplayedRevision, topRevision, &future]() { | 132 | return replayJob.then<KAsync::ControlFlowFlag>([this, revision, lastReplayedRevision, topRevision](const KAsync::Error &error) ->KAsync::Job<KAsync::ControlFlowFlag> { |
132 | SinkTrace() << "Replayed until " << revision; | 133 | if (error) { |
133 | recordReplayedRevision(*lastReplayedRevision); | 134 | SinkTrace() << "Change replay failed" << revision; |
134 | QTimer::singleShot(0, [&future, lastReplayedRevision, topRevision]() { | 135 | //We're probably not online or so, so postpone retrying |
135 | future.setValue((*lastReplayedRevision < *topRevision)); | 136 | return KAsync::value(KAsync::Break); |
136 | future.setFinished(); | 137 | } else { |
137 | }); | 138 | SinkTrace() << "Replayed until " << revision; |
138 | }, | 139 | recordReplayedRevision(*lastReplayedRevision); |
139 | [this, revision, &future](int, QString) { | 140 | if (*lastReplayedRevision < *topRevision) { |
140 | SinkTrace() << "Change replay failed" << revision; | 141 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); |
141 | //We're probably not online or so, so postpone retrying | 142 | } else { |
142 | future.setValue(false); | 143 | return KAsync::value(KAsync::Break); |
143 | future.setFinished(); | 144 | } |
144 | }).exec(); | 145 | } |
145 | 146 | //We shouldn't ever get here | |
147 | Q_ASSERT(false); | ||
148 | return KAsync::value(KAsync::Break); | ||
149 | }); | ||
146 | })) | 150 | })) |
147 | .then<void>([this, lastReplayedRevision]() { | 151 | .syncThen<void>([this, lastReplayedRevision]() { |
148 | recordReplayedRevision(*lastReplayedRevision); | 152 | recordReplayedRevision(*lastReplayedRevision); |
149 | mMainStoreTransaction.abort(); | 153 | mMainStoreTransaction.abort(); |
150 | if (allChangesReplayed()) { | 154 | if (allChangesReplayed()) { |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 7136882..f5b1775 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -100,7 +100,7 @@ private slots: | |||
100 | } | 100 | } |
101 | mProcessingLock = true; | 101 | mProcessingLock = true; |
102 | auto job = processPipeline() | 102 | auto job = processPipeline() |
103 | .then<void>([this]() { | 103 | .syncThen<void>([this]() { |
104 | mProcessingLock = false; | 104 | mProcessingLock = false; |
105 | if (messagesToProcessAvailable()) { | 105 | if (messagesToProcessAvailable()) { |
106 | process(); | 106 | process(); |
@@ -122,7 +122,8 @@ private slots: | |||
122 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 122 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
123 | case Sink::Commands::InspectionCommand: | 123 | case Sink::Commands::InspectionCommand: |
124 | if (mInspect) { | 124 | if (mInspect) { |
125 | return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { return -1; }); | 125 | return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()) |
126 | .syncThen<qint64>([]() { return -1; }); | ||
126 | } else { | 127 | } else { |
127 | return KAsync::error<qint64>(-1, "Missing inspection command."); | 128 | return KAsync::error<qint64>(-1, "Missing inspection command."); |
128 | } | 129 | } |
@@ -131,7 +132,7 @@ private slots: | |||
131 | } | 132 | } |
132 | } | 133 | } |
133 | 134 | ||
134 | KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) | 135 | KAsync::Job<qint64> processQueuedCommand(const QByteArray &data) |
135 | { | 136 | { |
136 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); | 137 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); |
137 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { | 138 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { |
@@ -143,13 +144,13 @@ private slots: | |||
143 | SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); | 144 | SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); |
144 | return processQueuedCommand(queuedCommand) | 145 | return processQueuedCommand(queuedCommand) |
145 | .then<qint64, qint64>( | 146 | .then<qint64, qint64>( |
146 | [this, commandId](qint64 createdRevision) -> qint64 { | 147 | [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> { |
148 | if (error) { | ||
149 | SinkWarning() << "Error while processing queue command: " << error.errorMessage; | ||
150 | return KAsync::error<qint64>(error); | ||
151 | } | ||
147 | SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); | 152 | SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); |
148 | return createdRevision; | 153 | return KAsync::value<qint64>(createdRevision); |
149 | }, | ||
150 | [](int errorCode, QString errorMessage) { | ||
151 | // FIXME propagate error, we didn't handle it | ||
152 | SinkWarning() << "Error while processing queue command: " << errorMessage; | ||
153 | }); | 154 | }); |
154 | } | 155 | } |
155 | 156 | ||
@@ -157,31 +158,31 @@ private slots: | |||
157 | KAsync::Job<void> processQueue(MessageQueue *queue) | 158 | KAsync::Job<void> processQueue(MessageQueue *queue) |
158 | { | 159 | { |
159 | auto time = QSharedPointer<QTime>::create(); | 160 | auto time = QSharedPointer<QTime>::create(); |
160 | return KAsync::start<void>([this]() { mPipeline->startTransaction(); }) | 161 | return KAsync::syncStart<void>([this]() { mPipeline->startTransaction(); }) |
161 | .then(KAsync::dowhile([queue]() { return !queue->isEmpty(); }, | 162 | .then(KAsync::dowhile( |
162 | [this, queue, time](KAsync::Future<void> &future) { | 163 | [this, queue, time]() -> KAsync::Job<KAsync::ControlFlowFlag> { |
163 | queue->dequeueBatch(sBatchSize, | 164 | return queue->dequeueBatch(sBatchSize, |
164 | [this, time](const QByteArray &data) { | 165 | [this, time](const QByteArray &data) -> KAsync::Job<void> { |
165 | time->start(); | 166 | time->start(); |
166 | return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { | 167 | return processQueuedCommand(data) |
167 | processQueuedCommand(data) | 168 | .syncThen<void, qint64>([this, time](qint64 createdRevision) { |
168 | .then<void, qint64>([&future, this, time](qint64 createdRevision) { | 169 | SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); |
169 | SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); | 170 | }); |
170 | future.setFinished(); | 171 | }) |
171 | }) | 172 | .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) { |
172 | .exec(); | 173 | if (error) { |
173 | }); | 174 | if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { |
174 | }) | 175 | SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; |
175 | .then<void>([&future, queue]() { future.setFinished(); }, | 176 | } |
176 | [&future](int i, QString error) { | ||
177 | if (i != MessageQueue::ErrorCodes::NoMessageFound) { | ||
178 | SinkWarning() << "Error while getting message from messagequeue: " << error; | ||
179 | } | 177 | } |
180 | future.setFinished(); | 178 | if (queue->isEmpty()) { |
181 | }) | 179 | return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Break); |
182 | .exec(); | 180 | } else { |
181 | return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Continue); | ||
182 | } | ||
183 | }); | ||
183 | })) | 184 | })) |
184 | .then<void>([this]() { mPipeline->commit(); }); | 185 | .syncThen<void>([this](const KAsync::Error &) { mPipeline->commit(); }); |
185 | } | 186 | } |
186 | 187 | ||
187 | KAsync::Job<void> processPipeline() | 188 | KAsync::Job<void> processPipeline() |
@@ -198,18 +199,20 @@ private slots: | |||
198 | 199 | ||
199 | // Go through all message queues | 200 | // Go through all message queues |
200 | auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); | 201 | auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); |
201 | return KAsync::dowhile([it]() { return it->hasNext(); }, | 202 | return KAsync::dowhile( |
202 | [it, this](KAsync::Future<void> &future) { | 203 | [it, this]() { |
203 | auto time = QSharedPointer<QTime>::create(); | 204 | auto time = QSharedPointer<QTime>::create(); |
204 | time->start(); | 205 | time->start(); |
205 | 206 | ||
206 | auto queue = it->next(); | 207 | auto queue = it->next(); |
207 | processQueue(queue) | 208 | return processQueue(queue) |
208 | .then<void>([this, &future, time]() { | 209 | .syncThen<KAsync::ControlFlowFlag>([this, time, it]() { |
209 | SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); | 210 | SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); |
210 | future.setFinished(); | 211 | if (it->hasNext()) { |
211 | }) | 212 | return KAsync::Continue; |
212 | .exec(); | 213 | } |
214 | return KAsync::Break; | ||
215 | }); | ||
213 | }); | 216 | }); |
214 | } | 217 | } |
215 | 218 | ||
@@ -251,22 +254,19 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra | |||
251 | s >> expectedValue; | 254 | s >> expectedValue; |
252 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) | 255 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) |
253 | .then<void>( | 256 | .then<void>( |
254 | [=]() { | 257 | [=](const KAsync::Error &error) { |
255 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; | ||
256 | Sink::Notification n; | ||
257 | n.type = Sink::Notification::Inspection; | ||
258 | n.id = inspectionId; | ||
259 | n.code = Sink::Notification::Success; | ||
260 | emit notify(n); | ||
261 | }, | ||
262 | [=](int code, const QString &message) { | ||
263 | Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message; | ||
264 | Sink::Notification n; | 258 | Sink::Notification n; |
265 | n.type = Sink::Notification::Inspection; | 259 | n.type = Sink::Notification::Inspection; |
266 | n.message = message; | ||
267 | n.id = inspectionId; | 260 | n.id = inspectionId; |
268 | n.code = Sink::Notification::Failure; | 261 | if (error) { |
262 | Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage; | ||
263 | n.code = Sink::Notification::Failure; | ||
264 | } else { | ||
265 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; | ||
266 | n.code = Sink::Notification::Success; | ||
267 | } | ||
269 | emit notify(n); | 268 | emit notify(n); |
269 | return KAsync::null(); | ||
270 | }) | 270 | }) |
271 | .exec(); | 271 | .exec(); |
272 | return KAsync::null<void>(); | 272 | return KAsync::null<void>(); |
@@ -420,7 +420,7 @@ void GenericResource::processCommand(int commandId, const QByteArray &data) | |||
420 | 420 | ||
421 | KAsync::Job<void> GenericResource::synchronizeWithSource() | 421 | KAsync::Job<void> GenericResource::synchronizeWithSource() |
422 | { | 422 | { |
423 | return KAsync::start<void>([this](KAsync::Future<void> &future) { | 423 | return KAsync::start<void>([this]() { |
424 | 424 | ||
425 | Sink::Notification n; | 425 | Sink::Notification n; |
426 | n.id = "sync"; | 426 | n.id = "sync"; |
@@ -432,23 +432,21 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
432 | SinkLog() << " Synchronizing"; | 432 | SinkLog() << " Synchronizing"; |
433 | // Changereplay would deadlock otherwise when trying to open the synchronization store | 433 | // Changereplay would deadlock otherwise when trying to open the synchronization store |
434 | enableChangeReplay(false); | 434 | enableChangeReplay(false); |
435 | mSynchronizer->synchronize() | 435 | return mSynchronizer->synchronize() |
436 | .then<void>([this, &future]() { | 436 | .then<void>([this](const KAsync::Error &error) { |
437 | SinkLog() << "Done Synchronizing"; | ||
438 | Sink::Notification n; | ||
439 | n.id = "sync"; | ||
440 | n.type = Sink::Notification::Status; | ||
441 | n.message = "Synchronization has ended."; | ||
442 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
443 | emit notify(n); | ||
444 | |||
445 | enableChangeReplay(true); | ||
446 | future.setFinished(); | ||
447 | }, [this, &future](int errorCode, const QString &error) { | ||
448 | enableChangeReplay(true); | 437 | enableChangeReplay(true); |
449 | future.setError(errorCode, error); | 438 | if (!error) { |
450 | }) | 439 | SinkLog() << "Done Synchronizing"; |
451 | .exec(); | 440 | Sink::Notification n; |
441 | n.id = "sync"; | ||
442 | n.type = Sink::Notification::Status; | ||
443 | n.message = "Synchronization has ended."; | ||
444 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
445 | emit notify(n); | ||
446 | return KAsync::null(); | ||
447 | } | ||
448 | return KAsync::error(error); | ||
449 | }); | ||
452 | }); | 450 | }); |
453 | } | 451 | } |
454 | 452 | ||
diff --git a/common/listener.cpp b/common/listener.cpp index a051293..027d9ae 100644 --- a/common/listener.cpp +++ b/common/listener.cpp | |||
@@ -248,13 +248,17 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
248 | if (buffer->localSync()) { | 248 | if (buffer->localSync()) { |
249 | job = job.then<void>(loadResource().processAllMessages()); | 249 | job = job.then<void>(loadResource().processAllMessages()); |
250 | } | 250 | } |
251 | job.then<void>([callback, timer]() { | 251 | job.then<void>([callback, timer](const KAsync::Error &error) { |
252 | SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); | 252 | if (error) { |
253 | callback(true); | 253 | SinkWarning() << "Sync failed: " << error.errorMessage; |
254 | }, [callback](int errorCode, const QString &msg) { | 254 | callback(false); |
255 | SinkWarning() << "Sync failed: " << msg; | 255 | return KAsync::error(error); |
256 | callback(false); | 256 | } else { |
257 | }) | 257 | SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); |
258 | callback(true); | ||
259 | return KAsync::null(); | ||
260 | } | ||
261 | }) | ||
258 | .exec(); | 262 | .exec(); |
259 | return; | 263 | return; |
260 | } else { | 264 | } else { |
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 3567a10..28eacb7 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -5,37 +5,6 @@ | |||
5 | 5 | ||
6 | SINK_DEBUG_AREA("messagequeue") | 6 | SINK_DEBUG_AREA("messagequeue") |
7 | 7 | ||
8 | static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void>> &futures) | ||
9 | { | ||
10 | auto context = new QObject; | ||
11 | return KAsync::start<void>([futures, context](KAsync::Future<void> &future) { | ||
12 | const auto total = futures.size(); | ||
13 | auto count = QSharedPointer<int>::create(); | ||
14 | int i = 0; | ||
15 | for (KAsync::Future<void> subFuture : futures) { | ||
16 | i++; | ||
17 | if (subFuture.isFinished()) { | ||
18 | *count += 1; | ||
19 | continue; | ||
20 | } | ||
21 | // FIXME bind lifetime all watcher to future (repectively the main job | ||
22 | auto watcher = QSharedPointer<KAsync::FutureWatcher<void>>::create(); | ||
23 | QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, [count, total, &future]() { | ||
24 | *count += 1; | ||
25 | if (*count == total) { | ||
26 | future.setFinished(); | ||
27 | } | ||
28 | }); | ||
29 | watcher->setFuture(subFuture); | ||
30 | context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); | ||
31 | } | ||
32 | if (*count == total) { | ||
33 | future.setFinished(); | ||
34 | } | ||
35 | }) | ||
36 | .then<void>([context]() { delete context; }); | ||
37 | } | ||
38 | |||
39 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) | 8 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) |
40 | { | 9 | { |
41 | } | 10 | } |
@@ -101,7 +70,7 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu | |||
101 | return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) { | 70 | return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) { |
102 | resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); | 71 | resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); |
103 | }); | 72 | }); |
104 | }).then<void>([]() {}, [errorHandler](int error, const QString &errorString) { errorHandler(Error("messagequeue", error, errorString.toLatin1())); }).exec(); | 73 | }).onError([errorHandler](const KAsync::Error &error) { errorHandler(Error("messagequeue", error.errorCode, error.errorMessage.toLatin1())); }).exec(); |
105 | } | 74 | } |
106 | 75 | ||
107 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) | 76 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) |
@@ -135,8 +104,8 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
135 | }); | 104 | }); |
136 | 105 | ||
137 | // Trace() << "Waiting on " << waitCondition.size() << " results"; | 106 | // Trace() << "Waiting on " << waitCondition.size() << " results"; |
138 | ::waitForCompletion(waitCondition) | 107 | KAsync::waitForCompletion(waitCondition) |
139 | .then<void>([this, resultCount, &future]() { | 108 | .syncThen<void>([this, resultCount, &future]() { |
140 | processRemovals(); | 109 | processRemovals(); |
141 | if (*resultCount == 0) { | 110 | if (*resultCount == 0) { |
142 | future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found"); | 111 | future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found"); |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 1d45340..ce864f7 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -233,7 +233,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
233 | 233 | ||
234 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 234 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
235 | 235 | ||
236 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); | 236 | return KAsync::value(newRevision); |
237 | } | 237 | } |
238 | 238 | ||
239 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 239 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
@@ -346,7 +346,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
346 | adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 346 | adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
347 | 347 | ||
348 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 348 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
349 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); | 349 | return KAsync::value(newRevision); |
350 | } | 350 | } |
351 | 351 | ||
352 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | 352 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) |
@@ -433,7 +433,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
433 | processor->deletedEntity(key, newRevision, *current, d->transaction); | 433 | processor->deletedEntity(key, newRevision, *current, d->transaction); |
434 | } | 434 | } |
435 | 435 | ||
436 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); | 436 | return KAsync::value(newRevision); |
437 | } | 437 | } |
438 | 438 | ||
439 | void Pipeline::cleanupRevision(qint64 revision) | 439 | void Pipeline::cleanupRevision(qint64 revision) |
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 2e2e96d..052db39 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -86,7 +86,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
86 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); | 86 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); |
87 | return newRevisionAndReplayedEntities; | 87 | return newRevisionAndReplayedEntities; |
88 | }) | 88 | }) |
89 | .template then<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { | 89 | .template syncThen<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { |
90 | mOffset[parentId] += newRevisionAndReplayedEntities.second; | 90 | mOffset[parentId] += newRevisionAndReplayedEntities.second; |
91 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 91 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
92 | if (query.liveQuery) { | 92 | if (query.liveQuery) { |
@@ -110,7 +110,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
110 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); | 110 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); |
111 | return newRevisionAndReplayedEntities; | 111 | return newRevisionAndReplayedEntities; |
112 | }) | 112 | }) |
113 | .template then<void, QPair<qint64, qint64> >([query, this, resultProvider](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { | 113 | .template syncThen<void, QPair<qint64, qint64> >([query, this, resultProvider](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { |
114 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 114 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
115 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); | 115 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); |
116 | resultProvider->setRevision(newRevisionAndReplayedEntities.first); | 116 | resultProvider->setRevision(newRevisionAndReplayedEntities.first); |
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 7b4d839..364616c 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -159,67 +159,65 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect() | |||
159 | // We may have a socket from the last connection leftover | 159 | // We may have a socket from the last connection leftover |
160 | socket.reset(); | 160 | socket.reset(); |
161 | auto counter = QSharedPointer<int>::create(0); | 161 | auto counter = QSharedPointer<int>::create(0); |
162 | return KAsync::dowhile([this]() -> bool { return !socket; }, | 162 | return KAsync::dowhile( |
163 | [this, counter](KAsync::Future<void> &future) { | 163 | [this, counter]() { |
164 | SinkTrace() << "Loop"; | 164 | SinkTrace() << "Loop"; |
165 | connectToServer(resourceInstanceIdentifier) | 165 | return connectToServer(resourceInstanceIdentifier) |
166 | .then<void, QSharedPointer<QLocalSocket>>( | 166 | .then<KAsync::ControlFlowFlag, QSharedPointer<QLocalSocket>>( |
167 | [this, &future](const QSharedPointer<QLocalSocket> &s) { | 167 | [this, counter](const KAsync::Error &error, const QSharedPointer<QLocalSocket> &s) { |
168 | Q_ASSERT(s); | 168 | if (error) { |
169 | socket = s; | 169 | static int waitTime = 10; |
170 | future.setFinished(); | 170 | static int timeout = 500; |
171 | }, | 171 | static int maxRetries = timeout / waitTime; |
172 | [&future, counter, this](int errorCode, const QString &errorString) { | 172 | if (*counter > maxRetries) { |
173 | static int waitTime = 10; | 173 | SinkTrace() << "Giving up"; |
174 | static int timeout = 500; | 174 | return KAsync::error<KAsync::ControlFlowFlag>("Failed to connect to socket"); |
175 | static int maxRetries = timeout / waitTime; | 175 | } else { |
176 | if (*counter > maxRetries) { | 176 | *counter = *counter + 1; |
177 | SinkTrace() << "Giving up"; | 177 | return KAsync::wait(waitTime).then(KAsync::value(KAsync::Continue)); |
178 | future.setError(-1, "Failed to connect to socket"); | 178 | } |
179 | } else { | 179 | } else { |
180 | KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec(); | 180 | Q_ASSERT(s); |
181 | socket = s; | ||
182 | return KAsync::value(KAsync::Break); | ||
181 | } | 183 | } |
182 | *counter = *counter + 1; | 184 | }); |
183 | }) | ||
184 | .exec(); | ||
185 | }); | 185 | }); |
186 | } | 186 | } |
187 | 187 | ||
188 | KAsync::Job<void> ResourceAccess::Private::initializeSocket() | 188 | KAsync::Job<void> ResourceAccess::Private::initializeSocket() |
189 | { | 189 | { |
190 | return KAsync::start<void>([this](KAsync::Future<void> &future) { | 190 | return KAsync::start<void>([this] { |
191 | SinkTrace() << "Trying to connect"; | 191 | SinkTrace() << "Trying to connect"; |
192 | connectToServer(resourceInstanceIdentifier) | 192 | return connectToServer(resourceInstanceIdentifier) |
193 | .then<void, QSharedPointer<QLocalSocket>>( | 193 | .then<void, QSharedPointer<QLocalSocket>>( |
194 | [this, &future](const QSharedPointer<QLocalSocket> &s) { | 194 | [this](const KAsync::Error &error, const QSharedPointer<QLocalSocket> &s) { |
195 | SinkTrace() << "Connected to resource, without having to start it."; | 195 | if (error) { |
196 | Q_ASSERT(s); | 196 | SinkTrace() << "Failed to connect, starting resource"; |
197 | socket = s; | 197 | // We failed to connect, so let's start the resource |
198 | future.setFinished(); | 198 | QStringList args; |
199 | }, | 199 | if (Sink::Test::testModeEnabled()) { |
200 | [this, &future](int errorCode, const QString &errorString) { | 200 | args << "--test"; |
201 | SinkTrace() << "Failed to connect, starting resource"; | 201 | } |
202 | // We failed to connect, so let's start the resource | 202 | args << resourceInstanceIdentifier << resourceName; |
203 | QStringList args; | 203 | qint64 pid = 0; |
204 | if (Sink::Test::testModeEnabled()) { | 204 | if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { |
205 | args << "--test"; | 205 | SinkTrace() << "Started resource " << pid; |
206 | } | 206 | return tryToConnect() |
207 | args << resourceInstanceIdentifier << resourceName; | 207 | .onError([this](const KAsync::Error &error) { |
208 | qint64 pid = 0; | 208 | SinkWarning() << "Failed to connect to started resource"; |
209 | if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { | 209 | }); |
210 | SinkTrace() << "Started resource " << pid; | 210 | } else { |
211 | tryToConnect() | 211 | SinkWarning() << "Failed to start resource"; |
212 | .then<void>([&future]() { future.setFinished(); }, | 212 | } |
213 | [this, &future](int errorCode, const QString &errorString) { | 213 | return KAsync::null(); |
214 | SinkWarning() << "Failed to connect to started resource"; | ||
215 | future.setError(errorCode, errorString); | ||
216 | }) | ||
217 | .exec(); | ||
218 | } else { | 214 | } else { |
219 | SinkWarning() << "Failed to start resource"; | 215 | SinkTrace() << "Connected to resource, without having to start it."; |
216 | Q_ASSERT(s); | ||
217 | socket = s; | ||
218 | return KAsync::null(); | ||
220 | } | 219 | } |
221 | }) | 220 | }); |
222 | .exec(); | ||
223 | }); | 221 | }); |
224 | } | 222 | } |
225 | 223 | ||
@@ -383,17 +381,18 @@ void ResourceAccess::open() | |||
383 | d->openingSocket = true; | 381 | d->openingSocket = true; |
384 | d->initializeSocket() | 382 | d->initializeSocket() |
385 | .then<void>( | 383 | .then<void>( |
386 | [this, time]() { | 384 | [this, time](const KAsync::Error &error) { |
387 | SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); | ||
388 | d->openingSocket = false; | 385 | d->openingSocket = false; |
389 | QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); | 386 | if (error) { |
390 | QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); | 387 | SinkWarning() << "Failed to initialize socket " << error.errorMessage; |
391 | QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); | 388 | } else { |
392 | connected(); | 389 | SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); |
393 | }, | 390 | QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); |
394 | [this](int error, const QString &errorString) { | 391 | QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); |
395 | d->openingSocket = false; | 392 | QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); |
396 | SinkWarning() << "Failed to initialize socket " << errorString; | 393 | connected(); |
394 | } | ||
395 | return KAsync::null(); | ||
397 | }) | 396 | }) |
398 | .exec(); | 397 | .exec(); |
399 | } | 398 | } |
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp index 7d092a4..f509318 100644 --- a/common/resourcecontrol.cpp +++ b/common/resourcecontrol.cpp | |||
@@ -41,22 +41,22 @@ KAsync::Job<void> ResourceControl::shutdown(const QByteArray &identifier) | |||
41 | time->start(); | 41 | time->start(); |
42 | return ResourceAccess::connectToServer(identifier) | 42 | return ResourceAccess::connectToServer(identifier) |
43 | .then<void, QSharedPointer<QLocalSocket>>( | 43 | .then<void, QSharedPointer<QLocalSocket>>( |
44 | [identifier, time](QSharedPointer<QLocalSocket> socket, KAsync::Future<void> &future) { | 44 | [identifier, time](const KAsync::Error &error, QSharedPointer<QLocalSocket> socket) { |
45 | if (error) { | ||
46 | SinkTrace() << "Resource is already closed."; | ||
47 | // Resource isn't started, nothing to shutdown | ||
48 | return KAsync::null(); | ||
49 | } | ||
45 | // We can't currently reuse the socket | 50 | // We can't currently reuse the socket |
46 | socket->close(); | 51 | socket->close(); |
47 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); | 52 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); |
48 | resourceAccess->open(); | 53 | resourceAccess->open(); |
49 | resourceAccess->sendCommand(Sink::Commands::ShutdownCommand) | 54 | return resourceAccess->sendCommand(Sink::Commands::ShutdownCommand) |
50 | .then<void>([&future, resourceAccess, time]() { | 55 | .addToContext(resourceAccess) |
56 | .syncThen<void>([resourceAccess, time]() { | ||
51 | resourceAccess->close(); | 57 | resourceAccess->close(); |
52 | SinkTrace() << "Shutdown complete." << Log::TraceTime(time->elapsed()); | 58 | SinkTrace() << "Shutdown complete." << Log::TraceTime(time->elapsed()); |
53 | future.setFinished(); | 59 | }); |
54 | }) | ||
55 | .exec(); | ||
56 | }, | ||
57 | [](int, const QString &) { | ||
58 | SinkTrace() << "Resource is already closed."; | ||
59 | // Resource isn't started, nothing to shutdown | ||
60 | }); | 60 | }); |
61 | } | 61 | } |
62 | 62 | ||
@@ -67,18 +67,19 @@ KAsync::Job<void> ResourceControl::start(const QByteArray &identifier) | |||
67 | time->start(); | 67 | time->start(); |
68 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); | 68 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); |
69 | resourceAccess->open(); | 69 | resourceAccess->open(); |
70 | return resourceAccess->sendCommand(Sink::Commands::PingCommand).then<void>([resourceAccess, time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); }); | 70 | return resourceAccess->sendCommand(Sink::Commands::PingCommand).addToContext(resourceAccess).syncThen<void>([time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); }); |
71 | } | 71 | } |
72 | 72 | ||
73 | KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier) | 73 | KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier) |
74 | { | 74 | { |
75 | SinkTrace() << "flushMessageQueue" << resourceIdentifier; | 75 | SinkTrace() << "flushMessageQueue" << resourceIdentifier; |
76 | return KAsync::iterate(resourceIdentifier) | 76 | return KAsync::value(resourceIdentifier) |
77 | .template each<void, QByteArray>([](const QByteArray &resource, KAsync::Future<void> &future) { | 77 | .template each([](const QByteArray &resource) { |
78 | SinkTrace() << "Flushing message queue " << resource; | 78 | SinkTrace() << "Flushing message queue " << resource; |
79 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); | 79 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); |
80 | resourceAccess->open(); | 80 | resourceAccess->open(); |
81 | resourceAccess->synchronizeResource(false, true).then<void>([&future, resourceAccess]() { future.setFinished(); }).exec(); | 81 | return resourceAccess->synchronizeResource(false, true) |
82 | .addToContext(resourceAccess); | ||
82 | }); | 83 | }); |
83 | } | 84 | } |
84 | 85 | ||
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index bf4239d..1c56fe5 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp | |||
@@ -167,7 +167,7 @@ template <typename DomainType> | |||
167 | KAsync::Job<void> LocalStorageFacade<DomainType>::create(const DomainType &domainObject) | 167 | KAsync::Job<void> LocalStorageFacade<DomainType>::create(const DomainType &domainObject) |
168 | { | 168 | { |
169 | auto configStoreIdentifier = mIdentifier; | 169 | auto configStoreIdentifier = mIdentifier; |
170 | return KAsync::start<void>([domainObject, configStoreIdentifier]() { | 170 | return KAsync::syncStart<void>([domainObject, configStoreIdentifier]() { |
171 | const QByteArray type = domainObject.getProperty("type").toByteArray(); | 171 | const QByteArray type = domainObject.getProperty("type").toByteArray(); |
172 | const QByteArray providedIdentifier = domainObject.identifier().isEmpty() ? domainObject.getProperty("identifier").toByteArray() : domainObject.identifier(); | 172 | const QByteArray providedIdentifier = domainObject.identifier().isEmpty() ? domainObject.getProperty("identifier").toByteArray() : domainObject.identifier(); |
173 | const QByteArray identifier = providedIdentifier.isEmpty() ? ResourceConfig::newIdentifier(type) : providedIdentifier; | 173 | const QByteArray identifier = providedIdentifier.isEmpty() ? ResourceConfig::newIdentifier(type) : providedIdentifier; |
@@ -192,7 +192,7 @@ template <typename DomainType> | |||
192 | KAsync::Job<void> LocalStorageFacade<DomainType>::modify(const DomainType &domainObject) | 192 | KAsync::Job<void> LocalStorageFacade<DomainType>::modify(const DomainType &domainObject) |
193 | { | 193 | { |
194 | auto configStoreIdentifier = mIdentifier; | 194 | auto configStoreIdentifier = mIdentifier; |
195 | return KAsync::start<void>([domainObject, configStoreIdentifier]() { | 195 | return KAsync::syncStart<void>([domainObject, configStoreIdentifier]() { |
196 | const QByteArray identifier = domainObject.identifier(); | 196 | const QByteArray identifier = domainObject.identifier(); |
197 | if (identifier.isEmpty()) { | 197 | if (identifier.isEmpty()) { |
198 | SinkWarning() << "We need an \"identifier\" property to identify the entity to configure."; | 198 | SinkWarning() << "We need an \"identifier\" property to identify the entity to configure."; |
@@ -220,7 +220,7 @@ template <typename DomainType> | |||
220 | KAsync::Job<void> LocalStorageFacade<DomainType>::remove(const DomainType &domainObject) | 220 | KAsync::Job<void> LocalStorageFacade<DomainType>::remove(const DomainType &domainObject) |
221 | { | 221 | { |
222 | auto configStoreIdentifier = mIdentifier; | 222 | auto configStoreIdentifier = mIdentifier; |
223 | return KAsync::start<void>([domainObject, configStoreIdentifier]() { | 223 | return KAsync::syncStart<void>([domainObject, configStoreIdentifier]() { |
224 | const QByteArray identifier = domainObject.identifier(); | 224 | const QByteArray identifier = domainObject.identifier(); |
225 | if (identifier.isEmpty()) { | 225 | if (identifier.isEmpty()) { |
226 | SinkWarning() << "We need an \"identifier\" property to identify the entity to configure"; | 226 | SinkWarning() << "We need an \"identifier\" property to identify the entity to configure"; |
diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp index fe996cb..702d8e3 100644 --- a/common/sourcewriteback.cpp +++ b/common/sourcewriteback.cpp | |||
@@ -106,7 +106,7 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr | |||
106 | job = replay(mail, operation, oldRemoteId, modifiedProperties); | 106 | job = replay(mail, operation, oldRemoteId, modifiedProperties); |
107 | } | 107 | } |
108 | 108 | ||
109 | return job.then<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { | 109 | return job.syncThen<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { |
110 | if (operation == Sink::Operation_Creation) { | 110 | if (operation == Sink::Operation_Creation) { |
111 | SinkTrace() << "Replayed creation with remote id: " << remoteId; | 111 | SinkTrace() << "Replayed creation with remote id: " << remoteId; |
112 | if (remoteId.isEmpty()) { | 112 | if (remoteId.isEmpty()) { |
@@ -127,13 +127,11 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr | |||
127 | } else { | 127 | } else { |
128 | SinkError() << "Unkown operation" << operation; | 128 | SinkError() << "Unkown operation" << operation; |
129 | } | 129 | } |
130 | 130 | }) | |
131 | mSyncStore.clear(); | 131 | .syncThen<void>([this](const KAsync::Error &error) { |
132 | mEntityStore.clear(); | 132 | if (error) { |
133 | mTransaction.abort(); | 133 | SinkWarning() << "Failed to replay change: " << error.errorMessage; |
134 | mSyncTransaction.commit(); | 134 | } |
135 | }, [this](int errorCode, const QString &errorMessage) { | ||
136 | SinkWarning() << "Failed to replay change: " << errorMessage; | ||
137 | mSyncStore.clear(); | 135 | mSyncStore.clear(); |
138 | mEntityStore.clear(); | 136 | mEntityStore.clear(); |
139 | mTransaction.abort(); | 137 | mTransaction.abort(); |
diff --git a/common/store.cpp b/common/store.cpp index 07f41f8..c01d220 100644 --- a/common/store.cpp +++ b/common/store.cpp | |||
@@ -39,6 +39,8 @@ | |||
39 | SINK_DEBUG_AREA("store") | 39 | SINK_DEBUG_AREA("store") |
40 | 40 | ||
41 | Q_DECLARE_METATYPE(QSharedPointer<Sink::ResultEmitter<Sink::ApplicationDomain::SinkResource::Ptr>>) | 41 | Q_DECLARE_METATYPE(QSharedPointer<Sink::ResultEmitter<Sink::ApplicationDomain::SinkResource::Ptr>>) |
42 | Q_DECLARE_METATYPE(QSharedPointer<Sink::ResourceAccessInterface>); | ||
43 | Q_DECLARE_METATYPE(std::shared_ptr<void>); | ||
42 | 44 | ||
43 | namespace Sink { | 45 | namespace Sink { |
44 | 46 | ||
@@ -169,12 +171,10 @@ QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) | |||
169 | result.first.exec(); | 171 | result.first.exec(); |
170 | } | 172 | } |
171 | 173 | ||
172 | KAsync::iterate(resources.keys()) | 174 | KAsync::value(resources.keys()) |
173 | .template each<void, QByteArray>([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier, KAsync::Future<void> &future) { | 175 | .template each([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier) { |
174 | const auto resourceType = resources.value(resourceInstanceIdentifier); | 176 | const auto resourceType = resources.value(resourceInstanceIdentifier); |
175 | queryResource<DomainType>(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter).template then<void>([&future]() { | 177 | return queryResource<DomainType>(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter); |
176 | future.setFinished(); | ||
177 | }).exec(); | ||
178 | }) | 178 | }) |
179 | .exec(); | 179 | .exec(); |
180 | model->fetchMore(QModelIndex()); | 180 | model->fetchMore(QModelIndex()); |
@@ -201,7 +201,7 @@ KAsync::Job<void> Store::create(const DomainType &domainObject) | |||
201 | { | 201 | { |
202 | // Potentially move to separate thread as well | 202 | // Potentially move to separate thread as well |
203 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | 203 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); |
204 | return facade->create(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to create"; }); | 204 | return facade->create(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create"; }); |
205 | } | 205 | } |
206 | 206 | ||
207 | template <class DomainType> | 207 | template <class DomainType> |
@@ -209,7 +209,7 @@ KAsync::Job<void> Store::modify(const DomainType &domainObject) | |||
209 | { | 209 | { |
210 | // Potentially move to separate thread as well | 210 | // Potentially move to separate thread as well |
211 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | 211 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); |
212 | return facade->modify(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to modify"; }); | 212 | return facade->modify(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; }); |
213 | } | 213 | } |
214 | 214 | ||
215 | template <class DomainType> | 215 | template <class DomainType> |
@@ -217,7 +217,7 @@ KAsync::Job<void> Store::remove(const DomainType &domainObject) | |||
217 | { | 217 | { |
218 | // Potentially move to separate thread as well | 218 | // Potentially move to separate thread as well |
219 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | 219 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); |
220 | return facade->remove(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to remove"; }); | 220 | return facade->remove(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove"; }); |
221 | } | 221 | } |
222 | 222 | ||
223 | KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) | 223 | KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) |
@@ -231,6 +231,7 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) | |||
231 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); | 231 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); |
232 | resourceAccess->open(); | 232 | resourceAccess->open(); |
233 | return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) | 233 | return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) |
234 | .addToContext(resourceAccess) | ||
234 | .then<void>([resourceAccess](KAsync::Future<void> &future) { | 235 | .then<void>([resourceAccess](KAsync::Future<void> &future) { |
235 | if (resourceAccess->isReady()) { | 236 | if (resourceAccess->isReady()) { |
236 | //Wait for the resource shutdown | 237 | //Wait for the resource shutdown |
@@ -243,8 +244,8 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) | |||
243 | future.setFinished(); | 244 | future.setFinished(); |
244 | } | 245 | } |
245 | }) | 246 | }) |
246 | .then<void>([resourceAccess, time]() { | 247 | .syncThen<void>([time]() { |
247 | SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); | 248 | SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); |
248 | }); | 249 | }); |
249 | } | 250 | } |
250 | 251 | ||
@@ -253,41 +254,38 @@ KAsync::Job<void> Store::synchronize(const Sink::Query &query) | |||
253 | SinkTrace() << "synchronize" << query.resources; | 254 | SinkTrace() << "synchronize" << query.resources; |
254 | auto resources = getResources(query.resources, query.accounts).keys(); | 255 | auto resources = getResources(query.resources, query.accounts).keys(); |
255 | //FIXME only necessary because each doesn't propagate errors | 256 | //FIXME only necessary because each doesn't propagate errors |
256 | auto error = new bool; | 257 | auto errorFlag = new bool; |
257 | return KAsync::iterate(resources) | 258 | return KAsync::value(resources) |
258 | .template each<void, QByteArray>([query, error](const QByteArray &resource, KAsync::Future<void> &future) { | 259 | .template each([query, errorFlag](const QByteArray &resource) { |
259 | SinkTrace() << "Synchronizing " << resource; | 260 | SinkTrace() << "Synchronizing " << resource; |
260 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); | 261 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); |
261 | resourceAccess->open(); | 262 | resourceAccess->open(); |
262 | resourceAccess->synchronizeResource(true, false).then<void>([resourceAccess, &future]() {SinkTrace() << "synced."; future.setFinished(); }, | 263 | return resourceAccess->synchronizeResource(true, false) |
263 | [&future, error](int errorCode, QString msg) { *error = true; SinkWarning() << "Error during sync."; future.setError(errorCode, msg); }).exec(); | 264 | .addToContext(resourceAccess) |
264 | }).then<void>([error](KAsync::Future<void> &future) { | 265 | .then<void>([errorFlag](const KAsync::Error &error) { |
265 | if (*error) { | 266 | if (error) { |
266 | future.setError(1, "Error during sync."); | 267 | *errorFlag = true; |
267 | } else { | 268 | SinkWarning() << "Error during sync."; |
268 | future.setFinished(); | 269 | return KAsync::error<void>(error); |
270 | } | ||
271 | SinkTrace() << "synced."; | ||
272 | return KAsync::null<void>(); | ||
273 | }); | ||
274 | }) | ||
275 | .then<void>([errorFlag]() { | ||
276 | if (*errorFlag) { | ||
277 | return KAsync::error<void>("Error during sync."); | ||
269 | } | 278 | } |
270 | delete error; | 279 | delete errorFlag; |
280 | return KAsync::null<void>(); | ||
271 | }); | 281 | }); |
272 | } | 282 | } |
273 | 283 | ||
274 | template <class DomainType> | 284 | template <class DomainType> |
275 | KAsync::Job<DomainType> Store::fetchOne(const Sink::Query &query) | 285 | KAsync::Job<DomainType> Store::fetchOne(const Sink::Query &query) |
276 | { | 286 | { |
277 | return KAsync::start<DomainType>([query](KAsync::Future<DomainType> &future) { | 287 | return fetch<DomainType>(query, 1).template then<DomainType, QList<typename DomainType::Ptr>>([](const QList<typename DomainType::Ptr> &list) { |
278 | // FIXME We could do this more elegantly if composed jobs would have the correct type (In that case we'd simply return the value from then continuation, and could avoid the | 288 | return KAsync::value(*list.first()); |
279 | // outer job entirely) | ||
280 | fetch<DomainType>(query, 1) | ||
281 | .template then<void, QList<typename DomainType::Ptr>>( | ||
282 | [&future](const QList<typename DomainType::Ptr> &list) { | ||
283 | future.setValue(*list.first()); | ||
284 | future.setFinished(); | ||
285 | }, | ||
286 | [&future](int errorCode, const QString &errorMessage) { | ||
287 | future.setError(errorCode, errorMessage); | ||
288 | future.setFinished(); | ||
289 | }) | ||
290 | .exec(); | ||
291 | }); | 289 | }); |
292 | } | 290 | } |
293 | 291 | ||
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 2d4fb8d..15a06e7 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -244,7 +244,7 @@ KAsync::Job<void> Synchronizer::synchronize() | |||
244 | SinkTrace() << "Synchronizing"; | 244 | SinkTrace() << "Synchronizing"; |
245 | mSyncInProgress = true; | 245 | mSyncInProgress = true; |
246 | mMessageQueue->startTransaction(); | 246 | mMessageQueue->startTransaction(); |
247 | return synchronizeWithSource().then<void>([this]() { | 247 | return synchronizeWithSource().syncThen<void>([this]() { |
248 | mSyncStore.clear(); | 248 | mSyncStore.clear(); |
249 | mEntityStore.clear(); | 249 | mEntityStore.clear(); |
250 | mMessageQueue->commit(); | 250 | mMessageQueue->commit(); |
diff --git a/common/test.cpp b/common/test.cpp index 5b4c899..1a8e11d 100644 --- a/common/test.cpp +++ b/common/test.cpp | |||
@@ -156,7 +156,7 @@ public: | |||
156 | } | 156 | } |
157 | resultProvider->initialResultSetComplete(parent); | 157 | resultProvider->initialResultSetComplete(parent); |
158 | }); | 158 | }); |
159 | auto job = KAsync::start<void>([query, resultProvider]() {}); | 159 | auto job = KAsync::syncStart<void>([query, resultProvider]() {}); |
160 | return qMakePair(job, emitter); | 160 | return qMakePair(job, emitter); |
161 | } | 161 | } |
162 | 162 | ||
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index 0f7463f..221e20d 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp | |||
@@ -113,7 +113,7 @@ class DummySynchronizer : public Sink::Synchronizer { | |||
113 | KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE | 113 | KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE |
114 | { | 114 | { |
115 | SinkLog() << " Synchronizing with the source"; | 115 | SinkLog() << " Synchronizing with the source"; |
116 | return KAsync::start<void>([this]() { | 116 | return KAsync::syncStart<void>([this]() { |
117 | synchronize(ENTITY_TYPE_EVENT, DummyStore::instance().events(), [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) { | 117 | synchronize(ENTITY_TYPE_EVENT, DummyStore::instance().events(), [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) { |
118 | return createEvent(ridBuffer, data); | 118 | return createEvent(ridBuffer, data); |
119 | }); | 119 | }); |
diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp index e199ea1..f78376a 100644 --- a/examples/imapresource/imapresource.cpp +++ b/examples/imapresource/imapresource.cpp | |||
@@ -56,6 +56,8 @@ | |||
56 | 56 | ||
57 | SINK_DEBUG_AREA("imapresource") | 57 | SINK_DEBUG_AREA("imapresource") |
58 | 58 | ||
59 | Q_DECLARE_METATYPE(QSharedPointer<Imap::ImapServerProxy>) | ||
60 | |||
59 | using namespace Imap; | 61 | using namespace Imap; |
60 | using namespace Sink; | 62 | using namespace Sink; |
61 | 63 | ||
@@ -217,22 +219,22 @@ public: | |||
217 | SinkLog() << "Synchronizing mails" << folder.normalizedPath(); | 219 | SinkLog() << "Synchronizing mails" << folder.normalizedPath(); |
218 | auto capabilities = imap->getCapabilities(); | 220 | auto capabilities = imap->getCapabilities(); |
219 | bool canDoIncrementalRemovals = false; | 221 | bool canDoIncrementalRemovals = false; |
220 | return KAsync::start<void>([=]() { | 222 | return KAsync::syncStart<void>([=]() { |
221 | //TODO update flags | 223 | //TODO update flags |
222 | }) | 224 | }) |
223 | .then<void, KAsync::Job<void>>([=]() { | 225 | .then<void>([=]() { |
224 | //TODO Remove what's no longer existing | 226 | //TODO Remove what's no longer existing |
225 | if (canDoIncrementalRemovals) { | 227 | if (canDoIncrementalRemovals) { |
226 | } else { | 228 | } else { |
227 | return imap->fetchUids(folder).then<void, QVector<qint64>>([this, folder](const QVector<qint64> &uids) { | 229 | return imap->fetchUids(folder).syncThen<void, QVector<qint64>>([this, folder](const QVector<qint64> &uids) { |
228 | SinkTrace() << "Syncing removals"; | 230 | SinkTrace() << "Syncing removals"; |
229 | synchronizeRemovals(folder.normalizedPath(), uids.toList().toSet()); | 231 | synchronizeRemovals(folder.normalizedPath(), uids.toList().toSet()); |
230 | commit(); | 232 | commit(); |
231 | }).then<void>([](){}); | 233 | }); |
232 | } | 234 | } |
233 | return KAsync::null<void>(); | 235 | return KAsync::null<void>(); |
234 | }) | 236 | }) |
235 | .then<void, KAsync::Job<void>>([this, folder, imap]() { | 237 | .then<void>([this, folder, imap]() { |
236 | SinkTrace() << "About to fetch mail"; | 238 | SinkTrace() << "About to fetch mail"; |
237 | const auto uidNext = syncStore().readValue(folder.normalizedPath().toUtf8() + "uidnext").toLongLong(); | 239 | const auto uidNext = syncStore().readValue(folder.normalizedPath().toUtf8() + "uidnext").toLongLong(); |
238 | auto maxUid = QSharedPointer<qint64>::create(0); | 240 | auto maxUid = QSharedPointer<qint64>::create(0); |
@@ -248,7 +250,7 @@ public: | |||
248 | [](int progress, int total) { | 250 | [](int progress, int total) { |
249 | SinkTrace() << "Progress: " << progress << " out of " << total; | 251 | SinkTrace() << "Progress: " << progress << " out of " << total; |
250 | }) | 252 | }) |
251 | .then<void>([this, maxUid, folder]() { | 253 | .syncThen<void>([this, maxUid, folder]() { |
252 | syncStore().writeValue(folder.normalizedPath().toUtf8() + "uidnext", QByteArray::number(*maxUid)); | 254 | syncStore().writeValue(folder.normalizedPath().toUtf8() + "uidnext", QByteArray::number(*maxUid)); |
253 | }); | 255 | }); |
254 | }); | 256 | }); |
@@ -330,15 +332,12 @@ public: | |||
330 | flags << Imap::Flags::Flagged; | 332 | flags << Imap::Flags::Flagged; |
331 | } | 333 | } |
332 | QDateTime internalDate = mail.getDate(); | 334 | QDateTime internalDate = mail.getDate(); |
333 | auto rid = QSharedPointer<QByteArray>::create(); | ||
334 | return login.then(imap->append(mailbox, content, flags, internalDate)) | 335 | return login.then(imap->append(mailbox, content, flags, internalDate)) |
335 | .then<void, qint64>([imap, mailbox, rid, mail](qint64 uid) { | 336 | .addToContext(imap) |
337 | .syncThen<QByteArray, qint64>([mail](qint64 uid) { | ||
336 | const auto remoteId = assembleMailRid(mail, uid); | 338 | const auto remoteId = assembleMailRid(mail, uid); |
337 | //FIXME this get's called after the final error handler? WTF? | ||
338 | SinkTrace() << "Finished creating a new mail: " << remoteId; | 339 | SinkTrace() << "Finished creating a new mail: " << remoteId; |
339 | *rid = remoteId; | 340 | return remoteId; |
340 | }).then<QByteArray>([rid, imap]() { //FIXME fix KJob so we don't need this extra clause | ||
341 | return *rid; | ||
342 | }); | 341 | }); |
343 | } else if (operation == Sink::Operation_Removal) { | 342 | } else if (operation == Sink::Operation_Removal) { |
344 | const auto folderId = folderIdFromMailRid(oldRemoteId); | 343 | const auto folderId = folderIdFromMailRid(oldRemoteId); |
@@ -348,7 +347,7 @@ public: | |||
348 | KIMAP::ImapSet set; | 347 | KIMAP::ImapSet set; |
349 | set.add(uid); | 348 | set.add(uid); |
350 | return login.then(imap->remove(mailbox, set)) | 349 | return login.then(imap->remove(mailbox, set)) |
351 | .then<QByteArray>([imap, oldRemoteId]() { | 350 | .syncThen<QByteArray>([imap, oldRemoteId] { |
352 | SinkTrace() << "Finished removing a mail: " << oldRemoteId; | 351 | SinkTrace() << "Finished removing a mail: " << oldRemoteId; |
353 | return QByteArray(); | 352 | return QByteArray(); |
354 | }); | 353 | }); |
@@ -374,29 +373,24 @@ public: | |||
374 | const QString oldMailbox = syncStore().resolveLocalId(ENTITY_TYPE_FOLDER, folderId); | 373 | const QString oldMailbox = syncStore().resolveLocalId(ENTITY_TYPE_FOLDER, folderId); |
375 | QByteArray content = KMime::LFtoCRLF(mail.getMimeMessage()); | 374 | QByteArray content = KMime::LFtoCRLF(mail.getMimeMessage()); |
376 | QDateTime internalDate = mail.getDate(); | 375 | QDateTime internalDate = mail.getDate(); |
377 | auto rid = QSharedPointer<QByteArray>::create(); | ||
378 | KIMAP::ImapSet set; | 376 | KIMAP::ImapSet set; |
379 | set.add(uid); | 377 | set.add(uid); |
380 | return login.then(imap->append(mailbox, content, flags, internalDate)) | 378 | return login.then(imap->append(mailbox, content, flags, internalDate)) |
381 | .then<void, qint64>([imap, mailbox, rid, mail](qint64 uid) { | 379 | .addToContext(imap) |
380 | .then<QByteArray, qint64>([=](qint64 uid) { | ||
382 | const auto remoteId = assembleMailRid(mail, uid); | 381 | const auto remoteId = assembleMailRid(mail, uid); |
383 | SinkTrace() << "Finished creating a modified mail: " << remoteId; | 382 | SinkTrace() << "Finished creating a modified mail: " << remoteId; |
384 | *rid = remoteId; | 383 | return imap->remove(oldMailbox, set).then(KAsync::value(remoteId)); |
385 | }) | ||
386 | .then(imap->remove(oldMailbox, set)) | ||
387 | .then<QByteArray>([rid, imap]() { | ||
388 | return *rid; | ||
389 | }); | 384 | }); |
390 | } else { | 385 | } else { |
391 | SinkTrace() << "Updating flags only."; | 386 | SinkTrace() << "Updating flags only."; |
392 | KIMAP::ImapSet set; | 387 | KIMAP::ImapSet set; |
393 | set.add(uid); | 388 | set.add(uid); |
394 | return login.then(imap->select(mailbox)) | 389 | return login.then(imap->select(mailbox)) |
390 | .addToContext(imap) | ||
395 | .then(imap->storeFlags(set, flags)) | 391 | .then(imap->storeFlags(set, flags)) |
396 | .then<void>([imap, mailbox]() { | 392 | .syncThen<QByteArray>([=] { |
397 | SinkTrace() << "Finished modifying mail"; | 393 | SinkTrace() << "Finished modifying mail"; |
398 | }) | ||
399 | .then<QByteArray>([oldRemoteId, imap]() { | ||
400 | return oldRemoteId; | 394 | return oldRemoteId; |
401 | }); | 395 | }); |
402 | } | 396 | } |
@@ -416,13 +410,13 @@ public: | |||
416 | SinkTrace() << "Creating a new folder: " << parentFolder << folder.getName(); | 410 | SinkTrace() << "Creating a new folder: " << parentFolder << folder.getName(); |
417 | auto rid = QSharedPointer<QByteArray>::create(); | 411 | auto rid = QSharedPointer<QByteArray>::create(); |
418 | auto createFolder = login.then<QString>(imap->createSubfolder(parentFolder, folder.getName())) | 412 | auto createFolder = login.then<QString>(imap->createSubfolder(parentFolder, folder.getName())) |
419 | .then<void, QString>([imap, rid](const QString &createdFolder) { | 413 | .syncThen<void, QString>([imap, rid](const QString &createdFolder) { |
420 | SinkTrace() << "Finished creating a new folder: " << createdFolder; | 414 | SinkTrace() << "Finished creating a new folder: " << createdFolder; |
421 | *rid = createdFolder.toUtf8(); | 415 | *rid = createdFolder.toUtf8(); |
422 | }); | 416 | }); |
423 | if (folder.getSpecialPurpose().isEmpty()) { | 417 | if (folder.getSpecialPurpose().isEmpty()) { |
424 | return createFolder | 418 | return createFolder |
425 | .then<QByteArray>([rid](){ | 419 | .syncThen<QByteArray>([rid](){ |
426 | return *rid; | 420 | return *rid; |
427 | }); | 421 | }); |
428 | } else { //We try to merge special purpose folders first | 422 | } else { //We try to merge special purpose folders first |
@@ -435,7 +429,7 @@ public: | |||
435 | }; | 429 | }; |
436 | } | 430 | } |
437 | })) | 431 | })) |
438 | .then<void, KAsync::Job<void>>([specialPurposeFolders, folder, imap, parentFolder, rid]() -> KAsync::Job<void> { | 432 | .then<void>([specialPurposeFolders, folder, imap, parentFolder, rid]() -> KAsync::Job<void> { |
439 | for (const auto &purpose : folder.getSpecialPurpose()) { | 433 | for (const auto &purpose : folder.getSpecialPurpose()) { |
440 | if (specialPurposeFolders->contains(purpose)) { | 434 | if (specialPurposeFolders->contains(purpose)) { |
441 | auto f = specialPurposeFolders->value(purpose); | 435 | auto f = specialPurposeFolders->value(purpose); |
@@ -446,13 +440,13 @@ public: | |||
446 | } | 440 | } |
447 | SinkTrace() << "No match found for merging, creating a new folder"; | 441 | SinkTrace() << "No match found for merging, creating a new folder"; |
448 | return imap->createSubfolder(parentFolder, folder.getName()) | 442 | return imap->createSubfolder(parentFolder, folder.getName()) |
449 | .then<void, QString>([imap, rid](const QString &createdFolder) { | 443 | .syncThen<void, QString>([imap, rid](const QString &createdFolder) { |
450 | SinkTrace() << "Finished creating a new folder: " << createdFolder; | 444 | SinkTrace() << "Finished creating a new folder: " << createdFolder; |
451 | *rid = createdFolder.toUtf8(); | 445 | *rid = createdFolder.toUtf8(); |
452 | }); | 446 | }); |
453 | 447 | ||
454 | }) | 448 | }) |
455 | .then<QByteArray>([rid](){ | 449 | .syncThen<QByteArray>([rid](){ |
456 | return *rid; | 450 | return *rid; |
457 | }); | 451 | }); |
458 | return mergeJob; | 452 | return mergeJob; |
@@ -460,7 +454,7 @@ public: | |||
460 | } else if (operation == Sink::Operation_Removal) { | 454 | } else if (operation == Sink::Operation_Removal) { |
461 | SinkTrace() << "Removing a folder: " << oldRemoteId; | 455 | SinkTrace() << "Removing a folder: " << oldRemoteId; |
462 | return login.then<void>(imap->remove(oldRemoteId)) | 456 | return login.then<void>(imap->remove(oldRemoteId)) |
463 | .then<QByteArray>([oldRemoteId, imap]() { | 457 | .syncThen<QByteArray>([oldRemoteId, imap]() { |
464 | SinkTrace() << "Finished removing a folder: " << oldRemoteId; | 458 | SinkTrace() << "Finished removing a folder: " << oldRemoteId; |
465 | return QByteArray(); | 459 | return QByteArray(); |
466 | }); | 460 | }); |
@@ -468,11 +462,11 @@ public: | |||
468 | SinkTrace() << "Renaming a folder: " << oldRemoteId << folder.getName(); | 462 | SinkTrace() << "Renaming a folder: " << oldRemoteId << folder.getName(); |
469 | auto rid = QSharedPointer<QByteArray>::create(); | 463 | auto rid = QSharedPointer<QByteArray>::create(); |
470 | return login.then<QString>(imap->renameSubfolder(oldRemoteId, folder.getName())) | 464 | return login.then<QString>(imap->renameSubfolder(oldRemoteId, folder.getName())) |
471 | .then<void, QString>([imap, rid](const QString &createdFolder) { | 465 | .syncThen<void, QString>([imap, rid](const QString &createdFolder) { |
472 | SinkTrace() << "Finished renaming a folder: " << createdFolder; | 466 | SinkTrace() << "Finished renaming a folder: " << createdFolder; |
473 | *rid = createdFolder.toUtf8(); | 467 | *rid = createdFolder.toUtf8(); |
474 | }) | 468 | }) |
475 | .then<QByteArray>([rid](){ | 469 | .syncThen<QByteArray>([rid](){ |
476 | return *rid; | 470 | return *rid; |
477 | }); | 471 | }); |
478 | } | 472 | } |
@@ -566,7 +560,8 @@ KAsync::Job<void> ImapResource::inspect(int inspectionType, const QByteArray &in | |||
566 | SinkTrace() << "Connecting to:" << mServer << mPort; | 560 | SinkTrace() << "Connecting to:" << mServer << mPort; |
567 | SinkTrace() << "as:" << mUser; | 561 | SinkTrace() << "as:" << mUser; |
568 | auto inspectionJob = imap->login(mUser, mPassword) | 562 | auto inspectionJob = imap->login(mUser, mPassword) |
569 | .then<void>(imap->select(folderRemoteId).then<void>([](){})) | 563 | .then<Imap::SelectResult>(imap->select(folderRemoteId)) |
564 | .syncThen<void, Imap::SelectResult>([](Imap::SelectResult){}) | ||
570 | .then<void>(imap->fetch(set, scope, [imap, messageByUid](const QVector<Imap::Message> &messages) { | 565 | .then<void>(imap->fetch(set, scope, [imap, messageByUid](const QVector<Imap::Message> &messages) { |
571 | for (const auto &m : messages) { | 566 | for (const auto &m : messages) { |
572 | messageByUid->insert(m.uid, m); | 567 | messageByUid->insert(m.uid, m); |
@@ -575,7 +570,7 @@ KAsync::Job<void> ImapResource::inspect(int inspectionType, const QByteArray &in | |||
575 | 570 | ||
576 | if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) { | 571 | if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) { |
577 | if (property == "unread") { | 572 | if (property == "unread") { |
578 | return inspectionJob.then<void, KAsync::Job<void>>([=]() { | 573 | return inspectionJob.then<void>([=]() { |
579 | auto msg = messageByUid->value(uid); | 574 | auto msg = messageByUid->value(uid); |
580 | if (expectedValue.toBool() && msg.flags.contains(Imap::Flags::Seen)) { | 575 | if (expectedValue.toBool() && msg.flags.contains(Imap::Flags::Seen)) { |
581 | return KAsync::error<void>(1, "Expected unread but couldn't find it."); | 576 | return KAsync::error<void>(1, "Expected unread but couldn't find it."); |
@@ -587,7 +582,7 @@ KAsync::Job<void> ImapResource::inspect(int inspectionType, const QByteArray &in | |||
587 | }); | 582 | }); |
588 | } | 583 | } |
589 | if (property == "subject") { | 584 | if (property == "subject") { |
590 | return inspectionJob.then<void, KAsync::Job<void>>([=]() { | 585 | return inspectionJob.then<void>([=]() { |
591 | auto msg = messageByUid->value(uid); | 586 | auto msg = messageByUid->value(uid); |
592 | if (msg.msg->subject(true)->asUnicodeString() != expectedValue.toString()) { | 587 | if (msg.msg->subject(true)->asUnicodeString() != expectedValue.toString()) { |
593 | return KAsync::error<void>(1, "Subject not as expected: " + msg.msg->subject(true)->asUnicodeString()); | 588 | return KAsync::error<void>(1, "Subject not as expected: " + msg.msg->subject(true)->asUnicodeString()); |
@@ -597,7 +592,7 @@ KAsync::Job<void> ImapResource::inspect(int inspectionType, const QByteArray &in | |||
597 | } | 592 | } |
598 | } | 593 | } |
599 | if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { | 594 | if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { |
600 | return inspectionJob.then<void, KAsync::Job<void>>([=]() { | 595 | return inspectionJob.then<void>([=]() { |
601 | if (!messageByUid->contains(uid)) { | 596 | if (!messageByUid->contains(uid)) { |
602 | SinkWarning() << "Existing messages are: " << messageByUid->keys(); | 597 | SinkWarning() << "Existing messages are: " << messageByUid->keys(); |
603 | SinkWarning() << "We're looking for: " << uid; | 598 | SinkWarning() << "We're looking for: " << uid; |
@@ -628,20 +623,19 @@ KAsync::Job<void> ImapResource::inspect(int inspectionType, const QByteArray &in | |||
628 | scope.mode = KIMAP::FetchJob::FetchScope::Headers; | 623 | scope.mode = KIMAP::FetchJob::FetchScope::Headers; |
629 | auto imap = QSharedPointer<ImapServerProxy>::create(mServer, mPort); | 624 | auto imap = QSharedPointer<ImapServerProxy>::create(mServer, mPort); |
630 | auto messageByUid = QSharedPointer<QHash<qint64, Imap::Message>>::create(); | 625 | auto messageByUid = QSharedPointer<QHash<qint64, Imap::Message>>::create(); |
631 | auto inspectionJob = imap->login(mUser, mPassword) | 626 | return imap->login(mUser, mPassword) |
632 | .then<void>(imap->select(remoteId).then<void>([](){})) | 627 | .then<void>(imap->select(remoteId).syncThen<void>([](){})) |
633 | .then<void>(imap->fetch(set, scope, [=](const QVector<Imap::Message> &messages) { | 628 | .then<void>(imap->fetch(set, scope, [=](const QVector<Imap::Message> &messages) { |
634 | for (const auto &m : messages) { | 629 | for (const auto &m : messages) { |
635 | messageByUid->insert(m.uid, m); | 630 | messageByUid->insert(m.uid, m); |
636 | } | 631 | } |
637 | })) | 632 | })) |
638 | .then<void, KAsync::Job<void>>([imap, messageByUid, expectedCount]() { | 633 | .then<void>([imap, messageByUid, expectedCount]() { |
639 | if (messageByUid->size() != expectedCount) { | 634 | if (messageByUid->size() != expectedCount) { |
640 | return KAsync::error<void>(1, QString("Wrong number of messages on the server; found %1 instead of %2.").arg(messageByUid->size()).arg(expectedCount)); | 635 | return KAsync::error<void>(1, QString("Wrong number of messages on the server; found %1 instead of %2.").arg(messageByUid->size()).arg(expectedCount)); |
641 | } | 636 | } |
642 | return KAsync::null<void>(); | 637 | return KAsync::null<void>(); |
643 | }); | 638 | }); |
644 | return inspectionJob; | ||
645 | } | 639 | } |
646 | if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { | 640 | if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { |
647 | auto folderByPath = QSharedPointer<QSet<QString>>::create(); | 641 | auto folderByPath = QSharedPointer<QSet<QString>>::create(); |
@@ -655,7 +649,7 @@ KAsync::Job<void> ImapResource::inspect(int inspectionType, const QByteArray &in | |||
655 | *folderByName << f.pathParts.last(); | 649 | *folderByName << f.pathParts.last(); |
656 | } | 650 | } |
657 | })) | 651 | })) |
658 | .then<void, KAsync::Job<void>>([this, folderByName, folderByPath, folder, remoteId, imap]() { | 652 | .then<void>([this, folderByName, folderByPath, folder, remoteId, imap]() { |
659 | if (!folderByName->contains(folder.getName())) { | 653 | if (!folderByName->contains(folder.getName())) { |
660 | SinkWarning() << "Existing folders are: " << *folderByPath; | 654 | SinkWarning() << "Existing folders are: " << *folderByPath; |
661 | SinkWarning() << "We're looking for: " << folder.getName(); | 655 | SinkWarning() << "We're looking for: " << folder.getName(); |
diff --git a/examples/imapresource/imapserverproxy.cpp b/examples/imapresource/imapserverproxy.cpp index a3d8d16..94367d8 100644 --- a/examples/imapresource/imapserverproxy.cpp +++ b/examples/imapresource/imapserverproxy.cpp | |||
@@ -161,7 +161,7 @@ KAsync::Job<void> ImapServerProxy::login(const QString &username, const QString | |||
161 | auto namespaceJob = new KIMAP::NamespaceJob(mSession); | 161 | auto namespaceJob = new KIMAP::NamespaceJob(mSession); |
162 | 162 | ||
163 | //FIXME The ping is only required because the login job doesn't fail after the configured timeout | 163 | //FIXME The ping is only required because the login job doesn't fail after the configured timeout |
164 | return ping().then(runJob(loginJob)).then(runJob(capabilitiesJob)).then<void>([this](){ | 164 | return ping().then(runJob(loginJob)).then(runJob(capabilitiesJob)).syncThen<void>([this](){ |
165 | SinkTrace() << "Supported capabilities: " << mCapabilities; | 165 | SinkTrace() << "Supported capabilities: " << mCapabilities; |
166 | QStringList requiredExtensions = QStringList() << "UIDPLUS" << "NAMESPACE"; | 166 | QStringList requiredExtensions = QStringList() << "UIDPLUS" << "NAMESPACE"; |
167 | for (const auto &requiredExtension : requiredExtensions) { | 167 | for (const auto &requiredExtension : requiredExtensions) { |
@@ -170,7 +170,7 @@ KAsync::Job<void> ImapServerProxy::login(const QString &username, const QString | |||
170 | //TODO fail the job | 170 | //TODO fail the job |
171 | } | 171 | } |
172 | } | 172 | } |
173 | }).then(runJob(namespaceJob)).then<void>([this, namespaceJob](){ | 173 | }).then(runJob(namespaceJob)).syncThen<void>([this, namespaceJob] { |
174 | for (const auto &ns :namespaceJob->personalNamespaces()) { | 174 | for (const auto &ns :namespaceJob->personalNamespaces()) { |
175 | mPersonalNamespaces << ns.name; | 175 | mPersonalNamespaces << ns.name; |
176 | mPersonalNamespaceSeparator = ns.separator; | 176 | mPersonalNamespaceSeparator = ns.separator; |
@@ -363,7 +363,7 @@ KAsync::Job<QList<qint64>> ImapServerProxy::fetchHeaders(const QString &mailbox, | |||
363 | list->append(uids.value(id)); | 363 | list->append(uids.value(id)); |
364 | } | 364 | } |
365 | }) | 365 | }) |
366 | .then<QList<qint64>>([list](){ | 366 | .syncThen<QList<qint64>>([list](){ |
367 | return *list; | 367 | return *list; |
368 | }); | 368 | }); |
369 | } | 369 | } |
@@ -402,35 +402,34 @@ KAsync::Job<void> ImapServerProxy::move(const QString &mailbox, const KIMAP::Ima | |||
402 | 402 | ||
403 | KAsync::Job<QString> ImapServerProxy::createSubfolder(const QString &parentMailbox, const QString &folderName) | 403 | KAsync::Job<QString> ImapServerProxy::createSubfolder(const QString &parentMailbox, const QString &folderName) |
404 | { | 404 | { |
405 | auto folder = QSharedPointer<QString>::create(); | 405 | return KAsync::start<QString>([this, parentMailbox, folderName]() { |
406 | return KAsync::start<void, KAsync::Job<void>>([this, parentMailbox, folderName, folder]() { | ||
407 | Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); | 406 | Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); |
407 | auto folder = QSharedPointer<QString>::create(); | ||
408 | if (parentMailbox.isEmpty()) { | 408 | if (parentMailbox.isEmpty()) { |
409 | *folder = mPersonalNamespaces.toList().first() + folderName; | 409 | *folder = mPersonalNamespaces.toList().first() + folderName; |
410 | } else { | 410 | } else { |
411 | *folder = parentMailbox + mPersonalNamespaceSeparator + folderName; | 411 | *folder = parentMailbox + mPersonalNamespaceSeparator + folderName; |
412 | } | 412 | } |
413 | SinkTrace() << "Creating subfolder: " << *folder; | 413 | SinkTrace() << "Creating subfolder: " << *folder; |
414 | return create(*folder); | 414 | return create(*folder) |
415 | }) | 415 | .syncThen<QString>([=]() { |
416 | .then<QString>([=]() { | 416 | return *folder; |
417 | return *folder; | 417 | }); |
418 | }); | 418 | }); |
419 | } | 419 | } |
420 | 420 | ||
421 | KAsync::Job<QString> ImapServerProxy::renameSubfolder(const QString &oldMailbox, const QString &newName) | 421 | KAsync::Job<QString> ImapServerProxy::renameSubfolder(const QString &oldMailbox, const QString &newName) |
422 | { | 422 | { |
423 | auto folder = QSharedPointer<QString>::create(); | 423 | return KAsync::start<QString>([this, oldMailbox, newName] { |
424 | return KAsync::start<void, KAsync::Job<void>>([this, oldMailbox, newName, folder]() { | ||
425 | Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); | 424 | Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); |
426 | auto parts = oldMailbox.split(mPersonalNamespaceSeparator); | 425 | auto parts = oldMailbox.split(mPersonalNamespaceSeparator); |
427 | parts.removeLast(); | 426 | parts.removeLast(); |
428 | *folder = parts.join(mPersonalNamespaceSeparator) + mPersonalNamespaceSeparator + newName; | 427 | auto folder = QSharedPointer<QString>::create(parts.join(mPersonalNamespaceSeparator) + mPersonalNamespaceSeparator + newName); |
429 | SinkTrace() << "Renaming subfolder: " << oldMailbox << *folder; | 428 | SinkTrace() << "Renaming subfolder: " << oldMailbox << *folder; |
430 | return rename(oldMailbox, *folder); | 429 | return rename(oldMailbox, *folder) |
431 | }) | 430 | .syncThen<QString>([=]() { |
432 | .then<QString>([=]() { | 431 | return *folder; |
433 | return *folder; | 432 | }); |
434 | }); | 433 | }); |
435 | } | 434 | } |
436 | 435 | ||
@@ -461,7 +460,7 @@ KAsync::Job<void> ImapServerProxy::fetchMessages(const Folder &folder, qint64 ui | |||
461 | auto time = QSharedPointer<QTime>::create(); | 460 | auto time = QSharedPointer<QTime>::create(); |
462 | time->start(); | 461 | time->start(); |
463 | Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); | 462 | Q_ASSERT(!mPersonalNamespaceSeparator.isNull()); |
464 | return select(mailboxFromFolder(folder)).then<void, KAsync::Job<void>, SelectResult>([this, callback, folder, time, progress, uidNext](const SelectResult &selectResult) -> KAsync::Job<void> { | 463 | return select(mailboxFromFolder(folder)).then<void, SelectResult>([this, callback, folder, time, progress, uidNext](const SelectResult &selectResult) -> KAsync::Job<void> { |
465 | 464 | ||
466 | SinkLog() << "UIDNEXT " << selectResult.uidNext << uidNext; | 465 | SinkLog() << "UIDNEXT " << selectResult.uidNext << uidNext; |
467 | if (selectResult.uidNext == (uidNext + 1)) { | 466 | if (selectResult.uidNext == (uidNext + 1)) { |
@@ -469,7 +468,7 @@ KAsync::Job<void> ImapServerProxy::fetchMessages(const Folder &folder, qint64 ui | |||
469 | return KAsync::null<void>(); | 468 | return KAsync::null<void>(); |
470 | } | 469 | } |
471 | 470 | ||
472 | return fetchHeaders(mailboxFromFolder(folder), (uidNext + 1)).then<void, KAsync::Job<void>, QList<qint64>>([this, callback, time, progress](const QList<qint64> &uidsToFetch){ | 471 | return fetchHeaders(mailboxFromFolder(folder), (uidNext + 1)).then<void, QList<qint64>>([this, callback, time, progress](const QList<qint64> &uidsToFetch){ |
473 | SinkTrace() << "Fetched headers"; | 472 | SinkTrace() << "Fetched headers"; |
474 | SinkTrace() << " Total: " << uidsToFetch.size(); | 473 | SinkTrace() << " Total: " << uidsToFetch.size(); |
475 | SinkTrace() << " Uids to fetch: " << uidsToFetch; | 474 | SinkTrace() << " Uids to fetch: " << uidsToFetch; |
diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp index 392b422..e69d822 100644 --- a/examples/maildirresource/maildirresource.cpp +++ b/examples/maildirresource/maildirresource.cpp | |||
@@ -367,7 +367,7 @@ public: | |||
367 | KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE | 367 | KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE |
368 | { | 368 | { |
369 | SinkLog() << " Synchronizing"; | 369 | SinkLog() << " Synchronizing"; |
370 | return KAsync::start<void, KAsync::Job<void> >([this]() { | 370 | return KAsync::start<void>([this]() { |
371 | KPIM::Maildir maildir(mMaildirPath, true); | 371 | KPIM::Maildir maildir(mMaildirPath, true); |
372 | if (!maildir.isValid(false)) { | 372 | if (!maildir.isValid(false)) { |
373 | return KAsync::error<void>(1, "Maildir path doesn't point to a valid maildir: " + mMaildirPath); | 373 | return KAsync::error<void>(1, "Maildir path doesn't point to a valid maildir: " + mMaildirPath); |
@@ -402,18 +402,14 @@ public: | |||
402 | if (operation == Sink::Operation_Creation) { | 402 | if (operation == Sink::Operation_Creation) { |
403 | const auto remoteId = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); | 403 | const auto remoteId = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); |
404 | SinkTrace() << "Mail created: " << remoteId; | 404 | SinkTrace() << "Mail created: " << remoteId; |
405 | return KAsync::start<QByteArray>([=]() -> QByteArray { | 405 | return KAsync::value(remoteId.toUtf8()); |
406 | return remoteId.toUtf8(); | ||
407 | }); | ||
408 | } else if (operation == Sink::Operation_Removal) { | 406 | } else if (operation == Sink::Operation_Removal) { |
409 | SinkTrace() << "Removing a mail: " << oldRemoteId; | 407 | SinkTrace() << "Removing a mail: " << oldRemoteId; |
410 | return KAsync::null<QByteArray>(); | 408 | return KAsync::null<QByteArray>(); |
411 | } else if (operation == Sink::Operation_Modification) { | 409 | } else if (operation == Sink::Operation_Modification) { |
412 | SinkTrace() << "Modifying a mail: " << oldRemoteId; | 410 | SinkTrace() << "Modifying a mail: " << oldRemoteId; |
413 | const auto remoteId = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); | 411 | const auto remoteId = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); |
414 | return KAsync::start<QByteArray>([=]() -> QByteArray { | 412 | return KAsync::value(remoteId.toUtf8()); |
415 | return remoteId.toUtf8(); | ||
416 | }); | ||
417 | } | 413 | } |
418 | return KAsync::null<QByteArray>(); | 414 | return KAsync::null<QByteArray>(); |
419 | } | 415 | } |
@@ -427,9 +423,7 @@ public: | |||
427 | SinkTrace() << "Creating a new folder: " << path; | 423 | SinkTrace() << "Creating a new folder: " << path; |
428 | KPIM::Maildir maildir(path, false); | 424 | KPIM::Maildir maildir(path, false); |
429 | maildir.create(); | 425 | maildir.create(); |
430 | return KAsync::start<QByteArray>([=]() -> QByteArray { | 426 | return KAsync::value(path.toUtf8()); |
431 | return path.toUtf8(); | ||
432 | }); | ||
433 | } else if (operation == Sink::Operation_Removal) { | 427 | } else if (operation == Sink::Operation_Removal) { |
434 | const auto path = oldRemoteId; | 428 | const auto path = oldRemoteId; |
435 | SinkTrace() << "Removing a folder: " << path; | 429 | SinkTrace() << "Removing a folder: " << path; |
@@ -438,9 +432,7 @@ public: | |||
438 | return KAsync::null<QByteArray>(); | 432 | return KAsync::null<QByteArray>(); |
439 | } else if (operation == Sink::Operation_Modification) { | 433 | } else if (operation == Sink::Operation_Modification) { |
440 | SinkWarning() << "Folder modifications are not implemented"; | 434 | SinkWarning() << "Folder modifications are not implemented"; |
441 | return KAsync::start<QByteArray>([=]() -> QByteArray { | 435 | return KAsync::value(oldRemoteId); |
442 | return oldRemoteId; | ||
443 | }); | ||
444 | } | 436 | } |
445 | return KAsync::null<QByteArray>(); | 437 | return KAsync::null<QByteArray>(); |
446 | } | 438 | } |
diff --git a/examples/mailtransportresource/mailtransportresource.cpp b/examples/mailtransportresource/mailtransportresource.cpp index a729d4d..be4e4e0 100644 --- a/examples/mailtransportresource/mailtransportresource.cpp +++ b/examples/mailtransportresource/mailtransportresource.cpp | |||
@@ -124,17 +124,14 @@ public: | |||
124 | }); | 124 | }); |
125 | auto job = KAsync::null<void>(); | 125 | auto job = KAsync::null<void>(); |
126 | for (const auto &m : toSend) { | 126 | for (const auto &m : toSend) { |
127 | job = job.then(send(m, mSettings)).then<void>([this, m]() { | 127 | job = job.then(send(m, mSettings)).syncThen<void>([this, m] { |
128 | auto modifiedMail = ApplicationDomain::Mail(mResourceInstanceIdentifier, m.identifier(), m.revision(), QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create()); | 128 | auto modifiedMail = ApplicationDomain::Mail(mResourceInstanceIdentifier, m.identifier(), m.revision(), QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create()); |
129 | modifiedMail.setSent(true); | 129 | modifiedMail.setSent(true); |
130 | modify(modifiedMail); | 130 | modify(modifiedMail); |
131 | //TODO copy to a sent mail folder as well | 131 | //TODO copy to a sent mail folder as well |
132 | }); | 132 | }); |
133 | } | 133 | } |
134 | job = job.then<void>([&future]() { | 134 | job = job.syncThen<void>([&future](const KAsync::Error &) { |
135 | future.setFinished(); | ||
136 | }, | ||
137 | [&future](int errorCode, const QString &errorString) { | ||
138 | future.setFinished(); | 135 | future.setFinished(); |
139 | }); | 136 | }); |
140 | job.exec(); | 137 | job.exec(); |
diff --git a/sinksh/syntax_modules/sink_sync.cpp b/sinksh/syntax_modules/sink_sync.cpp index 208b869..2ed4cf7 100644 --- a/sinksh/syntax_modules/sink_sync.cpp +++ b/sinksh/syntax_modules/sink_sync.cpp | |||
@@ -45,10 +45,10 @@ bool sync(const QStringList &args, State &state) | |||
45 | } | 45 | } |
46 | 46 | ||
47 | QTimer::singleShot(0, [query, state]() { | 47 | QTimer::singleShot(0, [query, state]() { |
48 | Sink::Store::synchronize(query).then<void>([state]() { | 48 | Sink::Store::synchronize(query).syncThen<void>([state]() { |
49 | state.printLine("Synchronization complete!"); | 49 | state.printLine("Synchronization complete!"); |
50 | state.commandFinished(); | 50 | state.commandFinished(); |
51 | }).exec(); | 51 | }).exec(); |
52 | }); | 52 | }); |
53 | 53 | ||
54 | return true; | 54 | return true; |
diff --git a/tests/accountstest.cpp b/tests/accountstest.cpp index 4be8bd6..e0a99c2 100644 --- a/tests/accountstest.cpp +++ b/tests/accountstest.cpp | |||
@@ -39,7 +39,7 @@ private slots: | |||
39 | account.setProperty("icon", accountIcon); | 39 | account.setProperty("icon", accountIcon); |
40 | Store::create(account).exec().waitForFinished(); | 40 | Store::create(account).exec().waitForFinished(); |
41 | 41 | ||
42 | Store::fetchAll<SinkAccount>(Query()).then<void, QList<SinkAccount::Ptr>>([&](const QList<SinkAccount::Ptr> &accounts) { | 42 | Store::fetchAll<SinkAccount>(Query()).syncThen<void, QList<SinkAccount::Ptr>>([&](const QList<SinkAccount::Ptr> &accounts) { |
43 | QCOMPARE(accounts.size(), 1); | 43 | QCOMPARE(accounts.size(), 1); |
44 | auto account = accounts.first(); | 44 | auto account = accounts.first(); |
45 | QCOMPARE(account->getProperty("type").toString(), QString("maildir")); | 45 | QCOMPARE(account->getProperty("type").toString(), QString("maildir")); |
@@ -60,7 +60,7 @@ private slots: | |||
60 | Store::create(resource).exec().waitForFinished(); | 60 | Store::create(resource).exec().waitForFinished(); |
61 | 61 | ||
62 | 62 | ||
63 | Store::fetchAll<SinkResource>(Query()).then<void, QList<SinkResource::Ptr>>([&](const QList<SinkResource::Ptr> &resources) { | 63 | Store::fetchAll<SinkResource>(Query()).syncThen<void, QList<SinkResource::Ptr>>([&](const QList<SinkResource::Ptr> &resources) { |
64 | QCOMPARE(resources.size(), 1); | 64 | QCOMPARE(resources.size(), 1); |
65 | auto resource = resources.first(); | 65 | auto resource = resources.first(); |
66 | QCOMPARE(resource->getProperty("type").toString(), QString("sink.mailtransport")); | 66 | QCOMPARE(resource->getProperty("type").toString(), QString("sink.mailtransport")); |
@@ -74,7 +74,7 @@ private slots: | |||
74 | identity.setProperty("account", account.identifier()); | 74 | identity.setProperty("account", account.identifier()); |
75 | Store::create(identity).exec().waitForFinished(); | 75 | Store::create(identity).exec().waitForFinished(); |
76 | 76 | ||
77 | Store::fetchAll<Identity>(Query()).then<void, QList<Identity::Ptr>>([&](const QList<Identity::Ptr> &identities) { | 77 | Store::fetchAll<Identity>(Query()).syncThen<void, QList<Identity::Ptr>>([&](const QList<Identity::Ptr> &identities) { |
78 | QCOMPARE(identities.size(), 1); | 78 | QCOMPARE(identities.size(), 1); |
79 | }) | 79 | }) |
80 | .exec().waitForFinished(); | 80 | .exec().waitForFinished(); |
@@ -82,7 +82,7 @@ private slots: | |||
82 | 82 | ||
83 | Store::remove(resource).exec().waitForFinished(); | 83 | Store::remove(resource).exec().waitForFinished(); |
84 | 84 | ||
85 | Store::fetchAll<SinkResource>(Query()).then<void, QList<SinkResource::Ptr>>([](const QList<SinkResource::Ptr> &resources) { | 85 | Store::fetchAll<SinkResource>(Query()).syncThen<void, QList<SinkResource::Ptr>>([](const QList<SinkResource::Ptr> &resources) { |
86 | QCOMPARE(resources.size(), 0); | 86 | QCOMPARE(resources.size(), 0); |
87 | }) | 87 | }) |
88 | .exec().waitForFinished(); | 88 | .exec().waitForFinished(); |
diff --git a/tests/clientapitest.cpp b/tests/clientapitest.cpp index b1e49c4..b5405cf 100644 --- a/tests/clientapitest.cpp +++ b/tests/clientapitest.cpp | |||
@@ -70,7 +70,7 @@ public: | |||
70 | } | 70 | } |
71 | resultProvider->initialResultSetComplete(parent); | 71 | resultProvider->initialResultSetComplete(parent); |
72 | }); | 72 | }); |
73 | auto job = KAsync::start<void>([query, resultProvider]() {}); | 73 | auto job = KAsync::syncStart<void>([query, resultProvider]() {}); |
74 | mResultProvider = resultProvider; | 74 | mResultProvider = resultProvider; |
75 | return qMakePair(job, emitter); | 75 | return qMakePair(job, emitter); |
76 | } | 76 | } |
@@ -273,7 +273,7 @@ private slots: | |||
273 | 273 | ||
274 | bool gotValue = false; | 274 | bool gotValue = false; |
275 | auto result = Sink::Store::fetchOne<Sink::ApplicationDomain::Event>(query) | 275 | auto result = Sink::Store::fetchOne<Sink::ApplicationDomain::Event>(query) |
276 | .then<void, Sink::ApplicationDomain::Event>([&gotValue](const Sink::ApplicationDomain::Event &event) { gotValue = true; }) | 276 | .syncThen<void, Sink::ApplicationDomain::Event>([&gotValue](const Sink::ApplicationDomain::Event &event) { gotValue = true; }) |
277 | .exec(); | 277 | .exec(); |
278 | result.waitForFinished(); | 278 | result.waitForFinished(); |
279 | QVERIFY(!result.errorCode()); | 279 | QVERIFY(!result.errorCode()); |
diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp index 72562c3..8636bf6 100644 --- a/tests/dummyresourcebenchmark.cpp +++ b/tests/dummyresourcebenchmark.cpp | |||
@@ -44,34 +44,6 @@ private slots: | |||
44 | { | 44 | { |
45 | } | 45 | } |
46 | 46 | ||
47 | static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void>> &futures) | ||
48 | { | ||
49 | auto context = new QObject; | ||
50 | return KAsync::start<void>([futures, context](KAsync::Future<void> &future) { | ||
51 | const auto total = futures.size(); | ||
52 | auto count = QSharedPointer<int>::create(); | ||
53 | int i = 0; | ||
54 | for (KAsync::Future<void> subFuture : futures) { | ||
55 | i++; | ||
56 | if (subFuture.isFinished()) { | ||
57 | *count += 1; | ||
58 | continue; | ||
59 | } | ||
60 | // FIXME bind lifetime all watcher to future (repectively the main job | ||
61 | auto watcher = QSharedPointer<KAsync::FutureWatcher<void>>::create(); | ||
62 | QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, [count, total, &future]() { | ||
63 | *count += 1; | ||
64 | if (*count == total) { | ||
65 | future.setFinished(); | ||
66 | } | ||
67 | }); | ||
68 | watcher->setFuture(subFuture); | ||
69 | context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); | ||
70 | } | ||
71 | }) | ||
72 | .then<void>([context]() { delete context; }); | ||
73 | } | ||
74 | |||
75 | // Ensure we can process a command in less than 0.1s | 47 | // Ensure we can process a command in less than 0.1s |
76 | void testCommandResponsiveness() | 48 | void testCommandResponsiveness() |
77 | { | 49 | { |
@@ -120,7 +92,7 @@ private slots: | |||
120 | event.setProperty("summary", "summaryValue"); | 92 | event.setProperty("summary", "summaryValue"); |
121 | waitCondition << Sink::Store::create<Sink::ApplicationDomain::Event>(event).exec(); | 93 | waitCondition << Sink::Store::create<Sink::ApplicationDomain::Event>(event).exec(); |
122 | } | 94 | } |
123 | waitForCompletion(waitCondition).exec().waitForFinished(); | 95 | KAsync::waitForCompletion(waitCondition).exec().waitForFinished(); |
124 | auto appendTime = time.elapsed(); | 96 | auto appendTime = time.elapsed(); |
125 | 97 | ||
126 | // Ensure everything is processed | 98 | // Ensure everything is processed |
diff --git a/tests/mailsynctest.cpp b/tests/mailsynctest.cpp index 4b797c8..9c57f0a 100644 --- a/tests/mailsynctest.cpp +++ b/tests/mailsynctest.cpp | |||
@@ -69,7 +69,7 @@ void MailSyncTest::testListFolders() | |||
69 | //First figure out how many folders we have by default | 69 | //First figure out how many folders we have by default |
70 | { | 70 | { |
71 | auto job = Store::fetchAll<Folder>(Query()) | 71 | auto job = Store::fetchAll<Folder>(Query()) |
72 | .then<void, QList<Folder::Ptr>>([&](const QList<Folder::Ptr> &folders) { | 72 | .syncThen<void, QList<Folder::Ptr>>([&](const QList<Folder::Ptr> &folders) { |
73 | QStringList names; | 73 | QStringList names; |
74 | for (const auto &folder : folders) { | 74 | for (const auto &folder : folders) { |
75 | names << folder->getName(); | 75 | names << folder->getName(); |
@@ -88,7 +88,7 @@ void MailSyncTest::testListFolders() | |||
88 | VERIFYEXEC(Store::synchronize(query)); | 88 | VERIFYEXEC(Store::synchronize(query)); |
89 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); | 89 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); |
90 | 90 | ||
91 | auto job = Store::fetchAll<Folder>(query).then<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { | 91 | auto job = Store::fetchAll<Folder>(query).syncThen<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { |
92 | QStringList names; | 92 | QStringList names; |
93 | QHash<QByteArray, QByteArray> specialPurposeFolders; | 93 | QHash<QByteArray, QByteArray> specialPurposeFolders; |
94 | for (const auto &folder : folders) { | 94 | for (const auto &folder : folders) { |
@@ -130,7 +130,7 @@ void MailSyncTest::testListNewFolder() | |||
130 | VERIFYEXEC(Store::synchronize(query)); | 130 | VERIFYEXEC(Store::synchronize(query)); |
131 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); | 131 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); |
132 | 132 | ||
133 | auto job = Store::fetchAll<Folder>(query).then<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) { | 133 | auto job = Store::fetchAll<Folder>(query).syncThen<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) { |
134 | QStringList names; | 134 | QStringList names; |
135 | for (const auto &folder : folders) { | 135 | for (const auto &folder : folders) { |
136 | names << folder->getName(); | 136 | names << folder->getName(); |
@@ -155,7 +155,7 @@ void MailSyncTest::testListRemovedFolder() | |||
155 | VERIFYEXEC(Store::synchronize(query)); | 155 | VERIFYEXEC(Store::synchronize(query)); |
156 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); | 156 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); |
157 | 157 | ||
158 | auto job = Store::fetchAll<Folder>(query).then<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) { | 158 | auto job = Store::fetchAll<Folder>(query).syncThen<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) { |
159 | QStringList names; | 159 | QStringList names; |
160 | for (const auto &folder : folders) { | 160 | for (const auto &folder : folders) { |
161 | names << folder->getName(); | 161 | names << folder->getName(); |
@@ -180,7 +180,7 @@ void MailSyncTest::testListFolderHierarchy() | |||
180 | VERIFYEXEC(Store::synchronize(query)); | 180 | VERIFYEXEC(Store::synchronize(query)); |
181 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); | 181 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); |
182 | 182 | ||
183 | auto job = Store::fetchAll<Folder>(query).then<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { | 183 | auto job = Store::fetchAll<Folder>(query).syncThen<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { |
184 | QHash<QString, Folder::Ptr> map; | 184 | QHash<QString, Folder::Ptr> map; |
185 | for (const auto &folder : folders) { | 185 | for (const auto &folder : folders) { |
186 | map.insert(folder->getName(), folder); | 186 | map.insert(folder->getName(), folder); |
@@ -223,7 +223,7 @@ void MailSyncTest::testListNewSubFolder() | |||
223 | VERIFYEXEC(Store::synchronize(query)); | 223 | VERIFYEXEC(Store::synchronize(query)); |
224 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); | 224 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); |
225 | 225 | ||
226 | auto job = Store::fetchAll<Folder>(query).then<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) { | 226 | auto job = Store::fetchAll<Folder>(query).syncThen<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) { |
227 | QStringList names; | 227 | QStringList names; |
228 | for (const auto &folder : folders) { | 228 | for (const auto &folder : folders) { |
229 | names << folder->getName(); | 229 | names << folder->getName(); |
@@ -251,7 +251,7 @@ void MailSyncTest::testListRemovedSubFolder() | |||
251 | VERIFYEXEC(Store::synchronize(query)); | 251 | VERIFYEXEC(Store::synchronize(query)); |
252 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); | 252 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); |
253 | 253 | ||
254 | auto job = Store::fetchAll<Folder>(query).then<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) { | 254 | auto job = Store::fetchAll<Folder>(query).syncThen<void, QList<Folder::Ptr>>([](const QList<Folder::Ptr> &folders) { |
255 | QStringList names; | 255 | QStringList names; |
256 | for (const auto &folder : folders) { | 256 | for (const auto &folder : folders) { |
257 | names << folder->getName(); | 257 | names << folder->getName(); |
@@ -271,7 +271,7 @@ void MailSyncTest::testListMails() | |||
271 | VERIFYEXEC(Store::synchronize(query)); | 271 | VERIFYEXEC(Store::synchronize(query)); |
272 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); | 272 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); |
273 | 273 | ||
274 | auto job = Store::fetchAll<Mail>(query).then<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { | 274 | auto job = Store::fetchAll<Mail>(query).syncThen<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { |
275 | QCOMPARE(mails.size(), 1); | 275 | QCOMPARE(mails.size(), 1); |
276 | QVERIFY(mails.first()->getSubject().startsWith(QString("[Nepomuk] Jenkins build is still unstable"))); | 276 | QVERIFY(mails.first()->getSubject().startsWith(QString("[Nepomuk] Jenkins build is still unstable"))); |
277 | const auto data = mails.first()->getMimeMessage(); | 277 | const auto data = mails.first()->getMimeMessage(); |
@@ -300,7 +300,7 @@ void MailSyncTest::testResyncMails() | |||
300 | VERIFYEXEC(Store::synchronize(query)); | 300 | VERIFYEXEC(Store::synchronize(query)); |
301 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); | 301 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); |
302 | 302 | ||
303 | auto job = Store::fetchAll<Mail>(query).then<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { | 303 | auto job = Store::fetchAll<Mail>(query).syncThen<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { |
304 | QCOMPARE(mails.size(), 1); | 304 | QCOMPARE(mails.size(), 1); |
305 | }); | 305 | }); |
306 | VERIFYEXEC(job); | 306 | VERIFYEXEC(job); |
@@ -325,7 +325,7 @@ void MailSyncTest::testFetchNewRemovedMessages() | |||
325 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); | 325 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); |
326 | 326 | ||
327 | { | 327 | { |
328 | auto job = Store::fetchAll<Mail>(query).then<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { | 328 | auto job = Store::fetchAll<Mail>(query).syncThen<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { |
329 | QCOMPARE(mails.size(), 2); | 329 | QCOMPARE(mails.size(), 2); |
330 | }); | 330 | }); |
331 | VERIFYEXEC(job); | 331 | VERIFYEXEC(job); |
@@ -337,7 +337,7 @@ void MailSyncTest::testFetchNewRemovedMessages() | |||
337 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); | 337 | ResourceControl::flushMessageQueue(query.resources).exec().waitForFinished(); |
338 | 338 | ||
339 | { | 339 | { |
340 | auto job = Store::fetchAll<Mail>(query).then<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { | 340 | auto job = Store::fetchAll<Mail>(query).syncThen<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { |
341 | QCOMPARE(mails.size(), 1); | 341 | QCOMPARE(mails.size(), 1); |
342 | }); | 342 | }); |
343 | VERIFYEXEC(job); | 343 | VERIFYEXEC(job); |
diff --git a/tests/mailtest.cpp b/tests/mailtest.cpp index 908fb84..925fb70 100644 --- a/tests/mailtest.cpp +++ b/tests/mailtest.cpp | |||
@@ -66,7 +66,7 @@ void MailTest::testCreateModifyDeleteFolder() | |||
66 | //First figure out how many folders we have by default | 66 | //First figure out how many folders we have by default |
67 | { | 67 | { |
68 | auto job = Store::fetchAll<Folder>(Query()) | 68 | auto job = Store::fetchAll<Folder>(Query()) |
69 | .then<void, QList<Folder::Ptr>>([&](const QList<Folder::Ptr> &folders) { | 69 | .syncThen<void, QList<Folder::Ptr>>([&](const QList<Folder::Ptr> &folders) { |
70 | baseCount = folders.size(); | 70 | baseCount = folders.size(); |
71 | }); | 71 | }); |
72 | VERIFYEXEC(job); | 72 | VERIFYEXEC(job); |
@@ -83,7 +83,7 @@ void MailTest::testCreateModifyDeleteFolder() | |||
83 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); | 83 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); |
84 | { | 84 | { |
85 | auto job = Store::fetchAll<Folder>(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name)) | 85 | auto job = Store::fetchAll<Folder>(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name)) |
86 | .then<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { | 86 | .syncThen<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { |
87 | QCOMPARE(folders.size(), baseCount + 1); | 87 | QCOMPARE(folders.size(), baseCount + 1); |
88 | QHash<QString, Folder::Ptr> foldersByName; | 88 | QHash<QString, Folder::Ptr> foldersByName; |
89 | for (const auto &folder : folders) { | 89 | for (const auto &folder : folders) { |
@@ -109,7 +109,7 @@ void MailTest::testCreateModifyDeleteFolder() | |||
109 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); | 109 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); |
110 | { | 110 | { |
111 | auto job = Store::fetchAll<Folder>(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name)) | 111 | auto job = Store::fetchAll<Folder>(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name)) |
112 | .then<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { | 112 | .syncThen<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { |
113 | QCOMPARE(folders.size(), baseCount + 1); | 113 | QCOMPARE(folders.size(), baseCount + 1); |
114 | QHash<QString, Folder::Ptr> foldersByName; | 114 | QHash<QString, Folder::Ptr> foldersByName; |
115 | for (const auto &folder : folders) { | 115 | for (const auto &folder : folders) { |
@@ -130,7 +130,7 @@ void MailTest::testCreateModifyDeleteFolder() | |||
130 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); | 130 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); |
131 | { | 131 | { |
132 | auto job = Store::fetchAll<Folder>(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name)) | 132 | auto job = Store::fetchAll<Folder>(Query::RequestedProperties(QByteArrayList() << Folder::Name::name << Folder::Icon::name)) |
133 | .then<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { | 133 | .syncThen<void, QList<Folder::Ptr>>([=](const QList<Folder::Ptr> &folders) { |
134 | QCOMPARE(folders.size(), baseCount); | 134 | QCOMPARE(folders.size(), baseCount); |
135 | }); | 135 | }); |
136 | VERIFYEXEC(job); | 136 | VERIFYEXEC(job); |
@@ -160,7 +160,7 @@ void MailTest::testCreateModifyDeleteMail() | |||
160 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); | 160 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); |
161 | { | 161 | { |
162 | auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) | 162 | auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) |
163 | .then<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) { | 163 | .syncThen<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) { |
164 | QCOMPARE(mails.size(), 1); | 164 | QCOMPARE(mails.size(), 1); |
165 | auto mail = *mails.first(); | 165 | auto mail = *mails.first(); |
166 | QCOMPARE(mail.getSubject(), subject); | 166 | QCOMPARE(mail.getSubject(), subject); |
@@ -189,7 +189,7 @@ void MailTest::testCreateModifyDeleteMail() | |||
189 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); | 189 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); |
190 | { | 190 | { |
191 | auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) | 191 | auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) |
192 | .then<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) { | 192 | .syncThen<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) { |
193 | QCOMPARE(mails.size(), 1); | 193 | QCOMPARE(mails.size(), 1); |
194 | auto mail = *mails.first(); | 194 | auto mail = *mails.first(); |
195 | QCOMPARE(mail.getSubject(), subject2); | 195 | QCOMPARE(mail.getSubject(), subject2); |
@@ -211,7 +211,7 @@ void MailTest::testCreateModifyDeleteMail() | |||
211 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); | 211 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); |
212 | { | 212 | { |
213 | auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name)) | 213 | auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name)) |
214 | .then<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) { | 214 | .syncThen<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) { |
215 | QCOMPARE(mails.size(), 0); | 215 | QCOMPARE(mails.size(), 0); |
216 | }); | 216 | }); |
217 | VERIFYEXEC(job); | 217 | VERIFYEXEC(job); |
@@ -247,7 +247,7 @@ void MailTest::testMoveMail() | |||
247 | Mail modifiedMail; | 247 | Mail modifiedMail; |
248 | { | 248 | { |
249 | auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) | 249 | auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) |
250 | .then<void, QList<Mail::Ptr>>([=, &modifiedMail](const QList<Mail::Ptr> &mails) { | 250 | .syncThen<void, QList<Mail::Ptr>>([=, &modifiedMail](const QList<Mail::Ptr> &mails) { |
251 | QCOMPARE(mails.size(), 1); | 251 | QCOMPARE(mails.size(), 1); |
252 | auto mail = *mails.first(); | 252 | auto mail = *mails.first(); |
253 | modifiedMail = mail; | 253 | modifiedMail = mail; |
@@ -266,7 +266,7 @@ void MailTest::testMoveMail() | |||
266 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); | 266 | VERIFYEXEC(ResourceControl::flushMessageQueue(QByteArrayList() << mResourceInstanceIdentifier)); |
267 | { | 267 | { |
268 | auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) | 268 | auto job = Store::fetchAll<Mail>(Query::RequestedProperties(QByteArrayList() << Mail::Folder::name << Mail::Subject::name << Mail::MimeMessage::name)) |
269 | .then<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) { | 269 | .syncThen<void, QList<Mail::Ptr>>([=](const QList<Mail::Ptr> &mails) { |
270 | QCOMPARE(mails.size(), 1); | 270 | QCOMPARE(mails.size(), 1); |
271 | auto mail = *mails.first(); | 271 | auto mail = *mails.first(); |
272 | QCOMPARE(mail.getFolder(), folder1.identifier()); | 272 | QCOMPARE(mail.getFolder(), folder1.identifier()); |
@@ -299,7 +299,7 @@ void MailTest::testMarkMailAsRead() | |||
299 | auto job = Store::fetchAll<Mail>(Query::ResourceFilter(mResourceInstanceIdentifier) + | 299 | auto job = Store::fetchAll<Mail>(Query::ResourceFilter(mResourceInstanceIdentifier) + |
300 | Query::RequestedProperties(QByteArrayList() << Mail::Folder::name | 300 | Query::RequestedProperties(QByteArrayList() << Mail::Folder::name |
301 | << Mail::Subject::name)) | 301 | << Mail::Subject::name)) |
302 | .then<void, KAsync::Job<void>, QList<Mail::Ptr>>([this](const QList<Mail::Ptr> &mails) { | 302 | .then<void, QList<Mail::Ptr>>([this](const QList<Mail::Ptr> &mails) { |
303 | ASYNCCOMPARE(mails.size(), 1); | 303 | ASYNCCOMPARE(mails.size(), 1); |
304 | auto mail = mails.first(); | 304 | auto mail = mails.first(); |
305 | mail->setUnread(false); | 305 | mail->setUnread(false); |
@@ -316,7 +316,7 @@ void MailTest::testMarkMailAsRead() | |||
316 | << Mail::Subject::name | 316 | << Mail::Subject::name |
317 | << Mail::MimeMessage::name | 317 | << Mail::MimeMessage::name |
318 | << Mail::Unread::name)) | 318 | << Mail::Unread::name)) |
319 | .then<void, KAsync::Job<void>, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { | 319 | .then<void, QList<Mail::Ptr>>([](const QList<Mail::Ptr> &mails) { |
320 | ASYNCCOMPARE(mails.size(), 1); | 320 | ASYNCCOMPARE(mails.size(), 1); |
321 | auto mail = mails.first(); | 321 | auto mail = mails.first(); |
322 | ASYNCVERIFY(!mail->getSubject().isEmpty()); | 322 | ASYNCVERIFY(!mail->getSubject().isEmpty()); |
diff --git a/tests/resourcecommunicationtest.cpp b/tests/resourcecommunicationtest.cpp index 1530f63..201db53 100644 --- a/tests/resourcecommunicationtest.cpp +++ b/tests/resourcecommunicationtest.cpp | |||
@@ -51,13 +51,14 @@ private slots: | |||
51 | int errors = 0; | 51 | int errors = 0; |
52 | for (int i = 0; i < count; i++) { | 52 | for (int i = 0; i < count; i++) { |
53 | auto result = resourceAccess.sendCommand(Sink::Commands::PingCommand) | 53 | auto result = resourceAccess.sendCommand(Sink::Commands::PingCommand) |
54 | .then<void>([&complete]() { complete++; }, | 54 | .syncThen<void>([&resourceAccess, &errors, &complete](const KAsync::Error &error) { |
55 | [&errors, &complete](int error, const QString &msg) { | 55 | complete++; |
56 | qWarning() << msg; | 56 | if (error) { |
57 | errors++; | 57 | qWarning() << error.errorMessage; |
58 | complete++; | 58 | errors++; |
59 | }) | 59 | } |
60 | .exec(); | 60 | }) |
61 | .exec(); | ||
61 | } | 62 | } |
62 | QTRY_COMPARE(complete, count); | 63 | QTRY_COMPARE(complete, count); |
63 | QVERIFY(!errors); | 64 | QVERIFY(!errors); |
@@ -76,13 +77,12 @@ private slots: | |||
76 | int errors = 0; | 77 | int errors = 0; |
77 | for (int i = 0; i < count; i++) { | 78 | for (int i = 0; i < count; i++) { |
78 | resourceAccess.sendCommand(Sink::Commands::PingCommand) | 79 | resourceAccess.sendCommand(Sink::Commands::PingCommand) |
79 | .then<void>([&complete]() { complete++; }, | 80 | .syncThen<void>([&resourceAccess, &errors, &complete](const KAsync::Error &error) { |
80 | [&errors, &complete](int error, const QString &msg) { | 81 | complete++; |
81 | qWarning() << msg; | 82 | if (error) { |
83 | qWarning() << error.errorMessage; | ||
82 | errors++; | 84 | errors++; |
83 | complete++; | 85 | } |
84 | }) | ||
85 | .then<void>([&resourceAccess]() { | ||
86 | resourceAccess.close(); | 86 | resourceAccess.close(); |
87 | resourceAccess.open(); | 87 | resourceAccess.open(); |
88 | }) | 88 | }) |
@@ -104,7 +104,7 @@ private slots: | |||
104 | auto resourceAccess = Sink::ResourceAccessFactory::instance().getAccess(resourceIdentifier, ""); | 104 | auto resourceAccess = Sink::ResourceAccessFactory::instance().getAccess(resourceIdentifier, ""); |
105 | weakRef = resourceAccess.toWeakRef(); | 105 | weakRef = resourceAccess.toWeakRef(); |
106 | resourceAccess->open(); | 106 | resourceAccess->open(); |
107 | resourceAccess->sendCommand(Sink::Commands::PingCommand).then<void>([resourceAccess]() { qDebug() << "Pind complete"; }).exec(); | 107 | resourceAccess->sendCommand(Sink::Commands::PingCommand).syncThen<void>([resourceAccess]() { qDebug() << "Ping complete"; }).exec(); |
108 | } | 108 | } |
109 | QVERIFY(weakRef.toStrongRef()); | 109 | QVERIFY(weakRef.toStrongRef()); |
110 | QTRY_VERIFY(!weakRef.toStrongRef()); | 110 | QTRY_VERIFY(!weakRef.toStrongRef()); |