summaryrefslogtreecommitdiffstats
path: root/async/src/async.h
diff options
context:
space:
mode:
Diffstat (limited to 'async/src/async.h')
-rw-r--r--async/src/async.h136
1 files changed, 77 insertions, 59 deletions
diff --git a/async/src/async.h b/async/src/async.h
index a976fa2..52d0570 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -28,6 +28,7 @@
28#include <iostream> 28#include <iostream>
29#include <cassert> 29#include <cassert>
30#include <iterator> 30#include <iterator>
31#include <boost/graph/graph_concepts.hpp>
31 32
32#include "future.h" 33#include "future.h"
33#include "async_impl.h" 34#include "async_impl.h"
@@ -35,6 +36,9 @@
35 36
36namespace Async { 37namespace Async {
37 38
39template<typename PrevOut, typename Out, typename ... In>
40class Executor;
41
38class JobBase; 42class JobBase;
39 43
40template<typename Out, typename ... In> 44template<typename Out, typename ... In>
@@ -50,17 +54,19 @@ using ReduceTask = typename detail::identity<std::function<void(In, Async::Futur
50template<typename Out, typename ... In> 54template<typename Out, typename ... In>
51Job<Out, In ...> start(ThenTask<Out, In ...> func); 55Job<Out, In ...> start(ThenTask<Out, In ...> func);
52 56
53class Executor 57template<typename ... T>
58struct PreviousOut {
59 using type = typename std::tuple_element<0, std::tuple<T ..., void>>::type;
60};
61
62
63class ExecutorBase
54{ 64{
65 template<typename PrevOut, typename Out, typename ... In>
66 friend class Executor;
55 67
56public: 68public:
57 Executor(Executor *parent) 69 virtual ~ExecutorBase()
58 : mPrev(parent)
59 , mResult(0)
60 {
61 }
62
63 virtual ~Executor()
64 { 70 {
65 delete mResult; 71 delete mResult;
66 } 72 }
@@ -72,113 +78,125 @@ public:
72 return mResult; 78 return mResult;
73 } 79 }
74 80
75 Executor *mPrev; 81protected:
82 ExecutorBase(ExecutorBase *parent)
83 : mPrev(parent)
84 , mResult(0)
85 {
86 }
87
88 ExecutorBase *mPrev;
76 FutureBase *mResult; 89 FutureBase *mResult;
77}; 90};
78 91
79template<typename Out, typename ... In> 92template<typename PrevOut, typename Out, typename ... In>
80class ThenExecutor: public Executor 93class Executor : public ExecutorBase
81{ 94{
82 95protected:
83public: 96 Executor(ExecutorBase *parent)
84 ThenExecutor(ThenTask<Out, In ...> then, Executor *parent = nullptr) 97 : ExecutorBase(parent)
85 : Executor(parent)
86 , mFunc(then)
87 { 98 {
88 } 99 }
89 100
90 void exec() 101 Async::Future<PrevOut>* chainup()
91 { 102 {
92 typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type PrevOut;
93
94 Async::Future<PrevOut> *in = 0;
95 if (mPrev) { 103 if (mPrev) {
96 mPrev->exec(); 104 mPrev->exec();
97 in = static_cast<Async::Future<PrevOut>*>(mPrev->result()); 105 auto future = static_cast<Async::Future<PrevOut>*>(mPrev->result());
98 assert(in->isFinished()); 106 assert(future->isFinished());
107 return future;
108 } else {
109 return 0;
99 } 110 }
111 }
112
113 std::function<void(const In& ..., Async::Future<Out> &)> mFunc;
114};
115
116template<typename Out, typename ... In>
117class ThenExecutor: public Executor<typename PreviousOut<In ...>::type, Out, In ...>
118{
119public:
120 ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase *parent = nullptr)
121 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent)
122 {
123 this->mFunc = then;
124 }
125
126 void exec()
127 {
128 auto in = this->chainup();
129 (void)in; // supress 'unused variable' warning when In is void
100 130
101 auto out = new Async::Future<Out>(); 131 auto out = new Async::Future<Out>();
102 mFunc(in ? in->value() : In() ..., *out); 132 this->mFunc(in ? in->value() : In() ..., *out);
103 out->waitForFinished(); 133 out->waitForFinished();
104 mResult = out; 134 this->mResult = out;
105 } 135 }
106
107private:
108 std::function<void(const In& ..., Async::Future<Out>&)> mFunc;
109}; 136};
110 137
111template<typename PrevOut, typename Out, typename In> 138template<typename PrevOut, typename Out, typename In>
112class EachExecutor : public Executor 139class EachExecutor : public Executor<PrevOut, Out, In>
113{ 140{
114public: 141public:
115 EachExecutor(EachTask<Out, In> each, Executor *parent = nullptr) 142 EachExecutor(EachTask<Out, In> each, ExecutorBase *parent)
116 : Executor(parent) 143 : Executor<PrevOut, Out, In>(parent)
117 , mFunc(each)
118 { 144 {
145 this->mFunc = each;
119 } 146 }
120 147
121 void exec() 148 void exec()
122 { 149 {
123 assert(mPrev); 150 auto in = this->chainup();
124 mPrev->exec();
125 Async::Future<PrevOut> *in = static_cast<Async::Future<PrevOut>*>(mPrev->result());
126 151
127 auto *out = new Async::Future<Out>(); 152 auto *out = new Async::Future<Out>();
128 for (auto arg : in->value()) { 153 for (auto arg : in->value()) {
129 Async::Future<Out> future; 154 Async::Future<Out> future;
130 mFunc(arg, future); 155 this->mFunc(arg, future);
131 future.waitForFinished(); 156 future.waitForFinished();
132 out->setValue(out->value() + future.value()); 157 out->setValue(out->value() + future.value());
133 } 158 }
134 out->setFinished(); 159 out->setFinished();
135 160
136 mResult = out; 161 this->mResult = out;
137 } 162 }
138
139private:
140 std::function<void(const In&, Async::Future<Out>&)> mFunc;
141}; 163};
142 164
143template<typename Out, typename In> 165template<typename Out, typename In>
144class ReduceExecutor : public Executor 166class ReduceExecutor : public Executor<In, Out, In>
145{ 167{
146public: 168public:
147 ReduceExecutor(ReduceTask<Out, In> reduce, Executor *parent = nullptr) 169 ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase *parent)
148 : Executor(parent) 170 : Executor<In, Out, In>(parent)
149 , mFunc(reduce)
150 { 171 {
172 this->mFunc = reduce;
151 } 173 }
152 174
153 void exec() 175 void exec()
154 { 176 {
155 assert(mPrev); 177 auto in = this->chainup();
156 mPrev->exec();
157 Async::Future<In> *in = static_cast<Async::Future<In>*>(mPrev->result());
158 178
159 auto out = new Async::Future<Out>(); 179 auto out = new Async::Future<Out>();
160 mFunc(in->value(), *out); 180 this->mFunc(in->value(), *out);
161 out->waitForFinished(); 181 out->waitForFinished();
162 mResult = out; 182 this->mResult = out;
163 } 183 }
164
165private:
166 std::function<void(const In &, Async::Future<Out> &)> mFunc;
167}; 184};
168 185
186
169class JobBase 187class JobBase
170{ 188{
171 template<typename Out, typename ... In> 189 template<typename Out, typename ... In>
172 friend class Job; 190 friend class Job;
173 191
174public: 192public:
175 JobBase(Executor *executor); 193 JobBase(ExecutorBase *executor);
176 ~JobBase(); 194 ~JobBase();
177 195
178 void exec(); 196 void exec();
179 197
180protected: 198protected:
181 Executor *mExecutor; 199 ExecutorBase *mExecutor;
182}; 200};
183 201
184template<typename Out, typename ... In> 202template<typename Out, typename ... In>
@@ -194,15 +212,14 @@ public:
194 template<typename OutOther, typename ... InOther> 212 template<typename OutOther, typename ... InOther>
195 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func) 213 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func)
196 { 214 {
197 Executor *exec = new ThenExecutor<OutOther, InOther ...>(func, mExecutor); 215 return Job<OutOther, InOther ...>(new ThenExecutor<OutOther, InOther ...>(func, mExecutor));
198 return Job<OutOther, InOther ...>(exec);
199 } 216 }
200 217
201 template<typename OutOther, typename InOther> 218 template<typename OutOther, typename InOther>
202 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) 219 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func)
203 { 220 {
204 static_assert(detail::isIterable<Out>::value, 221 static_assert(detail::isIterable<Out>::value,
205 "The 'Each' task can only be connected to a job that returns a list or array."); 222 "The 'Each' task can only be connected to a job that returns a list or an array.");
206 static_assert(detail::isIterable<OutOther>::value, 223 static_assert(detail::isIterable<OutOther>::value,
207 "The result type of 'Each' task must be a list or an array."); 224 "The result type of 'Each' task must be a list or an array.");
208 return Job<OutOther, InOther>(new EachExecutor<Out, OutOther, InOther>(func, mExecutor)); 225 return Job<OutOther, InOther>(new EachExecutor<Out, OutOther, InOther>(func, mExecutor));
@@ -212,7 +229,9 @@ public:
212 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) 229 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func)
213 { 230 {
214 static_assert(Async::detail::isIterable<Out>::value, 231 static_assert(Async::detail::isIterable<Out>::value,
215 "The 'Result' task can only be connected to a job that returns a list or array"); 232 "The 'Result' task can only be connected to a job that returns a list or an array");
233 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
234 "The return type of previous task must be compatible with input type of this task");
216 return Job<OutOther, InOther>(new ReduceExecutor<OutOther, InOther>(func, mExecutor)); 235 return Job<OutOther, InOther>(new ReduceExecutor<OutOther, InOther>(func, mExecutor));
217 } 236 }
218 237
@@ -222,7 +241,7 @@ public:
222 } 241 }
223 242
224private: 243private:
225 Job(Executor *executor) 244 Job(ExecutorBase *executor)
226 : JobBase(executor) 245 : JobBase(executor)
227 { 246 {
228 } 247 }
@@ -231,7 +250,6 @@ private:
231} // namespace Async 250} // namespace Async
232 251
233 252
234
235// ********** Out of line definitions **************** 253// ********** Out of line definitions ****************
236 254
237template<typename Out, typename ... In> 255template<typename Out, typename ... In>