JayBeams  0.1
Another project to have fun coding.
moldfeedhandler.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  *
4  * A Feed Handler for the ITCH-5.x protocol over MoldUDP.
5  *
6  * This program receives a ITCH-5.x feed over MoldUDP and generates
7  * normalized inside messages for the feed.
8  */
9 #include <jb/ehs/acceptor.hpp>
18 #include <jb/fileio.hpp>
19 #include <jb/log.hpp>
20 
21 #include <ctime>
22 #include <sstream>
23 #include <stdexcept>
24 #include <type_traits>
25 #include <unordered_map>
26 
27 /**
28  * Define types and functions used in this program.
29  */
30 namespace {
31 /// Configuration parameters for moldfeedhandler
32 class config : public jb::config_object {
33 public:
34  config();
36 
37  void validate() const override;
38 
44  output;
47  using book_config = typename jb::itch5::array_based_order_book::config;
50 };
51 
52 template <typename callback_t>
53 std::unique_ptr<jb::itch5::mold_udp_channel> create_udp_channel(
54  boost::asio::io_service& io, callback_t cb,
55  jb::itch5::udp_receiver_config const& cfg) {
56  if (cfg.port() == 0 or cfg.address() == "") {
57  return std::unique_ptr<jb::itch5::mold_udp_channel>();
58  }
59  return std::make_unique<jb::itch5::mold_udp_channel>(io, std::move(cb), cfg);
60 }
61 
62 /// Define the type of order book used in the program.
64 
65 /// The output layer is composed of multiple instances of this
66 /// function type.
67 using output_function = std::function<void(
68  jb::itch5::message_header const& header, order_book const& updated_book,
69  jb::itch5::book_update const& update)>;
70 
71 /// Create the output function for the file option.
72 output_function create_output_file(config const& cfg) {
73  // ... otherwise create an output iostream and use it ...
74  auto out = std::make_shared<boost::iostreams::filtering_ostream>();
75  jb::open_output_file(*out, cfg.output_file());
76  return [out](
77  jb::itch5::message_header const& header, order_book const& updated_book,
78  jb::itch5::book_update const& update) {
79  auto bid = updated_book.best_bid();
80  auto offer = updated_book.best_offer();
81  *out << header.timestamp.ts.count() << " " << header.stock_locate << " "
82  << update.stock << " " << bid.first.as_integer() << " " << bid.second
83  << " " << offer.first.as_integer() << " " << offer.second << "\n";
84  };
85 }
86 
87 // TODO() - this value is cached, we need think about what happens for
88 // programs that run 24x7 ...
90  using std::chrono::system_clock;
91  // TODO - this should be ::localtime_r(), even though it is not part
92  // of the C++ standard it is part of POSIX and better for threaded
93  // environments ...
94  auto now = std::time(nullptr);
95  std::tm date = *std::localtime(&now);
96  date.tm_sec = 0;
97  date.tm_min = 0;
98  date.tm_hour = 0;
99  return system_clock::from_time_t(std::mktime(&date));
100 }
101 
102 void send_inside_levels_update(
103  boost::asio::ip::udp::socket& socket,
104  boost::asio::ip::udp::endpoint const& destination,
106  jb::itch5::message_header const& header, order_book const& updated_book,
107  jb::itch5::book_update const& update) {
108  // ... filter out messages that do not update the inside ...
109  if (update.buy_sell_indicator == u'B') {
110  if (updated_book.best_bid().first != update.px) {
111  return;
112  }
113  } else {
114  if (updated_book.best_offer().first != update.px) {
115  return;
116  }
117  }
118  // ... prepare the message to send ...
119  // TODO() - the number of levels should be based on the "levels()"
120  // configuration parameter
122  static_assert(
123  std::is_pod<decltype(msg)>::value, "Message type should be a POD type");
125  // TODO() - add configuration to send sizeof(msg) - sizeof(msg.annotations)
126  msg.message_size = sizeof(msg);
127  // TODO() - actually create sequence numbers ...
128  msg.sequence_number = 0;
129  // TODO() - this should be configured, the configuration parameters
130  // should be the short strings (e.g. NASD-PITCH-5), and the feed
131  // identifier should be looked up.
132  msg.market.id = 0;
133  msg.feed.id = 0;
134  msg.feedhandler_ts.nanos =
135  std::chrono::duration_cast<std::chrono::nanoseconds>(
136  std::chrono::system_clock::now() - midnight)
137  .count();
138  // TODO() - another configuration parameter
139  msg.source.id = 0;
140  msg.exchange_ts.nanos = header.timestamp.ts.count();
141  msg.feed_ts.nanos = header.timestamp.ts.count();
142  // TODO() - this should be based on the JayBeams security id.
143  msg.security.id = header.stock_locate;
144  msg.bid_qty[0] = updated_book.best_bid().second;
145  msg.bid_px[0] = updated_book.best_bid().first.as_integer();
146  msg.offer_qty[0] = updated_book.best_offer().second;
147  msg.offer_px[0] = updated_book.best_offer().first.as_integer();
148  // TODO() - this should be based on configuration parameters, and
149  // range checked ...
150  std::memcpy(msg.annotations.mic, "NASD", 4);
151  std::memcpy(msg.annotations.feed_name, "NASD-PITCH-5x", 13);
152  std::memcpy(msg.annotations.source_name, "NASD-PITCH-5x", 13);
153  // TODO() - NASDAQ data is mostly normalized, some NYSE securities
154  // have a different ticker in NASDAQ data vs. CQS and NYSE data.
155  std::memcpy(
156  msg.annotations.security_normalized, update.stock.c_str(),
157  update.stock.wire_size);
158  std::memcpy(
159  msg.annotations.security_feed, update.stock.c_str(),
160  update.stock.wire_size);
161  // TODO() - consider a non-blocking write for the socket
162  socket.send_to(
163  boost::asio::buffer(&msg, msg.message_size.value()), destination);
164  // TODO() - increment a counter to show that the socket was sent,
165  // different counters for success and failure ...
166 }
167 
168 /// Create an output function for a single socket
169 output_function create_output_socket(
170  boost::asio::io_service& io, jb::itch5::udp_sender_config const& cfg) {
171  auto s = jb::itch5::make_socket_udp_send<>(io, cfg);
172  auto socket = std::make_shared<decltype(s)>(std::move(s));
173  auto const address = boost::asio::ip::address::from_string(cfg.address());
174  auto const destination = boost::asio::ip::udp::endpoint(address, cfg.port());
175  auto const mid = midnight();
176  return [socket, cfg, destination, mid](
177  jb::itch5::message_header const& h, order_book const& ub,
178  jb::itch5::book_update const& u) {
179  send_inside_levels_update(*socket, destination, mid, h, ub, u);
180  };
181 }
182 
183 /// Create a composite output function aggregating all the different
184 /// configured outputs
185 output_function
186 create_output_layer(boost::asio::io_service& io, config const& cfg) {
187  std::vector<output_function> outs;
188  if (cfg.output_file() != "") {
189  outs.push_back(create_output_file(cfg));
190  }
191  for (auto const& outcfg : cfg.output()) {
192  if (outcfg.port() == 0 and outcfg.address() == "") {
193  continue;
194  }
195  outs.push_back(create_output_socket(io, outcfg));
196  }
197  return [outputs = std::move(outs)](
198  jb::itch5::message_header const& header, order_book const& updated_book,
199  jb::itch5::book_update const& update) {
200  for (auto f : outputs) {
201  f(header, updated_book, update);
202  }
203  };
204 }
205 
206 } // anonymous namespace
207 
208 #define KNOWN_ITCH5_MESSAGES \
209  jb::itch5::add_order_message, jb::itch5::add_order_mpid_message, \
210  jb::itch5::broken_trade_message, jb::itch5::cross_trade_message, \
211  jb::itch5::ipo_quoting_period_update_message, \
212  jb::itch5::market_participant_position_message, \
213  jb::itch5::mwcb_breach_message, jb::itch5::mwcb_decline_level_message, \
214  jb::itch5::net_order_imbalance_indicator_message, \
215  jb::itch5::order_cancel_message, jb::itch5::order_delete_message, \
216  jb::itch5::order_executed_message, \
217  jb::itch5::order_executed_price_message, \
218  jb::itch5::order_replace_message, \
219  jb::itch5::reg_sho_restriction_message, \
220  jb::itch5::stock_directory_message, \
221  jb::itch5::stock_trading_action_message, \
222  jb::itch5::system_event_message, jb::itch5::trade_message
223 
224 int main(int argc, char* argv[]) try {
225  // All JayBeam programs read their configuration from a YAML file,
226  // the values can be overriden by the command-line arguments, but it
227  // is not recommended to set all the values via command-line flags
228  // ...
229  // TODO() - make it possible to read the YAML file from a etcd
230  // path. That way we can keep all the configurations in a single
231  // place ...
232  config cfg;
233  cfg.load_overrides(
234  argc, argv, std::string("moldfeedhandler.yaml"), "JB_ROOT");
235  jb::log::init(cfg.log());
236 
237  // ... this program basically has a single control loop. A future
238  // version should separate performance critical code to its own
239  // threads with their own io_service ...
240  boost::asio::io_service io;
241 
242  // ... define the classes used to build the book ...
243  using compute_book =
245 
246  // ... the data path is implemented as a series of stages, each one
247  // calls the next using lambdas. The last lambda to be called --
248  // where the data is sent to a file or a socket -- is the first to
249  // be constructed ...
250  // TODO() - actually output the messages to UDP sockets and files
251  // TODO() - run a master election via etcd and only output to
252  // sockets if this is the master
253  auto output_layer = create_output_layer(io, cfg);
254 
255  // ... here we should have a layer to arbitrage between the ITCH-5.x
256  // feed and the UQDF/CQS feeds. Normally ITCH-5.x is a better feed,
257  // richer data, more accurate, and lower latency. But the ITCH-5.x
258  // feed depends on never losing a message. When you do, there are
259  // multiple alternatives (e.g. requesting a retransmission from the
260  // exchange, using a sync+tell feed from the exchange). We propose
261  // to fallback to the UQDF/CQS feeds, which are stateless. The
262  // recovery using those feeds is almost immediate.
263  // The ITCH-5.x book can be cleared and rebuilt using only new
264  // messages, for most tickers the freshly constructed book is
265  // accurate enough within seconds. Switching back to ITCH-5.x after
266  // falling back to UQDF/CQS will require detecting when the two
267  // feeds are synchronized again ...
268  // TODO() - implement all the fallback / recovery complexity ...
269 
270  // ... in this layer we compute the book, i.e., assemble the list of
271  // orders received from the feed into a quantity at each price level
272  // ...
273  compute_book book_build_layer(std::move(output_layer), cfg.book());
274 
275  // ... in this layer we decode the raw ITCH messages into objects
276  // that can be more easily manipulated ...
277  // TODO() - we need to break out the non-book-building messages and
278  // bypass the book_build_layer for them, send them directly to the
279  // output layer. Or maybe have a separate output layer for
280  // non-book-build messages, which can be running at lower priority
281  // ...
282  auto itch_decoding_layer = [&book_build_layer](
283  std::chrono::steady_clock::time_point recv_ts, std::uint64_t msgcnt,
284  std::size_t msgoffset, char const* msgbuf, std::size_t msglen) {
286  process(book_build_layer, recv_ts, msgcnt, msgoffset, msgbuf, msglen);
287  };
288 
289  /// ... here we are missing a layer to arbitrage between the two UDP
290  // message sources, something like ...
291  // auto sequencing_layer = [&itch_decoding_layer](...) {};
292  // TODO() - we need to refactor the mold_udp_channel class to
293  // support multiple input sockets and to handle out-of-order,
294  // duplicate, and gaps in the message stream.
295  auto data_source_layer =
296  create_udp_channel(io, itch_decoding_layer, cfg.primary());
297 
298  // ... that was it for the critical data path. There are several
299  // TODO() entries there ...
300 
301  // In this section we create the control and monitoring path for the
302  // application. The control and monitoring path is implemented by a
303  // HTTP server that responds to simple GET requests. Adding new
304  // control methods is easy, as we will see ...
305  // TODO() - this should be refactored to a "application" class, they
306  // are very repetitive. We need to solve the counter problem first.
307  using endpoint = boost::asio::ip::tcp::endpoint;
308  using address = boost::asio::ip::address;
309  endpoint ep{address::from_string(cfg.control_host()), cfg.control_port()};
310 
311  // ... the request and response types are simple in-memory strings,
312  // this is suitable for our purposes as the payloads will generally
313  // be small ...
314  using jb::ehs::request_type;
316  // ... this object keeps track of all the handlers for each "path"
317  // in the HTTP request. The application registers a number of
318  // handlers for monitoring and control ...
319  auto dispatcher = std::make_shared<jb::ehs::request_dispatcher>("moldreplay");
320  // ... the root serves simply as a indication that the server is
321  // running ...
322  dispatcher->add_handler("/", [](request_type const&, response_type& res) {
323  res.insert("Content-type", "text/plain");
324  res.body = "Server running...\r\n";
325  });
326  // ... this prints out the system configuration (command-line
327  // parameters and the YAML configuration file), in YAML format ...
328  dispatcher->add_handler(
329  "/config", [cfg](request_type const&, response_type& res) {
330  res.insert("Content-type", "text/plain");
331  std::ostringstream os;
332  os << cfg << "\r\n";
333  res.body = os.str();
334  });
335  // ... we need to use a weak_ptr to avoid a cycle of shared_ptr ...
336  std::weak_ptr<jb::ehs::request_dispatcher> disp = dispatcher;
337  // ... this handler collects the metrics and reports them in human
338  // readable form ...
339  // TODO() - we need a separate handler to serve the metrics in
340  // protobuf form for efficiency
341  // TODO() - once we solve the counter problem we should show the
342  // counter values here, not just whatever the dispatcher collects
343  // about itself
344  dispatcher->add_handler(
345  "/metrics", [disp](request_type const&, response_type& res) {
346  std::shared_ptr<jb::ehs::request_dispatcher> d(disp);
347  if (not d) {
348  res.result(beast::http::status::internal_server_error);
349  res.body = std::string("An internal error occurred\r\n"
350  "Null request handler in /metrics\r\n");
351  return;
352  }
353  res.set("content-type", "text/plain; version=0.0.4");
354  d->append_metrics(res);
355  });
356 
357  // ... create an acceptor to handle incoming connections, if we wanted
358  // to, we could create multiple acceptors on different addresses
359  // pointing to the same dispatcher ...
360  jb::ehs::acceptor acceptor(io, ep, dispatcher);
361 
362  // ... run the program forever ...
363  // TODO() - we should be able to gracefully terminate the program
364  // with a handler in the embedded http server, and/or with a signal
365  io.run();
366 
367  return 0;
368 } catch (jb::usage const& u) {
369  std::cerr << u.what() << std::endl;
370  return u.exit_status();
371 } catch (std::exception const& ex) {
372  std::cerr << "Standard exception raised: " << ex.what() << std::endl;
373  return 1;
374 } catch (...) {
375  std::cerr << "Unknown exception raised" << std::endl;
376  return 1;
377 }
378 
379 namespace {
380 namespace defaults {
381 int const levels = 4;
382 std::string const control_host = "0.0.0.0";
383 int const control_port = 23100;
384 std::string const mold_address = "127.0.0.1";
385 unsigned short const mold_port = 12300;
386 std::string const output_address = "127.0.0.1";
387 unsigned short const output_port = 13000;
388 } // namespace defaults
389 
390 config::config()
391  : levels(
392  desc("levels").help("Configure the number of levels generated by "
393  "this feed handler. The only allowed values are "
394  "1, 4, or 8."),
395  this, defaults::levels)
396  , primary(
397  desc("primary"), this, jb::itch5::udp_receiver_config()
398  .address(defaults::mold_address)
399  .port(defaults::mold_port))
400  , secondary(
401  desc("secondary"), this,
402  jb::itch5::udp_receiver_config().address(defaults::mold_address))
403  , output_file(
404  desc("output-file")
405  .help("Configure the feed handler to log to a "
406  "ASCII (possibly compressed) file."
407  " The user should consider the performance impact of this "
408  "option when using this as the primary feedhandler."),
409  this)
410  , output(
411  desc("output").help(
412  "Configure the output UDP addresses for the feed handler "
413  "messages."
414  " Typically one output UDP address is enough, the application "
415  "can be configured with multiple output sockets for network "
416  "redundancy, or to send copies to another process in "
417  "the localhost for logging."),
418  this)
419  , control_host(
420  desc("control-host")
421  .help("Where does the server listen for control connections."
422  "Typically this is an address for the current host,"
423  "for example: 'localhost', '0.0.0.0', or '::1'."),
424  this, defaults::control_host)
425  , control_port(
426  desc("control-port").help("The port to receive control connections."),
427  this, defaults::control_port)
428  , book(desc("book", "order-book-config"), this)
429  , log(desc("log", "logging"), this) {
431  .address(defaults::output_address)
432  .port(defaults::output_port)});
433 }
434 
435 void config::validate() const {
436  if (levels() != 1 and levels() != 4 and levels() != 8) {
437  std::ostringstream os;
438  os << "Invalid value (" << levels() << ") for --levels option.";
439  throw jb::usage(os.str(), 1);
440  }
441  if (primary().port() == 0 and secondary().port() == 0) {
442  throw jb::usage(
443  "Either the primary or secondary port must be configured.", 1);
444  }
445  if (primary().address() == "" and secondary().address() == "") {
446  throw jb::usage(
447  "Either the primary or secondary receiving address must be configured.",
448  1);
449  }
450  int cnt = 0;
451  int outputs = 0;
452  for (auto const& outcfg : output()) {
453  if ((outcfg.port() != 0 and outcfg.address() == "") or
454  (outcfg.port() == 0 and outcfg.address() != "")) {
455  std::ostringstream os;
456  os << "Partially configured output socket #" << cnt << " ("
457  << outcfg.address() << " / " << outcfg.port() << ")";
458  throw jb::usage(os.str(), 1);
459  }
460  if (outcfg.port() != 0 and outcfg.address() != "") {
461  ++outputs;
462  }
463  }
464  if (outputs == 0 and output_file() == "") {
465  throw jb::usage("No --output nor --output-file configured", 1);
466  }
467  log().validate();
468 }
469 
470 } // anonymous namespace
jb::config_attribute< udp_sender_config, int > port
Compute the book and call a user-defined callback on each change.
Define the header common to all ITCH 5.0 messages.
Define defaults for program parameters.
clock_type::time_point time_point
A convenience alias for clock_type::time_point.
char const * c_str() const
Return the C-string representation.
timestamp feed_ts
Typically each feed provides a timestamp (with feed-specific semantics) for the message, this may be different from the exchange timestamp.
security_id security
The id of the security.
Base class for all configuration objects.
beast::http::request< beast::http::string_body > request_type
The request type used for JayBeams Embedded HTTP Servers.
Definition: base_types.hpp:17
virtual void validate() const
Validate the settings.
market_id market
The market this data refers to.
jb::config_attribute< udp_receiver_config, std::string > address
int exit_status() const
Definition: usage.hpp:21
boost::endian::little_uint16_buf_t message_size
The message size.
void open_output_file(boost::iostreams::filtering_ostream &out, std::string const &filename)
Open a file for writing.
Definition: fileio.cpp:12
jb::config_attribute< udp_receiver_config, int > port
static constexpr std::size_t wire_size
The size of the field on the wire.
A message representing the top N levels of a market.
beast::http::response< beast::http::string_body > response_type
The response type used for JayBeams Embedded HTTP Servers.
Definition: base_types.hpp:20
timestamp feedhandler_ts
The feedhandler (the software system that processes the feed and generated this message), timestamps the message just before sending it out.
boost::endian::little_uint32_buf_t bid_px[N]
Bid prices, in descending order.
feed_id feed
The name of the feed handler used to parse and generate this data.
jb::config_attribute< udp_sender_config, std::string > address
void init(config const &cfg)
Initialize the logging functions using the configuration provided.
Definition: log.cpp:190
#define config_object_constructors(NAME)
timestamp exchange_ts
Typically exchange feeds provide a timestamp (with feed-specific semantics) for the event in the exch...
Helper class to easily define configuration attributes.
int stock_locate
The stock locate number.
feed_id source
The source of the data within that feed, some feeds arbitrage between multiple sources for the same d...
Configure an array_based_order_book config object.
boost::endian::little_uint32_buf_t bid_qty[N]
The bid quantities, in shares, can be 0 if the level does not exist or is not provided by the exchang...
std::chrono::nanoseconds ts
Definition: timestamp.hpp:18
int main(int argc, char *argv[])
boost::endian::little_uint32_buf_t sequence_number
The sequence number created by the feed handler.
buy_sell_indicator_t buy_sell_indicator
What side of the book is being updated.
stock_t stock
The security updated by this order.
A simple class to communicate the result of parsing the options.
Definition: usage.hpp:11
A flat struct to represent updates to an order book.
boost::endian::little_uint32_buf_t offer_px[N]
Offer prices, in ascending order.
annotations_type annotations
The annotations field.
A configuration object for UDP senders.
boost::endian::little_uint16_buf_t message_type
The message type, each message in JayBeams receives a unique identifier.
Process a buffer with a single message: parse it and call the handler.
Create a control server for the program.
Definition: acceptor.hpp:17
boost::endian::little_uint32_buf_t offer_qty[N]
Offer quantities, in ascending order of prices.
Maintain the ITCH-5.0 order book for a single security.
Definition: order_book.hpp:57
jb::itch5::timestamp timestamp
The message timestamp, in nanoseconds since midnight.
A configuration object for UDP receivers.
price4_t px
What price level is being updated.