JayBeams  0.1
Another project to have fun coding.
itch5moldreplay.cpp
Go to the documentation of this file.
3 #include <jb/as_hhmmss.hpp>
4 #include <jb/fileio.hpp>
5 #include <jb/log.hpp>
6 
7 #include <boost/asio/io_service.hpp>
8 #include <boost/asio/ip/multicast.hpp>
9 #include <boost/asio/ip/udp.hpp>
10 
11 #include <chrono>
12 #include <iostream>
13 #include <stdexcept>
14 #include <thread>
15 
16 namespace {
17 
18 class config : public jb::config_object {
19 public:
20  config();
22 
23  void validate() const override;
24 
30 };
31 
32 class replayer {
33 public:
34  //@{
35  /**
36  * @name Type traits
37  */
39  //@}
40 
41  /// Initialize an empty handler
42  replayer(
43  boost::asio::ip::udp::socket&& s,
44  boost::asio::ip::udp::endpoint const& ep,
46  : socket_(std::move(s))
47  , endpoint_(ep)
48  , pacer_(cfg, jb::itch5::mold_udp_pacer<>::session_id_type("ITCH/RPLY")) {
49  }
50 
51  /// Handle all messages as blobs
52  void handle_unknown(
53  time_point const& recv_ts, jb::itch5::unknown_message const& msg) {
54  auto sink = [this](auto buffers) { socket_.send_to(buffers, endpoint_); };
55  auto sleeper = [](jb::itch5::mold_udp_pacer<>::duration d) {
56  if (d > std::chrono::seconds(10)) {
57  JB_LOG(info) << "Sleep request for " << jb::as_hh_mm_ss_u(d);
58  d = std::chrono::seconds(10);
59  }
60  std::this_thread::sleep_for(d);
61  };
62  pacer_.handle_message(recv_ts, msg, sink, sleeper);
63  }
64 
65  /// Return the current timestamp for delay measurements
66  time_point now() const {
67  return std::chrono::steady_clock::now();
68  }
69 
70 private:
71  boost::asio::ip::udp::socket socket_;
72  boost::asio::ip::udp::endpoint endpoint_;
74 };
75 
76 } // anonymous namespace
77 
78 int main(int argc, char* argv[]) try {
79  config cfg;
80  cfg.load_overrides(
81  argc, argv, std::string("itch5moldreplay.yaml"), "JB_ROOT");
82  jb::log::init(cfg.log());
83 
84  boost::asio::io_service io_service;
85  auto address = boost::asio::ip::address::from_string(cfg.destination());
86  boost::asio::ip::udp::endpoint endpoint(address, cfg.port());
87  JB_LOG(info) << "Sending to endpoint=" << endpoint;
88  boost::asio::ip::udp::socket s(io_service, endpoint.protocol());
89  s.set_option(boost::asio::ip::multicast::enable_loopback(true));
90 
91  boost::iostreams::filtering_istream in;
92  jb::open_input_file(in, cfg.input_file());
93 
94  replayer rep(std::move(s), endpoint, cfg.pacer());
95  jb::itch5::process_iostream_mlist<replayer>(in, rep);
96 
97  return 0;
98 } catch (jb::usage const& u) {
99  std::cout << u.what() << std::endl;
100  return u.exit_status();
101 } catch (std::exception const& ex) {
102  std::cerr << "Standard exception raised: " << ex.what() << std::endl;
103  return 1;
104 } catch (...) {
105  std::cerr << "Unknown exception raised" << std::endl;
106  return 1;
107 }
108 
109 namespace {
110 
111 int default_multicast_port() {
112  return 50000;
113 }
114 
115 std::string default_multicast_group() {
116  return "::1";
117 }
118 
119 config::config()
120  : input_file(
121  desc("input-file").help("An input file with ITCH-5.0 messages."),
122  this)
123  , destination(
124  desc("destination")
125  .help("The destination for the UDP messages. "
126  "The destination can be a unicast or multicast address."),
127  this, default_multicast_group())
128  , port(
129  desc("port").help("The destination port for the UDP messages. "),
130  this, default_multicast_port())
131  , log(desc("log", "logging"), this)
132  , pacer(desc("pacer", "mold-udp-pacer"), this) {
133 }
134 
135 void config::validate() const {
136  if (input_file() == "") {
137  throw jb::usage(
138  "Missing input-file setting."
139  " You must specify an input file.",
140  1);
141  }
142  log().validate();
143  pacer().validate();
144 }
145 
146 } // anonymous namespace
clock_type::time_point time_point
A convenience alias for clock_type::time_point.
Base class for all configuration objects.
virtual void validate() const
Validate the settings.
Send a sequence of raw ITCH-5.x messages as MoldUDP64 packets, trying to match the original time inte...
int exit_status() const
Definition: usage.hpp:21
STL namespace.
int main(int argc, char *argv[])
clock_type::duration duration
The duration (the difference between two time points) type.
void init(config const &cfg)
Initialize the logging functions using the configuration provided.
Definition: log.cpp:190
#define config_object_constructors(NAME)
Helper class to easily define configuration attributes.
A simple class to communicate the result of parsing the options.
Definition: usage.hpp:11
Helper class to print time durations in HH:MM:SS.UUUUUU format.
Definition: as_hhmmss.hpp:58
void open_input_file(boost::iostreams::filtering_istream &in, std::string const &filename)
Open a file for reading.
Definition: fileio.cpp:27
Configuration object for the jb::itch5::mold_udp_pacer class.
#define JB_LOG(lvl)
Definition: log.hpp:70
The top-level namespace for the JayBeams library.
Definition: as_hhmmss.hpp:7