diff options
Diffstat (limited to 'async/src/async.h')
-rw-r--r-- | async/src/async.h | 200 |
1 files changed, 116 insertions, 84 deletions
diff --git a/async/src/async.h b/async/src/async.h index 182e57c..0f027f5 100644 --- a/async/src/async.h +++ b/async/src/async.h | |||
@@ -27,6 +27,7 @@ | |||
27 | #include <type_traits> | 27 | #include <type_traits> |
28 | #include <iostream> | 28 | #include <iostream> |
29 | #include <cassert> | 29 | #include <cassert> |
30 | #include <iterator> | ||
30 | 31 | ||
31 | #include "future.h" | 32 | #include "future.h" |
32 | #include "async_impl.h" | 33 | #include "async_impl.h" |
@@ -49,97 +50,159 @@ using ReduceTask = typename detail::identity<std::function<void(In, Async::Futur | |||
49 | template<typename Out, typename ... In> | 50 | template<typename Out, typename ... In> |
50 | Job<Out, In ...> start(ThenTask<Out, In ...> func); | 51 | Job<Out, In ...> start(ThenTask<Out, In ...> func); |
51 | 52 | ||
52 | namespace Private | 53 | class Executor |
53 | { | 54 | { |
54 | template<typename Out, typename ... In> | 55 | |
55 | void doExec(Job<Out, In ...> *job, Async::Future<Out> &out, const In & ... args); | 56 | public: |
56 | } | 57 | Executor(Executor *parent) |
58 | : mPrev(parent) | ||
59 | , mResult(0) | ||
60 | { | ||
61 | } | ||
62 | |||
63 | virtual ~Executor() | ||
64 | { | ||
65 | delete mResult; | ||
66 | } | ||
67 | |||
68 | virtual void exec() = 0; | ||
69 | |||
70 | FutureBase* result() const | ||
71 | { | ||
72 | return mResult; | ||
73 | } | ||
74 | |||
75 | Executor *mPrev; | ||
76 | FutureBase *mResult; | ||
77 | }; | ||
78 | |||
79 | template<typename Out, typename ... In> | ||
80 | class ThenExecutor: public Executor | ||
81 | { | ||
82 | |||
83 | typedef Out OutType; | ||
84 | typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type InType; | ||
85 | |||
86 | |||
87 | public: | ||
88 | ThenExecutor(ThenTask<Out, In ...> then, Executor *parent = nullptr) | ||
89 | : Executor(parent) | ||
90 | , mFunc(then) | ||
91 | { | ||
92 | } | ||
93 | |||
94 | void exec() | ||
95 | { | ||
96 | Async::Future<InType> *in = 0; | ||
97 | if (mPrev) { | ||
98 | mPrev->exec(); | ||
99 | in = static_cast<Async::Future<InType>*>(mPrev->result()); | ||
100 | assert(in->isFinished()); | ||
101 | } | ||
102 | |||
103 | auto out = new Async::Future<Out>(); | ||
104 | mFunc(in ? in->value() : In() ..., *out); | ||
105 | out->waitForFinished(); | ||
106 | mResult = out; | ||
107 | } | ||
108 | |||
109 | private: | ||
110 | std::function<void(const In& ..., Async::Future<Out>&)> mFunc; | ||
111 | }; | ||
112 | |||
113 | template<typename PrevOut, typename Out, typename In> | ||
114 | class EachExecutor : public Executor | ||
115 | { | ||
116 | public: | ||
117 | EachExecutor(EachTask<Out, In> each, Executor *parent = nullptr) | ||
118 | : Executor(parent) | ||
119 | , mFunc(each) | ||
120 | { | ||
121 | } | ||
122 | |||
123 | void exec() | ||
124 | { | ||
125 | assert(mPrev); | ||
126 | mPrev->exec(); | ||
127 | Async::Future<PrevOut> *in = static_cast<Async::Future<PrevOut>*>(mPrev->result()); | ||
128 | |||
129 | auto *out = new Async::Future<Out>(); | ||
130 | for (auto arg : in->value()) { | ||
131 | Async::Future<Out> future; | ||
132 | mFunc(arg, future); | ||
133 | future.waitForFinished(); | ||
134 | out->setValue(out->value() + future.value()); | ||
135 | } | ||
136 | |||
137 | mResult = out; | ||
138 | } | ||
139 | |||
140 | private: | ||
141 | std::function<void(const In&, Async::Future<Out>&)> mFunc; | ||
142 | }; | ||
57 | 143 | ||
58 | class JobBase | 144 | class JobBase |
59 | { | 145 | { |
60 | template<typename Out, typename ... In> | 146 | template<typename Out, typename ... In> |
61 | friend class Job; | 147 | friend class Job; |
62 | 148 | ||
63 | protected: | ||
64 | enum JobType { | ||
65 | Then, | ||
66 | Each, | ||
67 | Reduce | ||
68 | }; | ||
69 | |||
70 | public: | 149 | public: |
71 | JobBase(JobType jobType, JobBase *prev = nullptr); | 150 | JobBase(Executor *executor); |
72 | virtual void exec() = 0; | 151 | ~JobBase(); |
152 | |||
153 | void exec(); | ||
73 | 154 | ||
74 | protected: | 155 | protected: |
75 | JobBase *mPrev; | 156 | Executor *mExecutor; |
76 | void *mResult; | ||
77 | JobType mJobType; | ||
78 | }; | 157 | }; |
79 | 158 | ||
80 | template<typename Out, typename ... In> | 159 | template<typename Out, typename ... In> |
81 | class Job : public JobBase | 160 | class Job : public JobBase |
82 | { | 161 | { |
83 | template<typename Out_, typename ... In_> | 162 | template<typename OutOther, typename ... InOther> |
84 | friend class Job; | 163 | friend class Job; |
85 | 164 | ||
86 | template<typename Out_, typename ... In_> | 165 | template<typename OutOther, typename ... InOther> |
87 | friend Job<Out_, In_ ...> start(Async::ThenTask<Out_, In_ ...> func); | 166 | friend Job<OutOther, InOther ...> start(Async::ThenTask<OutOther, InOther ...> func); |
88 | |||
89 | typedef Out OutType; | ||
90 | typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type InType; | ||
91 | 167 | ||
92 | public: | 168 | public: |
93 | ~Job() | 169 | template<typename OutOther, typename ... InOther> |
170 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func) | ||
94 | { | 171 | { |
95 | delete reinterpret_cast<Async::Future<Out>*>(mResult); | 172 | Executor *exec = new ThenExecutor<OutOther, InOther ...>(func, mExecutor); |
173 | return Job<OutOther, InOther ...>(exec); | ||
96 | } | 174 | } |
97 | 175 | ||
98 | template<typename Out_, typename ... In_> | 176 | template<typename OutOther, typename InOther> |
99 | Job<Out_, In_ ...> then(ThenTask<Out_, In_ ...> func) | 177 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) |
100 | { | 178 | { |
101 | return Job<Out_, In_ ...>::create(func, JobBase::Then, this); | 179 | static_assert(detail::isIterable<Out>::value, |
102 | } | ||
103 | |||
104 | template<typename Out_, typename In_> | ||
105 | Job<Out_, In_> each(EachTask<Out_, In_> func) | ||
106 | { | ||
107 | static_assert(detail::isIterable<OutType>::value, | ||
108 | "The 'Each' task can only be connected to a job that returns a list or array."); | 180 | "The 'Each' task can only be connected to a job that returns a list or array."); |
109 | static_assert(detail::isIterable<Out_>::value, | 181 | static_assert(detail::isIterable<OutOther>::value, |
110 | "The result type of 'Each' task must be a list or an array."); | 182 | "The result type of 'Each' task must be a list or an array."); |
111 | return Job<Out_, In_>::create(func, JobBase::Each, this); | 183 | return Job<OutOther, InOther>(new EachExecutor<Out, OutOther, InOther>(func, mExecutor)); |
112 | } | 184 | } |
113 | 185 | ||
114 | template<typename Out_, typename In_> | 186 | template<typename OutOther, typename InOther> |
115 | Job<Out_, In_> reduce(ReduceTask<Out_, In_> func) | 187 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) |
116 | { | 188 | { |
117 | static_assert(Async::detail::isIterable<OutType>::value, | 189 | static_assert(Async::detail::isIterable<Out>::value, |
118 | "The result type of 'Reduce' task must be a list or an array."); | 190 | "The result type of 'Reduce' task must be a list or an array."); |
119 | return Job<Out_, In_>::create(func, JobBase::Reduce, this); | 191 | //return Job<Out_, In_>::create(func, new ReduceEx, this); |
120 | } | 192 | } |
121 | 193 | ||
122 | Async::Future<Out> result() const | 194 | Async::Future<Out> result() const |
123 | { | 195 | { |
124 | return *reinterpret_cast<Async::Future<Out>*>(mResult); | 196 | return *static_cast<Async::Future<Out>*>(mExecutor->result()); |
125 | } | 197 | } |
126 | 198 | ||
127 | void exec(); | ||
128 | |||
129 | private: | 199 | private: |
130 | Job(JobBase::JobType jobType, JobBase *parent = nullptr) | 200 | Job(Executor *executor) |
131 | : JobBase(jobType, parent) | 201 | : JobBase(executor) |
132 | { | 202 | { |
133 | } | 203 | } |
134 | |||
135 | template<typename F> | ||
136 | static Job<Out, In ... > create(F func, JobBase::JobType jobType, JobBase *parent = nullptr); | ||
137 | |||
138 | public: | ||
139 | std::function<void(In ..., Async::Future<Out>&)> mFunc; | ||
140 | }; | 204 | }; |
141 | 205 | ||
142 | |||
143 | } // namespace Async | 206 | } // namespace Async |
144 | 207 | ||
145 | 208 | ||
@@ -149,41 +212,10 @@ public: | |||
149 | template<typename Out, typename ... In> | 212 | template<typename Out, typename ... In> |
150 | Async::Job<Out, In ...> Async::start(ThenTask<Out, In ...> func) | 213 | Async::Job<Out, In ...> Async::start(ThenTask<Out, In ...> func) |
151 | { | 214 | { |
152 | return Job<Out, In ...>::create(func, JobBase::Then); | 215 | Executor *exec = new ThenExecutor<Out, In ...>(func); |
153 | } | 216 | return Job<Out, In ...>(exec); |
154 | |||
155 | template<typename Out, typename ... In> | ||
156 | void Async::Private::doExec(Job<Out, In ...> *job, Async::Future<Out> &out, const In & ... args) | ||
157 | { | ||
158 | job->mFunc(args ..., out); | ||
159 | }; | ||
160 | |||
161 | template<typename Out, typename ... In> | ||
162 | void Async::Job<Out, In ...>::exec() | ||
163 | { | ||
164 | Async::Future<InType> *in = nullptr; | ||
165 | if (mPrev) { | ||
166 | mPrev->exec(); | ||
167 | in = reinterpret_cast<Async::Future<InType>*>(mPrev->mResult); | ||
168 | assert(in->isFinished()); | ||
169 | } | ||
170 | |||
171 | auto out = new Async::Future<Out>(); | ||
172 | Private::doExec<Out, In ...>(this, *out, in ? in->value() : In() ...); | ||
173 | out->waitForFinished(); | ||
174 | mResult = reinterpret_cast<void*>(out); | ||
175 | } | ||
176 | |||
177 | template<typename Out, typename ... In> | ||
178 | template<typename F> | ||
179 | Async::Job<Out, In ...> Async::Job<Out, In ...>::create(F func, Async::JobBase::JobType jobType, Async::JobBase* parent) | ||
180 | { | ||
181 | Job<Out, In ...> job(jobType, parent); | ||
182 | job.mFunc = func; | ||
183 | return job; | ||
184 | } | 217 | } |
185 | 218 | ||
186 | |||
187 | #endif // ASYNC_H | 219 | #endif // ASYNC_H |
188 | 220 | ||
189 | 221 | ||