summaryrefslogtreecommitdiffstats
path: root/async/src
diff options
context:
space:
mode:
Diffstat (limited to 'async/src')
-rw-r--r--async/src/async.h44
1 files changed, 34 insertions, 10 deletions
diff --git a/async/src/async.h b/async/src/async.h
index 0f027f5..a976fa2 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -80,10 +80,6 @@ template<typename Out, typename ... In>
80class ThenExecutor: public Executor 80class ThenExecutor: public Executor
81{ 81{
82 82
83 typedef Out OutType;
84 typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type InType;
85
86
87public: 83public:
88 ThenExecutor(ThenTask<Out, In ...> then, Executor *parent = nullptr) 84 ThenExecutor(ThenTask<Out, In ...> then, Executor *parent = nullptr)
89 : Executor(parent) 85 : Executor(parent)
@@ -93,10 +89,12 @@ public:
93 89
94 void exec() 90 void exec()
95 { 91 {
96 Async::Future<InType> *in = 0; 92 typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type PrevOut;
93
94 Async::Future<PrevOut> *in = 0;
97 if (mPrev) { 95 if (mPrev) {
98 mPrev->exec(); 96 mPrev->exec();
99 in = static_cast<Async::Future<InType>*>(mPrev->result()); 97 in = static_cast<Async::Future<PrevOut>*>(mPrev->result());
100 assert(in->isFinished()); 98 assert(in->isFinished());
101 } 99 }
102 100
@@ -133,6 +131,7 @@ public:
133 future.waitForFinished(); 131 future.waitForFinished();
134 out->setValue(out->value() + future.value()); 132 out->setValue(out->value() + future.value());
135 } 133 }
134 out->setFinished();
136 135
137 mResult = out; 136 mResult = out;
138 } 137 }
@@ -141,6 +140,32 @@ private:
141 std::function<void(const In&, Async::Future<Out>&)> mFunc; 140 std::function<void(const In&, Async::Future<Out>&)> mFunc;
142}; 141};
143 142
143template<typename Out, typename In>
144class ReduceExecutor : public Executor
145{
146public:
147 ReduceExecutor(ReduceTask<Out, In> reduce, Executor *parent = nullptr)
148 : Executor(parent)
149 , mFunc(reduce)
150 {
151 }
152
153 void exec()
154 {
155 assert(mPrev);
156 mPrev->exec();
157 Async::Future<In> *in = static_cast<Async::Future<In>*>(mPrev->result());
158
159 auto out = new Async::Future<Out>();
160 mFunc(in->value(), *out);
161 out->waitForFinished();
162 mResult = out;
163 }
164
165private:
166 std::function<void(const In &, Async::Future<Out> &)> mFunc;
167};
168
144class JobBase 169class JobBase
145{ 170{
146 template<typename Out, typename ... In> 171 template<typename Out, typename ... In>
@@ -187,8 +212,8 @@ public:
187 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) 212 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func)
188 { 213 {
189 static_assert(Async::detail::isIterable<Out>::value, 214 static_assert(Async::detail::isIterable<Out>::value,
190 "The result type of 'Reduce' task must be a list or an array."); 215 "The 'Result' task can only be connected to a job that returns a list or array");
191 //return Job<Out_, In_>::create(func, new ReduceEx, this); 216 return Job<OutOther, InOther>(new ReduceExecutor<OutOther, InOther>(func, mExecutor));
192 } 217 }
193 218
194 Async::Future<Out> result() const 219 Async::Future<Out> result() const
@@ -212,8 +237,7 @@ private:
212template<typename Out, typename ... In> 237template<typename Out, typename ... In>
213Async::Job<Out, In ...> Async::start(ThenTask<Out, In ...> func) 238Async::Job<Out, In ...> Async::start(ThenTask<Out, In ...> func)
214{ 239{
215 Executor *exec = new ThenExecutor<Out, In ...>(func); 240 return Job<Out, In ...>(new ThenExecutor<Out, In ...>(func));
216 return Job<Out, In ...>(exec);
217} 241}
218 242
219#endif // ASYNC_H 243#endif // ASYNC_H