diff options
author | Dan Vrátil <dvratil@redhat.com> | 2014-12-11 15:55:18 +0100 |
---|---|---|
committer | Dan Vrátil <dvratil@redhat.com> | 2014-12-11 15:55:18 +0100 |
commit | c30e9145049c52feb2de719307ebbfee0650f01b (patch) | |
tree | 6896823401fb174c0f396ec30eae6257d32f8a41 /async/src/async.h | |
parent | 1aee1bda9fc81c888ad18fea107c271133dd5442 (diff) | |
download | sink-c30e9145049c52feb2de719307ebbfee0650f01b.tar.gz sink-c30e9145049c52feb2de719307ebbfee0650f01b.zip |
Async: move the actual task exection into Executor implementation
As of now, Job is only front interface to a chain of Executor subclasses. Each
Executor subclass specializes for given type of execution (then, each, reduce, ...),
and the chain is then executed recursively, as we did with the original Job
implementation.
Diffstat (limited to 'async/src/async.h')
-rw-r--r-- | async/src/async.h | 200 |
1 files changed, 116 insertions, 84 deletions
diff --git a/async/src/async.h b/async/src/async.h index 182e57c..0f027f5 100644 --- a/async/src/async.h +++ b/async/src/async.h | |||
@@ -27,6 +27,7 @@ | |||
27 | #include <type_traits> | 27 | #include <type_traits> |
28 | #include <iostream> | 28 | #include <iostream> |
29 | #include <cassert> | 29 | #include <cassert> |
30 | #include <iterator> | ||
30 | 31 | ||
31 | #include "future.h" | 32 | #include "future.h" |
32 | #include "async_impl.h" | 33 | #include "async_impl.h" |
@@ -49,97 +50,159 @@ using ReduceTask = typename detail::identity<std::function<void(In, Async::Futur | |||
49 | template<typename Out, typename ... In> | 50 | template<typename Out, typename ... In> |
50 | Job<Out, In ...> start(ThenTask<Out, In ...> func); | 51 | Job<Out, In ...> start(ThenTask<Out, In ...> func); |
51 | 52 | ||
52 | namespace Private | 53 | class Executor |
53 | { | 54 | { |
54 | template<typename Out, typename ... In> | 55 | |
55 | void doExec(Job<Out, In ...> *job, Async::Future<Out> &out, const In & ... args); | 56 | public: |
56 | } | 57 | Executor(Executor *parent) |
58 | : mPrev(parent) | ||
59 | , mResult(0) | ||
60 | { | ||
61 | } | ||
62 | |||
63 | virtual ~Executor() | ||
64 | { | ||
65 | delete mResult; | ||
66 | } | ||
67 | |||
68 | virtual void exec() = 0; | ||
69 | |||
70 | FutureBase* result() const | ||
71 | { | ||
72 | return mResult; | ||
73 | } | ||
74 | |||
75 | Executor *mPrev; | ||
76 | FutureBase *mResult; | ||
77 | }; | ||
78 | |||
79 | template<typename Out, typename ... In> | ||
80 | class ThenExecutor: public Executor | ||
81 | { | ||
82 | |||
83 | typedef Out OutType; | ||
84 | typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type InType; | ||
85 | |||
86 | |||
87 | public: | ||
88 | ThenExecutor(ThenTask<Out, In ...> then, Executor *parent = nullptr) | ||
89 | : Executor(parent) | ||
90 | , mFunc(then) | ||
91 | { | ||
92 | } | ||
93 | |||
94 | void exec() | ||
95 | { | ||
96 | Async::Future<InType> *in = 0; | ||
97 | if (mPrev) { | ||
98 | mPrev->exec(); | ||
99 | in = static_cast<Async::Future<InType>*>(mPrev->result()); | ||
100 | assert(in->isFinished()); | ||
101 | } | ||
102 | |||
103 | auto out = new Async::Future<Out>(); | ||
104 | mFunc(in ? in->value() : In() ..., *out); | ||
105 | out->waitForFinished(); | ||
106 | mResult = out; | ||
107 | } | ||
108 | |||
109 | private: | ||
110 | std::function<void(const In& ..., Async::Future<Out>&)> mFunc; | ||
111 | }; | ||
112 | |||
113 | template<typename PrevOut, typename Out, typename In> | ||
114 | class EachExecutor : public Executor | ||
115 | { | ||
116 | public: | ||
117 | EachExecutor(EachTask<Out, In> each, Executor *parent = nullptr) | ||
118 | : Executor(parent) | ||
119 | , mFunc(each) | ||
120 | { | ||
121 | } | ||
122 | |||
123 | void exec() | ||
124 | { | ||
125 | assert(mPrev); | ||
126 | mPrev->exec(); | ||
127 | Async::Future<PrevOut> *in = static_cast<Async::Future<PrevOut>*>(mPrev->result()); | ||
128 | |||
129 | auto *out = new Async::Future<Out>(); | ||
130 | for (auto arg : in->value()) { | ||
131 | Async::Future<Out> future; | ||
132 | mFunc(arg, future); | ||
133 | future.waitForFinished(); | ||
134 | out->setValue(out->value() + future.value()); | ||
135 | } | ||
136 | |||
137 | mResult = out; | ||
138 | } | ||
139 | |||
140 | private: | ||
141 | std::function<void(const In&, Async::Future<Out>&)> mFunc; | ||
142 | }; | ||
57 | 143 | ||
58 | class JobBase | 144 | class JobBase |
59 | { | 145 | { |
60 | template<typename Out, typename ... In> | 146 | template<typename Out, typename ... In> |
61 | friend class Job; | 147 | friend class Job; |
62 | 148 | ||
63 | protected: | ||
64 | enum JobType { | ||
65 | Then, | ||
66 | Each, | ||
67 | Reduce | ||
68 | }; | ||
69 | |||
70 | public: | 149 | public: |
71 | JobBase(JobType jobType, JobBase *prev = nullptr); | 150 | JobBase(Executor *executor); |
72 | virtual void exec() = 0; | 151 | ~JobBase(); |
152 | |||
153 | void exec(); | ||
73 | 154 | ||
74 | protected: | 155 | protected: |
75 | JobBase *mPrev; | 156 | Executor *mExecutor; |
76 | void *mResult; | ||
77 | JobType mJobType; | ||
78 | }; | 157 | }; |
79 | 158 | ||
80 | template<typename Out, typename ... In> | 159 | template<typename Out, typename ... In> |
81 | class Job : public JobBase | 160 | class Job : public JobBase |
82 | { | 161 | { |
83 | template<typename Out_, typename ... In_> | 162 | template<typename OutOther, typename ... InOther> |
84 | friend class Job; | 163 | friend class Job; |
85 | 164 | ||
86 | template<typename Out_, typename ... In_> | 165 | template<typename OutOther, typename ... InOther> |
87 | friend Job<Out_, In_ ...> start(Async::ThenTask<Out_, In_ ...> func); | 166 | friend Job<OutOther, InOther ...> start(Async::ThenTask<OutOther, InOther ...> func); |
88 | |||
89 | typedef Out OutType; | ||
90 | typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type InType; | ||
91 | 167 | ||
92 | public: | 168 | public: |
93 | ~Job() | 169 | template<typename OutOther, typename ... InOther> |
170 | Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func) | ||
94 | { | 171 | { |
95 | delete reinterpret_cast<Async::Future<Out>*>(mResult); | 172 | Executor *exec = new ThenExecutor<OutOther, InOther ...>(func, mExecutor); |
173 | return Job<OutOther, InOther ...>(exec); | ||
96 | } | 174 | } |
97 | 175 | ||
98 | template<typename Out_, typename ... In_> | 176 | template<typename OutOther, typename InOther> |
99 | Job<Out_, In_ ...> then(ThenTask<Out_, In_ ...> func) | 177 | Job<OutOther, InOther> each(EachTask<OutOther, InOther> func) |
100 | { | 178 | { |
101 | return Job<Out_, In_ ...>::create(func, JobBase::Then, this); | 179 | static_assert(detail::isIterable<Out>::value, |
102 | } | ||
103 | |||
104 | template<typename Out_, typename In_> | ||
105 | Job<Out_, In_> each(EachTask<Out_, In_> func) | ||
106 | { | ||
107 | static_assert(detail::isIterable<OutType>::value, | ||
108 | "The 'Each' task can only be connected to a job that returns a list or array."); | 180 | "The 'Each' task can only be connected to a job that returns a list or array."); |
109 | static_assert(detail::isIterable<Out_>::value, | 181 | static_assert(detail::isIterable<OutOther>::value, |
110 | "The result type of 'Each' task must be a list or an array."); | 182 | "The result type of 'Each' task must be a list or an array."); |
111 | return Job<Out_, In_>::create(func, JobBase::Each, this); | 183 | return Job<OutOther, InOther>(new EachExecutor<Out, OutOther, InOther>(func, mExecutor)); |
112 | } | 184 | } |
113 | 185 | ||
114 | template<typename Out_, typename In_> | 186 | template<typename OutOther, typename InOther> |
115 | Job<Out_, In_> reduce(ReduceTask<Out_, In_> func) | 187 | Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func) |
116 | { | 188 | { |
117 | static_assert(Async::detail::isIterable<OutType>::value, | 189 | static_assert(Async::detail::isIterable<Out>::value, |
118 | "The result type of 'Reduce' task must be a list or an array."); | 190 | "The result type of 'Reduce' task must be a list or an array."); |
119 | return Job<Out_, In_>::create(func, JobBase::Reduce, this); | 191 | //return Job<Out_, In_>::create(func, new ReduceEx, this); |
120 | } | 192 | } |
121 | 193 | ||
122 | Async::Future<Out> result() const | 194 | Async::Future<Out> result() const |
123 | { | 195 | { |
124 | return *reinterpret_cast<Async::Future<Out>*>(mResult); | 196 | return *static_cast<Async::Future<Out>*>(mExecutor->result()); |
125 | } | 197 | } |
126 | 198 | ||
127 | void exec(); | ||
128 | |||
129 | private: | 199 | private: |
130 | Job(JobBase::JobType jobType, JobBase *parent = nullptr) | 200 | Job(Executor *executor) |
131 | : JobBase(jobType, parent) | 201 | : JobBase(executor) |
132 | { | 202 | { |
133 | } | 203 | } |
134 | |||
135 | template<typename F> | ||
136 | static Job<Out, In ... > create(F func, JobBase::JobType jobType, JobBase *parent = nullptr); | ||
137 | |||
138 | public: | ||
139 | std::function<void(In ..., Async::Future<Out>&)> mFunc; | ||
140 | }; | 204 | }; |
141 | 205 | ||
142 | |||
143 | } // namespace Async | 206 | } // namespace Async |
144 | 207 | ||
145 | 208 | ||
@@ -149,41 +212,10 @@ public: | |||
149 | template<typename Out, typename ... In> | 212 | template<typename Out, typename ... In> |
150 | Async::Job<Out, In ...> Async::start(ThenTask<Out, In ...> func) | 213 | Async::Job<Out, In ...> Async::start(ThenTask<Out, In ...> func) |
151 | { | 214 | { |
152 | return Job<Out, In ...>::create(func, JobBase::Then); | 215 | Executor *exec = new ThenExecutor<Out, In ...>(func); |
153 | } | 216 | return Job<Out, In ...>(exec); |
154 | |||
155 | template<typename Out, typename ... In> | ||
156 | void Async::Private::doExec(Job<Out, In ...> *job, Async::Future<Out> &out, const In & ... args) | ||
157 | { | ||
158 | job->mFunc(args ..., out); | ||
159 | }; | ||
160 | |||
161 | template<typename Out, typename ... In> | ||
162 | void Async::Job<Out, In ...>::exec() | ||
163 | { | ||
164 | Async::Future<InType> *in = nullptr; | ||
165 | if (mPrev) { | ||
166 | mPrev->exec(); | ||
167 | in = reinterpret_cast<Async::Future<InType>*>(mPrev->mResult); | ||
168 | assert(in->isFinished()); | ||
169 | } | ||
170 | |||
171 | auto out = new Async::Future<Out>(); | ||
172 | Private::doExec<Out, In ...>(this, *out, in ? in->value() : In() ...); | ||
173 | out->waitForFinished(); | ||
174 | mResult = reinterpret_cast<void*>(out); | ||
175 | } | ||
176 | |||
177 | template<typename Out, typename ... In> | ||
178 | template<typename F> | ||
179 | Async::Job<Out, In ...> Async::Job<Out, In ...>::create(F func, Async::JobBase::JobType jobType, Async::JobBase* parent) | ||
180 | { | ||
181 | Job<Out, In ...> job(jobType, parent); | ||
182 | job.mFunc = func; | ||
183 | return job; | ||
184 | } | 217 | } |
185 | 218 | ||
186 | |||
187 | #endif // ASYNC_H | 219 | #endif // ASYNC_H |
188 | 220 | ||
189 | 221 | ||