19 #include <type_traits> 20 #include <unordered_map> 51 struct abort_process_iostream {};
64 template <
typename book_type_t,
typename cfg_book_t>
65 void run_inside(config
const& cfg, cfg_book_t
const& cfg_book) {
68 boost::iostreams::filtering_istream in;
71 boost::iostreams::filtering_ostream out;
74 std::map<jb::itch5::stock_t, jb::offline_feed_statistics> per_symbol;
77 std::chrono::seconds stop_after(cfg.stop_after_seconds());
81 callback_type cb = std::move([&stats, &out, stop_after](
85 if (stop_after != std::chrono::seconds(0) and
87 throw abort_process_iostream{};
89 auto pl = std::chrono::steady_clock::now() - update.
recvts;
91 stats, out, header, updated_book, update, pl);
94 if (cfg.enable_symbol_stats()) {
98 cb = std::move([&stats, &out, &per_symbol, symcfg, stop_after](
102 if (stop_after != std::chrono::seconds(0) and
104 throw abort_process_iostream{};
106 auto pl = std::chrono::steady_clock::now() - update.
recvts;
108 stats, out, header, updated_book, update, pl)) {
111 auto location = per_symbol.find(update.
stock);
112 if (location == per_symbol.end()) {
113 auto p = per_symbol.emplace(
124 }
catch (abort_process_iostream
const&) {
127 JB_LOG(info) <<
"process_iostream aborted, stop_after_seconds=" 128 << cfg.stop_after_seconds();
130 stats.log_final_progress();
133 for (
auto const& i : per_symbol) {
134 i.second.print_csv(i.first.c_str(), std::cout);
136 stats.print_csv(
"__aggregate__", std::cout);
139 int main(
int argc,
char* argv[])
try {
141 cfg.load_overrides(argc, argv, std::string(
"itch5inside.yaml"),
"JB_ROOT");
145 if (cfg.enable_array_based()) {
146 (void)run_inside<jb::itch5::array_based_order_book>(cfg, cfg.book_cfg());
150 (void)run_inside<jb::itch5::map_based_order_book>(cfg, cfg_bk);
155 std::cerr << u.what() << std::endl;
157 }
catch (std::exception
const& ex) {
158 std::cerr <<
"Standard exception raised: " << ex.what() << std::endl;
161 std::cerr <<
"Unknown exception raised" << std::endl;
171 .max_processing_latency_nanoseconds(10000)
172 .max_interarrival_time_nanoseconds(10000)
173 .max_messages_per_microsecond(1000)
174 .max_messages_per_millisecond(10000)
175 .max_messages_per_second(10000)
181 desc(
"input-file").help(
"An input file with ITCH-5.0 messages."),
185 .help(
"The name of the file where to store the inside data." 186 " Files ending in .gz are automatically compressed."),
188 , log(desc(
"log",
"logging"),
this)
189 , stats(desc(
"stats",
"offline-feed-statistics"),
this)
191 desc(
"symbol-stats",
"offline-feed-statistics"),
this,
192 default_per_symbol_stats())
193 , enable_symbol_stats(
194 desc(
"enable-symbol-stats")
196 "If set, enable per-symbol statistics." 197 " Collecting per-symbol statistics is expensive in both" 198 " memory and execution time, so it is disabled by default."),
200 , enable_array_based(
201 desc(
"enable-array-based")
202 .help(
"If set, enable array_based_order_book usage." 203 " It is disabled by default."),
205 , book_cfg(desc(
"book-config",
"order-book-config"),
this)
206 , stop_after_seconds(
207 desc(
"stop-after-seconds")
209 "If non-zero, stop processing the input after this many " 210 "seconds in the input. For example, if set to 34500 (= 9 * " 211 "3600 + 35 * 60) the processing will stop when the first " 212 "event timestamped after 09:35:00 is received."),
216 void config::validate()
const {
217 if (input_file() ==
"") {
219 "Missing input-file setting." 220 " You must specify an input file.",
223 if (output_file() ==
"") {
225 "Missing output-file setting." 226 " You must specify an output file.",
229 if (stop_after_seconds() < 0) {
230 throw jb::usage(
"The stop-after-seconds must be >= 0", 1);
235 symbol_stats().validate();
236 book_cfg().validate();
jb::config_attribute< config, int > reporting_interval_seconds
Compute the book and call a user-defined callback on each change.
int main(int argc, char *argv[])
Base class for all configuration objects.
virtual void validate() const
Validate the settings.
Keep statistics about a feed and its offline processor.
void open_output_file(boost::iostreams::filtering_ostream &out, std::string const &filename)
Open a file for writing.
void process_iostream(std::istream &in, message_handler &handler)
Process an iostream of ITCH-5.0 messages.
void init(config const &cfg)
Initialize the logging functions using the configuration provided.
#define config_object_constructors(NAME)
Helper class to easily define configuration attributes.
std::function< void(message_header const &header, order_book< book_type > const &updated_book, book_update const &update)> callback_type
Define the callback type.
Configure an array_based_order_book config object.
std::chrono::nanoseconds ts
stock_t stock
The security updated by this order.
A simple class to communicate the result of parsing the options.
A flat struct to represent updates to an order book.
bool generate_inside(jb::offline_feed_statistics &stats, std::ostream &out, jb::itch5::message_header const &header, jb::itch5::order_book< book_type > const &book, jb::itch5::book_update const &update, duration_t processing_latency)
Determine if this event changes the inside, if so, record the statistics and output the result...
void open_input_file(boost::iostreams::filtering_istream &in, std::string const &filename)
Open a file for reading.
Configure an map_based_order_book config object.
static void print_csv_header(std::ostream &os)
Print a CSV header.
time_point recvts
When was the message that triggered this update received.
Maintain the ITCH-5.0 order book for a single security.
Configure an offline_feed_statistics object.
void run_inside(config const &cfg, cfg_book_t const &cfg_book)
Template function to refactor the usage of a book side type.
jb::itch5::timestamp timestamp
The message timestamp, in nanoseconds since midnight.