JayBeams  0.1
Another project to have fun coding.
mold_udp_pacer.hpp
Go to the documentation of this file.
1 #ifndef jb_itch5_mold_udp_pacer_hpp
2 #define jb_itch5_mold_udp_pacer_hpp
3 
4 #include <jb/itch5/encoder.hpp>
10 #include <jb/assert_throw.hpp>
11 
12 #include <boost/asio/buffer.hpp>
13 
14 namespace jb {
15 namespace itch5 {
16 
17 /**
18  * Send a sequence of raw ITCH-5.x messages as MoldUDP64 packets, trying
19  * to match the original time interval between messages.
20  *
21  * The MoldUDP64 protocol (see link below) allows transmission of
22  * ITCH-5.x messages over UDP. Multiple ITCH-5.x messages are packed
23  * into a single MoldUDP64 packet, which includes enough information to
24  * request retransmissions if needed.
25  *
26  * This class receives a stream of raw ITCH-5.x messages and creates a
27  * stream of MoldUDP64 packets. It examines the original timestamps of
28  * the raw ITCH-5.x messages to pace the outgoing stream. When the
29  * original messages are sufficiently close in time they are assembled
30  * into a single large packet. If the messages are separated in time
31  * the class blocks until enough wall-clock time has elapsed.
32  *
33  * @tparam clock_type a dependency injection point to make this class
34  * testable. Normally the class is simply used with a
35  * std::chrono::steady_clock. Under test, it is convenient to be able
36  * to modify the results of the clock_type::now() function to exercise
37  * multiple scenarios.
38  *
39  * References:
40  * http://www.nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/moldudp64.pdf
41  */
42 template <typename clock_type = std::chrono::steady_clock>
44 public:
45  //@{
46  /**
47  * @name Type traits
48  */
49  /// The time point (specific time with respect to some epoch) type.
51 
52  /// The duration (the difference between two time points) type.
53  typedef typename clock_type::duration duration;
54 
55  /// The configuration file
57 
58  /**
59  * The type used to represent session ids.
60  *
61  * The MoldUDP64 protocol uses a 10-character identifier for the
62  * session id, different streams can be distinguished using this
63  * field in the protocol.
64  */
67  //@}
68 
69  /**
70  * Initialize a MoldUDP pacer object.
71  */
73  config const& cfg = config(),
74  session_id_type const& session_id = session_id_type())
75  : last_send_{std::chrono::microseconds(0)}
76  , max_delay_(std::chrono::microseconds(cfg.maximum_delay_microseconds()))
77  , mtu_(cfg.maximum_transmission_unit())
80  , first_block_(0)
81  , first_block_ts_{std::chrono::microseconds(0)}
82  , block_count_(0) {
83  boost::asio::buffer_copy(
84  packet_, boost::asio::buffer(session_id.c_str(), session_id.wire_size));
85  }
86 
87  /**
88  * Process a raw ITCH-5.x message.
89  *
90  * @param ts the wall-clock when the message was received, as
91  * defined by @a clock_type
92  * @param msg the message received, the timestamp in the message is
93  * used to pace the outgoing MoldUDP64 packets
94  * @param sink a functor to send the MoldUDP64 packets
95  * @param sleeper a functor to sleep and effectively pace the
96  * messages
97  *
98  * @tparam message_sink_type the type of the @a sink functor. The
99  * signature must be compatible with void(auto buffers) where
100  * buffers meets the requirements of a Boost.Asio ConstBufferSequence.
101  * @tparam sleep_functor_type the type of the sleeper function, the
102  * signature must be compatible with void(clock_type::duration const&)
103  */
104  template <typename message_sink_type, typename sleep_functor_type>
106  time_point ts, unknown_message const& msg, message_sink_type& sink,
107  sleep_functor_type& sleeper) {
108  message_header msghdr = msg.decode_header<false>();
109 
110  // how long since the last send() call...
111  if (msg.count() == 0) {
112  // ... on the first message initialize the timestamp, otherwise
113  // we would likely flush the first message always ...
114  last_send_ = msghdr.timestamp;
115  }
116  auto elapsed = msghdr.timestamp.ts - last_send_.ts;
117  if (elapsed < max_delay_) {
118  // ... save the message to send later, potentially flushing if
119  // the queue is big enough ...
120  coalesce(ts, msg, msghdr.timestamp, sink);
121  return;
122  }
123  // ... flush whatever is in the queue ...
124  flush(msghdr.timestamp, sink);
125  // ... until the timer has expired ...
126  sleeper(elapsed);
127  // ... send the message immediately ...
128  coalesce(ts, msg, msghdr.timestamp, sink);
129  }
130 
131  /**
132  * Flush the current messages, if any
133  *
134  * @param ts the wall clock time when the message was received
135  * @param sink the destination for the MoldUDP64 packets
136  *
137  * @tparam message_sink_type please see handle_message() for details
138  */
139  template <typename message_sink_type>
140  void flush(timestamp ts, message_sink_type& sink) {
141  if (block_count_ == 0) {
142  return;
143  }
144  flush_impl(ts, sink);
145  }
146 
147  /**
148  * Send a heartbeat packet.
149  *
150  * If there are any pending messages those messages are flushed and
151  * the resulting packet constitutes the heartbeat.
152  *
153  * @param sink the destination for the MoldUDP64 packets
154  *
155  * @tparam message_sink_type please see handle_message() for details
156  */
157  template <typename message_sink_type>
158  void heartbeat(message_sink_type& sink) {
160  }
161 
162 private:
163  /**
164  * Add another message to the current queue, flushing first if
165  * necessary.
166  *
167  * @param recv_ts the timestamp when the message was received
168  * @param msg the message contents and location
169  * @param ts the timestamp when the last message was sent
170  * @param sink the destination for the MoldUDP64 packets
171  */
172  template <typename message_sink_type>
173  void coalesce(
174  time_point recv_ts, unknown_message const& msg, timestamp ts,
175  message_sink_type& sink) {
176  // Make sure the message is small enough to be represented in a
177  // single MoldUDP64 block ...
178  JB_ASSERT_THROW(msg.len() < (1 << 16));
179  // ... make sure the message is small enough to fit in a single
180  // MoldUDP64 packet given the current MTU ...
182 
183  // ... if the packet is too full to accept the current message,
184  // flush first ...
185  if (packet_full(msg.len())) {
186  flush(ts, sink);
187  }
188  if (block_count_ == 0) {
189  first_block_ = msg.count();
190  first_block_ts_ = ts;
191  }
192  // ... append the message as a new block in the MoldUDP packet,
193  // first update the block header ...
194  boost::asio::mutable_buffer block_header = packet_ + packet_size_;
196  buffer_size(block_header),
197  boost::asio::buffer_cast<void*>(block_header), 0, msg.len());
198  // ... the copy the message into the block payload ...
199  boost::asio::mutable_buffer block_payload = packet_ + packet_size_ + 2;
200  boost::asio::buffer_copy(
201  block_payload, boost::asio::buffer(msg.buf(), msg.len()));
202  packet_size_ += msg.len() + 2;
203 
204  // ... and update the number of blocks ...
205  block_count_++;
206  }
207 
208  /// Fill up the header for the MoldUDP64 packet
210  // ... we assume that the session was initialized in the
211  // constructor, and simply reuse that portion of the packet over
212  // and over.
213  // ... write down the sequence number field of the packet header,
214  // this is the number of the first block ...
217  boost::asio::buffer_size(seqno), boost::asio::buffer_cast<void*>(seqno),
218  0, first_block_);
219  // ... then write down the block field ...
222  boost::asio::buffer_size(blkcnt),
223  boost::asio::buffer_cast<void*>(blkcnt), 0, block_count_);
224  }
225 
226  /**
227  * Implement the flush() and hearbeat() member functions.
228  */
229  template <typename message_sink_type>
230  void flush_impl(timestamp ts, message_sink_type& sink) {
232  sink(boost::asio::buffer(packet_, packet_size_));
233  last_send_ = ts;
235  block_count_ = 0;
237  }
238 
239  /**
240  * Return true if the packet is too full to accept a new block of
241  * size @a block_size.
242  *
243  * @param block_size the size of the next block that we intend to
244  * add to the packet
245  */
246  bool packet_full(std::uint16_t block_size) const {
247  if (block_size + 2 + packet_size_ >= std::size_t(mtu_)) {
248  return true;
249  }
250  return block_count_ == std::numeric_limits<std::uint16_t>::max();
251  }
252 
253 private:
255  duration max_delay_;
256  int mtu_;
257 
258  // Use a simple raw buffer to hold the packet, this is good enough
259  // because MoldUDP64 can only operate on UDP packets, which never
260  // exceed 64KiB.
261  static constexpr std::size_t rawbufsize = 65536;
262  char rawbuf[rawbufsize] = {0};
263 
264  boost::asio::mutable_buffer packet_;
265  std::size_t packet_size_;
266 
267  std::uint32_t first_block_;
269  std::uint16_t block_count_;
270 };
271 
272 } // namespace itch5
273 } // namespace jb
274 
275 #endif // jb_itch5_mold_udp_pacer_hpp
mold_udp_pacer_config config
The configuration file.
Define the header common to all ITCH 5.0 messages.
void handle_message(time_point ts, unknown_message const &msg, message_sink_type &sink, sleep_functor_type &sleeper)
Process a raw ITCH-5.x message.
clock_type::time_point time_point
A convenience alias for clock_type::time_point.
static void w(std::size_t size, void *msg, std::size_t offset, T const &x)
Write a single message or field to a buffer.
void heartbeat(message_sink_type &sink)
Send a heartbeat packet.
void fillup_header_fields()
Fill up the header for the MoldUDP64 packet.
mold_udp_pacer(config const &cfg=config(), session_id_type const &session_id=session_id_type())
Initialize a MoldUDP pacer object.
constexpr std::size_t sequence_number_offset
The location of the sequence number field within the header.
Send a sequence of raw ITCH-5.x messages as MoldUDP64 packets, trying to match the original time inte...
void coalesce(time_point recv_ts, unknown_message const &msg, timestamp ts, message_sink_type &sink)
Add another message to the current queue, flushing first if necessary.
jb::itch5::timestamp last_send_
static constexpr std::size_t rawbufsize
clock_type::duration duration
The duration (the difference between two time points) type.
boost::asio::mutable_buffer packet_
message_header decode_header() const
Extract the message header.
bool packet_full(std::uint16_t block_size) const
Return true if the packet is too full to accept a new block of size block_size.
Represent a ITCH-5.0 timestamp.
Definition: timestamp.hpp:17
void flush(timestamp ts, message_sink_type &sink)
Flush the current messages, if any.
std::chrono::nanoseconds ts
Definition: timestamp.hpp:18
#define JB_ASSERT_THROW(PRED)
A helper type to define short (and fixed sized) string fields.
jb::itch5::short_string_field< mold_udp_protocol::session_id_size > session_id_type
The type used to represent session ids.
void flush_impl(timestamp ts, message_sink_type &sink)
Implement the flush() and hearbeat() member functions.
Configuration object for the jb::itch5::mold_udp_pacer class.
constexpr std::size_t header_size
The total size of the MoldUDP64 header.
clock_type::time_point time_point
The time point (specific time with respect to some epoch) type.
constexpr std::size_t block_count_offset
The location of the block count field within the header.
jb::itch5::timestamp timestamp
The message timestamp, in nanoseconds since midnight.
void const * buf() const
The top-level namespace for the JayBeams library.
Definition: as_hhmmss.hpp:7
std::uint32_t count() const