JayBeams  0.1
Another project to have fun coding.
itch5inside.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  *
4  * This program reads a raw ITCH-5.0 file and generates the inside
5  * quotes in an ASCII (though potentially compressed) file. The
6  * program also generates statistics about the feed and the book
7  * build, using jb::offline_feed_statistics.
8  *
9  * It reports the percentiles of "for each change in the inside, how
10  * long did it take to process the event, and what was the elapsed
11  * time since the last change to the inside".
12  */
15 #include <jb/fileio.hpp>
16 #include <jb/log.hpp>
17 
18 #include <stdexcept>
19 #include <type_traits>
20 #include <unordered_map>
21 
22 /**
23  * Define types and functions used in this program.
24  */
25 namespace {
26 
27 /// Configuration parameters for itch5inside
28 class config : public jb::config_object {
29 public:
30  config();
32 
33  void validate() const override;
34 
40  symbol_stats;
41  jb::config_attribute<config, bool> enable_symbol_stats;
42  jb::config_attribute<config, bool> enable_array_based;
43  using book_config = typename jb::itch5::array_based_order_book::config;
45 
46  jb::config_attribute<config, int> stop_after_seconds;
47 };
48 
49 /// A simple struct to signal termination of the
50 /// jb::itch5::process_iostream() loop
51 struct abort_process_iostream {};
52 
53 } // anonymous namespace
54 
55 /**
56  * Template function to refactor the usage of a book side type.
57  *
58  * @tparam book_type_t based order book type
59  * @tparam cfg_book_t Defines the config type
60 
61  * @param cfg Application config object
62  * @param cfg_book Book side config object
63  */
64 template <typename book_type_t, typename cfg_book_t>
65 void run_inside(config const& cfg, cfg_book_t const& cfg_book) {
66  jb::log::init(cfg.log());
67 
68  boost::iostreams::filtering_istream in;
69  jb::open_input_file(in, cfg.input_file());
70 
71  boost::iostreams::filtering_ostream out;
72  jb::open_output_file(out, cfg.output_file());
73 
74  std::map<jb::itch5::stock_t, jb::offline_feed_statistics> per_symbol;
75  jb::offline_feed_statistics stats(cfg.stats());
76 
77  std::chrono::seconds stop_after(cfg.stop_after_seconds());
78 
79  using callback_type =
81  callback_type cb = std::move([&stats, &out, stop_after](
82  jb::itch5::message_header const& header,
83  jb::itch5::order_book<book_type_t> const& updated_book,
84  jb::itch5::book_update const& update) {
85  if (stop_after != std::chrono::seconds(0) and
86  stop_after <= header.timestamp.ts) {
87  throw abort_process_iostream{};
88  }
89  auto pl = std::chrono::steady_clock::now() - update.recvts;
91  stats, out, header, updated_book, update, pl);
92  });
93 
94  if (cfg.enable_symbol_stats()) {
95  // ... replace the calback with one that also records the stats
96  // for each symbol ...
97  jb::offline_feed_statistics::config symcfg(cfg.symbol_stats());
98  cb = std::move([&stats, &out, &per_symbol, symcfg, stop_after](
99  jb::itch5::message_header const& header,
100  jb::itch5::order_book<book_type_t> const& updated_book,
101  jb::itch5::book_update const& update) {
102  if (stop_after != std::chrono::seconds(0) and
103  stop_after <= header.timestamp.ts) {
104  throw abort_process_iostream{};
105  }
106  auto pl = std::chrono::steady_clock::now() - update.recvts;
108  stats, out, header, updated_book, update, pl)) {
109  return;
110  }
111  auto location = per_symbol.find(update.stock);
112  if (location == per_symbol.end()) {
113  auto p = per_symbol.emplace(
114  update.stock, jb::offline_feed_statistics(symcfg));
115  location = p.first;
116  }
117  location->second.sample(header.timestamp.ts, pl);
118  });
119  }
120 
121  jb::itch5::compute_book<book_type_t> handler(std::move(cb), cfg_book);
122  try {
123  jb::itch5::process_iostream(in, handler);
124  } catch (abort_process_iostream const&) {
125  // nothing to do, the loop is terminated by the exception and we
126  // continue the code ...
127  JB_LOG(info) << "process_iostream aborted, stop_after_seconds="
128  << cfg.stop_after_seconds();
129  }
130  stats.log_final_progress();
131 
133  for (auto const& i : per_symbol) {
134  i.second.print_csv(i.first.c_str(), std::cout);
135  }
136  stats.print_csv("__aggregate__", std::cout);
137 }
138 
139 int main(int argc, char* argv[]) try {
140  config cfg;
141  cfg.load_overrides(argc, argv, std::string("itch5inside.yaml"), "JB_ROOT");
142 
143  /// if enable_array_based uses array_based_order_book type
144  /// and the config built by the call's arguments
145  if (cfg.enable_array_based()) {
146  (void)run_inside<jb::itch5::array_based_order_book>(cfg, cfg.book_cfg());
147  } else {
148  /// ... uses map_based_order_book type and a default config
150  (void)run_inside<jb::itch5::map_based_order_book>(cfg, cfg_bk);
151  }
152 
153  return 0;
154 } catch (jb::usage const& u) {
155  std::cerr << u.what() << std::endl;
156  return u.exit_status();
157 } catch (std::exception const& ex) {
158  std::cerr << "Standard exception raised: " << ex.what() << std::endl;
159  return 1;
160 } catch (...) {
161  std::cerr << "Unknown exception raised" << std::endl;
162  return 1;
163 }
164 
165 namespace {
166 
167 // Define the default per-symbol stats
168 jb::offline_feed_statistics::config default_per_symbol_stats() {
170  .reporting_interval_seconds(24 * 3600) // disable reporting
171  .max_processing_latency_nanoseconds(10000) // limit memory usage
172  .max_interarrival_time_nanoseconds(10000) // limit memory usage
173  .max_messages_per_microsecond(1000) // limit memory usage
174  .max_messages_per_millisecond(10000) // limit memory usage
175  .max_messages_per_second(10000) // limit memory usage
176  ;
177 }
178 
179 config::config()
180  : input_file(
181  desc("input-file").help("An input file with ITCH-5.0 messages."),
182  this)
183  , output_file(
184  desc("output-file")
185  .help("The name of the file where to store the inside data."
186  " Files ending in .gz are automatically compressed."),
187  this)
188  , log(desc("log", "logging"), this)
189  , stats(desc("stats", "offline-feed-statistics"), this)
190  , symbol_stats(
191  desc("symbol-stats", "offline-feed-statistics"), this,
192  default_per_symbol_stats())
193  , enable_symbol_stats(
194  desc("enable-symbol-stats")
195  .help(
196  "If set, enable per-symbol statistics."
197  " Collecting per-symbol statistics is expensive in both"
198  " memory and execution time, so it is disabled by default."),
199  this, false)
200  , enable_array_based(
201  desc("enable-array-based")
202  .help("If set, enable array_based_order_book usage."
203  " It is disabled by default."),
204  this, false)
205  , book_cfg(desc("book-config", "order-book-config"), this)
206  , stop_after_seconds(
207  desc("stop-after-seconds")
208  .help(
209  "If non-zero, stop processing the input after this many "
210  "seconds in the input. For example, if set to 34500 (= 9 * "
211  "3600 + 35 * 60) the processing will stop when the first "
212  "event timestamped after 09:35:00 is received."),
213  this, 0) {
214 }
215 
216 void config::validate() const {
217  if (input_file() == "") {
218  throw jb::usage(
219  "Missing input-file setting."
220  " You must specify an input file.",
221  1);
222  }
223  if (output_file() == "") {
224  throw jb::usage(
225  "Missing output-file setting."
226  " You must specify an output file.",
227  1);
228  }
229  if (stop_after_seconds() < 0) {
230  throw jb::usage("The stop-after-seconds must be >= 0", 1);
231  }
232 
233  log().validate();
234  stats().validate();
235  symbol_stats().validate();
236  book_cfg().validate();
237 }
238 
239 } // anonymous namespace
jb::config_attribute< config, int > reporting_interval_seconds
Compute the book and call a user-defined callback on each change.
Define the header common to all ITCH 5.0 messages.
int main(int argc, char *argv[])
Base class for all configuration objects.
virtual void validate() const
Validate the settings.
int exit_status() const
Definition: usage.hpp:21
Keep statistics about a feed and its offline processor.
void open_output_file(boost::iostreams::filtering_ostream &out, std::string const &filename)
Open a file for writing.
Definition: fileio.cpp:12
void process_iostream(std::istream &in, message_handler &handler)
Process an iostream of ITCH-5.0 messages.
void init(config const &cfg)
Initialize the logging functions using the configuration provided.
Definition: log.cpp:190
#define config_object_constructors(NAME)
Helper class to easily define configuration attributes.
std::function< void(message_header const &header, order_book< book_type > const &updated_book, book_update const &update)> callback_type
Define the callback type.
Configure an array_based_order_book config object.
std::chrono::nanoseconds ts
Definition: timestamp.hpp:18
stock_t stock
The security updated by this order.
A simple class to communicate the result of parsing the options.
Definition: usage.hpp:11
A flat struct to represent updates to an order book.
bool generate_inside(jb::offline_feed_statistics &stats, std::ostream &out, jb::itch5::message_header const &header, jb::itch5::order_book< book_type > const &book, jb::itch5::book_update const &update, duration_t processing_latency)
Determine if this event changes the inside, if so, record the statistics and output the result...
void open_input_file(boost::iostreams::filtering_istream &in, std::string const &filename)
Open a file for reading.
Definition: fileio.cpp:27
Configure an map_based_order_book config object.
static void print_csv_header(std::ostream &os)
Print a CSV header.
time_point recvts
When was the message that triggered this update received.
Maintain the ITCH-5.0 order book for a single security.
Definition: order_book.hpp:57
Configure an offline_feed_statistics object.
#define JB_LOG(lvl)
Definition: log.hpp:70
void run_inside(config const &cfg, cfg_book_t const &cfg_book)
Template function to refactor the usage of a book side type.
Definition: itch5inside.cpp:65
jb::itch5::timestamp timestamp
The message timestamp, in nanoseconds since midnight.