JayBeams  0.1
Another project to have fun coding.
mold2inside.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  *
4  * This program receives MoldUDP64 packets containing ITCH-5.0 messges
5  * and generates the inside quotes in an ASCII (though potentially
6  * compressed) file. The program also generates statistics about the
7  * feed and the book build, using jb::offline_feed_statistics.
8  *
9  * It reports the percentiles of "for each change in the inside, how
10  * long did it take to process the event, and what was the elapsed
11  * time since the last change to the inside".
12  */
17 #include <jb/fileio.hpp>
18 #include <jb/log.hpp>
19 
20 #include <stdexcept>
21 #include <unordered_map>
22 
23 /**
24  * Define types and functions used in this program.
25  */
26 namespace {
27 
28 /// Configuration parameters for itch5inside
29 class config : public jb::config_object {
30 public:
31  config();
33 
34  void validate() const override;
35 
41  symbol_stats;
42  jb::config_attribute<config, bool> enable_symbol_stats;
43 };
44 
45 } // anonymous namespace
46 
47 #define KNOWN_ITCH5_MESSAGES \
48  jb::itch5::add_order_message, jb::itch5::add_order_mpid_message, \
49  jb::itch5::broken_trade_message, jb::itch5::cross_trade_message, \
50  jb::itch5::ipo_quoting_period_update_message, \
51  jb::itch5::market_participant_position_message, \
52  jb::itch5::mwcb_breach_message, jb::itch5::mwcb_decline_level_message, \
53  jb::itch5::net_order_imbalance_indicator_message, \
54  jb::itch5::order_cancel_message, jb::itch5::order_delete_message, \
55  jb::itch5::order_executed_message, \
56  jb::itch5::order_executed_price_message, \
57  jb::itch5::order_replace_message, \
58  jb::itch5::reg_sho_restriction_message, \
59  jb::itch5::stock_directory_message, \
60  jb::itch5::stock_trading_action_message, \
61  jb::itch5::system_event_message, jb::itch5::trade_message
62 
63 int main(int argc, char* argv[]) try {
64  config cfg;
65  cfg.load_overrides(argc, argv, std::string("mold2inside.yaml"), "JB_ROOT");
66  jb::log::init(cfg.log());
67 
68  boost::asio::io_service io_service;
69 
70  boost::iostreams::filtering_ostream out;
71  jb::open_output_file(out, cfg.output_file());
72 
73  std::map<jb::itch5::stock_t, jb::offline_feed_statistics> per_symbol;
74  jb::offline_feed_statistics stats(cfg.stats());
75 
77  [&stats, &out](
78  jb::itch5::message_header const& header,
80  updated_book,
81  jb::itch5::book_update const& update) {
82  auto pl = std::chrono::steady_clock::now() - update.recvts;
84  stats, out, header, updated_book, update, pl);
85  };
86  if (cfg.enable_symbol_stats()) {
87  // ... replace the calback with one that also records the stats
88  // for each symbol ...
89  jb::offline_feed_statistics::config symcfg(cfg.symbol_stats());
90  cb = [&stats, &out, &per_symbol, symcfg](
91  jb::itch5::message_header const& header,
93  updated_book,
94  jb::itch5::book_update const& update) {
95  auto pl = std::chrono::steady_clock::now() - update.recvts;
97  stats, out, header, updated_book, update, pl)) {
98  return;
99  }
100  auto location = per_symbol.find(update.stock);
101  if (location == per_symbol.end()) {
102  auto p = per_symbol.emplace(
103  update.stock, jb::offline_feed_statistics(symcfg));
104  location = p.first;
105  }
106  location->second.sample(header.timestamp.ts, pl);
107  };
108  }
109 
112  auto process_buffer = [&handler](
113  std::chrono::steady_clock::time_point recv_ts, std::uint64_t msgcnt,
114  std::size_t msgoffset, char const* msgbuf, std::size_t msglen) {
118  process(handler, recv_ts, msgcnt, msgoffset, msgbuf, msglen);
119  };
120 
122  io_service, std::move(process_buffer), cfg.receiver());
123 
124  io_service.run();
125 
127  for (auto const& i : per_symbol) {
128  i.second.print_csv(i.first.c_str(), std::cout);
129  }
130  stats.print_csv("__aggregate__", std::cout);
131 
132  return 0;
133 } catch (jb::usage const& u) {
134  std::cerr << u.what() << std::endl;
135  return u.exit_status();
136 } catch (std::exception const& ex) {
137  std::cerr << "Standard exception raised: " << ex.what() << std::endl;
138  return 1;
139 } catch (...) {
140  std::cerr << "Unknown exception raised" << std::endl;
141  return 1;
142 }
143 
144 namespace {
145 
146 namespace defaults {
147 // Define the default per-symbol stats
148 jb::offline_feed_statistics::config per_symbol_stats() {
150  .reporting_interval_seconds(24 * 3600) // effectively disable updates
151  .max_processing_latency_nanoseconds(10000) // limit memory usage
152  .max_interarrival_time_nanoseconds(10000) // limit memory usage
153  .max_messages_per_microsecond(1000) // limit memory usage
154  .max_messages_per_millisecond(10000) // limit memory usage
155  .max_messages_per_second(10000) // limit memory usage
156  ;
157 }
158 
159 std::string const local_address = "";
160 std::string const address = "::1";
161 int const port = 50000;
162 } // namespace defaults
163 
164 config::config()
165  : receiver(
166  desc("receiver"), this, jb::itch5::udp_receiver_config()
167  .port(defaults::port)
168  .local_address(defaults::local_address)
169  .address(defaults::address))
170  , output_file(
171  desc("output-file")
172  .help("The name of the file where to store the inside data."
173  " Files ending in .gz are automatically compressed."),
174  this)
175  , log(desc("log", "logging"), this)
176  , stats(desc("stats", "offline-feed-statistics"), this)
177  , symbol_stats(
178  desc("symbol-stats", "offline-feed-statistics"), this,
179  defaults::per_symbol_stats())
180  , enable_symbol_stats(
181  desc("enable-symbol-stats")
182  .help(
183  "If set, enable per-symbol statistics."
184  " Collecting per-symbol statistics is expensive in both"
185  " memory and execution time, so it is disabled by default."),
186  this, false) {
187 }
188 
189 void config::validate() const {
190 
191  if (output_file() == "") {
192  throw jb::usage(
193  "Missing output-file setting."
194  " You must specify an output file.",
195  1);
196  }
197  log().validate();
198  stats().validate();
199  symbol_stats().validate();
200 }
201 
202 } // anonymous namespace
jb::config_attribute< config, int > reporting_interval_seconds
Compute the book and call a user-defined callback on each change.
Define the header common to all ITCH 5.0 messages.
Define defaults for program parameters.
clock_type::time_point time_point
A convenience alias for clock_type::time_point.
Base class for all configuration objects.
virtual void validate() const
Validate the settings.
int exit_status() const
Definition: usage.hpp:21
Keep statistics about a feed and its offline processor.
void open_output_file(boost::iostreams::filtering_ostream &out, std::string const &filename)
Open a file for writing.
Definition: fileio.cpp:12
int main(int argc, char *argv[])
Definition: mold2inside.cpp:63
void init(config const &cfg)
Initialize the logging functions using the configuration provided.
Definition: log.cpp:190
#define config_object_constructors(NAME)
Helper class to easily define configuration attributes.
std::function< void(message_header const &header, order_book< book_type > const &updated_book, book_update const &update)> callback_type
Define the callback type.
A simple class to communicate the result of parsing the options.
Definition: usage.hpp:11
A flat struct to represent updates to an order book.
bool generate_inside(jb::offline_feed_statistics &stats, std::ostream &out, jb::itch5::message_header const &header, jb::itch5::order_book< book_type > const &book, jb::itch5::book_update const &update, duration_t processing_latency)
Determine if this event changes the inside, if so, record the statistics and output the result...
Process a buffer with a single message: parse it and call the handler.
Configure an map_based_order_book config object.
static void print_csv_header(std::ostream &os)
Print a CSV header.
#define KNOWN_ITCH5_MESSAGES
Definition: mold2inside.cpp:47
Maintain the ITCH-5.0 order book for a single security.
Definition: order_book.hpp:57
Configure an offline_feed_statistics object.
A configuration object for UDP receivers.
Create and manage a socket to receive MoldUDP64 packets.