diff options
author | Dan Vrátil <dvratil@redhat.com> | 2014-12-11 16:09:28 +0100 |
---|---|---|
committer | Dan Vrátil <dvratil@redhat.com> | 2014-12-12 13:24:11 +0100 |
commit | e0ba51543037a026e2a8d483dfdc0e196e9dc59e (patch) | |
tree | 17bad6e8b23249ed4df7731258f2de9700e2fbf8 /async/src | |
parent | b30fe2fa42a717d6e89710cde82ecf7f419b2fe9 (diff) | |
download | sink-e0ba51543037a026e2a8d483dfdc0e196e9dc59e.tar.gz sink-e0ba51543037a026e2a8d483dfdc0e196e9dc59e.zip |
Async: add Reduce task
Diffstat (limited to 'async/src')
-rw-r--r-- | async/src/async.h | 44 |
1 files changed, 34 insertions, 10 deletions
diff --git a/async/src/async.h b/async/src/async.h index 0f027f5..a976fa2 100644 --- a/async/src/async.h +++ b/async/src/async.h | |||
@@ -80,10 +80,6 @@ template<typename Out, typename ... In> | |||
80 | class ThenExecutor: public Executor | 80 | class ThenExecutor: public Executor |
81 | { | 81 | { |
82 | 82 | ||
83 | typedef Out OutType; | ||
84 | typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type InType; | ||
85 | |||
86 | |||
87 | public: | 83 | public: |
88 | ThenExecutor(ThenTask<Out, In ...> then, Executor *parent = nullptr) | 84 | ThenExecutor(ThenTask<Out, In ...> then, Executor *parent = nullptr) |
89 | : Executor(parent) | 85 | : Executor(parent) |
@@ -93,10 +89,12 @@ public: | |||
93 | 89 | ||
94 | void exec() | 90 | void exec() |
95 | { | 91 | { |
96 | Async::Future<InType> *in = 0; | 92 | typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type PrevOut; |
93 | |||
94 | Async::Future<PrevOut> *in = 0; | ||
97 | if (mPrev) { | 95 | if (mPrev) { |
98 | mPrev->exec(); | 96 | mPrev->exec(); |
99 | in = static_cast<Async::Future<InType>*>(mPrev->result()); | 97 | in = static_cast<Async::Future<PrevOut>*>(mPrev->result()); |
100 | assert(in->isFinished()); | 98 | assert(in->isFinished()); |
101 | } | 99 | } |
102 | 100 | ||
@@ -133,6 +131,7 @@ public: | |||
133 | future.waitForFinished(); | 131 | future.waitForFinished(); |
134 | out->setValue(out->value() + future.value()); | 132 | out->setValue(out->value() + future.value()); |
135 | } | 133 | } |
134 | out->setFinished(); | ||
136 | 135 | ||
137 | mResult = out; | 136 | mResult = out; |
138 | } | 137 | } |
@@ -141,6 +140,32 @@ private: | |||
141 | std::function<void(const In&, Async::Future<Out>&)> mFunc; | 140 | std::function<void(const In&, Async::Future<Out>&)> mFunc; |
142 | }; | 141 | }; |
143 | 142 | ||
143 | template<typename Out, typename In> | ||
144 | class ReduceExecutor : public Executor | ||
145 | { | ||
146 | public: | ||
147 | ReduceExecutor(ReduceTask<Out, In> reduce, Executor *parent = nullptr) | ||
148 | : Executor(parent) | ||
149 | , mFunc(reduce) | ||
150 | { | ||
151 | } | ||
152 | |||
153 | void exec() | ||
154 | { | ||
155 | assert(mPrev); | ||
156 | mPrev->exec(); | ||
157 | Async::Future<In> *in = static_cast<Async::Future<In>*>(mPrev->result()); | ||
158 | |||
159 | auto out = new Async::Future<Out>(); | ||
160 | mFunc(in->value(), *out); | ||
161 | out->waitForFinished(); | ||
162 | mResult = out; | ||
163 | } | ||
164 | |||
165 | private: | ||
166 | std::function<void(const In &, Async::Future<Out> &)> mFunc; | ||
167 | }; | ||
168 | |||
144 | class JobBase | 169 | class JobBase |
145 | { | 170 | { |
146 | template<typename Out, typename ... In> | 171 | template<typename Out, typename ... In> |
@@ -187,8 +212,8 @@ public: | |||
187 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) | 212 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) |
188 | { | 213 | { |
189 | static_assert(Async::detail::isIterable<Out>::value, | 214 | static_assert(Async::detail::isIterable<Out>::value, |
190 | "The result type of 'Reduce' task must be a list or an array."); | 215 | "The 'Result' task can only be connected to a job that returns a list or array"); |
191 | //return Job<Out_, In_>::create(func, new ReduceEx, this); | 216 | return Job<OutOther, InOther>(new ReduceExecutor<OutOther, InOther>(func, mExecutor)); |
192 | } | 217 | } |
193 | 218 | ||
194 | Async::Future<Out> result() const | 219 | Async::Future<Out> result() const |
@@ -212,8 +237,7 @@ private: | |||
212 | template<typename Out, typename ... In> | 237 | template<typename Out, typename ... In> |
213 | Async::Job<Out, In ...> Async::start(ThenTask<Out, In ...> func) | 238 | Async::Job<Out, In ...> Async::start(ThenTask<Out, In ...> func) |
214 | { | 239 | { |
215 | Executor *exec = new ThenExecutor<Out, In ...>(func); | 240 | return Job<Out, In ...>(new ThenExecutor<Out, In ...>(func)); |
216 | return Job<Out, In ...>(exec); | ||
217 | } | 241 | } |
218 | 242 | ||
219 | #endif // ASYNC_H | 243 | #endif // ASYNC_H |