JayBeams  0.1
Another project to have fun coding.
generic_reduce.hpp
Go to the documentation of this file.
1 #ifndef jb_opencl_generic_reduce_hpp
2 #define jb_opencl_generic_reduce_hpp
3 
5 #include <jb/assert_throw.hpp>
6 #include <jb/log.hpp>
7 #include <jb/p2ceil.hpp>
8 
9 #include <boost/compute/command_queue.hpp>
10 #include <boost/compute/container/vector.hpp>
11 #include <boost/compute/memory/local_buffer.hpp>
12 
13 namespace jb {
14 namespace opencl {
15 
16 /**
17  * Implement a generic reducer for OpenCL.
18  *
19  * Aggregating all the values in a vector to a single value, also
20  * known as reductions, is a common building block in parallel
21  * algorithms. All the reductions follow a common form, this template
22  * class implements a generic reduction given the aggregation function
23  * and its input / output types.
24  *
25  * This implementation uses a parallel reduction, for a general
26  * motivation and description please see:
27  * http://developer.amd.com/resources/articles-whitepapers/opencl-optimization-case-study-simple-reductions/
28  *
29  * TODO(coryan) this class is work in progress, it is not fully implemented
30  *
31  * @tparam reducer a class derived from generic_reduce<reducer,...>.
32  * Please see jb::opencl::reducer_concept for details.
33  * @tparam input_type_t the host type that represents the input
34  * @tparam output_type_t the host type that represents the output
35  */
36 template <typename reducer, typename input_type_t, typename output_type_t>
38 public:
39  //@{
40  /**
41  * @name Type traits
42  */
43  /// The host type used to represent the input into the reduction
44  using input_type = input_type_t;
45 
46  /// The host type representing the output of the reduction
47  using output_type = output_type_t;
48 
49  /**
50  * The type of the vector used to store the results.
51  *
52  * The final output is a single element, but OpenCL makes it easier
53  * to treat that as a result of a vector with a single element.
54  */
55  using vector_iterator = typename boost::compute::vector<input_type>::iterator;
56  //@}
57 
58  /**
59  * Constructor. Initialize a generic reduce for a given size and
60  * device queue.
61  *
62  * @param size the size of the input array
63  * @param queue a command queue to communicate with a single OpenCL device
64  */
65  generic_reduce(std::size_t size, boost::compute::command_queue const& queue)
66  : queue_(queue)
67  , program_(create_program(queue))
68  , initial_(program_, "generic_transform_reduce_initial")
69  , intermediate_(program_, "generic_transform_reduce_intermediate")
70  , ping_(queue_.get_context())
71  , pong_(queue_.get_context()) {
72  // ... size the first pass of the reduction. We need to balance two
73  // constraints:
74  // (a) We cannot use more memory than whatever the device
75  // supports, since the algorithm uses one entry in scratch per
76  // local thread that could limit the number of local threads.
77  // This is extremely unlikely, but who wants crashes?
78  // (b) The maximum size of a workgroup limits by how much we can
79  // reduce in a single pass, so smaller workgroups require more
80  // passes and more intermediate memory.
81  // first we query the device ...
82  boost::compute::device device = queue_.get_device();
83  std::size_t local_mem_size = device.local_memory_size();
84  max_workgroup_size_ = device.max_work_group_size();
85 
86  // ... the we convert the bytes into array sizes ...
87  boost::compute::kernel sizer(program_, "scratch_element_size");
88  boost::compute::vector<boost::compute::ulong_> dev(1, queue_.get_context());
89  sizer.set_arg(0, dev);
90  queue_.enqueue_1d_range_kernel(sizer, 0, 1, 1).wait();
91  std::vector<boost::compute::ulong_> host(1);
92  boost::compute::copy(dev.begin(), dev.end(), host.begin(), queue_);
93  sizeof_output_type_ = host[0];
94  scratch_size_ = local_mem_size / sizeof_output_type_;
95  // ... this is the largest amount that we might need from local
96  // scratch ...
98 
99  // ... now on to compute the reduction factor, first how many
100  // threads can we effectively use, nless local scratch is tiny,
101  // almost all of the time this would be max_workgroup_size ...
103  }
104 
105  /**
106  * Schedule the execution of a reduction.
107  *
108  * The algorithm works in phases, each phase runs in the OpenCL
109  * device, reducing the input to a (typically much smaller) vector,
110  * which is stored in either the ping_ or pong_ variable.
111  *
112  * If necessary the algorithm schedules multiple repeated phases,
113  * asynchronously (but waiting for each other), until the output has
114  * been reduced to a vector with a single element.
115  *
116  * @param begin the beginning of the range to be reduced.
117  * @param end the end of the range to be reduced.
118  * @param wait a list of events to wait for before any work starts
119  * on the device.
120  *
121  * @returns a boost::compute::future<>, when said future is ready,
122  * it contains an iterator pointing to the result. Calls to
123  * execute() invalidate this iterator.
124  */
125  template <typename InputIterator>
126  boost::compute::future<vector_iterator> execute(
127  InputIterator begin, InputIterator end,
128  boost::compute::wait_list const& wait = boost::compute::wait_list()) {
129  auto size = std::distance(begin, end);
130 
131  // ... initialize how much work we can do in each workgroup ...
132  auto workgroup_size = effective_workgroup_size_;
133  // ... that determines how manny workgroups we can work on ...
134  auto workgroups = size / workgroup_size;
135  if (workgroups == 0) {
136  workgroups = 1;
137  }
138  // ... resize the temporary buffers ...
139  ping_.resize(workgroups, queue_);
140  pong_.resize(workgroups, queue_);
141 
142  // ... with those values we can compute how many values will each
143  // thread need to aggregate ...
144  auto div = std::div(
145  static_cast<long long>(size),
146  static_cast<long long>(workgroups * workgroup_size));
147  // ... that is "values-per-thread" ...
148  auto VPT = div.quot + (div.rem != 0);
149 
150  // ... fill up the arguments to call the "initial_" program which
151  // we prepared in the constructor ..
152  int arg = 0;
153  initial_.set_arg(arg++, ping_);
154  initial_.set_arg(arg++, boost::compute::ulong_(VPT));
155  initial_.set_arg(arg++, boost::compute::ulong_(workgroup_size));
156  initial_.set_arg(arg++, boost::compute::ulong_(size));
157  initial_.set_arg(arg++, begin.get_buffer());
158  initial_.set_arg(
159  arg++, boost::compute::local_buffer<output_type>(workgroup_size));
160  // ... schedule that program to start ...
161  auto event = queue_.enqueue_1d_range_kernel(
162  initial_, 0, workgroups * workgroup_size, workgroup_size, wait);
163 
164  // ... now, if there output from the initial (or any future) phase
165  // contains more than one element we need to schedule another
166  // phase ...
167  for (auto pass_output_size = workgroups; pass_output_size > 1;
168  pass_output_size = workgroups) {
169  // ... it is possible (specially towards the end) that we do not
170  // have enough work to fill a "workgroup_size" number of
171  // local work items, in that case, just limit the local size even
172  // further ...
173  if (pass_output_size < workgroup_size) {
174  // ... the workgroup size should always be a power of two,
175  // notice that this number will always be smaller than
176  // pass_output_size because p2ceil() returns the *smallest*
177  // power of two greather or equal than pass_output_size.
178  // Therefore p2ceil()/2 must be strictly smaller. Also notice
179  // that p2ceil() must be at least 2 because pass_output_size
180  // is greater than 1 ...
181  workgroup_size = jb::p2ceil(pass_output_size) / 2;
182  }
183  workgroups = pass_output_size / workgroup_size;
184  // ... so we have set workgroup_size to be smaller than
185  // pass_output_size, therefore this is true:
186  JB_ASSERT_THROW(workgroups > 0);
187 
188  // ... once more, compute the "values per thread" argument ...
189  auto div = std::div(
190  static_cast<long long>(pass_output_size),
191  static_cast<long long>(workgroups * workgroup_size));
192  auto VPT = div.quot + (div.rem != 0);
193 
194  // ... prepare the program to run ...
195  int arg = 0;
196  intermediate_.set_arg(arg++, pong_);
197  intermediate_.set_arg(arg++, boost::compute::ulong_(VPT));
198  intermediate_.set_arg(
199  arg++, boost::compute::ulong_(workgroup_size) /* TPB */);
200  intermediate_.set_arg(arg++, boost::compute::ulong_(pass_output_size));
201  intermediate_.set_arg(arg++, ping_);
202  intermediate_.set_arg(
203  arg++, boost::compute::local_buffer<output_type>(workgroup_size));
204  // ... schedule the "intermediate_" program to reduce the data
205  // from ping_ into pong_ ...
206  event = queue_.enqueue_1d_range_kernel(
207  intermediate_, 0, workgroups * workgroup_size, workgroup_size,
208  boost::compute::wait_list(event));
209 
210  // ... this does not swap anything in device memory, we just
211  // swap our pointers in the host, so the next iteration of the
212  // loop (or if we exit here) has the output in ping_ ...
213  std::swap(ping_, pong_);
214  }
215  // ... return the results ...
216  return boost::compute::make_future(ping_.begin(), event);
217  }
218 
219  /**
220  * Schedule a reduction for a full vector.
221  *
222  * See the other overload of this member function for details.
223  *
224  * @param src the vector to be reduced
225  * @param wait a wait list that must be completed before the
226  * reduction starts
227  * @returns a boost::compute::future<>, when said future is ready,
228  * it contains an iterator pointing to the result. Calls to
229  * execute() invalidate this iterator.
230  */
231  boost::compute::future<vector_iterator> execute(
232  boost::compute::vector<input_type> const& src,
233  boost::compute::wait_list const& wait = boost::compute::wait_list()) {
234  return execute(src.begin(), src.end(), wait);
235  }
236 
237  static boost::compute::program
238  create_program(boost::compute::command_queue const& queue) {
239  std::ostringstream os;
240  auto device = queue.get_device();
241  if (device.supports_extension("cl_khr_fp64")) {
242  os << "#pragma OPENCL EXTENSION cl_khr_fp64 : enable\n\n";
243  }
244  os << "typedef " << boost::compute::type_name<input_type_t>()
245  << " reduce_input_t;\n";
246  os << "typedef " << boost::compute::type_name<output_type_t>()
247  << " reduce_output_t;\n";
248  os << "inline void reduce_initialize(reduce_output_t* lhs) {\n";
249  os << " " << reducer::initialize_body("lhs") << "\n";
250  os << "}\n";
251  os << "inline void reduce_transform(\n";
252  os << " reduce_output_t* lhs, reduce_input_t const* value,\n";
253  os << " unsigned long offset) {\n";
254  os << " " << reducer::transform_body("lhs", "value", "offset") << "\n";
255  os << "}\n";
256  os << "inline void reduce_combine(\n";
257  os << " reduce_output_t* accumulated, reduce_output_t* value) {\n";
258  os << " " << reducer::combine_body("accumulated", "value") << "\n";
259  os << "}\n";
260  os << "\n";
262  auto program = boost::compute::program::create_with_source(
263  os.str(), queue.get_context());
264  try {
265  program.build();
266  } catch (boost::compute::opencl_error const& ex) {
267  JB_LOG(error) << "errors building program: " << ex.what() << "\n"
268  << program.build_log() << "\n";
269  JB_LOG(error) << "Program body\n================\n"
270  << os.str() << "\n================\n";
271  throw;
272  }
273  return program;
274  }
275 
276 private:
277 private:
278  boost::compute::command_queue queue_;
279  boost::compute::program program_;
280  boost::compute::kernel initial_;
281  boost::compute::kernel intermediate_;
282  std::size_t max_workgroup_size_;
283  std::size_t sizeof_output_type_;
284  std::size_t scratch_size_;
286  boost::compute::vector<output_type> ping_;
287  boost::compute::vector<output_type> pong_;
288 };
289 
290 } // namespace opencl
291 } // namespace jb
292 
293 #endif // jb_opencl_generic_reduce_hpp
input_type_t input_type
The host type used to represent the input into the reduction.
boost::compute::kernel initial_
boost::compute::future< vector_iterator > execute(InputIterator begin, InputIterator end, boost::compute::wait_list const &wait=boost::compute::wait_list())
Schedule the execution of a reduction.
boost::compute::vector< output_type > pong_
Implement a generic reducer for OpenCL.
boost::compute::command_queue queue_
typename boost::compute::vector< input_type >::iterator vector_iterator
The type of the vector used to store the results.
char const generic_reduce_program_source[]
Contains the code for the kernels used in computing the argmax.
static boost::compute::program create_program(boost::compute::command_queue const &queue)
boost::compute::kernel intermediate_
#define JB_ASSERT_THROW(PRED)
output_type_t output_type
The host type representing the output of the reduction.
constexpr std::uint64_t p2ceil(std::uint64_t n)
Find the smallest power of 2 larger than n for a 64-bit integer.
Definition: p2ceil.hpp:34
boost::compute::vector< output_type > ping_
#define JB_LOG(lvl)
Definition: log.hpp:70
boost::compute::future< vector_iterator > execute(boost::compute::vector< input_type > const &src, boost::compute::wait_list const &wait=boost::compute::wait_list())
Schedule a reduction for a full vector.
generic_reduce(std::size_t size, boost::compute::command_queue const &queue)
Constructor.
boost::compute::program program_
The top-level namespace for the JayBeams library.
Definition: as_hhmmss.hpp:7