summaryrefslogtreecommitdiffstats
path: root/async/src/async.h
diff options
context:
space:
mode:
authorDan Vrátil <dvratil@redhat.com>2014-12-11 15:55:18 +0100
committerDan Vrátil <dvratil@redhat.com>2014-12-11 15:55:18 +0100
commitc30e9145049c52feb2de719307ebbfee0650f01b (patch)
tree6896823401fb174c0f396ec30eae6257d32f8a41 /async/src/async.h
parent1aee1bda9fc81c888ad18fea107c271133dd5442 (diff)
downloadsink-c30e9145049c52feb2de719307ebbfee0650f01b.tar.gz
sink-c30e9145049c52feb2de719307ebbfee0650f01b.zip
Async: move the actual task exection into Executor implementation
As of now, Job is only front interface to a chain of Executor subclasses. Each Executor subclass specializes for given type of execution (then, each, reduce, ...), and the chain is then executed recursively, as we did with the original Job implementation.
Diffstat (limited to 'async/src/async.h')
-rw-r--r--async/src/async.h200
1 files changed, 116 insertions, 84 deletions
diff --git a/async/src/async.h b/async/src/async.h
index 182e57c..0f027f5 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -27,6 +27,7 @@
27#include <type_traits> 27#include <type_traits>
28#include <iostream> 28#include <iostream>
29#include <cassert> 29#include <cassert>
30#include <iterator>
30 31
31#include "future.h" 32#include "future.h"
32#include "async_impl.h" 33#include "async_impl.h"
@@ -49,97 +50,159 @@ using ReduceTask = typename detail::identity<std::function<void(In, Async::Futur
49template<typename Out, typename ... In> 50template<typename Out, typename ... In>
50Job<Out, In ...> start(ThenTask<Out, In ...> func); 51Job<Out, In ...> start(ThenTask<Out, In ...> func);
51 52
52namespace Private 53class Executor
53{ 54{
54 template<typename Out, typename ... In> 55
55 void doExec(Job<Out, In ...> *job, Async::Future<Out> &out, const In & ... args); 56public:
56} 57 Executor(Executor *parent)
58 : mPrev(parent)
59 , mResult(0)
60 {
61 }
62
63 virtual ~Executor()
64 {
65 delete mResult;
66 }
67
68 virtual void exec() = 0;
69
70 FutureBase* result() const
71 {
72 return mResult;
73 }
74
75 Executor *mPrev;
76 FutureBase *mResult;
77};
78
79template<typename Out, typename ... In>
80class ThenExecutor: public Executor
81{
82
83 typedef Out OutType;
84 typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type InType;
85
86
87public:
88 ThenExecutor(ThenTask<Out, In ...> then, Executor *parent = nullptr)
89 : Executor(parent)
90 , mFunc(then)
91 {
92 }
93
94 void exec()
95 {
96 Async::Future<InType> *in = 0;
97 if (mPrev) {
98 mPrev->exec();
99 in = static_cast<Async::Future<InType>*>(mPrev->result());
100 assert(in->isFinished());
101 }
102
103 auto out = new Async::Future<Out>();
104 mFunc(in ? in->value() : In() ..., *out);
105 out->waitForFinished();
106 mResult = out;
107 }
108
109private:
110 std::function<void(const In& ..., Async::Future<Out>&)> mFunc;
111};
112
113template<typename PrevOut, typename Out, typename In>
114class EachExecutor : public Executor
115{
116public:
117 EachExecutor(EachTask<Out, In> each, Executor *parent = nullptr)
118 : Executor(parent)
119 , mFunc(each)
120 {
121 }
122
123 void exec()
124 {
125 assert(mPrev);
126 mPrev->exec();
127 Async::Future<PrevOut> *in = static_cast<Async::Future<PrevOut>*>(mPrev->result());
128
129 auto *out = new Async::Future<Out>();
130 for (auto arg : in->value()) {
131 Async::Future<Out> future;
132 mFunc(arg, future);
133 future.waitForFinished();
134 out->setValue(out->value() + future.value());
135 }
136
137 mResult = out;
138 }
139
140private:
141 std::function<void(const In&, Async::Future<Out>&)> mFunc;
142};
57 143
58class JobBase 144class JobBase
59{ 145{
60 template<typename Out, typename ... In> 146 template<typename Out, typename ... In>
61 friend class Job; 147 friend class Job;
62 148
63protected:
64 enum JobType {
65 Then,
66 Each,
67 Reduce
68 };
69
70public: 149public:
71 JobBase(JobType jobType, JobBase *prev = nullptr); 150 JobBase(Executor *executor);
72 virtual void exec() = 0; 151 ~JobBase();
152
153 void exec();
73 154
74protected: 155protected:
75 JobBase *mPrev; 156 Executor *mExecutor;
76 void *mResult;
77 JobType mJobType;
78}; 157};
79 158
80template<typename Out, typename ... In> 159template<typename Out, typename ... In>
81class Job : public JobBase 160class Job : public JobBase
82{ 161{
83 template<typename Out_, typename ... In_> 162 template<typename OutOther, typename ... InOther>
84 friend class Job; 163 friend class Job;
85 164
86 template<typename Out_, typename ... In_> 165 template<typename OutOther, typename ... InOther>
87 friend Job<Out_, In_ ...> start(Async::ThenTask<Out_, In_ ...> func); 166 friend Job<OutOther, InOther ...> start(Async::ThenTask<OutOther, InOther ...> func);
88
89 typedef Out OutType;
90 typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type InType;
91 167
92public: 168public:
93 ~Job() 169 template<typename OutOther, typename ... InOther>
170 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func)
94 { 171 {
95 delete reinterpret_cast<Async::Future<Out>*>(mResult); 172 Executor *exec = new ThenExecutor<OutOther, InOther ...>(func, mExecutor);
173 return Job<OutOther, InOther ...>(exec);
96 } 174 }
97 175
98 template<typename Out_, typename ... In_> 176 template<typename OutOther, typename InOther>
99 Job<Out_, In_ ...> then(ThenTask<Out_, In_ ...> func) 177 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func)
100 { 178 {
101 return Job<Out_, In_ ...>::create(func, JobBase::Then, this); 179 static_assert(detail::isIterable<Out>::value,
102 }
103
104 template<typename Out_, typename In_>
105 Job<Out_, In_> each(EachTask<Out_, In_> func)
106 {
107 static_assert(detail::isIterable<OutType>::value,
108 "The 'Each' task can only be connected to a job that returns a list or array."); 180 "The 'Each' task can only be connected to a job that returns a list or array.");
109 static_assert(detail::isIterable<Out_>::value, 181 static_assert(detail::isIterable<OutOther>::value,
110 "The result type of 'Each' task must be a list or an array."); 182 "The result type of 'Each' task must be a list or an array.");
111 return Job<Out_, In_>::create(func, JobBase::Each, this); 183 return Job<OutOther, InOther>(new EachExecutor<Out, OutOther, InOther>(func, mExecutor));
112 } 184 }
113 185
114 template<typename Out_, typename In_> 186 template<typename OutOther, typename InOther>
115 Job<Out_, In_> reduce(ReduceTask<Out_, In_> func) 187 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func)
116 { 188 {
117 static_assert(Async::detail::isIterable<OutType>::value, 189 static_assert(Async::detail::isIterable<Out>::value,
118 "The result type of 'Reduce' task must be a list or an array."); 190 "The result type of 'Reduce' task must be a list or an array.");
119 return Job<Out_, In_>::create(func, JobBase::Reduce, this); 191 //return Job<Out_, In_>::create(func, new ReduceEx, this);
120 } 192 }
121 193
122 Async::Future<Out> result() const 194 Async::Future<Out> result() const
123 { 195 {
124 return *reinterpret_cast<Async::Future<Out>*>(mResult); 196 return *static_cast<Async::Future<Out>*>(mExecutor->result());
125 } 197 }
126 198
127 void exec();
128
129private: 199private:
130 Job(JobBase::JobType jobType, JobBase *parent = nullptr) 200 Job(Executor *executor)
131 : JobBase(jobType, parent) 201 : JobBase(executor)
132 { 202 {
133 } 203 }
134
135 template<typename F>
136 static Job<Out, In ... > create(F func, JobBase::JobType jobType, JobBase *parent = nullptr);
137
138public:
139 std::function<void(In ..., Async::Future<Out>&)> mFunc;
140}; 204};
141 205
142
143} // namespace Async 206} // namespace Async
144 207
145 208
@@ -149,41 +212,10 @@ public:
149template<typename Out, typename ... In> 212template<typename Out, typename ... In>
150Async::Job<Out, In ...> Async::start(ThenTask<Out, In ...> func) 213Async::Job<Out, In ...> Async::start(ThenTask<Out, In ...> func)
151{ 214{
152 return Job<Out, In ...>::create(func, JobBase::Then); 215 Executor *exec = new ThenExecutor<Out, In ...>(func);
153} 216 return Job<Out, In ...>(exec);
154
155template<typename Out, typename ... In>
156void Async::Private::doExec(Job<Out, In ...> *job, Async::Future<Out> &out, const In & ... args)
157{
158 job->mFunc(args ..., out);
159};
160
161template<typename Out, typename ... In>
162void Async::Job<Out, In ...>::exec()
163{
164 Async::Future<InType> *in = nullptr;
165 if (mPrev) {
166 mPrev->exec();
167 in = reinterpret_cast<Async::Future<InType>*>(mPrev->mResult);
168 assert(in->isFinished());
169 }
170
171 auto out = new Async::Future<Out>();
172 Private::doExec<Out, In ...>(this, *out, in ? in->value() : In() ...);
173 out->waitForFinished();
174 mResult = reinterpret_cast<void*>(out);
175}
176
177template<typename Out, typename ... In>
178template<typename F>
179Async::Job<Out, In ...> Async::Job<Out, In ...>::create(F func, Async::JobBase::JobType jobType, Async::JobBase* parent)
180{
181 Job<Out, In ...> job(jobType, parent);
182 job.mFunc = func;
183 return job;
184} 217}
185 218
186
187#endif // ASYNC_H 219#endif // ASYNC_H
188 220
189 221