summaryrefslogtreecommitdiffstats
path: root/async/src
diff options
context:
space:
mode:
Diffstat (limited to 'async/src')
-rw-r--r--async/src/async.cpp19
-rw-r--r--async/src/async.h200
-rw-r--r--async/src/future.h2
3 files changed, 133 insertions, 88 deletions
diff --git a/async/src/async.cpp b/async/src/async.cpp
index 7e81f24..c4a88fd 100644
--- a/async/src/async.cpp
+++ b/async/src/async.cpp
@@ -28,13 +28,20 @@
28 28
29using namespace Async; 29using namespace Async;
30 30
31JobBase::JobBase(JobBase::JobType jobType, JobBase* prev) 31JobBase::JobBase(Executor *executor)
32 : mPrev(prev) 32 : mExecutor(executor)
33 , mResult(0)
34 , mJobType(jobType)
35{ 33{
36} 34}
37 35
36JobBase::~JobBase()
37{
38}
39
40void JobBase::exec()
41{
42 mExecutor->exec();
43}
44
38 45
39FutureBase::FutureBase() 46FutureBase::FutureBase()
40 : mFinished(false) 47 : mFinished(false)
@@ -48,6 +55,10 @@ FutureBase::FutureBase(const Async::FutureBase &other)
48{ 55{
49} 56}
50 57
58FutureBase::~FutureBase()
59{
60}
61
51void FutureBase::setFinished() 62void FutureBase::setFinished()
52{ 63{
53 mFinished = true; 64 mFinished = true;
diff --git a/async/src/async.h b/async/src/async.h
index 182e57c..0f027f5 100644
--- a/async/src/async.h
+++ b/async/src/async.h
@@ -27,6 +27,7 @@
27#include <type_traits> 27#include <type_traits>
28#include <iostream> 28#include <iostream>
29#include <cassert> 29#include <cassert>
30#include <iterator>
30 31
31#include "future.h" 32#include "future.h"
32#include "async_impl.h" 33#include "async_impl.h"
@@ -49,97 +50,159 @@ using ReduceTask = typename detail::identity<std::function<void(In, Async::Futur
49template<typename Out, typename ... In> 50template<typename Out, typename ... In>
50Job<Out, In ...> start(ThenTask<Out, In ...> func); 51Job<Out, In ...> start(ThenTask<Out, In ...> func);
51 52
52namespace Private 53class Executor
53{ 54{
54 template<typename Out, typename ... In> 55
55 void doExec(Job<Out, In ...> *job, Async::Future<Out> &out, const In & ... args); 56public:
56} 57 Executor(Executor *parent)
58 : mPrev(parent)
59 , mResult(0)
60 {
61 }
62
63 virtual ~Executor()
64 {
65 delete mResult;
66 }
67
68 virtual void exec() = 0;
69
70 FutureBase* result() const
71 {
72 return mResult;
73 }
74
75 Executor *mPrev;
76 FutureBase *mResult;
77};
78
79template<typename Out, typename ... In>
80class ThenExecutor: public Executor
81{
82
83 typedef Out OutType;
84 typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type InType;
85
86
87public:
88 ThenExecutor(ThenTask<Out, In ...> then, Executor *parent = nullptr)
89 : Executor(parent)
90 , mFunc(then)
91 {
92 }
93
94 void exec()
95 {
96 Async::Future<InType> *in = 0;
97 if (mPrev) {
98 mPrev->exec();
99 in = static_cast<Async::Future<InType>*>(mPrev->result());
100 assert(in->isFinished());
101 }
102
103 auto out = new Async::Future<Out>();
104 mFunc(in ? in->value() : In() ..., *out);
105 out->waitForFinished();
106 mResult = out;
107 }
108
109private:
110 std::function<void(const In& ..., Async::Future<Out>&)> mFunc;
111};
112
113template<typename PrevOut, typename Out, typename In>
114class EachExecutor : public Executor
115{
116public:
117 EachExecutor(EachTask<Out, In> each, Executor *parent = nullptr)
118 : Executor(parent)
119 , mFunc(each)
120 {
121 }
122
123 void exec()
124 {
125 assert(mPrev);
126 mPrev->exec();
127 Async::Future<PrevOut> *in = static_cast<Async::Future<PrevOut>*>(mPrev->result());
128
129 auto *out = new Async::Future<Out>();
130 for (auto arg : in->value()) {
131 Async::Future<Out> future;
132 mFunc(arg, future);
133 future.waitForFinished();
134 out->setValue(out->value() + future.value());
135 }
136
137 mResult = out;
138 }
139
140private:
141 std::function<void(const In&, Async::Future<Out>&)> mFunc;
142};
57 143
58class JobBase 144class JobBase
59{ 145{
60 template<typename Out, typename ... In> 146 template<typename Out, typename ... In>
61 friend class Job; 147 friend class Job;
62 148
63protected:
64 enum JobType {
65 Then,
66 Each,
67 Reduce
68 };
69
70public: 149public:
71 JobBase(JobType jobType, JobBase *prev = nullptr); 150 JobBase(Executor *executor);
72 virtual void exec() = 0; 151 ~JobBase();
152
153 void exec();
73 154
74protected: 155protected:
75 JobBase *mPrev; 156 Executor *mExecutor;
76 void *mResult;
77 JobType mJobType;
78}; 157};
79 158
80template<typename Out, typename ... In> 159template<typename Out, typename ... In>
81class Job : public JobBase 160class Job : public JobBase
82{ 161{
83 template<typename Out_, typename ... In_> 162 template<typename OutOther, typename ... InOther>
84 friend class Job; 163 friend class Job;
85 164
86 template<typename Out_, typename ... In_> 165 template<typename OutOther, typename ... InOther>
87 friend Job<Out_, In_ ...> start(Async::ThenTask<Out_, In_ ...> func); 166 friend Job<OutOther, InOther ...> start(Async::ThenTask<OutOther, InOther ...> func);
88
89 typedef Out OutType;
90 typedef typename std::tuple_element<0, std::tuple<In ..., void>>::type InType;
91 167
92public: 168public:
93 ~Job() 169 template<typename OutOther, typename ... InOther>
170 Job<OutOther, InOther ...> then(ThenTask<OutOther, InOther ...> func)
94 { 171 {
95 delete reinterpret_cast<Async::Future<Out>*>(mResult); 172 Executor *exec = new ThenExecutor<OutOther, InOther ...>(func, mExecutor);
173 return Job<OutOther, InOther ...>(exec);
96 } 174 }
97 175
98 template<typename Out_, typename ... In_> 176 template<typename OutOther, typename InOther>
99 Job<Out_, In_ ...> then(ThenTask<Out_, In_ ...> func) 177 Job<OutOther, InOther> each(EachTask<OutOther, InOther> func)
100 { 178 {
101 return Job<Out_, In_ ...>::create(func, JobBase::Then, this); 179 static_assert(detail::isIterable<Out>::value,
102 }
103
104 template<typename Out_, typename In_>
105 Job<Out_, In_> each(EachTask<Out_, In_> func)
106 {
107 static_assert(detail::isIterable<OutType>::value,
108 "The 'Each' task can only be connected to a job that returns a list or array."); 180 "The 'Each' task can only be connected to a job that returns a list or array.");
109 static_assert(detail::isIterable<Out_>::value, 181 static_assert(detail::isIterable<OutOther>::value,
110 "The result type of 'Each' task must be a list or an array."); 182 "The result type of 'Each' task must be a list or an array.");
111 return Job<Out_, In_>::create(func, JobBase::Each, this); 183 return Job<OutOther, InOther>(new EachExecutor<Out, OutOther, InOther>(func, mExecutor));
112 } 184 }
113 185
114 template<typename Out_, typename In_> 186 template<typename OutOther, typename InOther>
115 Job<Out_, In_> reduce(ReduceTask<Out_, In_> func) 187 Job<OutOther, InOther> reduce(ReduceTask<OutOther, InOther> func)
116 { 188 {
117 static_assert(Async::detail::isIterable<OutType>::value, 189 static_assert(Async::detail::isIterable<Out>::value,
118 "The result type of 'Reduce' task must be a list or an array."); 190 "The result type of 'Reduce' task must be a list or an array.");
119 return Job<Out_, In_>::create(func, JobBase::Reduce, this); 191 //return Job<Out_, In_>::create(func, new ReduceEx, this);
120 } 192 }
121 193
122 Async::Future<Out> result() const 194 Async::Future<Out> result() const
123 { 195 {
124 return *reinterpret_cast<Async::Future<Out>*>(mResult); 196 return *static_cast<Async::Future<Out>*>(mExecutor->result());
125 } 197 }
126 198
127 void exec();
128
129private: 199private:
130 Job(JobBase::JobType jobType, JobBase *parent = nullptr) 200 Job(Executor *executor)
131 : JobBase(jobType, parent) 201 : JobBase(executor)
132 { 202 {
133 } 203 }
134
135 template<typename F>
136 static Job<Out, In ... > create(F func, JobBase::JobType jobType, JobBase *parent = nullptr);
137
138public:
139 std::function<void(In ..., Async::Future<Out>&)> mFunc;
140}; 204};
141 205
142
143} // namespace Async 206} // namespace Async
144 207
145 208
@@ -149,41 +212,10 @@ public:
149template<typename Out, typename ... In> 212template<typename Out, typename ... In>
150Async::Job<Out, In ...> Async::start(ThenTask<Out, In ...> func) 213Async::Job<Out, In ...> Async::start(ThenTask<Out, In ...> func)
151{ 214{
152 return Job<Out, In ...>::create(func, JobBase::Then); 215 Executor *exec = new ThenExecutor<Out, In ...>(func);
153} 216 return Job<Out, In ...>(exec);
154
155template<typename Out, typename ... In>
156void Async::Private::doExec(Job<Out, In ...> *job, Async::Future<Out> &out, const In & ... args)
157{
158 job->mFunc(args ..., out);
159};
160
161template<typename Out, typename ... In>
162void Async::Job<Out, In ...>::exec()
163{
164 Async::Future<InType> *in = nullptr;
165 if (mPrev) {
166 mPrev->exec();
167 in = reinterpret_cast<Async::Future<InType>*>(mPrev->mResult);
168 assert(in->isFinished());
169 }
170
171 auto out = new Async::Future<Out>();
172 Private::doExec<Out, In ...>(this, *out, in ? in->value() : In() ...);
173 out->waitForFinished();
174 mResult = reinterpret_cast<void*>(out);
175}
176
177template<typename Out, typename ... In>
178template<typename F>
179Async::Job<Out, In ...> Async::Job<Out, In ...>::create(F func, Async::JobBase::JobType jobType, Async::JobBase* parent)
180{
181 Job<Out, In ...> job(jobType, parent);
182 job.mFunc = func;
183 return job;
184} 217}
185 218
186
187#endif // ASYNC_H 219#endif // ASYNC_H
188 220
189 221
diff --git a/async/src/future.h b/async/src/future.h
index c53ef56..eb3de1e 100644
--- a/async/src/future.h
+++ b/async/src/future.h
@@ -31,6 +31,8 @@ class FutureBase
31public: 31public:
32 FutureBase(); 32 FutureBase();
33 FutureBase(const FutureBase &other); 33 FutureBase(const FutureBase &other);
34 virtual ~FutureBase();
35
34 void setFinished(); 36 void setFinished();
35 bool isFinished() const; 37 bool isFinished() const;
36 void waitForFinished(); 38 void waitForFinished();