diff options
author | Dan Vrátil <dvratil@redhat.com> | 2014-12-11 18:24:24 +0100 |
---|---|---|
committer | Dan Vrátil <dvratil@redhat.com> | 2014-12-12 13:24:11 +0100 |
commit | 6b6e2a3ecc1e8f54b0862d66929ed6ace21892e7 (patch) | |
tree | 28e5aa75bebc5dce9f80697c8a9eaf8a8c0c4241 /async/src | |
parent | e0ba51543037a026e2a8d483dfdc0e196e9dc59e (diff) | |
download | sink-6b6e2a3ecc1e8f54b0862d66929ed6ace21892e7.tar.gz sink-6b6e2a3ecc1e8f54b0862d66929ed6ace21892e7.zip |
Async: move some common code from executors to shared base class
Diffstat (limited to 'async/src')
-rw-r--r-- | async/src/async.cpp | 2 | ||||
-rw-r--r-- | async/src/async.h | 136 |
2 files changed, 78 insertions, 60 deletions
diff --git a/async/src/async.cpp b/async/src/async.cpp index c4a88fd..16da384 100644 --- a/async/src/async.cpp +++ b/async/src/async.cpp | |||
@@ -28,7 +28,7 @@ | |||
28 | 28 | ||
29 | using namespace Async; | 29 | using namespace Async; |
30 | 30 | ||
31 | JobBase::JobBase(Executor *executor) | 31 | JobBase::JobBase(ExecutorBase *executor) |
32 | : mExecutor(executor) | 32 | : mExecutor(executor) |
33 | { | 33 | { |
34 | } | 34 | } |
diff --git a/async/src/async.h b/async/src/async.h index a976fa2..52d0570 100644 --- a/async/src/async.h +++ b/async/src/async.h | |||
@@ -28,6 +28,7 @@ | |||
28 | #include <iostream> | 28 | #include <iostream> |
29 | #include <cassert> | 29 | #include <cassert> |
30 | #include <iterator> | 30 | #include <iterator> |
31 | #include <boost/graph/graph_concepts.hpp> | ||
31 | 32 | ||
32 | #include "future.h" | 33 | #include "future.h" |
33 | #include "async_impl.h" | 34 | #include "async_impl.h" |
@@ -35,6 +36,9 @@ | |||
35 | 36 | ||
36 | namespace Async { | 37 | namespace Async { |
37 | 38 | ||
39 | template<typename PrevOut, typename Out, typename ... In> | ||
40 | class Executor; | ||
41 | |||
38 | class JobBase; | 42 | class JobBase; |
39 | 43 | ||
40 | template<typename Out, typename ... In> | 44 | template<typename Out, typename ... In> |
@@ -50,17 +54,19 @@ using ReduceTask = typename detail::identity<std::function<void(In, Async::Futur | |||
50 | template<typename Out, typename ... In> | 54 | template<typename Out, typename ... In> |
51 | Job<Out, In ...> start(ThenTask<Out, In ...> func); | 55 | Job<Out, In ...> start(ThenTask<Out, In ...> func); |
52 | 56 | ||
53 | class Executor | 57 | template<typename ... T> |
58 | struct PreviousOut { | ||
59 | using type = typename std::tuple_element<0, std::tuple<T ..., void>>::type; | ||
60 | }; | ||
61 | |||
62 | |||
63 | class ExecutorBase | ||
54 | { | 64 | { |
65 | template<typename PrevOut, typename Out, typename ... In> | ||
66 | friend class Executor; | ||
55 | 67 | ||
56 | public: | 68 | public: |
57 | Executor(Executor *parent) | 69 | virtual ~ExecutorBase() |
58 | : mPrev(parent) | ||
59 | , mResult(0) | ||
60 | { | ||
61 | } | ||
62 | |||
63 | virtual ~Executor() | ||
64 | { | 70 | { |
65 | delete mResult; | 71 | delete mResult; |
66 | } | 72 | } |
@@ -72,113 +78,125 @@ public: | |||
72 | return mResult; | 78 | return mResult; |
73 | } | 79 | } |
74 | 80 | ||
75 | Executor *mPrev; | 81 | protected: |
82 | ExecutorBase(ExecutorBase *parent) | ||
83 | : mPrev(parent) | ||
84 | , mResult(0) | ||
85 | { | ||
86 | } | ||
87 | |||
88 | ExecutorBase *mPrev; | ||
76 | FutureBase *mResult; | 89 | FutureBase *mResult; |
77 | }; | 90 | }; |
78 | 91 | ||
79 | template<typename Out, typename ... In> | 92 | template<typename PrevOut, typename Out, typename ... In> |
80 | class ThenExecutor: public Executor | 93 | class Executor : public ExecutorBase |
81 | { | 94 | { |
82 | 95 | protected: | |
83 | public: | 96 | Executor(ExecutorBase *parent) |
84 | ThenExecutor(ThenTask<Out, In ...> then, Executor *parent = nullptr) | 97 | : ExecutorBase(parent) |
85 | : Executor(parent) | ||
86 | , mFunc(then) | ||
87 | { | 98 | { |
88 | } | 99 | } |
89 | 100 | ||
90 | void exec() | 101 | Async::Future<PrevOut>* chainup() |
91 | { | 102 | { |
92 | typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type PrevOut; | ||
93 | |||
94 | Async::Future<PrevOut> *in = 0; | ||
95 | if (mPrev) { | 103 | if (mPrev) { |
96 | mPrev->exec(); | 104 | mPrev->exec(); |
97 | in = static_cast<Async::Future<PrevOut>*>(mPrev->result()); | 105 | auto future = static_cast<Async::Future<PrevOut>*>(mPrev->result()); |
98 | assert(in->isFinished()); | 106 | assert(future->isFinished()); |
107 | return future; | ||
108 | } else { | ||
109 | return 0; | ||
99 | } | 110 | } |
111 | } | ||
112 | |||
113 | std::function<void(const In& ..., Async::Future<Out> &)> mFunc; | ||
114 | }; | ||
115 | |||
116 | template<typename Out, typename ... In> | ||
117 | class ThenExecutor: public Executor<typename PreviousOut<In ...>::type, Out, In ...> | ||
118 | { | ||
119 | public: | ||
120 | ThenExecutor(ThenTask<Out, In ...> then, ExecutorBase *parent = nullptr) | ||
121 | : Executor<typename PreviousOut<In ...>::type, Out, In ...>(parent) | ||
122 | { | ||
123 | this->mFunc = then; | ||
124 | } | ||
125 | |||
126 | void exec() | ||
127 | { | ||
128 | auto in = this->chainup(); | ||
129 | (void)in; // supress 'unused variable' warning when In is void | ||
100 | 130 | ||
101 | auto out = new Async::Future<Out>(); | 131 | auto out = new Async::Future<Out>(); |
102 | mFunc(in ? in->value() : In() ..., *out); | 132 | this->mFunc(in ? in->value() : In() ..., *out); |
103 | out->waitForFinished(); | 133 | out->waitForFinished(); |
104 | mResult = out; | 134 | this->mResult = out; |
105 | } | 135 | } |
106 | |||
107 | private: | ||
108 | std::function<void(const In& ..., Async::Future<Out>&)> mFunc; | ||
109 | }; | 136 | }; |
110 | 137 | ||
111 | template<typename PrevOut, typename Out, typename In> | 138 | template<typename PrevOut, typename Out, typename In> |
112 | class EachExecutor : public Executor | 139 | class EachExecutor : public Executor<PrevOut, Out, In> |
113 | { | 140 | { |
114 | public: | 141 | public: |
115 | EachExecutor(EachTask<Out, In> each, Executor *parent = nullptr) | 142 | EachExecutor(EachTask<Out, In> each, ExecutorBase *parent) |
116 | : Executor(parent) | 143 | : Executor<PrevOut, Out, In>(parent) |
117 | , mFunc(each) | ||
118 | { | 144 | { |
145 | this->mFunc = each; | ||
119 | } | 146 | } |
120 | 147 | ||
121 | void exec() | 148 | void exec() |
122 | { | 149 | { |
123 | assert(mPrev); | 150 | auto in = this->chainup(); |
124 | mPrev->exec(); | ||
125 | Async::Future<PrevOut> *in = static_cast<Async::Future<PrevOut>*>(mPrev->result()); | ||
126 | 151 | ||
127 | auto *out = new Async::Future<Out>(); | 152 | auto *out = new Async::Future<Out>(); |
128 | for (auto arg : in->value()) { | 153 | for (auto arg : in->value()) { |
129 | Async::Future<Out> future; | 154 | Async::Future<Out> future; |
130 | mFunc(arg, future); | 155 | this->mFunc(arg, future); |
131 | future.waitForFinished(); | 156 | future.waitForFinished(); |
132 | out->setValue(out->value() + future.value()); | 157 | out->setValue(out->value() + future.value()); |
133 | } | 158 | } |
134 | out->setFinished(); | 159 | out->setFinished(); |
135 | 160 | ||
136 | mResult = out; | 161 | this->mResult = out; |
137 | } | 162 | } |
138 | |||
139 | private: | ||
140 | std::function<void(const In&, Async::Future<Out>&)> mFunc; | ||
141 | }; | 163 | }; |
142 | 164 | ||
143 | template<typename Out, typename In> | 165 | template<typename Out, typename In> |
144 | class ReduceExecutor : public Executor | 166 | class ReduceExecutor : public Executor<In, Out, In> |
145 | { | 167 | { |
146 | public: | 168 | public: |
147 | ReduceExecutor(ReduceTask<Out, In> reduce, Executor *parent = nullptr) | 169 | ReduceExecutor(ReduceTask<Out, In> reduce, ExecutorBase *parent) |
148 | : Executor(parent) | 170 | : Executor<In, Out, In>(parent) |
149 | , mFunc(reduce) | ||
150 | { | 171 | { |
172 | this->mFunc = reduce; | ||
151 | } | 173 | } |
152 | 174 | ||
153 | void exec() | 175 | void exec() |
154 | { | 176 | { |
155 | assert(mPrev); | 177 | auto in = this->chainup(); |
156 | mPrev->exec(); | ||
157 | Async::Future<In> *in = static_cast<Async::Future<In>*>(mPrev->result()); | ||
158 | 178 | ||
159 | auto out = new Async::Future<Out>(); | 179 | auto out = new Async::Future<Out>(); |
160 | mFunc(in->value(), *out); | 180 | this->mFunc(in->value(), *out); |
161 | out->waitForFinished(); | 181 | out->waitForFinished(); |
162 | mResult = out; | 182 | this->mResult = out; |
163 | } | 183 | } |
164 | |||
165 | private: | ||
166 | std::function<void(const In &, Async::Future<Out> &)> mFunc; | ||
167 | }; | 184 | }; |
168 | 185 | ||
186 | |||
169 | class JobBase | 187 | class JobBase |
170 | { | 188 | { |
171 | template<typename Out, typename ... In> | 189 | template<typename Out, typename ... In> |
172 | friend class Job; | 190 | friend class Job; |
173 | 191 | ||
174 | public: | 192 | public: |
175 | JobBase(Executor *executor); | 193 | JobBase(ExecutorBase *executor); |
176 | ~JobBase(); | 194 | ~JobBase(); |
177 | 195 | ||
178 | void exec(); | 196 | void exec(); |
179 | 197 | ||
180 | protected: | 198 | protected: |
181 | Executor *mExecutor; | 199 | ExecutorBase *mExecutor; |
182 | }; | 200 | }; |
183 | 201 | ||
184 | template<typename Out, typename ... In> | 202 | template<typename Out, typename ... In> |
@@ -194,15 +212,14 @@ public: | |||
194 | template<typename OutOther, typename ... InOther> | 212 | template<typename OutOther, typename ... InOther> |
195 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func) | 213 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func) |
196 | { | 214 | { |
197 | Executor *exec = new ThenExecutor<OutOther, InOther ...>(func, mExecutor); | 215 | return Job<OutOther, InOther ...>(new ThenExecutor<OutOther, InOther ...>(func, mExecutor)); |
198 | return Job<OutOther, InOther ...>(exec); | ||
199 | } | 216 | } |
200 | 217 | ||
201 | template<typename OutOther, typename InOther> | 218 | template<typename OutOther, typename InOther> |
202 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) | 219 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) |
203 | { | 220 | { |
204 | static_assert(detail::isIterable<Out>::value, | 221 | static_assert(detail::isIterable<Out>::value, |
205 | "The 'Each' task can only be connected to a job that returns a list or array."); | 222 | "The 'Each' task can only be connected to a job that returns a list or an array."); |
206 | static_assert(detail::isIterable<OutOther>::value, | 223 | static_assert(detail::isIterable<OutOther>::value, |
207 | "The result type of 'Each' task must be a list or an array."); | 224 | "The result type of 'Each' task must be a list or an array."); |
208 | return Job<OutOther, InOther>(new EachExecutor<Out, OutOther, InOther>(func, mExecutor)); | 225 | return Job<OutOther, InOther>(new EachExecutor<Out, OutOther, InOther>(func, mExecutor)); |
@@ -212,7 +229,9 @@ public: | |||
212 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) | 229 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) |
213 | { | 230 | { |
214 | static_assert(Async::detail::isIterable<Out>::value, | 231 | static_assert(Async::detail::isIterable<Out>::value, |
215 | "The 'Result' task can only be connected to a job that returns a list or array"); | 232 | "The 'Result' task can only be connected to a job that returns a list or an array"); |
233 | static_assert(std::is_same<typename Out::value_type, typename InOther::value_type>::value, | ||
234 | "The return type of previous task must be compatible with input type of this task"); | ||
216 | return Job<OutOther, InOther>(new ReduceExecutor<OutOther, InOther>(func, mExecutor)); | 235 | return Job<OutOther, InOther>(new ReduceExecutor<OutOther, InOther>(func, mExecutor)); |
217 | } | 236 | } |
218 | 237 | ||
@@ -222,7 +241,7 @@ public: | |||
222 | } | 241 | } |
223 | 242 | ||
224 | private: | 243 | private: |
225 | Job(Executor *executor) | 244 | Job(ExecutorBase *executor) |
226 | : JobBase(executor) | 245 | : JobBase(executor) |
227 | { | 246 | { |
228 | } | 247 | } |
@@ -231,7 +250,6 @@ private: | |||
231 | } // namespace Async | 250 | } // namespace Async |
232 | 251 | ||
233 | 252 | ||
234 | |||
235 | // ********** Out of line definitions **************** | 253 | // ********** Out of line definitions **************** |
236 | 254 | ||
237 | template<typename Out, typename ... In> | 255 | template<typename Out, typename ... In> |