summaryrefslogtreecommitdiffstats
path: root/async/src/async.h
diff options
context:
space:
mode:
Diffstat (limited to 'async/src/async.h')
-rw-r--r--async/src/async.h196
1 files changed, 108 insertions, 88 deletions
diff --git a/async/src/async.h b/async/src/async.h
index 52d0570..66e455c 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -51,39 +51,30 @@ using EachTask = typename detail::identity<std::function<void(In, Async::Future<
51template<typename Out, typename In> 51template<typename Out, typename In>
52using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type; 52using ReduceTask = typename detail::identity<std::function<void(In, Async::Future<Out>&)>>::type;
53 53
54template<typename Out, typename ... In> 54namespace Private
55Job<Out, In ...> start(ThenTask<Out, In ...> func); 55{
56 56
57template<typename ... T> 57template<typename ... T>
58struct PreviousOut { 58struct PreviousOut {
59 using type = typename std::tuple_element<0, std::tuple<T ..., void>>::type; 59 using type = typename std::tuple_element<0, std::tuple<T ..., void>>::type;
60}; 60};
61 61
62
63class ExecutorBase 62class ExecutorBase
64{ 63{
65 template<typename PrevOut, typename Out, typename ... In> 64 template<typename PrevOut, typename Out, typename ... In>
66 friend class Executor; 65 friend class Executor;
67 66
68public: 67public:
69 virtual ~ExecutorBase() 68 virtual ~ExecutorBase();
70 {
71 delete mResult;
72 }
73
74 virtual void exec() = 0; 69 virtual void exec() = 0;
75 70
76 FutureBase* result() const 71 inline FutureBase* result() const
77 { 72 {
78 return mResult; 73 return mResult;
79 } 74 }
80 75
81protected: 76protected:
82 ExecutorBase(ExecutorBase *parent) 77 ExecutorBase(ExecutorBase *parent);
83 : mPrev(parent)
84 , mResult(0)
85 {
86 }
87 78
88 ExecutorBase *mPrev; 79 ExecutorBase *mPrev;
89 FutureBase *mResult; 80 FutureBase *mResult;
@@ -95,20 +86,9 @@ class Executor : public ExecutorBase
95protected: 86protected:
96 Executor(ExecutorBase *parent) 87 Executor(ExecutorBase *parent)
97 : ExecutorBase(parent) 88 : ExecutorBase(parent)
98 { 89 {}
99 } 90 virtual ~Executor() {}
100 91 inline Async::Future<PrevOut>* chainup();
101 Async::Future<PrevOut>* chainup()
102 {
103 if (mPrev) {
104 mPrev->exec();
105 auto future = static_cast<Async::Future<PrevOut>*>(mPrev->result());
106 assert(future->isFinished());
107 return future;
108 } else {
109 return 0;
110 }
111 }
112 92
113 std::function<void(const In& ..., Async::Future<Out> &)> mFunc; 93 std::function<void(const In& ..., Async::Future<Out> &)> mFunc;
114}; 94};
@@ -117,72 +97,28 @@ template<typename Out, typename ... In>
117class ThenExecutor: public Executor<typename PreviousOut<In ...>::type, Out, In ...> 97class ThenExecutor: public Executor<typename PreviousOut<In ...>::type, Out, In ...>
118{ 98{
119public: 99public:
120 ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase *parent = nullptr) 100 ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase *parent = nullptr);
121 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) 101 void exec();
122 {
123 this->mFunc = then;
124 }
125
126 void exec()
127 {
128 auto in = this->chainup();
129 (void)in; // supress 'unused variable' warning when In is void
130
131 auto out = new Async::Future<Out>();
132 this->mFunc(in ? in->value() : In() ..., *out);
133 out->waitForFinished();
134 this->mResult = out;
135 }
136}; 102};
137 103
138template<typename PrevOut, typename Out, typename In> 104template<typename PrevOut, typename Out, typename In>
139class EachExecutor : public Executor<PrevOut, Out, In> 105class EachExecutor : public Executor<PrevOut, Out, In>
140{ 106{
141public: 107public:
142 EachExecutor(EachTask<Out, In> each, ExecutorBase *parent) 108 EachExecutor(EachTask<Out, In> each, ExecutorBase *parent);
143 : Executor<PrevOut, Out, In>(parent) 109 void exec();
144 {
145 this->mFunc = each;
146 }
147
148 void exec()
149 {
150 auto in = this->chainup();
151
152 auto *out = new Async::Future<Out>();
153 for (auto arg : in->value()) {
154 Async::Future<Out> future;
155 this->mFunc(arg, future);
156 future.waitForFinished();
157 out->setValue(out->value() + future.value());
158 }
159 out->setFinished();
160
161 this->mResult = out;
162 }
163}; 110};
164 111
165template<typename Out, typename In> 112template<typename Out, typename In>
166class ReduceExecutor : public Executor<In, Out, In> 113class ReduceExecutor : public Executor<In, Out, In>
167{ 114{
168public: 115public:
169 ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase *parent) 116 ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase *parent);
170 : Executor<In, Out, In>(parent) 117 void exec();
171 {
172 this->mFunc = reduce;
173 }
174
175 void exec()
176 {
177 auto in = this->chainup();
178
179 auto out = new Async::Future<Out>();
180 this->mFunc(in->value(), *out);
181 out->waitForFinished();
182 this->mResult = out;
183 }
184}; 118};
185 119
120} // namespace Private
121
186 122
187class JobBase 123class JobBase
188{ 124{
@@ -190,13 +126,13 @@ class JobBase
190 friend class Job; 126 friend class Job;
191 127
192public: 128public:
193 JobBase(ExecutorBase *executor); 129 JobBase(Private::ExecutorBase *executor);
194 ~JobBase(); 130 ~JobBase();
195 131
196 void exec(); 132 void exec();
197 133
198protected: 134protected:
199 ExecutorBase *mExecutor; 135 Private::ExecutorBase *mExecutor;
200}; 136};
201 137
202template<typename Out, typename ... In> 138template<typename Out, typename ... In>
@@ -212,7 +148,7 @@ public:
212 template<typename OutOther, typename ... InOther> 148 template<typename OutOther, typename ... InOther>
213 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func) 149 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func)
214 { 150 {
215 return Job<OutOther, InOther ...>(new ThenExecutor<OutOther, InOther ...>(func, mExecutor)); 151 return Job<OutOther, InOther ...>(new Private::ThenExecutor<OutOther, InOther ...>(func, mExecutor));
216 } 152 }
217 153
218 template<typename OutOther, typename InOther> 154 template<typename OutOther, typename InOther>
@@ -222,7 +158,7 @@ public:
222 "The 'Each' task can only be connected to a job that returns a list or an array."); 158 "The 'Each' task can only be connected to a job that returns a list or an array.");
223 static_assert(detail::isIterable<OutOther>::value, 159 static_assert(detail::isIterable<OutOther>::value,
224 "The result type of 'Each' task must be a list or an array."); 160 "The result type of 'Each' task must be a list or an array.");
225 return Job<OutOther, InOther>(new EachExecutor<Out, OutOther, InOther>(func, mExecutor)); 161 return Job<OutOther, InOther>(new Private::EachExecutor<Out, OutOther, InOther>(func, mExecutor));
226 } 162 }
227 163
228 template<typename OutOther, typename InOther> 164 template<typename OutOther, typename InOther>
@@ -232,7 +168,7 @@ public:
232 "The 'Result' task can only be connected to a job that returns a list or an array"); 168 "The 'Result' task can only be connected to a job that returns a list or an array");
233 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, 169 static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value,
234 "The return type of previous task must be compatible with input type of this task"); 170 "The return type of previous task must be compatible with input type of this task");
235 return Job<OutOther, InOther>(new ReduceExecutor<OutOther, InOther>(func, mExecutor)); 171 return Job<OutOther, InOther>(new Private::ReduceExecutor<OutOther, InOther>(func, mExecutor));
236 } 172 }
237 173
238 Async::Future<Out> result() const 174 Async::Future<Out> result() const
@@ -241,7 +177,7 @@ public:
241 } 177 }
242 178
243private: 179private:
244 Job(ExecutorBase *executor) 180 Job(Private::ExecutorBase *executor)
245 : JobBase(executor) 181 : JobBase(executor)
246 { 182 {
247 } 183 }
@@ -252,12 +188,96 @@ private:
252 188
253// ********** Out of line definitions **************** 189// ********** Out of line definitions ****************
254 190
191namespace Async {
192
255template<typename Out, typename ... In> 193template<typename Out, typename ... In>
256Async::Job<Out, In ...> Async::start(ThenTask<Out, In ...> func) 194Job<Out, In ...> start(ThenTask<Out, In ...> func)
257{ 195{
258 return Job<Out, In ...>(new ThenExecutor<Out, In ...>(func)); 196 return Job<Out, In ...>(new Private::ThenExecutor<Out, In ...>(func));
259} 197}
260 198
199namespace Private {
200
201template<typename PrevOut, typename Out, typename ... In>
202Future<PrevOut>* Executor<PrevOut, Out, In ...>::chainup()
203{
204 if (mPrev) {
205 mPrev->exec();
206 auto future = static_cast<Async::Future<PrevOut>*>(mPrev->result());
207 assert(future->isFinished());
208 return future;
209 } else {
210 return 0;
211 }
212}
213
214template<typename Out, typename ... In>
215ThenExecutor<Out, In ...>::ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase* parent)
216 : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent)
217{
218 this->mFunc = then;
219}
220
221template<typename Out, typename ... In>
222void ThenExecutor<Out, In ...>::exec()
223{
224 auto in = this->chainup();
225 (void)in; // supress 'unused variable' warning when In is void
226
227 auto out = new Async::Future<Out>();
228 this->mFunc(in ? in->value() : In() ..., *out);
229 out->waitForFinished();
230 this->mResult = out;
231}
232
233template<typename PrevOut, typename Out, typename In>
234EachExecutor<PrevOut, Out, In>::EachExecutor(EachTask<Out, In> each, ExecutorBase* parent)
235 : Executor<PrevOut, Out, In>(parent)
236{
237 this->mFunc = each;
238}
239
240template<typename PrevOut, typename Out, typename In>
241void EachExecutor<PrevOut, Out, In>::exec()
242{
243 auto in = this->chainup();
244
245 auto *out = new Async::Future<Out>();
246 for (auto arg : in->value()) {
247 Async::Future<Out> future;
248 this->mFunc(arg, future);
249 future.waitForFinished();
250 out->setValue(out->value() + future.value());
251 }
252 out->setFinished();
253
254 this->mResult = out;
255}
256
257template<typename Out, typename In>
258ReduceExecutor<Out, In>::ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase* parent)
259 : Executor<In, Out, In>(parent)
260{
261 this->mFunc = reduce;
262}
263
264template<typename Out, typename In>
265void ReduceExecutor<Out, In>::exec()
266{
267 auto in = this->chainup();
268
269 auto out = new Async::Future<Out>();
270 this->mFunc(in->value(), *out);
271 out->waitForFinished();
272 this->mResult = out;
273}
274
275} // namespace Private
276
277} // namespace Async
278
279
280
261#endif // ASYNC_H 281#endif // ASYNC_H
262 282
263 283