diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/changereplay.cpp | 58 | ||||
-rw-r--r-- | common/genericresource.cpp | 134 | ||||
-rw-r--r-- | common/listener.cpp | 18 | ||||
-rw-r--r-- | common/messagequeue.cpp | 37 | ||||
-rw-r--r-- | common/pipeline.cpp | 6 | ||||
-rw-r--r-- | common/queryrunner.cpp | 4 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 117 | ||||
-rw-r--r-- | common/resourcecontrol.cpp | 29 | ||||
-rw-r--r-- | common/resourcefacade.cpp | 6 | ||||
-rw-r--r-- | common/sourcewriteback.cpp | 14 | ||||
-rw-r--r-- | common/store.cpp | 68 | ||||
-rw-r--r-- | common/synchronizer.cpp | 2 | ||||
-rw-r--r-- | common/test.cpp | 2 |
13 files changed, 233 insertions, 262 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp index fbd556f..e3b7158 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp | |||
@@ -72,7 +72,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
72 | { | 72 | { |
73 | auto lastReplayedRevision = QSharedPointer<qint64>::create(0); | 73 | auto lastReplayedRevision = QSharedPointer<qint64>::create(0); |
74 | auto topRevision = QSharedPointer<qint64>::create(0); | 74 | auto topRevision = QSharedPointer<qint64>::create(0); |
75 | return KAsync::start<void>([this, lastReplayedRevision, topRevision]() { | 75 | return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { |
76 | mReplayInProgress = true; | 76 | mReplayInProgress = true; |
77 | mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 77 | mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
78 | SinkWarning() << error.message; | 78 | SinkWarning() << error.message; |
@@ -90,11 +90,9 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
90 | SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; | 90 | SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; |
91 | }) | 91 | }) |
92 | .then(KAsync::dowhile( | 92 | .then(KAsync::dowhile( |
93 | [this, lastReplayedRevision, topRevision](KAsync::Future<bool> &future) { | 93 | [this, lastReplayedRevision, topRevision]() -> KAsync::Job<KAsync::ControlFlowFlag> { |
94 | if (*lastReplayedRevision >= *topRevision) { | 94 | if (*lastReplayedRevision >= *topRevision) { |
95 | future.setValue(false); | 95 | return KAsync::value(KAsync::Break); |
96 | future.setFinished(); | ||
97 | return; | ||
98 | } | 96 | } |
99 | 97 | ||
100 | qint64 revision = *lastReplayedRevision + 1; | 98 | qint64 revision = *lastReplayedRevision + 1; |
@@ -109,12 +107,15 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
109 | [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { | 107 | [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { |
110 | SinkTrace() << "Replaying " << key; | 108 | SinkTrace() << "Replaying " << key; |
111 | if (canReplay(type, key, value)) { | 109 | if (canReplay(type, key, value)) { |
112 | replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision]() { | 110 | replayJob = replay(type, key, value).then<void>([this, revision, lastReplayedRevision](const KAsync::Error &error) { |
113 | recordReplayedRevision(revision); | 111 | if (error) { |
114 | *lastReplayedRevision = revision; | 112 | SinkTrace() << "Change replay failed" << revision; |
115 | }, | 113 | return KAsync::error(error); |
116 | [revision](int, QString) { | 114 | } else { |
117 | SinkTrace() << "Change replay failed" << revision; | 115 | recordReplayedRevision(revision); |
116 | *lastReplayedRevision = revision; | ||
117 | } | ||
118 | return KAsync::null(); | ||
118 | }); | 119 | }); |
119 | exitLoop = true; | 120 | exitLoop = true; |
120 | } else { | 121 | } else { |
@@ -128,23 +129,26 @@ KAsync::Job<void> ChangeReplay::replayNextRevision() | |||
128 | } | 129 | } |
129 | revision++; | 130 | revision++; |
130 | } | 131 | } |
131 | replayJob.then<void>([this, revision, lastReplayedRevision, topRevision, &future]() { | 132 | return replayJob.then<KAsync::ControlFlowFlag>([this, revision, lastReplayedRevision, topRevision](const KAsync::Error &error) ->KAsync::Job<KAsync::ControlFlowFlag> { |
132 | SinkTrace() << "Replayed until " << revision; | 133 | if (error) { |
133 | recordReplayedRevision(*lastReplayedRevision); | 134 | SinkTrace() << "Change replay failed" << revision; |
134 | QTimer::singleShot(0, [&future, lastReplayedRevision, topRevision]() { | 135 | //We're probably not online or so, so postpone retrying |
135 | future.setValue((*lastReplayedRevision < *topRevision)); | 136 | return KAsync::value(KAsync::Break); |
136 | future.setFinished(); | 137 | } else { |
137 | }); | 138 | SinkTrace() << "Replayed until " << revision; |
138 | }, | 139 | recordReplayedRevision(*lastReplayedRevision); |
139 | [this, revision, &future](int, QString) { | 140 | if (*lastReplayedRevision < *topRevision) { |
140 | SinkTrace() << "Change replay failed" << revision; | 141 | return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); |
141 | //We're probably not online or so, so postpone retrying | 142 | } else { |
142 | future.setValue(false); | 143 | return KAsync::value(KAsync::Break); |
143 | future.setFinished(); | 144 | } |
144 | }).exec(); | 145 | } |
145 | 146 | //We shouldn't ever get here | |
147 | Q_ASSERT(false); | ||
148 | return KAsync::value(KAsync::Break); | ||
149 | }); | ||
146 | })) | 150 | })) |
147 | .then<void>([this, lastReplayedRevision]() { | 151 | .syncThen<void>([this, lastReplayedRevision]() { |
148 | recordReplayedRevision(*lastReplayedRevision); | 152 | recordReplayedRevision(*lastReplayedRevision); |
149 | mMainStoreTransaction.abort(); | 153 | mMainStoreTransaction.abort(); |
150 | if (allChangesReplayed()) { | 154 | if (allChangesReplayed()) { |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 7136882..f5b1775 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -100,7 +100,7 @@ private slots: | |||
100 | } | 100 | } |
101 | mProcessingLock = true; | 101 | mProcessingLock = true; |
102 | auto job = processPipeline() | 102 | auto job = processPipeline() |
103 | .then<void>([this]() { | 103 | .syncThen<void>([this]() { |
104 | mProcessingLock = false; | 104 | mProcessingLock = false; |
105 | if (messagesToProcessAvailable()) { | 105 | if (messagesToProcessAvailable()) { |
106 | process(); | 106 | process(); |
@@ -122,7 +122,8 @@ private slots: | |||
122 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 122 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
123 | case Sink::Commands::InspectionCommand: | 123 | case Sink::Commands::InspectionCommand: |
124 | if (mInspect) { | 124 | if (mInspect) { |
125 | return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { return -1; }); | 125 | return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()) |
126 | .syncThen<qint64>([]() { return -1; }); | ||
126 | } else { | 127 | } else { |
127 | return KAsync::error<qint64>(-1, "Missing inspection command."); | 128 | return KAsync::error<qint64>(-1, "Missing inspection command."); |
128 | } | 129 | } |
@@ -131,7 +132,7 @@ private slots: | |||
131 | } | 132 | } |
132 | } | 133 | } |
133 | 134 | ||
134 | KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) | 135 | KAsync::Job<qint64> processQueuedCommand(const QByteArray &data) |
135 | { | 136 | { |
136 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); | 137 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); |
137 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { | 138 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { |
@@ -143,13 +144,13 @@ private slots: | |||
143 | SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); | 144 | SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); |
144 | return processQueuedCommand(queuedCommand) | 145 | return processQueuedCommand(queuedCommand) |
145 | .then<qint64, qint64>( | 146 | .then<qint64, qint64>( |
146 | [this, commandId](qint64 createdRevision) -> qint64 { | 147 | [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job<qint64> { |
148 | if (error) { | ||
149 | SinkWarning() << "Error while processing queue command: " << error.errorMessage; | ||
150 | return KAsync::error<qint64>(error); | ||
151 | } | ||
147 | SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); | 152 | SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); |
148 | return createdRevision; | 153 | return KAsync::value<qint64>(createdRevision); |
149 | }, | ||
150 | [](int errorCode, QString errorMessage) { | ||
151 | // FIXME propagate error, we didn't handle it | ||
152 | SinkWarning() << "Error while processing queue command: " << errorMessage; | ||
153 | }); | 154 | }); |
154 | } | 155 | } |
155 | 156 | ||
@@ -157,31 +158,31 @@ private slots: | |||
157 | KAsync::Job<void> processQueue(MessageQueue *queue) | 158 | KAsync::Job<void> processQueue(MessageQueue *queue) |
158 | { | 159 | { |
159 | auto time = QSharedPointer<QTime>::create(); | 160 | auto time = QSharedPointer<QTime>::create(); |
160 | return KAsync::start<void>([this]() { mPipeline->startTransaction(); }) | 161 | return KAsync::syncStart<void>([this]() { mPipeline->startTransaction(); }) |
161 | .then(KAsync::dowhile([queue]() { return !queue->isEmpty(); }, | 162 | .then(KAsync::dowhile( |
162 | [this, queue, time](KAsync::Future<void> &future) { | 163 | [this, queue, time]() -> KAsync::Job<KAsync::ControlFlowFlag> { |
163 | queue->dequeueBatch(sBatchSize, | 164 | return queue->dequeueBatch(sBatchSize, |
164 | [this, time](const QByteArray &data) { | 165 | [this, time](const QByteArray &data) -> KAsync::Job<void> { |
165 | time->start(); | 166 | time->start(); |
166 | return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { | 167 | return processQueuedCommand(data) |
167 | processQueuedCommand(data) | 168 | .syncThen<void, qint64>([this, time](qint64 createdRevision) { |
168 | .then<void, qint64>([&future, this, time](qint64 createdRevision) { | 169 | SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); |
169 | SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); | 170 | }); |
170 | future.setFinished(); | 171 | }) |
171 | }) | 172 | .then<KAsync::ControlFlowFlag>([queue](const KAsync::Error &error) { |
172 | .exec(); | 173 | if (error) { |
173 | }); | 174 | if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { |
174 | }) | 175 | SinkWarning() << "Error while getting message from messagequeue: " << error.errorMessage; |
175 | .then<void>([&future, queue]() { future.setFinished(); }, | 176 | } |
176 | [&future](int i, QString error) { | ||
177 | if (i != MessageQueue::ErrorCodes::NoMessageFound) { | ||
178 | SinkWarning() << "Error while getting message from messagequeue: " << error; | ||
179 | } | 177 | } |
180 | future.setFinished(); | 178 | if (queue->isEmpty()) { |
181 | }) | 179 | return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Break); |
182 | .exec(); | 180 | } else { |
181 | return KAsync::value<KAsync::ControlFlowFlag>(KAsync::Continue); | ||
182 | } | ||
183 | }); | ||
183 | })) | 184 | })) |
184 | .then<void>([this]() { mPipeline->commit(); }); | 185 | .syncThen<void>([this](const KAsync::Error &) { mPipeline->commit(); }); |
185 | } | 186 | } |
186 | 187 | ||
187 | KAsync::Job<void> processPipeline() | 188 | KAsync::Job<void> processPipeline() |
@@ -198,18 +199,20 @@ private slots: | |||
198 | 199 | ||
199 | // Go through all message queues | 200 | // Go through all message queues |
200 | auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); | 201 | auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); |
201 | return KAsync::dowhile([it]() { return it->hasNext(); }, | 202 | return KAsync::dowhile( |
202 | [it, this](KAsync::Future<void> &future) { | 203 | [it, this]() { |
203 | auto time = QSharedPointer<QTime>::create(); | 204 | auto time = QSharedPointer<QTime>::create(); |
204 | time->start(); | 205 | time->start(); |
205 | 206 | ||
206 | auto queue = it->next(); | 207 | auto queue = it->next(); |
207 | processQueue(queue) | 208 | return processQueue(queue) |
208 | .then<void>([this, &future, time]() { | 209 | .syncThen<KAsync::ControlFlowFlag>([this, time, it]() { |
209 | SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); | 210 | SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); |
210 | future.setFinished(); | 211 | if (it->hasNext()) { |
211 | }) | 212 | return KAsync::Continue; |
212 | .exec(); | 213 | } |
214 | return KAsync::Break; | ||
215 | }); | ||
213 | }); | 216 | }); |
214 | } | 217 | } |
215 | 218 | ||
@@ -251,22 +254,19 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra | |||
251 | s >> expectedValue; | 254 | s >> expectedValue; |
252 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) | 255 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) |
253 | .then<void>( | 256 | .then<void>( |
254 | [=]() { | 257 | [=](const KAsync::Error &error) { |
255 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; | ||
256 | Sink::Notification n; | ||
257 | n.type = Sink::Notification::Inspection; | ||
258 | n.id = inspectionId; | ||
259 | n.code = Sink::Notification::Success; | ||
260 | emit notify(n); | ||
261 | }, | ||
262 | [=](int code, const QString &message) { | ||
263 | Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message; | ||
264 | Sink::Notification n; | 258 | Sink::Notification n; |
265 | n.type = Sink::Notification::Inspection; | 259 | n.type = Sink::Notification::Inspection; |
266 | n.message = message; | ||
267 | n.id = inspectionId; | 260 | n.id = inspectionId; |
268 | n.code = Sink::Notification::Failure; | 261 | if (error) { |
262 | Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage; | ||
263 | n.code = Sink::Notification::Failure; | ||
264 | } else { | ||
265 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; | ||
266 | n.code = Sink::Notification::Success; | ||
267 | } | ||
269 | emit notify(n); | 268 | emit notify(n); |
269 | return KAsync::null(); | ||
270 | }) | 270 | }) |
271 | .exec(); | 271 | .exec(); |
272 | return KAsync::null<void>(); | 272 | return KAsync::null<void>(); |
@@ -420,7 +420,7 @@ void GenericResource::processCommand(int commandId, const QByteArray &data) | |||
420 | 420 | ||
421 | KAsync::Job<void> GenericResource::synchronizeWithSource() | 421 | KAsync::Job<void> GenericResource::synchronizeWithSource() |
422 | { | 422 | { |
423 | return KAsync::start<void>([this](KAsync::Future<void> &future) { | 423 | return KAsync::start<void>([this]() { |
424 | 424 | ||
425 | Sink::Notification n; | 425 | Sink::Notification n; |
426 | n.id = "sync"; | 426 | n.id = "sync"; |
@@ -432,23 +432,21 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
432 | SinkLog() << " Synchronizing"; | 432 | SinkLog() << " Synchronizing"; |
433 | // Changereplay would deadlock otherwise when trying to open the synchronization store | 433 | // Changereplay would deadlock otherwise when trying to open the synchronization store |
434 | enableChangeReplay(false); | 434 | enableChangeReplay(false); |
435 | mSynchronizer->synchronize() | 435 | return mSynchronizer->synchronize() |
436 | .then<void>([this, &future]() { | 436 | .then<void>([this](const KAsync::Error &error) { |
437 | SinkLog() << "Done Synchronizing"; | ||
438 | Sink::Notification n; | ||
439 | n.id = "sync"; | ||
440 | n.type = Sink::Notification::Status; | ||
441 | n.message = "Synchronization has ended."; | ||
442 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
443 | emit notify(n); | ||
444 | |||
445 | enableChangeReplay(true); | ||
446 | future.setFinished(); | ||
447 | }, [this, &future](int errorCode, const QString &error) { | ||
448 | enableChangeReplay(true); | 437 | enableChangeReplay(true); |
449 | future.setError(errorCode, error); | 438 | if (!error) { |
450 | }) | 439 | SinkLog() << "Done Synchronizing"; |
451 | .exec(); | 440 | Sink::Notification n; |
441 | n.id = "sync"; | ||
442 | n.type = Sink::Notification::Status; | ||
443 | n.message = "Synchronization has ended."; | ||
444 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
445 | emit notify(n); | ||
446 | return KAsync::null(); | ||
447 | } | ||
448 | return KAsync::error(error); | ||
449 | }); | ||
452 | }); | 450 | }); |
453 | } | 451 | } |
454 | 452 | ||
diff --git a/common/listener.cpp b/common/listener.cpp index a051293..027d9ae 100644 --- a/common/listener.cpp +++ b/common/listener.cpp | |||
@@ -248,13 +248,17 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
248 | if (buffer->localSync()) { | 248 | if (buffer->localSync()) { |
249 | job = job.then<void>(loadResource().processAllMessages()); | 249 | job = job.then<void>(loadResource().processAllMessages()); |
250 | } | 250 | } |
251 | job.then<void>([callback, timer]() { | 251 | job.then<void>([callback, timer](const KAsync::Error &error) { |
252 | SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); | 252 | if (error) { |
253 | callback(true); | 253 | SinkWarning() << "Sync failed: " << error.errorMessage; |
254 | }, [callback](int errorCode, const QString &msg) { | 254 | callback(false); |
255 | SinkWarning() << "Sync failed: " << msg; | 255 | return KAsync::error(error); |
256 | callback(false); | 256 | } else { |
257 | }) | 257 | SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); |
258 | callback(true); | ||
259 | return KAsync::null(); | ||
260 | } | ||
261 | }) | ||
258 | .exec(); | 262 | .exec(); |
259 | return; | 263 | return; |
260 | } else { | 264 | } else { |
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 3567a10..28eacb7 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp | |||
@@ -5,37 +5,6 @@ | |||
5 | 5 | ||
6 | SINK_DEBUG_AREA("messagequeue") | 6 | SINK_DEBUG_AREA("messagequeue") |
7 | 7 | ||
8 | static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void>> &futures) | ||
9 | { | ||
10 | auto context = new QObject; | ||
11 | return KAsync::start<void>([futures, context](KAsync::Future<void> &future) { | ||
12 | const auto total = futures.size(); | ||
13 | auto count = QSharedPointer<int>::create(); | ||
14 | int i = 0; | ||
15 | for (KAsync::Future<void> subFuture : futures) { | ||
16 | i++; | ||
17 | if (subFuture.isFinished()) { | ||
18 | *count += 1; | ||
19 | continue; | ||
20 | } | ||
21 | // FIXME bind lifetime all watcher to future (repectively the main job | ||
22 | auto watcher = QSharedPointer<KAsync::FutureWatcher<void>>::create(); | ||
23 | QObject::connect(watcher.data(), &KAsync::FutureWatcher<void>::futureReady, [count, total, &future]() { | ||
24 | *count += 1; | ||
25 | if (*count == total) { | ||
26 | future.setFinished(); | ||
27 | } | ||
28 | }); | ||
29 | watcher->setFuture(subFuture); | ||
30 | context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); | ||
31 | } | ||
32 | if (*count == total) { | ||
33 | future.setFinished(); | ||
34 | } | ||
35 | }) | ||
36 | .then<void>([context]() { delete context; }); | ||
37 | } | ||
38 | |||
39 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) | 8 | MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) |
40 | { | 9 | { |
41 | } | 10 | } |
@@ -101,7 +70,7 @@ void MessageQueue::dequeue(const std::function<void(void *ptr, int size, std::fu | |||
101 | return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) { | 70 | return KAsync::start<void>([&value, resultHandler](KAsync::Future<void> &future) { |
102 | resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); | 71 | resultHandler(const_cast<void *>(static_cast<const void *>(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); |
103 | }); | 72 | }); |
104 | }).then<void>([]() {}, [errorHandler](int error, const QString &errorString) { errorHandler(Error("messagequeue", error, errorString.toLatin1())); }).exec(); | 73 | }).onError([errorHandler](const KAsync::Error &error) { errorHandler(Error("messagequeue", error.errorCode, error.errorMessage.toLatin1())); }).exec(); |
105 | } | 74 | } |
106 | 75 | ||
107 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) | 76 | KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::function<KAsync::Job<void>(const QByteArray &)> &resultHandler) |
@@ -135,8 +104,8 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi | |||
135 | }); | 104 | }); |
136 | 105 | ||
137 | // Trace() << "Waiting on " << waitCondition.size() << " results"; | 106 | // Trace() << "Waiting on " << waitCondition.size() << " results"; |
138 | ::waitForCompletion(waitCondition) | 107 | KAsync::waitForCompletion(waitCondition) |
139 | .then<void>([this, resultCount, &future]() { | 108 | .syncThen<void>([this, resultCount, &future]() { |
140 | processRemovals(); | 109 | processRemovals(); |
141 | if (*resultCount == 0) { | 110 | if (*resultCount == 0) { |
142 | future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found"); | 111 | future.setError(static_cast<int>(ErrorCodes::NoMessageFound), "No message found"); |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 1d45340..ce864f7 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -233,7 +233,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
233 | 233 | ||
234 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 234 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
235 | 235 | ||
236 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); | 236 | return KAsync::value(newRevision); |
237 | } | 237 | } |
238 | 238 | ||
239 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 239 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
@@ -346,7 +346,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
346 | adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 346 | adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
347 | 347 | ||
348 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 348 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
349 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); | 349 | return KAsync::value(newRevision); |
350 | } | 350 | } |
351 | 351 | ||
352 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | 352 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) |
@@ -433,7 +433,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
433 | processor->deletedEntity(key, newRevision, *current, d->transaction); | 433 | processor->deletedEntity(key, newRevision, *current, d->transaction); |
434 | } | 434 | } |
435 | 435 | ||
436 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); | 436 | return KAsync::value(newRevision); |
437 | } | 437 | } |
438 | 438 | ||
439 | void Pipeline::cleanupRevision(qint64 revision) | 439 | void Pipeline::cleanupRevision(qint64 revision) |
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 2e2e96d..052db39 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -86,7 +86,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
86 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); | 86 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); |
87 | return newRevisionAndReplayedEntities; | 87 | return newRevisionAndReplayedEntities; |
88 | }) | 88 | }) |
89 | .template then<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { | 89 | .template syncThen<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { |
90 | mOffset[parentId] += newRevisionAndReplayedEntities.second; | 90 | mOffset[parentId] += newRevisionAndReplayedEntities.second; |
91 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 91 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
92 | if (query.liveQuery) { | 92 | if (query.liveQuery) { |
@@ -110,7 +110,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
110 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); | 110 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); |
111 | return newRevisionAndReplayedEntities; | 111 | return newRevisionAndReplayedEntities; |
112 | }) | 112 | }) |
113 | .template then<void, QPair<qint64, qint64> >([query, this, resultProvider](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { | 113 | .template syncThen<void, QPair<qint64, qint64> >([query, this, resultProvider](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { |
114 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 114 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
115 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); | 115 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); |
116 | resultProvider->setRevision(newRevisionAndReplayedEntities.first); | 116 | resultProvider->setRevision(newRevisionAndReplayedEntities.first); |
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 7b4d839..364616c 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -159,67 +159,65 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect() | |||
159 | // We may have a socket from the last connection leftover | 159 | // We may have a socket from the last connection leftover |
160 | socket.reset(); | 160 | socket.reset(); |
161 | auto counter = QSharedPointer<int>::create(0); | 161 | auto counter = QSharedPointer<int>::create(0); |
162 | return KAsync::dowhile([this]() -> bool { return !socket; }, | 162 | return KAsync::dowhile( |
163 | [this, counter](KAsync::Future<void> &future) { | 163 | [this, counter]() { |
164 | SinkTrace() << "Loop"; | 164 | SinkTrace() << "Loop"; |
165 | connectToServer(resourceInstanceIdentifier) | 165 | return connectToServer(resourceInstanceIdentifier) |
166 | .then<void, QSharedPointer<QLocalSocket>>( | 166 | .then<KAsync::ControlFlowFlag, QSharedPointer<QLocalSocket>>( |
167 | [this, &future](const QSharedPointer<QLocalSocket> &s) { | 167 | [this, counter](const KAsync::Error &error, const QSharedPointer<QLocalSocket> &s) { |
168 | Q_ASSERT(s); | 168 | if (error) { |
169 | socket = s; | 169 | static int waitTime = 10; |
170 | future.setFinished(); | 170 | static int timeout = 500; |
171 | }, | 171 | static int maxRetries = timeout / waitTime; |
172 | [&future, counter, this](int errorCode, const QString &errorString) { | 172 | if (*counter > maxRetries) { |
173 | static int waitTime = 10; | 173 | SinkTrace() << "Giving up"; |
174 | static int timeout = 500; | 174 | return KAsync::error<KAsync::ControlFlowFlag>("Failed to connect to socket"); |
175 | static int maxRetries = timeout / waitTime; | 175 | } else { |
176 | if (*counter > maxRetries) { | 176 | *counter = *counter + 1; |
177 | SinkTrace() << "Giving up"; | 177 | return KAsync::wait(waitTime).then(KAsync::value(KAsync::Continue)); |
178 | future.setError(-1, "Failed to connect to socket"); | 178 | } |
179 | } else { | 179 | } else { |
180 | KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec(); | 180 | Q_ASSERT(s); |
181 | socket = s; | ||
182 | return KAsync::value(KAsync::Break); | ||
181 | } | 183 | } |
182 | *counter = *counter + 1; | 184 | }); |
183 | }) | ||
184 | .exec(); | ||
185 | }); | 185 | }); |
186 | } | 186 | } |
187 | 187 | ||
188 | KAsync::Job<void> ResourceAccess::Private::initializeSocket() | 188 | KAsync::Job<void> ResourceAccess::Private::initializeSocket() |
189 | { | 189 | { |
190 | return KAsync::start<void>([this](KAsync::Future<void> &future) { | 190 | return KAsync::start<void>([this] { |
191 | SinkTrace() << "Trying to connect"; | 191 | SinkTrace() << "Trying to connect"; |
192 | connectToServer(resourceInstanceIdentifier) | 192 | return connectToServer(resourceInstanceIdentifier) |
193 | .then<void, QSharedPointer<QLocalSocket>>( | 193 | .then<void, QSharedPointer<QLocalSocket>>( |
194 | [this, &future](const QSharedPointer<QLocalSocket> &s) { | 194 | [this](const KAsync::Error &error, const QSharedPointer<QLocalSocket> &s) { |
195 | SinkTrace() << "Connected to resource, without having to start it."; | 195 | if (error) { |
196 | Q_ASSERT(s); | 196 | SinkTrace() << "Failed to connect, starting resource"; |
197 | socket = s; | 197 | // We failed to connect, so let's start the resource |
198 | future.setFinished(); | 198 | QStringList args; |
199 | }, | 199 | if (Sink::Test::testModeEnabled()) { |
200 | [this, &future](int errorCode, const QString &errorString) { | 200 | args << "--test"; |
201 | SinkTrace() << "Failed to connect, starting resource"; | 201 | } |
202 | // We failed to connect, so let's start the resource | 202 | args << resourceInstanceIdentifier << resourceName; |
203 | QStringList args; | 203 | qint64 pid = 0; |
204 | if (Sink::Test::testModeEnabled()) { | 204 | if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { |
205 | args << "--test"; | 205 | SinkTrace() << "Started resource " << pid; |
206 | } | 206 | return tryToConnect() |
207 | args << resourceInstanceIdentifier << resourceName; | 207 | .onError([this](const KAsync::Error &error) { |
208 | qint64 pid = 0; | 208 | SinkWarning() << "Failed to connect to started resource"; |
209 | if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { | 209 | }); |
210 | SinkTrace() << "Started resource " << pid; | 210 | } else { |
211 | tryToConnect() | 211 | SinkWarning() << "Failed to start resource"; |
212 | .then<void>([&future]() { future.setFinished(); }, | 212 | } |
213 | [this, &future](int errorCode, const QString &errorString) { | 213 | return KAsync::null(); |
214 | SinkWarning() << "Failed to connect to started resource"; | ||
215 | future.setError(errorCode, errorString); | ||
216 | }) | ||
217 | .exec(); | ||
218 | } else { | 214 | } else { |
219 | SinkWarning() << "Failed to start resource"; | 215 | SinkTrace() << "Connected to resource, without having to start it."; |
216 | Q_ASSERT(s); | ||
217 | socket = s; | ||
218 | return KAsync::null(); | ||
220 | } | 219 | } |
221 | }) | 220 | }); |
222 | .exec(); | ||
223 | }); | 221 | }); |
224 | } | 222 | } |
225 | 223 | ||
@@ -383,17 +381,18 @@ void ResourceAccess::open() | |||
383 | d->openingSocket = true; | 381 | d->openingSocket = true; |
384 | d->initializeSocket() | 382 | d->initializeSocket() |
385 | .then<void>( | 383 | .then<void>( |
386 | [this, time]() { | 384 | [this, time](const KAsync::Error &error) { |
387 | SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); | ||
388 | d->openingSocket = false; | 385 | d->openingSocket = false; |
389 | QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); | 386 | if (error) { |
390 | QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); | 387 | SinkWarning() << "Failed to initialize socket " << error.errorMessage; |
391 | QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); | 388 | } else { |
392 | connected(); | 389 | SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); |
393 | }, | 390 | QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); |
394 | [this](int error, const QString &errorString) { | 391 | QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); |
395 | d->openingSocket = false; | 392 | QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); |
396 | SinkWarning() << "Failed to initialize socket " << errorString; | 393 | connected(); |
394 | } | ||
395 | return KAsync::null(); | ||
397 | }) | 396 | }) |
398 | .exec(); | 397 | .exec(); |
399 | } | 398 | } |
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp index 7d092a4..f509318 100644 --- a/common/resourcecontrol.cpp +++ b/common/resourcecontrol.cpp | |||
@@ -41,22 +41,22 @@ KAsync::Job<void> ResourceControl::shutdown(const QByteArray &identifier) | |||
41 | time->start(); | 41 | time->start(); |
42 | return ResourceAccess::connectToServer(identifier) | 42 | return ResourceAccess::connectToServer(identifier) |
43 | .then<void, QSharedPointer<QLocalSocket>>( | 43 | .then<void, QSharedPointer<QLocalSocket>>( |
44 | [identifier, time](QSharedPointer<QLocalSocket> socket, KAsync::Future<void> &future) { | 44 | [identifier, time](const KAsync::Error &error, QSharedPointer<QLocalSocket> socket) { |
45 | if (error) { | ||
46 | SinkTrace() << "Resource is already closed."; | ||
47 | // Resource isn't started, nothing to shutdown | ||
48 | return KAsync::null(); | ||
49 | } | ||
45 | // We can't currently reuse the socket | 50 | // We can't currently reuse the socket |
46 | socket->close(); | 51 | socket->close(); |
47 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); | 52 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); |
48 | resourceAccess->open(); | 53 | resourceAccess->open(); |
49 | resourceAccess->sendCommand(Sink::Commands::ShutdownCommand) | 54 | return resourceAccess->sendCommand(Sink::Commands::ShutdownCommand) |
50 | .then<void>([&future, resourceAccess, time]() { | 55 | .addToContext(resourceAccess) |
56 | .syncThen<void>([resourceAccess, time]() { | ||
51 | resourceAccess->close(); | 57 | resourceAccess->close(); |
52 | SinkTrace() << "Shutdown complete." << Log::TraceTime(time->elapsed()); | 58 | SinkTrace() << "Shutdown complete." << Log::TraceTime(time->elapsed()); |
53 | future.setFinished(); | 59 | }); |
54 | }) | ||
55 | .exec(); | ||
56 | }, | ||
57 | [](int, const QString &) { | ||
58 | SinkTrace() << "Resource is already closed."; | ||
59 | // Resource isn't started, nothing to shutdown | ||
60 | }); | 60 | }); |
61 | } | 61 | } |
62 | 62 | ||
@@ -67,18 +67,19 @@ KAsync::Job<void> ResourceControl::start(const QByteArray &identifier) | |||
67 | time->start(); | 67 | time->start(); |
68 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); | 68 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); |
69 | resourceAccess->open(); | 69 | resourceAccess->open(); |
70 | return resourceAccess->sendCommand(Sink::Commands::PingCommand).then<void>([resourceAccess, time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); }); | 70 | return resourceAccess->sendCommand(Sink::Commands::PingCommand).addToContext(resourceAccess).syncThen<void>([time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); }); |
71 | } | 71 | } |
72 | 72 | ||
73 | KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier) | 73 | KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier) |
74 | { | 74 | { |
75 | SinkTrace() << "flushMessageQueue" << resourceIdentifier; | 75 | SinkTrace() << "flushMessageQueue" << resourceIdentifier; |
76 | return KAsync::iterate(resourceIdentifier) | 76 | return KAsync::value(resourceIdentifier) |
77 | .template each<void, QByteArray>([](const QByteArray &resource, KAsync::Future<void> &future) { | 77 | .template each([](const QByteArray &resource) { |
78 | SinkTrace() << "Flushing message queue " << resource; | 78 | SinkTrace() << "Flushing message queue " << resource; |
79 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); | 79 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); |
80 | resourceAccess->open(); | 80 | resourceAccess->open(); |
81 | resourceAccess->synchronizeResource(false, true).then<void>([&future, resourceAccess]() { future.setFinished(); }).exec(); | 81 | return resourceAccess->synchronizeResource(false, true) |
82 | .addToContext(resourceAccess); | ||
82 | }); | 83 | }); |
83 | } | 84 | } |
84 | 85 | ||
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index bf4239d..1c56fe5 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp | |||
@@ -167,7 +167,7 @@ template <typename DomainType> | |||
167 | KAsync::Job<void> LocalStorageFacade<DomainType>::create(const DomainType &domainObject) | 167 | KAsync::Job<void> LocalStorageFacade<DomainType>::create(const DomainType &domainObject) |
168 | { | 168 | { |
169 | auto configStoreIdentifier = mIdentifier; | 169 | auto configStoreIdentifier = mIdentifier; |
170 | return KAsync::start<void>([domainObject, configStoreIdentifier]() { | 170 | return KAsync::syncStart<void>([domainObject, configStoreIdentifier]() { |
171 | const QByteArray type = domainObject.getProperty("type").toByteArray(); | 171 | const QByteArray type = domainObject.getProperty("type").toByteArray(); |
172 | const QByteArray providedIdentifier = domainObject.identifier().isEmpty() ? domainObject.getProperty("identifier").toByteArray() : domainObject.identifier(); | 172 | const QByteArray providedIdentifier = domainObject.identifier().isEmpty() ? domainObject.getProperty("identifier").toByteArray() : domainObject.identifier(); |
173 | const QByteArray identifier = providedIdentifier.isEmpty() ? ResourceConfig::newIdentifier(type) : providedIdentifier; | 173 | const QByteArray identifier = providedIdentifier.isEmpty() ? ResourceConfig::newIdentifier(type) : providedIdentifier; |
@@ -192,7 +192,7 @@ template <typename DomainType> | |||
192 | KAsync::Job<void> LocalStorageFacade<DomainType>::modify(const DomainType &domainObject) | 192 | KAsync::Job<void> LocalStorageFacade<DomainType>::modify(const DomainType &domainObject) |
193 | { | 193 | { |
194 | auto configStoreIdentifier = mIdentifier; | 194 | auto configStoreIdentifier = mIdentifier; |
195 | return KAsync::start<void>([domainObject, configStoreIdentifier]() { | 195 | return KAsync::syncStart<void>([domainObject, configStoreIdentifier]() { |
196 | const QByteArray identifier = domainObject.identifier(); | 196 | const QByteArray identifier = domainObject.identifier(); |
197 | if (identifier.isEmpty()) { | 197 | if (identifier.isEmpty()) { |
198 | SinkWarning() << "We need an \"identifier\" property to identify the entity to configure."; | 198 | SinkWarning() << "We need an \"identifier\" property to identify the entity to configure."; |
@@ -220,7 +220,7 @@ template <typename DomainType> | |||
220 | KAsync::Job<void> LocalStorageFacade<DomainType>::remove(const DomainType &domainObject) | 220 | KAsync::Job<void> LocalStorageFacade<DomainType>::remove(const DomainType &domainObject) |
221 | { | 221 | { |
222 | auto configStoreIdentifier = mIdentifier; | 222 | auto configStoreIdentifier = mIdentifier; |
223 | return KAsync::start<void>([domainObject, configStoreIdentifier]() { | 223 | return KAsync::syncStart<void>([domainObject, configStoreIdentifier]() { |
224 | const QByteArray identifier = domainObject.identifier(); | 224 | const QByteArray identifier = domainObject.identifier(); |
225 | if (identifier.isEmpty()) { | 225 | if (identifier.isEmpty()) { |
226 | SinkWarning() << "We need an \"identifier\" property to identify the entity to configure"; | 226 | SinkWarning() << "We need an \"identifier\" property to identify the entity to configure"; |
diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp index fe996cb..702d8e3 100644 --- a/common/sourcewriteback.cpp +++ b/common/sourcewriteback.cpp | |||
@@ -106,7 +106,7 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr | |||
106 | job = replay(mail, operation, oldRemoteId, modifiedProperties); | 106 | job = replay(mail, operation, oldRemoteId, modifiedProperties); |
107 | } | 107 | } |
108 | 108 | ||
109 | return job.then<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { | 109 | return job.syncThen<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { |
110 | if (operation == Sink::Operation_Creation) { | 110 | if (operation == Sink::Operation_Creation) { |
111 | SinkTrace() << "Replayed creation with remote id: " << remoteId; | 111 | SinkTrace() << "Replayed creation with remote id: " << remoteId; |
112 | if (remoteId.isEmpty()) { | 112 | if (remoteId.isEmpty()) { |
@@ -127,13 +127,11 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr | |||
127 | } else { | 127 | } else { |
128 | SinkError() << "Unkown operation" << operation; | 128 | SinkError() << "Unkown operation" << operation; |
129 | } | 129 | } |
130 | 130 | }) | |
131 | mSyncStore.clear(); | 131 | .syncThen<void>([this](const KAsync::Error &error) { |
132 | mEntityStore.clear(); | 132 | if (error) { |
133 | mTransaction.abort(); | 133 | SinkWarning() << "Failed to replay change: " << error.errorMessage; |
134 | mSyncTransaction.commit(); | 134 | } |
135 | }, [this](int errorCode, const QString &errorMessage) { | ||
136 | SinkWarning() << "Failed to replay change: " << errorMessage; | ||
137 | mSyncStore.clear(); | 135 | mSyncStore.clear(); |
138 | mEntityStore.clear(); | 136 | mEntityStore.clear(); |
139 | mTransaction.abort(); | 137 | mTransaction.abort(); |
diff --git a/common/store.cpp b/common/store.cpp index 07f41f8..c01d220 100644 --- a/common/store.cpp +++ b/common/store.cpp | |||
@@ -39,6 +39,8 @@ | |||
39 | SINK_DEBUG_AREA("store") | 39 | SINK_DEBUG_AREA("store") |
40 | 40 | ||
41 | Q_DECLARE_METATYPE(QSharedPointer<Sink::ResultEmitter<Sink::ApplicationDomain::SinkResource::Ptr>>) | 41 | Q_DECLARE_METATYPE(QSharedPointer<Sink::ResultEmitter<Sink::ApplicationDomain::SinkResource::Ptr>>) |
42 | Q_DECLARE_METATYPE(QSharedPointer<Sink::ResourceAccessInterface>); | ||
43 | Q_DECLARE_METATYPE(std::shared_ptr<void>); | ||
42 | 44 | ||
43 | namespace Sink { | 45 | namespace Sink { |
44 | 46 | ||
@@ -169,12 +171,10 @@ QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) | |||
169 | result.first.exec(); | 171 | result.first.exec(); |
170 | } | 172 | } |
171 | 173 | ||
172 | KAsync::iterate(resources.keys()) | 174 | KAsync::value(resources.keys()) |
173 | .template each<void, QByteArray>([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier, KAsync::Future<void> &future) { | 175 | .template each([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier) { |
174 | const auto resourceType = resources.value(resourceInstanceIdentifier); | 176 | const auto resourceType = resources.value(resourceInstanceIdentifier); |
175 | queryResource<DomainType>(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter).template then<void>([&future]() { | 177 | return queryResource<DomainType>(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter); |
176 | future.setFinished(); | ||
177 | }).exec(); | ||
178 | }) | 178 | }) |
179 | .exec(); | 179 | .exec(); |
180 | model->fetchMore(QModelIndex()); | 180 | model->fetchMore(QModelIndex()); |
@@ -201,7 +201,7 @@ KAsync::Job<void> Store::create(const DomainType &domainObject) | |||
201 | { | 201 | { |
202 | // Potentially move to separate thread as well | 202 | // Potentially move to separate thread as well |
203 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | 203 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); |
204 | return facade->create(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to create"; }); | 204 | return facade->create(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create"; }); |
205 | } | 205 | } |
206 | 206 | ||
207 | template <class DomainType> | 207 | template <class DomainType> |
@@ -209,7 +209,7 @@ KAsync::Job<void> Store::modify(const DomainType &domainObject) | |||
209 | { | 209 | { |
210 | // Potentially move to separate thread as well | 210 | // Potentially move to separate thread as well |
211 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | 211 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); |
212 | return facade->modify(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to modify"; }); | 212 | return facade->modify(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; }); |
213 | } | 213 | } |
214 | 214 | ||
215 | template <class DomainType> | 215 | template <class DomainType> |
@@ -217,7 +217,7 @@ KAsync::Job<void> Store::remove(const DomainType &domainObject) | |||
217 | { | 217 | { |
218 | // Potentially move to separate thread as well | 218 | // Potentially move to separate thread as well |
219 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | 219 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); |
220 | return facade->remove(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to remove"; }); | 220 | return facade->remove(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove"; }); |
221 | } | 221 | } |
222 | 222 | ||
223 | KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) | 223 | KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) |
@@ -231,6 +231,7 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) | |||
231 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); | 231 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); |
232 | resourceAccess->open(); | 232 | resourceAccess->open(); |
233 | return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) | 233 | return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) |
234 | .addToContext(resourceAccess) | ||
234 | .then<void>([resourceAccess](KAsync::Future<void> &future) { | 235 | .then<void>([resourceAccess](KAsync::Future<void> &future) { |
235 | if (resourceAccess->isReady()) { | 236 | if (resourceAccess->isReady()) { |
236 | //Wait for the resource shutdown | 237 | //Wait for the resource shutdown |
@@ -243,8 +244,8 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) | |||
243 | future.setFinished(); | 244 | future.setFinished(); |
244 | } | 245 | } |
245 | }) | 246 | }) |
246 | .then<void>([resourceAccess, time]() { | 247 | .syncThen<void>([time]() { |
247 | SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); | 248 | SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); |
248 | }); | 249 | }); |
249 | } | 250 | } |
250 | 251 | ||
@@ -253,41 +254,38 @@ KAsync::Job<void> Store::synchronize(const Sink::Query &query) | |||
253 | SinkTrace() << "synchronize" << query.resources; | 254 | SinkTrace() << "synchronize" << query.resources; |
254 | auto resources = getResources(query.resources, query.accounts).keys(); | 255 | auto resources = getResources(query.resources, query.accounts).keys(); |
255 | //FIXME only necessary because each doesn't propagate errors | 256 | //FIXME only necessary because each doesn't propagate errors |
256 | auto error = new bool; | 257 | auto errorFlag = new bool; |
257 | return KAsync::iterate(resources) | 258 | return KAsync::value(resources) |
258 | .template each<void, QByteArray>([query, error](const QByteArray &resource, KAsync::Future<void> &future) { | 259 | .template each([query, errorFlag](const QByteArray &resource) { |
259 | SinkTrace() << "Synchronizing " << resource; | 260 | SinkTrace() << "Synchronizing " << resource; |
260 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); | 261 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); |
261 | resourceAccess->open(); | 262 | resourceAccess->open(); |
262 | resourceAccess->synchronizeResource(true, false).then<void>([resourceAccess, &future]() {SinkTrace() << "synced."; future.setFinished(); }, | 263 | return resourceAccess->synchronizeResource(true, false) |
263 | [&future, error](int errorCode, QString msg) { *error = true; SinkWarning() << "Error during sync."; future.setError(errorCode, msg); }).exec(); | 264 | .addToContext(resourceAccess) |
264 | }).then<void>([error](KAsync::Future<void> &future) { | 265 | .then<void>([errorFlag](const KAsync::Error &error) { |
265 | if (*error) { | 266 | if (error) { |
266 | future.setError(1, "Error during sync."); | 267 | *errorFlag = true; |
267 | } else { | 268 | SinkWarning() << "Error during sync."; |
268 | future.setFinished(); | 269 | return KAsync::error<void>(error); |
270 | } | ||
271 | SinkTrace() << "synced."; | ||
272 | return KAsync::null<void>(); | ||
273 | }); | ||
274 | }) | ||
275 | .then<void>([errorFlag]() { | ||
276 | if (*errorFlag) { | ||
277 | return KAsync::error<void>("Error during sync."); | ||
269 | } | 278 | } |
270 | delete error; | 279 | delete errorFlag; |
280 | return KAsync::null<void>(); | ||
271 | }); | 281 | }); |
272 | } | 282 | } |
273 | 283 | ||
274 | template <class DomainType> | 284 | template <class DomainType> |
275 | KAsync::Job<DomainType> Store::fetchOne(const Sink::Query &query) | 285 | KAsync::Job<DomainType> Store::fetchOne(const Sink::Query &query) |
276 | { | 286 | { |
277 | return KAsync::start<DomainType>([query](KAsync::Future<DomainType> &future) { | 287 | return fetch<DomainType>(query, 1).template then<DomainType, QList<typename DomainType::Ptr>>([](const QList<typename DomainType::Ptr> &list) { |
278 | // FIXME We could do this more elegantly if composed jobs would have the correct type (In that case we'd simply return the value from then continuation, and could avoid the | 288 | return KAsync::value(*list.first()); |
279 | // outer job entirely) | ||
280 | fetch<DomainType>(query, 1) | ||
281 | .template then<void, QList<typename DomainType::Ptr>>( | ||
282 | [&future](const QList<typename DomainType::Ptr> &list) { | ||
283 | future.setValue(*list.first()); | ||
284 | future.setFinished(); | ||
285 | }, | ||
286 | [&future](int errorCode, const QString &errorMessage) { | ||
287 | future.setError(errorCode, errorMessage); | ||
288 | future.setFinished(); | ||
289 | }) | ||
290 | .exec(); | ||
291 | }); | 289 | }); |
292 | } | 290 | } |
293 | 291 | ||
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 2d4fb8d..15a06e7 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -244,7 +244,7 @@ KAsync::Job<void> Synchronizer::synchronize() | |||
244 | SinkTrace() << "Synchronizing"; | 244 | SinkTrace() << "Synchronizing"; |
245 | mSyncInProgress = true; | 245 | mSyncInProgress = true; |
246 | mMessageQueue->startTransaction(); | 246 | mMessageQueue->startTransaction(); |
247 | return synchronizeWithSource().then<void>([this]() { | 247 | return synchronizeWithSource().syncThen<void>([this]() { |
248 | mSyncStore.clear(); | 248 | mSyncStore.clear(); |
249 | mEntityStore.clear(); | 249 | mEntityStore.clear(); |
250 | mMessageQueue->commit(); | 250 | mMessageQueue->commit(); |
diff --git a/common/test.cpp b/common/test.cpp index 5b4c899..1a8e11d 100644 --- a/common/test.cpp +++ b/common/test.cpp | |||
@@ -156,7 +156,7 @@ public: | |||
156 | } | 156 | } |
157 | resultProvider->initialResultSetComplete(parent); | 157 | resultProvider->initialResultSetComplete(parent); |
158 | }); | 158 | }); |
159 | auto job = KAsync::start<void>([query, resultProvider]() {}); | 159 | auto job = KAsync::syncStart<void>([query, resultProvider]() {}); |
160 | return qMakePair(job, emitter); | 160 | return qMakePair(job, emitter); |
161 | } | 161 | } |
162 | 162 | ||