24 #include <type_traits> 25 #include <unordered_map> 52 template <
typename callback_t>
53 std::unique_ptr<jb::itch5::mold_udp_channel> create_udp_channel(
54 boost::asio::io_service& io, callback_t cb,
57 return std::unique_ptr<jb::itch5::mold_udp_channel>();
59 return std::make_unique<jb::itch5::mold_udp_channel>(io, std::move(cb), cfg);
67 using output_function = std::function<void(
72 output_function create_output_file(config
const& cfg) {
74 auto out = std::make_shared<boost::iostreams::filtering_ostream>();
79 auto bid = updated_book.best_bid();
80 auto offer = updated_book.best_offer();
81 *out << header.timestamp.ts.count() <<
" " << header.stock_locate <<
" " 82 << update.stock <<
" " << bid.first.as_integer() <<
" " << bid.second
83 <<
" " << offer.first.as_integer() <<
" " << offer.second <<
"\n";
90 using std::chrono::system_clock;
94 auto now = std::time(
nullptr);
95 std::tm date = *std::localtime(&now);
99 return system_clock::from_time_t(std::mktime(&date));
102 void send_inside_levels_update(
103 boost::asio::ip::udp::socket& socket,
104 boost::asio::ip::udp::endpoint
const& destination,
110 if (updated_book.best_bid().first != update.
px) {
114 if (updated_book.best_offer().first != update.
px) {
123 std::is_pod<decltype(msg)>::value,
"Message type should be a POD type");
135 std::chrono::duration_cast<std::chrono::nanoseconds>(
136 std::chrono::system_clock::now() - midnight)
144 msg.
bid_qty[0] = updated_book.best_bid().second;
145 msg.
bid_px[0] = updated_book.best_bid().first.as_integer();
146 msg.
offer_qty[0] = updated_book.best_offer().second;
147 msg.
offer_px[0] = updated_book.best_offer().first.as_integer();
151 std::memcpy(msg.
annotations.feed_name,
"NASD-PITCH-5x", 13);
152 std::memcpy(msg.
annotations.source_name,
"NASD-PITCH-5x", 13);
163 boost::asio::buffer(&msg, msg.
message_size.value()), destination);
169 output_function create_output_socket(
171 auto s = jb::itch5::make_socket_udp_send<>(io, cfg);
172 auto socket = std::make_shared<decltype(s)>(std::move(s));
173 auto const address = boost::asio::ip::address::from_string(cfg.
address());
174 auto const destination = boost::asio::ip::udp::endpoint(address, cfg.
port());
175 auto const mid = midnight();
176 return [socket, cfg, destination, mid](
179 send_inside_levels_update(*socket, destination, mid, h, ub, u);
186 create_output_layer(boost::asio::io_service& io, config
const& cfg) {
187 std::vector<output_function> outs;
188 if (cfg.output_file() !=
"") {
189 outs.push_back(create_output_file(cfg));
191 for (
auto const& outcfg : cfg.output()) {
192 if (outcfg.port() == 0 and outcfg.address() ==
"") {
195 outs.push_back(create_output_socket(io, outcfg));
197 return [outputs = std::move(outs)](
200 for (
auto f : outputs) {
201 f(header, updated_book, update);
208 #define KNOWN_ITCH5_MESSAGES \ 209 jb::itch5::add_order_message, jb::itch5::add_order_mpid_message, \ 210 jb::itch5::broken_trade_message, jb::itch5::cross_trade_message, \ 211 jb::itch5::ipo_quoting_period_update_message, \ 212 jb::itch5::market_participant_position_message, \ 213 jb::itch5::mwcb_breach_message, jb::itch5::mwcb_decline_level_message, \ 214 jb::itch5::net_order_imbalance_indicator_message, \ 215 jb::itch5::order_cancel_message, jb::itch5::order_delete_message, \ 216 jb::itch5::order_executed_message, \ 217 jb::itch5::order_executed_price_message, \ 218 jb::itch5::order_replace_message, \ 219 jb::itch5::reg_sho_restriction_message, \ 220 jb::itch5::stock_directory_message, \ 221 jb::itch5::stock_trading_action_message, \ 222 jb::itch5::system_event_message, jb::itch5::trade_message 224 int main(
int argc,
char* argv[])
try {
234 argc, argv, std::string(
"moldfeedhandler.yaml"),
"JB_ROOT");
240 boost::asio::io_service io;
253 auto output_layer = create_output_layer(io, cfg);
273 compute_book book_build_layer(std::move(output_layer), cfg.book());
282 auto itch_decoding_layer = [&book_build_layer](
284 std::size_t msgoffset,
char const* msgbuf, std::size_t msglen) {
286 process(book_build_layer, recv_ts, msgcnt, msgoffset, msgbuf, msglen);
295 auto data_source_layer =
296 create_udp_channel(io, itch_decoding_layer, cfg.primary());
307 using endpoint = boost::asio::ip::tcp::endpoint;
308 using address = boost::asio::ip::address;
309 endpoint ep{address::from_string(cfg.control_host()), cfg.control_port()};
319 auto dispatcher = std::make_shared<jb::ehs::request_dispatcher>(
"moldreplay");
323 res.insert(
"Content-type",
"text/plain");
324 res.body =
"Server running...\r\n";
328 dispatcher->add_handler(
330 res.insert(
"Content-type",
"text/plain");
331 std::ostringstream os;
336 std::weak_ptr<jb::ehs::request_dispatcher> disp = dispatcher;
344 dispatcher->add_handler(
346 std::shared_ptr<jb::ehs::request_dispatcher> d(disp);
348 res.result(beast::http::status::internal_server_error);
349 res.body = std::string(
"An internal error occurred\r\n" 350 "Null request handler in /metrics\r\n");
353 res.set(
"content-type",
"text/plain; version=0.0.4");
354 d->append_metrics(res);
369 std::cerr << u.what() << std::endl;
371 }
catch (std::exception
const& ex) {
372 std::cerr <<
"Standard exception raised: " << ex.what() << std::endl;
375 std::cerr <<
"Unknown exception raised" << std::endl;
381 int const levels = 4;
382 std::string
const control_host =
"0.0.0.0";
383 int const control_port = 23100;
384 std::string
const mold_address =
"127.0.0.1";
385 unsigned short const mold_port = 12300;
386 std::string
const output_address =
"127.0.0.1";
387 unsigned short const output_port = 13000;
392 desc(
"levels").help(
"Configure the number of levels generated by " 393 "this feed handler. The only allowed values are " 395 this, defaults::levels)
398 .address(defaults::mold_address)
399 .port(defaults::mold_port))
401 desc(
"secondary"),
this,
405 .help(
"Configure the feed handler to log to a " 406 "ASCII (possibly compressed) file." 407 " The user should consider the performance impact of this " 408 "option when using this as the primary feedhandler."),
412 "Configure the output UDP addresses for the feed handler " 414 " Typically one output UDP address is enough, the application " 415 "can be configured with multiple output sockets for network " 416 "redundancy, or to send copies to another process in " 417 "the localhost for logging."),
421 .help(
"Where does the server listen for control connections." 422 "Typically this is an address for the current host," 423 "for example: 'localhost', '0.0.0.0', or '::1'."),
424 this, defaults::control_host)
426 desc(
"control-port").help(
"The port to receive control connections."),
427 this, defaults::control_port)
428 , book(desc(
"book",
"order-book-config"),
this)
429 , log(desc(
"log",
"logging"),
this) {
431 .
address(defaults::output_address)
432 .port(defaults::output_port)});
435 void config::validate()
const {
436 if (levels() != 1 and levels() != 4 and levels() != 8) {
437 std::ostringstream os;
438 os <<
"Invalid value (" << levels() <<
") for --levels option.";
441 if (primary().port() == 0 and secondary().port() == 0) {
443 "Either the primary or secondary port must be configured.", 1);
445 if (primary().address() ==
"" and secondary().address() ==
"") {
447 "Either the primary or secondary receiving address must be configured.",
452 for (
auto const& outcfg : output()) {
453 if ((outcfg.port() != 0 and outcfg.address() ==
"") or
454 (outcfg.port() == 0 and outcfg.address() !=
"")) {
455 std::ostringstream os;
456 os <<
"Partially configured output socket #" << cnt <<
" (" 457 << outcfg.address() <<
" / " << outcfg.port() <<
")";
460 if (outcfg.port() != 0 and outcfg.address() !=
"") {
464 if (outputs == 0 and output_file() ==
"") {
465 throw jb::usage(
"No --output nor --output-file configured", 1);
jb::config_attribute< udp_sender_config, int > port
Compute the book and call a user-defined callback on each change.
Define defaults for program parameters.
clock_type::time_point time_point
A convenience alias for clock_type::time_point.
char const * c_str() const
Return the C-string representation.
timestamp feed_ts
Typically each feed provides a timestamp (with feed-specific semantics) for the message, this may be different from the exchange timestamp.
security_id security
The id of the security.
Base class for all configuration objects.
beast::http::request< beast::http::string_body > request_type
The request type used for JayBeams Embedded HTTP Servers.
virtual void validate() const
Validate the settings.
market_id market
The market this data refers to.
jb::config_attribute< udp_receiver_config, std::string > address
boost::endian::little_uint16_buf_t message_size
The message size.
void open_output_file(boost::iostreams::filtering_ostream &out, std::string const &filename)
Open a file for writing.
jb::config_attribute< udp_receiver_config, int > port
static constexpr std::size_t wire_size
The size of the field on the wire.
A message representing the top N levels of a market.
beast::http::response< beast::http::string_body > response_type
The response type used for JayBeams Embedded HTTP Servers.
timestamp feedhandler_ts
The feedhandler (the software system that processes the feed and generated this message), timestamps the message just before sending it out.
boost::endian::little_uint32_buf_t bid_px[N]
Bid prices, in descending order.
feed_id feed
The name of the feed handler used to parse and generate this data.
jb::config_attribute< udp_sender_config, std::string > address
void init(config const &cfg)
Initialize the logging functions using the configuration provided.
#define config_object_constructors(NAME)
timestamp exchange_ts
Typically exchange feeds provide a timestamp (with feed-specific semantics) for the event in the exch...
Helper class to easily define configuration attributes.
int stock_locate
The stock locate number.
feed_id source
The source of the data within that feed, some feeds arbitrage between multiple sources for the same d...
Configure an array_based_order_book config object.
boost::endian::little_uint32_buf_t bid_qty[N]
The bid quantities, in shares, can be 0 if the level does not exist or is not provided by the exchang...
std::chrono::nanoseconds ts
int main(int argc, char *argv[])
boost::endian::little_uint32_buf_t sequence_number
The sequence number created by the feed handler.
buy_sell_indicator_t buy_sell_indicator
What side of the book is being updated.
stock_t stock
The security updated by this order.
A simple class to communicate the result of parsing the options.
A flat struct to represent updates to an order book.
boost::endian::little_uint32_buf_t offer_px[N]
Offer prices, in ascending order.
annotations_type annotations
The annotations field.
A configuration object for UDP senders.
boost::endian::little_uint16_buf_t message_type
The message type, each message in JayBeams receives a unique identifier.
Process a buffer with a single message: parse it and call the handler.
Create a control server for the program.
boost::endian::little_uint32_buf_t offer_qty[N]
Offer quantities, in ascending order of prices.
Maintain the ITCH-5.0 order book for a single security.
jb::itch5::timestamp timestamp
The message timestamp, in nanoseconds since midnight.
A configuration object for UDP receivers.
price4_t px
What price level is being updated.