JayBeams  0.1
Another project to have fun coding.
compute_book.hpp
Go to the documentation of this file.
1 #ifndef jb_itch5_compute_book_hpp
2 #define jb_itch5_compute_book_hpp
3 
14 #include <jb/assert_throw.hpp>
15 
16 #include <boost/functional/hash.hpp>
17 #include <chrono>
18 #include <functional>
19 #include <unordered_map>
20 
21 namespace jb {
22 namespace itch5 {
23 
24 /// A convenience alias for clock_type
25 using clock_type = std::chrono::steady_clock;
26 
27 /// A convenience alias for clock_type::time_point
29 
30 /**
31  * A flat struct to represent updates to an order book.
32  *
33  * Updates to an order book come in many forms, but they can all be
34  * represented with a simple structure that shows: what book is
35  * being updated, what side of the book is being updated, what price
36  * level is being updated, and how many shares are being added or
37  * removed from the book.
38  */
39 struct book_update {
40  /**
41  * When was the message that triggered this update received
42  */
44 
45  /**
46  * The security updated by this order. This is redundant for
47  * order updates and deletes, and ITCH-5.0 omits the field, but
48  * we find it easier
49  */
51 
52  /// What side of the book is being updated.
54 
55  /// What price level is being updated.
57 
58  /// How many shares are being added (if positive) or removed (if
59  /// negative) from the book.
60  int qty;
61 
62  /// If true, this was a cancel replace and and old order was
63  /// modified too...
64  bool cxlreplx;
65 
66  /// Old price for the order
68 
69  /// How many shares were removed in the old order
70  int oldqty;
71 };
72 
73 /**
74  * A convenient container for per-order data.
75  *
76  * Most market data feeds resend the security identifier and side
77  * with each order update, but ITCH-5.0 does not. One needs to
78  * lookup the symbol, side, original price, information based on the order
79  * id. This literal type
80  * is used to keep that information around.
81  */
82 struct order_data {
83  /// The symbol for this particular order
85 
86  /// Whether the order is a BUY or SELL
88 
89  /// The price of the order
91 
92  /// The remaining quantity in the order
93  int qty;
94 };
95 
96 /**
97  * Compute the book and call a user-defined callback on each change.
98  *
99  * Keep a collection of all the order books, indexed by symbol, and
100  * forward the updates to them. Only process the relevant messages
101  * types in ITCH-5.0 that are necessary to keep the book.
102  *
103  * @tparam book_type the type used to define order_book<book_type>,
104  * must be compatible with jb::itch5::map_price
105  */
106 template <typename book_type>
108 public:
109  //@{
110  /**
111  * @name Type traits
112  */
113  /// clock_type is used as a compute_book<book_type> type
114  /// in some modules
116 
117  /// time_point is used as a compute_book<book_type> type
118  /// in some modules
120 
121  /// config type is used to construct the order_book
122  using book_type_config = typename book_type::config;
123 
124  /**
125  * Define the callback type
126  *
127  * A callback of this type is required in the constructor of this
128  * class. After each book update the user-provided callback is
129  * invoked.
130  *
131  * @param header the header of the raw ITCH-5.0 message
132  * @param update a representation of the update just applied to the
133  * book
134  * @param updated_book the order_book after the update was applied
135  */
136  using callback_type = std::function<void(
137  message_header const& header, order_book<book_type> const& updated_book,
138  book_update const& update)>;
139  //@}
140 
141  /// Constructor
142  explicit compute_book(callback_type&& cb, book_type_config const& cfg)
143  : callback_(std::forward<callback_type>(cb))
144  , books_()
145  , orders_()
146  , cfg_(cfg) {
147  }
148 
149  explicit compute_book(callback_type const& cb, book_type_config const& cfg)
150  : compute_book(callback_type(cb), cfg) {
151  }
152 
153  /**
154  * Handle a new order message.
155  *
156  * New orders are added to the list of known orders and their qty is
157  * added to the right book at the order's price level.
158  *
159  * @param recvts the timestamp when the message was received
160  * @param msgcnt the number of messages received before this message
161  * @param msgoffset the number of bytes received before this message
162  * @param msg the message describing a new order
163  */
165  time_point recvts, long msgcnt, std::size_t msgoffset,
166  add_order_message const& msg) {
167  JB_LOG(trace) << " " << msgcnt << ":" << msgoffset << " " << msg;
168  auto insert = orders_.emplace(
169  msg.order_reference_number,
170  order_data{msg.stock, msg.buy_sell_indicator, msg.price, msg.shares});
171  if (insert.second == false) {
172  // ... ooops, this should not happen, we got a duplicate order
173  // id. There is a problem with the feed, because we are working
174  // with simple command-line utilities we are just going to log the
175  // error, in a more complex system we would want to raise an
176  // exception and let the caller decide what to do ...
177  order_data const& data = insert.first->second;
178  JB_LOG(warning) << "duplicate order in handle_message(add_order_message)"
179  << ", id=" << msg.order_reference_number
180  << ", location=" << msgcnt << ":" << msgoffset
181  << ", existing data=" << data << ", msg=" << msg;
182  return;
183  }
184  // ... find the right book for this order, create one if necessary ...
185  auto itbook = books_.find(msg.stock);
186  if (itbook == books_.end()) {
187  auto newbk = books_.emplace(msg.stock, order_book<book_type>(cfg_));
188  itbook = newbk.first;
189  }
190  (void)itbook->second.handle_add_order(
191  msg.buy_sell_indicator, msg.price, msg.shares);
192  callback_(
193  msg.header, itbook->second,
194  book_update{recvts, msg.stock, msg.buy_sell_indicator, msg.price,
195  msg.shares});
196  }
197 
198  /**
199  * Handle a new order with MPID.
200  *
201  * @param recvts the timestamp when the message was received
202  * @param msgcnt the number of messages received before this message
203  * @param msgoffset the number of bytes received before this message
204  * @param msg the message describing a new order
205  */
207  time_point recvts, long msgcnt, std::size_t msgoffset,
208  add_order_mpid_message const& msg) {
209  // ... delegate to the handler for add_order_message (without mpid) ...
210  handle_message(
211  recvts, msgcnt, msgoffset, static_cast<add_order_message const&>(msg));
212  }
213 
214  /**
215  * Handle an order execution.
216  *
217  * @param recvts the timestamp when the message was received
218  * @param msgcnt the number of messages received before this message
219  * @param msgoffset the number of bytes received before this message
220  * @param msg the message describing the execution
221  */
223  time_point recvts, long msgcnt, std::size_t msgoffset,
224  order_executed_message const& msg) {
225  JB_LOG(trace) << " " << msgcnt << ":" << msgoffset << " " << msg;
226  handle_order_reduction(
227  recvts, msgcnt, msgoffset, msg.header, msg.order_reference_number,
228  msg.executed_shares);
229  }
230 
231  /**
232  * Handle an order execution with a different price than the order's
233  *
234  * @param recvts the timestamp when the message was received
235  * @param msgcnt the number of messages received before this message
236  * @param msgoffset the number of bytes received before this message
237  * @param msg the message describing the execution
238  */
240  time_point recvts, long msgcnt, std::size_t msgoffset,
241  order_executed_price_message const& msg) {
242  // ... delegate on the handler for add_order_message (without price) ...
243  handle_message(
244  recvts, msgcnt, msgoffset,
245  static_cast<order_executed_message const&>(msg));
246  }
247 
248  /**
249  * Handle a partial cancel.
250  *
251  * @param recvts the timestamp when the message was received
252  * @param msgcnt the number of messages received before this message
253  * @param msgoffset the number of bytes received before this message
254  * @param msg the message describing the cancelation
255  */
257  time_point recvts, long msgcnt, std::size_t msgoffset,
258  order_cancel_message const& msg) {
259  JB_LOG(trace) << " " << msgcnt << ":" << msgoffset << " " << msg;
260  handle_order_reduction(
261  recvts, msgcnt, msgoffset, msg.header, msg.order_reference_number,
262  msg.canceled_shares);
263  }
264 
265  /**
266  * Handle a full cancel.
267  *
268  * @param recvts the timestamp when the message was received
269  * @param msgcnt the number of messages received before this message
270  * @param msgoffset the number of bytes received before this message
271  * @param msg the message describing the cancelation
272  */
274  time_point recvts, long msgcnt, std::size_t msgoffset,
275  order_delete_message const& msg) {
276  JB_LOG(trace) << " " << msgcnt << ":" << msgoffset << " " << msg;
277  handle_order_reduction(
278  recvts, msgcnt, msgoffset, msg.header, msg.order_reference_number, 0);
279  }
280 
281  /**
282  * Handle an order replace.
283  *
284  * @param recvts the timestamp when the message was received
285  * @param msgcnt the number of messages received before this message
286  * @param msgoffset the number of bytes received before this message
287  * @param msg the message describing the cancel/replace
288  */
290  time_point recvts, long msgcnt, std::size_t msgoffset,
291  order_replace_message const& msg) {
292  JB_LOG(trace) << " " << msgcnt << ":" << msgoffset << " " << msg;
293  // First we need to find the original order ...
294  auto position = orders_.find(msg.original_order_reference_number);
295  if (position == orders_.end()) {
296  // ... ooops, this should not happen, there is a problem with the
297  // feed, log the problem and skip the message ...
298  JB_LOG(warning)
299  << "unknown order in handle_message(order_replace_message)"
300  << ", id=" << msg.original_order_reference_number
301  << ", location=" << msgcnt << ":" << msgoffset << ", msg=" << msg;
302  return;
303  }
304  // ... then we need to make sure the new order is not a duplicate
305  // ...
306  auto newpos = orders_.find(msg.new_order_reference_number);
307  if (newpos != orders_.end()) {
308  JB_LOG(warning) << "duplicate order in "
309  << "handle_message(order_replace_message)"
310  << ", id=" << msg.new_order_reference_number
311  << ", location=" << msgcnt << ":" << msgoffset
312  << ", msg=" << msg;
313  return;
314  }
315  // ... find the right book for this order
316  auto itbook = books_.find(position->second.stock);
317  // ... the book has to exists, since the original add_order created
318  // one if needed
319  JB_ASSERT_THROW(itbook != books_.end());
320  // ... update the order list and book, but do not make a callback ...
321  auto update = do_reduce(
322  position, itbook->second, recvts, msgcnt, msgoffset, msg.header,
323  msg.original_order_reference_number, 0);
324  // ... now we need to insert the new order ...
325  orders_.emplace(
326  msg.new_order_reference_number,
327  order_data{update.stock, update.buy_sell_indicator, msg.price,
328  msg.shares});
329  (void)itbook->second.handle_add_order(
330  update.buy_sell_indicator, msg.price, msg.shares);
331  // ... adjust the update data structure ...
332  update.cxlreplx = true;
333  update.oldpx = update.px;
334  update.oldqty = -update.qty;
335  update.px = msg.price;
336  update.qty = msg.shares;
337  // ... and invoke the callback ...
338  callback_(msg.header, itbook->second, update);
339  }
340 
341  /**
342  * Pre-populate the books based on the symbol directory.
343  *
344  * ITCH-5.0 sends the list of expected securities to be traded on a
345  * given day as a sequence of messages. We use these messages to
346  * pre-populate the map of books and avoid hash map updates during
347  * the critical path.
348  *
349  * @param recvts the timestamp when the message was received
350  * @param msgcnt the number of messages received before this message
351  * @param msgoffset the number of bytes received before this message
352  * @param msg the message describing a known symbol for the feed
353  */
355  time_point recvts, long msgcnt, std::size_t msgoffset,
356  stock_directory_message const& msg) {
357  JB_LOG(trace) << " " << msgcnt << ":" << msgoffset << " " << msg;
358  // ... create the book and update the map ...
359  books_.emplace(msg.stock, order_book<book_type>(cfg_));
360  }
361 
362  /**
363  * Ignore all other message types.
364  *
365  * We are only interested in a handful of message types, anything
366  * else is captured by this template function and ignored.
367  */
368  template <typename message_type>
369  void handle_message(time_point, long, std::size_t, message_type const&) {
370  }
371 
372  /**
373  * Log any unknown message types.
374  *
375  * @param recvts the timestamp when the message was received
376  * @param msg the unknown message location and contents
377  */
379  char msgtype = *static_cast<char const*>(msg.buf());
380  JB_LOG(error) << "Unknown message type '" << msgtype << "'(" << int(msgtype)
381  << ") in msgcnt=" << msg.count()
382  << ", msgoffset=" << msg.offset();
383  }
384 
385  /// Return the symbols known in the order book
386  std::vector<stock_t> symbols() const {
387  std::vector<stock_t> result(books_.size());
388  std::transform(
389  books_.begin(), books_.end(), result.begin(),
390  [](auto const& x) { return x.first; });
391  return result;
392  }
393 
394  /// Return the current timestamp for delay measurements
395  time_point now() const {
396  return std::chrono::steady_clock::now();
397  }
398 
399 private:
400  /// Represent the collection of order books
401  using books_by_security =
402  std::unordered_map<stock_t, order_book<book_type>, boost::hash<stock_t>>;
403 
404  /// Represent the collection of all orders
405  using orders_by_id = std::unordered_map<std::uint64_t, order_data>;
406  using orders_iterator = typename orders_by_id::iterator;
407 
408  /**
409  * Refactor code to handle order reductions, i.e., cancels and
410  * executions
411  *
412  * @param recvts the timestamp when the message was received
413  * @param msgcnt the number of messages received before this message
414  * @param msgoffset the number of bytes received before this message
415  * @param header the header of the message that triggered this event
416  * @param order_reference_number the id of the order being reduced
417  * @param shares the number of shares to reduce, if 0 reduce all shares
418  */
420  time_point recvts, long msgcnt, std::size_t msgoffset,
421  message_header const& header, std::uint64_t order_reference_number,
422  std::uint32_t shares) {
423  // First we need to find the order ...
424  auto position = orders_.find(order_reference_number);
425  if (position == orders_.end()) {
426  // ... ooops, this should not happen, there is a problem with the
427  // feed, log the problem and skip the message ...
428  JB_LOG(warning) << "unknown order in handle_order_reduction"
429  << ", id=" << order_reference_number
430  << ", location=" << msgcnt << ":" << msgoffset
431  << ", header=" << header
432  << ", order_reference_number=" << order_reference_number
433  << ", shares=" << shares;
434  return;
435  }
436  // find the book..
437  auto itbook = books_.find(position->second.stock);
438  // ... the book has to exist, since the original add_order created
439  // one if needed
440  JB_ASSERT_THROW(itbook != books_.end());
441  auto u = do_reduce(
442  position, itbook->second, recvts, msgcnt, msgoffset, header,
443  order_reference_number, shares);
444  callback_(header, itbook->second, u);
445  }
446 
447  /**
448  * Refactor code common to handle_order_reduction() and
449  * handle_message(order_replace_message).
450  *
451  * @param position the location of the order matching order_reference_number
452  * @param book the order_book matching the symbol for the given order
453  * @param recvts the timestamp when the message was received
454  * @param msgcnt the number of messages received before this message
455  * @param msgoffset the number of bytes received before this message
456  * @param header the header of the message that triggered this event
457  * @param order_reference_number the id of the order being reduced
458  * @param shares the number of shares to reduce, if 0 reduce all shares
459  */
462  long msgcnt, std::size_t msgoffset, message_header const& header,
463  std::uint64_t order_reference_number, std::uint32_t shares) {
464  auto& data = position->second;
465  int qty = shares == 0 ? data.qty : static_cast<int>(shares);
466  // ... now we need to update the data for the order ...
467  if (data.qty < qty) {
468  JB_LOG(warning) << "trying to execute more shares than are available"
469  << ", location=" << msgcnt << ":" << msgoffset
470  << ", data=" << data << ", header=" << header
471  << ", order_reference_number=" << order_reference_number
472  << ", shares=" << shares;
473  qty = data.qty;
474  }
475  data.qty -= qty;
476  // ... if the order is finished we need to remove it, otherwise the
477  // number of live orders grows without bound (almost), this might
478  // remove the data, so we make a copy ...
479  book_update u{recvts, data.stock, data.buy_sell_indicator, data.px,
480  -static_cast<int>(qty)};
481  // ... after the copy is safely stored, go and remove the order if
482  // needed ...
483  if (data.qty == 0) {
484  orders_.erase(position);
485  }
486  (void)book.handle_order_reduced(u.buy_sell_indicator, u.px, qty);
487  return u;
488  }
489 
490 private:
491  /// Store the callback function, this is invoked on each event that
492  /// changes a book.
494 
495  /// The order books indexed by security.
497 
498  /// The live orders indexed by the "order reference number"
500 
501  /// reference to the order book config
503 };
504 
505 inline bool operator==(book_update const& a, book_update const& b) {
506  return a.recvts == b.recvts and a.stock == b.stock and
507  a.buy_sell_indicator == b.buy_sell_indicator and a.px == b.px and
508  a.qty == b.qty;
509 }
510 
511 inline bool operator!=(book_update const& a, book_update const& b) {
512  return not(a == b);
513 }
514 
515 std::ostream& operator<<(std::ostream& os, book_update const& x) {
516  return os << "{" << x.stock << "," << x.buy_sell_indicator << "," << x.px
517  << "," << x.qty << "}";
518 }
519 
520 std::ostream& operator<<(std::ostream& os, order_data const& x) {
521  return os << "{" << x.stock << "," << x.buy_sell_indicator << "," << x.px
522  << "," << x.qty << "}";
523 }
524 
525 } // namespace itch5
526 } // namespace jb
527 
528 #endif // jb_itch5_compute_book_hpp
void handle_message(time_point recvts, long msgcnt, std::size_t msgoffset, add_order_message const &msg)
Handle a new order message.
callback_type callback_
Store the callback function, this is invoked on each event that changes a book.
std::vector< stock_t > symbols() const
Return the symbols known in the order book.
price4_t px
The price of the order.
bool handle_order_reduced(buy_sell_indicator_t side, price4_t px, int reduced_qty)
Handle an order reduction, which includes executions, cancels and replaces.
Definition: order_book.hpp:134
Compute the book and call a user-defined callback on each change.
bool operator!=(book_update const &a, book_update const &b)
Define the header common to all ITCH 5.0 messages.
book_type_config const & cfg_
reference to the order book config
void handle_message(time_point recvts, long msgcnt, std::size_t msgoffset, stock_directory_message const &msg)
Pre-populate the books based on the symbol directory.
std::unordered_map< stock_t, order_book< book_type >, boost::hash< stock_t > > books_by_security
Represent the collection of order books.
void handle_message(time_point recvts, long msgcnt, std::size_t msgoffset, order_replace_message const &msg)
Handle an order replace.
clock_type::time_point time_point
A convenience alias for clock_type::time_point.
Represent an &#39;Order Executed&#39; message in the ITCH-5.0 protocol.
bool operator==(book_update const &a, book_update const &b)
void handle_message(time_point recvts, long msgcnt, std::size_t msgoffset, order_cancel_message const &msg)
Handle a partial cancel.
void handle_order_reduction(time_point recvts, long msgcnt, std::size_t msgoffset, message_header const &header, std::uint64_t order_reference_number, std::uint32_t shares)
Refactor code to handle order reductions, i.e., cancels and executions.
void handle_message(time_point recvts, long msgcnt, std::size_t msgoffset, order_executed_price_message const &msg)
Handle an order execution with a different price than the order&#39;s.
STL namespace.
A convenient container for per-order data.
orders_by_id orders_
The live orders indexed by the "order reference number".
void handle_message(time_point, long, std::size_t, message_type const &)
Ignore all other message types.
jb::itch5::time_point time_point
time_point is used as a compute_book<book_type> type in some modules
compute_book(callback_type const &cb, book_type_config const &cfg)
Represent an &#39;Order Delete&#39; message in the ITCH-5.0 protocol.
void handle_message(time_point recvts, long msgcnt, std::size_t msgoffset, order_executed_message const &msg)
Handle an order execution.
Represent an &#39;Order Executed with Price&#39; message in the ITCH-5.0 protocol.
typename book_type::config book_type_config
config type is used to construct the order_book
typename orders_by_id::iterator orders_iterator
Represent an &#39;Add Order with MPID&#39; message in the ITCH-5.0 protocol.
std::chrono::steady_clock clock_type
A convenience alias for clock_type.
int oldqty
How many shares were removed in the old order.
stock_t stock
The symbol for this particular order.
bool cxlreplx
If true, this was a cancel replace and and old order was modified too...
std::function< void(message_header const &header, order_book< book_type > const &updated_book, book_update const &update)> callback_type
Define the callback type.
void handle_message(time_point recvts, long msgcnt, std::size_t msgoffset, add_order_mpid_message const &msg)
Handle a new order with MPID.
std::unordered_map< std::uint64_t, order_data > orders_by_id
Represent the collection of all orders.
#define JB_ASSERT_THROW(PRED)
compute_book(callback_type &&cb, book_type_config const &cfg)
Constructor.
time_point now() const
Return the current timestamp for delay measurements.
buy_sell_indicator_t buy_sell_indicator
What side of the book is being updated.
void handle_unknown(time_point recvts, unknown_message const &msg)
Log any unknown message types.
stock_t stock
The security updated by this order.
A flat struct to represent updates to an order book.
std::ostream & operator<<(std::ostream &os, add_order_message const &x)
Streaming operator for jb::itch5::add_order_message.
int qty
The remaining quantity in the order.
void handle_message(time_point recvts, long msgcnt, std::size_t msgoffset, order_delete_message const &msg)
Handle a full cancel.
Represent an &#39;Order Replace&#39; message in the ITCH-5.0 protocol.
price4_t oldpx
Old price for the order.
book_update do_reduce(orders_iterator position, order_book< book_type > &book, time_point recvts, long msgcnt, std::size_t msgoffset, message_header const &header, std::uint64_t order_reference_number, std::uint32_t shares)
Refactor code common to handle_order_reduction() and handle_message(order_replace_message).
jb::itch5::clock_type clock_type
clock_type is used as a compute_book<book_type> type in some modules
int qty
How many shares are being added (if positive) or removed (if negative) from the book.
time_point recvts
When was the message that triggered this update received.
Maintain the ITCH-5.0 order book for a single security.
Definition: order_book.hpp:57
#define JB_LOG(lvl)
Definition: log.hpp:70
buy_sell_indicator_t buy_sell_indicator
Whether the order is a BUY or SELL.
void const * buf() const
books_by_security books_
The order books indexed by security.
Represent a &#39;Stock Directory&#39; message in the ITCH-5.0 protocol.
Represent an &#39;Add Order&#39; message in the ITCH-5.0 protocol.
price4_t px
What price level is being updated.
The top-level namespace for the JayBeams library.
Definition: as_hhmmss.hpp:7
std::uint32_t count() const
std::uint64_t offset() const
Represent an &#39;Order Cancel&#39; message in the ITCH-5.0 protocol.