JayBeams  0.1
Another project to have fun coding.
moldreplay.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  *
4  * A program to replay raw ITCH-5.x files as MoldUDP packets.
5  *
6  * This program replays a ITCH-5.x file via multicast, simulating the
7  * behavior of a market data feed.
8  */
9 #include <jb/ehs/acceptor.hpp>
12 #include <jb/as_hhmmss.hpp>
13 #include <jb/config_object.hpp>
14 #include <jb/fileio.hpp>
15 #include <jb/launch_thread.hpp>
16 #include <jb/log.hpp>
17 
18 #include <beast/http.hpp>
19 #include <boost/asio/io_service.hpp>
20 #include <boost/asio/ip/address.hpp>
21 #include <boost/asio/ip/multicast.hpp>
22 #include <boost/asio/ip/tcp.hpp>
23 #include <boost/asio/ip/udp.hpp>
24 
25 #include <chrono>
26 #include <iostream>
27 #include <stdexcept>
28 #include <thread>
29 
30 /// Types and functions used in this program
31 namespace {
32 /**
33  * Program configuration.
34  */
35 class config : public jb::config_object {
36 public:
37  config();
39 
40  void validate() const override;
41 
44  jb::config_attribute<config, std::string> secondary_destination;
45  jb::config_attribute<config, int> secondary_port;
52 };
53 
54 class session : public std::enable_shared_from_this<session> {
55 public:
56  //@{
57  /**
58  * @name Type traits
59  */
61  //@}
62 
63  /// Create a new session to replay a ITCH-5.x file.
64  session(config const& cfg);
65 
66  /// Start running a new session
67  void start();
68 
69  /// Stop a running session
70  void stop();
71 
72  /// The last message count processed
73  std::uint32_t last_message_count() const {
74  return last_message_count_.load(std::memory_order_relaxed);
75  }
76 
77  /// The offset of the last message processed
78  std::uint64_t last_message_offset() const {
79  return last_message_offset_.load(std::memory_order_relaxed);
80  }
81 
82  /// Implement the callback for jb::itch5::process_iostream_mlist<>
83  void handle_unknown(
84  time_point const& recv_ts, jb::itch5::unknown_message const& msg);
85 
86  /// Return the current timestamp for delay measurements
87  time_point now() const {
88  return std::chrono::steady_clock::now();
89  }
90 
91 private:
92  config cfg_;
93  std::atomic_bool stop_;
95  std::atomic<std::uint32_t> last_message_count_;
96  std::atomic<std::uint64_t> last_message_offset_;
97  boost::asio::io_service io_;
98  boost::asio::ip::udp::socket s0_;
99  boost::asio::ip::udp::endpoint ep0_;
100  boost::asio::ip::udp::socket s1_;
101  boost::asio::ip::udp::endpoint ep1_;
102  bool ep1_enabled_;
103 };
104 
105 class replayer_control {
106 public:
107  explicit replayer_control(config const& cfg);
108 
109  enum class state { idle, starting, replaying, stopping };
110 
111  void status(jb::ehs::response_type& res) const;
112  void start(jb::ehs::request_type const& req, jb::ehs::response_type& res);
113  void stop(jb::ehs::request_type const& req, jb::ehs::response_type& res);
114 
115 private:
116  bool start_check();
117  void replay_done();
118 
119 private:
120  config cfg_;
121  mutable std::mutex mu_;
122  state current_state_;
123  std::thread session_thread_;
124  std::shared_ptr<session> session_;
125 };
126 
127 } // anonymous namespace
128 
129 int main(int argc, char* argv[]) try {
130  // Load the configuration ...
131  config cfg;
132  cfg.load_overrides(argc, argv, std::string("moldreplay.yaml"), "JB_ROOT");
133  jb::log::init(cfg.log());
134 
135  boost::asio::io_service io_service;
136 
137  using endpoint = boost::asio::ip::tcp::endpoint;
138  using address = boost::asio::ip::address;
139  endpoint ep{address::from_string(cfg.control_host()), cfg.control_port()};
140 
141  // ... create the replayer control, this is where the main work
142  // happens ...
143  auto replayer = std::make_shared<replayer_control>(cfg);
144 
145  // ... create a dispatcher to process the HTTP requests, register
146  // some basic handlers ...
147  using jb::ehs::request_type;
149  auto dispatcher = std::make_shared<jb::ehs::request_dispatcher>("moldreplay");
150  dispatcher->add_handler("/", [](request_type const&, response_type& res) {
151  res.insert("Content-type", "text/plain");
152  res.body = "Server running...\r\n";
153  });
154  dispatcher->add_handler(
155  "/config", [cfg](request_type const&, response_type& res) {
156  res.insert("Content-type", "text/plain");
157  std::ostringstream os;
158  os << cfg << "\r\n";
159  res.body = os.str();
160  });
161  // ... we need to use a weak_ptr to avoid a cycle of shared_ptr ...
162  std::weak_ptr<jb::ehs::request_dispatcher> disp = dispatcher;
163  dispatcher->add_handler(
164  "/metrics", [disp](request_type const&, response_type& res) {
165  std::shared_ptr<jb::ehs::request_dispatcher> d(disp);
166  if (not d) {
167  res.result(beast::http::status::internal_server_error);
168  res.body = std::string("An internal error occurred\r\n"
169  "Null request handler in /metrics\r\n");
170  return;
171  }
172  res.set("content-type", "text/plain; version=0.0.4");
173  d->append_metrics(res);
174  });
175  dispatcher->add_handler(
176  "/replay-status", [replayer](request_type const&, response_type& res) {
177  replayer->status(res);
178  });
179  dispatcher->add_handler(
180  "/replay-start", [replayer](request_type const& req, response_type& res) {
181  replayer->start(req, res);
182  });
183  dispatcher->add_handler(
184  "/replay-stop", [replayer](request_type const& req, response_type& res) {
185  replayer->stop(req, res);
186  });
187 
188  // ... create an acceptor to handle incoming connections, if we wanted
189  // to, we could create multiple acceptors on different addresses
190  // pointing to the same dispatcher ...
191  jb::ehs::acceptor acceptor(io_service, ep, dispatcher);
192 
193  // ... run the program forever ...
194  io_service.run();
195 
196  return 0;
197 } catch (jb::usage const& u) {
198  std::cout << u.what() << std::endl;
199  return u.exit_status();
200 } catch (std::exception const& ex) {
201  std::cerr << "Standard exception raised: " << ex.what() << std::endl;
202  return 1;
203 } catch (...) {
204  std::cerr << "Unknown exception raised" << std::endl;
205  return 1;
206 }
207 
208 namespace {
209 
210 /// Default values for the program configuration
211 namespace defaults {
212 std::string const primary_destination = "127.0.0.1";
213 std::string const secondary_destination = "127.0.0.1";
214 int const primary_port = 12300;
215 int const secondary_port = 12301;
216 
217 std::string control_host = "0.0.0.0";
218 int const control_port = 23000;
219 } // namespace defaults
220 
221 config::config()
222  : primary_destination(
223  desc("primary-destination")
224  .help("The destination for the UDP messages. "
225  "The destination can be a unicast or multicast address."),
226  this, defaults::primary_destination)
227  , primary_port(
228  desc("primary-port")
229  .help("The destination port for the UDP messages."),
230  this, defaults::primary_port)
231  , secondary_destination(
232  desc("secondary-destination")
233  .help("The destination for the UDP messages. "
234  "The destination can be empty, a unicast, or a multicast "
235  "address."),
236  this, defaults::secondary_destination)
237  , secondary_port(
238  desc("secondary-port")
239  .help("The destination port for the UDP messages."),
240  this, defaults::secondary_port)
241  , control_host(
242  desc("control-host")
243  .help("Where does the server listen for control connections."
244  "Typically this is an address for the current host,"
245  "for example: 'localhost', '0.0.0.0', or '::1'."),
246  this, defaults::control_host)
247  , control_port(
248  desc("control-port").help("The port to receive control connections."),
249  this, defaults::control_port)
250  , input_file(
251  desc("input-file").help("The file to replay when requested."), this)
252  , replay_session(
253  desc("replay-session", "thread-config")
254  .help("Configure the replay session threads."),
255  this, jb::thread_config().name("replay"))
256  , pacer(
257  desc("pacer", "mold-udp-pacer").help("Configure the ITCH-5.x pacer"),
258  this)
259  , log(desc("log", "logging"), this) {
260 }
261 
262 void config::validate() const {
263  if (primary_destination() == "") {
264  throw jb::usage("Missing primary-destination argument or setting.", 1);
265  }
266  if (input_file() == "") {
267  throw jb::usage("Missing input-file argument or setting.", 1);
268  }
269  log().validate();
270 }
271 
272 session::session(config const& cfg)
273  : cfg_(cfg)
274  , stop_(false)
275  , pacer_(cfg.pacer())
276  , last_message_count_(0)
277  , last_message_offset_(0)
278  , io_()
279  , s0_(io_)
280  , ep0_()
281  , s1_(io_)
282  , ep1_()
283  , ep1_enabled_(false) {
284 #ifndef ATOMIC_BOOL_LOCK_FREE
285 #error "Missing ATOMIC_BOOL_LOCK_FREE required by C++11 standard"
286 #endif // ATOMIC_BOOL_LOCK_FREE
287  static_assert(
288  ATOMIC_BOOL_LOCK_FREE == 2, "Class requires lock-free std::atomic<bool>");
289 
290  auto address0 =
291  boost::asio::ip::address::from_string(cfg_.primary_destination());
292  boost::asio::ip::udp::endpoint ep0(address0, cfg_.primary_port());
293  boost::asio::ip::udp::socket s0(io_, ep0.protocol());
294  if (ep0.address().is_multicast()) {
295  s0.set_option(boost::asio::ip::multicast::enable_loopback(true));
296  }
297  s0_ = std::move(s0);
298  ep0_ = std::move(ep0);
299 
300  if (cfg_.secondary_destination() != "") {
301  auto address1 =
302  boost::asio::ip::address::from_string(cfg_.secondary_destination());
303  boost::asio::ip::udp::endpoint ep1(address1, cfg_.secondary_port());
304  boost::asio::ip::udp::socket s1(io_, ep1.protocol());
305  if (ep1.address().is_multicast()) {
306  s1.set_option(boost::asio::ip::multicast::enable_loopback(true));
307  }
308  s1_ = std::move(s1);
309  ep1_ = std::move(ep1);
310  ep1_enabled_ = true;
311  }
312 }
313 
314 void session::start() {
315  auto self = shared_from_this();
316 
317  boost::iostreams::filtering_istream in;
318  jb::open_input_file(in, cfg_.input_file());
319  jb::itch5::process_iostream_mlist<session>(in, *self);
320 }
321 
322 void session::stop() {
323  stop_.store(true, std::memory_order_release);
324 }
325 
326 void session::handle_unknown(
327  time_point const& recv_ts, jb::itch5::unknown_message const& msg) {
328  if (stop_.load(std::memory_order_consume)) {
329  throw std::runtime_error("stopping replay thread");
330  }
331  last_message_count_.store(msg.count(), std::memory_order_relaxed);
332  last_message_offset_.store(msg.offset(), std::memory_order_relaxed);
333  auto sink = [this](auto buffers) {
334  s0_.send_to(buffers, ep0_);
335  if (ep1_enabled_) {
336  s1_.send_to(buffers, ep1_);
337  }
338  };
339  auto sleeper = [](jb::itch5::mold_udp_pacer<>::duration d) {
340  // ... never sleep for more than 10 seconds, the feeds typically
341  // have large idle times early and waiting for hours to start
342  // doing anything interesting is kind of boring ...
343  if (d > std::chrono::seconds(10)) {
344  d = std::chrono::seconds(10);
345  }
346  std::this_thread::sleep_for(d);
347  };
348  pacer_.handle_message(recv_ts, msg, sink, sleeper);
349 }
350 
351 replayer_control::replayer_control(config const& cfg)
352  : cfg_(cfg)
353  , mu_()
354  , current_state_(state::idle) {
355 }
356 
357 void replayer_control::status(jb::ehs::response_type& res) const {
358  res.set("content-type", "text/plain");
359 
360  std::lock_guard<std::mutex> guard(mu_);
361  std::ostringstream os;
362  switch (current_state_) {
363  case state::idle:
364  res.body = "idle\nNothing to see here folks\n";
365  return;
366  case state::starting:
367  os << "starting\nMessages arriving shortly\n";
368  break;
369  case state::stopping:
370  os << "stopping\nMessages will stop flowing\n";
371  break;
372  case state::replaying:
373  os << "replaying\n";
374  break;
375  default:
376  res.result(beast::http::status::internal_server_error);
377  res.body = "Unkown state\n";
378  return;
379  }
380  JB_ASSERT_THROW(session_.get() != 0);
381  os << " last-count: " << session_->last_message_count() << "\n"
382  << " last-offset: " << session_->last_message_offset() << "\n"
383  << "\n";
384  res.body = os.str();
385 }
386 
387 void replayer_control::start(
389  std::lock_guard<std::mutex> guard(mu_);
390  if (current_state_ != state::idle) {
391  res.result(beast::http::status::precondition_required);
392  res.body = "request rejected, current status is not idle\n";
393  return;
394  }
395  // ... set the result before any computation, if there is a failure
396  // it will raise an exception and the caller sends back the error
397  // ...
398  res.result(beast::http::status::ok);
399  res.body = "request succeeded, started new session\n";
400  auto s = std::make_shared<session>(cfg_);
401  // ... wait until this point to set the state to starting, if there
402  // are failures before we have not changed the state and can
403  // continue ...
404  current_state_ = state::starting;
405  jb::launch_thread(session_thread_, cfg_.replay_session(), [s, this]() {
406  // ... check if the session can start, maybe it was stopped
407  // before the thread started ...
408  if (not start_check()) {
409  return;
410  }
411  // ... run the session, without holding the mu_ lock ...
412  try {
413  s->start();
414  } catch (...) {
415  }
416  // ... reset the state to idle, even if an exception is raised
417  // ...
418  replay_done();
419  });
420  session_thread_.detach();
421  session_ = s;
422 }
423 
424 void replayer_control::stop(
426  std::lock_guard<std::mutex> guard(mu_);
427  if (current_state_ != state::replaying and
428  current_state_ != state::starting) {
429  res.result(beast::http::status::precondition_required);
430  res.body = "request rejected, current status is not valid\n";
431  return;
432  }
433  current_state_ = state::stopping;
434  res.result(beast::http::status::ok);
435  res.body = "request succeeded, stopping current session\n";
436  JB_ASSERT_THROW(session_.get() != 0);
437  session_->stop();
438 }
439 
440 bool replayer_control::start_check() {
441  std::lock_guard<std::mutex> guard(mu_);
442  if (current_state_ != state::starting) {
443  return false;
444  }
445  current_state_ = state::replaying;
446  return true;
447 }
448 
449 void replayer_control::replay_done() {
450  std::lock_guard<std::mutex> guard(mu_);
451  current_state_ = state::idle;
452  session_.reset();
453 }
454 
455 } // anonymous namespace
Define defaults for program parameters.
clock_type::time_point time_point
A convenience alias for clock_type::time_point.
Base class for all configuration objects.
beast::http::request< beast::http::string_body > request_type
The request type used for JayBeams Embedded HTTP Servers.
Definition: base_types.hpp:17
virtual void validate() const
Validate the settings.
Send a sequence of raw ITCH-5.x messages as MoldUDP64 packets, trying to match the original time inte...
int exit_status() const
Definition: usage.hpp:21
clock_type::duration duration
The duration (the difference between two time points) type.
beast::http::response< beast::http::string_body > response_type
The response type used for JayBeams Embedded HTTP Servers.
Definition: base_types.hpp:20
void init(config const &cfg)
Initialize the logging functions using the configuration provided.
Definition: log.cpp:190
Hold the configuration to initialize threads.
#define config_object_constructors(NAME)
Helper class to easily define configuration attributes.
#define JB_ASSERT_THROW(PRED)
A simple class to communicate the result of parsing the options.
Definition: usage.hpp:11
void open_input_file(boost::iostreams::filtering_istream &in, std::string const &filename)
Open a file for reading.
Definition: fileio.cpp:27
Create a control server for the program.
Definition: acceptor.hpp:17
int main(int argc, char *argv[])
Definition: moldreplay.cpp:129
void launch_thread(std::thread &t, thread_config const &config, Function &&f, A &&... a)
Create a new thread, configure it as desired, and then call a user-defined function.
std::uint32_t count() const
std::uint64_t offset() const