JayBeams  0.1
Another project to have fun coding.
mold_udp_channel.cpp
Go to the documentation of this file.
2 
7 #include <jb/log.hpp>
8 
9 #include <boost/asio/ip/multicast.hpp>
10 
11 #include <utility>
12 
13 namespace jb {
14 namespace itch5 {
15 
17  boost::asio::io_service& io, buffer_handler const& handler,
18  udp_receiver_config const& cfg)
19  : mold_udp_channel(io, buffer_handler(handler), cfg) {
20 }
21 
23  boost::asio::io_service& io, buffer_handler&& handler,
24  udp_receiver_config const& cfg)
25  : handler_(std::move(handler))
26  , socket_(make_socket_udp_recv<>(io, cfg))
28  , message_offset_(0) {
30 }
31 
33  socket_.async_receive_from(
34  boost::asio::buffer(buffer_, buflen), sender_endpoint_,
35  [this](boost::system::error_code const& ec, size_t bytes_received) {
36  handle_received(ec, bytes_received);
37  });
38 }
39 
41  boost::system::error_code const& ec, size_t bytes_received) {
42  if (ec) {
43  // If we get an error from the socket simply report it and
44  // return. No more callbacks will be registered in this case ...
45  JB_LOG(info) << "error received in mold_udp_channel::handle_received: "
46  << ec.message() << " (" << ec << ")";
47  return;
48  }
49 
50  if (bytes_received <= 0) {
51  // ... if we get notified for an empty packet, simply re-register
52  // and return ...
54  return;
55  }
56 
57  // ... we have no errors and at least some data to process, get the
58  // current timestamp, all the messages in the MoldUDP64 packet share
59  // the same timestamp ...
60  auto recv_ts = std::chrono::steady_clock::now();
61  // ... parse the sequence number of the first message in the
62  // MoldUDP64 packet ...
63  auto sequence_number = jb::itch5::decoder<true, std::uint64_t>::r(
64  bytes_received, buffer_,
66  // ... and parse the number of blocks in the MoldUDP64 packet ...
68  bytes_received, buffer_,
70 
71  // ... if the message is out of order we simply print the problem,
72  // in a more realistic application we would need to reorder them
73  // and gap fill if needed, and sometimes do even more complicated
74  // things ...
75  if (sequence_number != expected_sequence_number_) {
76  JB_LOG(info) << "Mismatched sequence number, expected="
77  << expected_sequence_number_ << ", got=" << sequence_number;
78  }
79 
80  // Keep track of where each ITCH-5.0 message starts in the
81  // MoldUDP64 block ...
82  std::size_t offset = jb::itch5::mold_udp_protocol::header_size;
83  // ... process each message in the MoldUDP64 packet, in order ...
84  for (std::size_t block = 0; block != block_count; ++block) {
85  // ... parse the block size ...
87  bytes_received, buffer_, offset);
88  // ... increment the offset into the MoldUDP64 packet, this is
89  // the start of the ITCH-5.x message ...
90  offset += 2;
91  // ... process the buffer ...
92  handler_(
94  message_size);
95 
96  // ... increment counters to reflect that this message was
97  // proceesed ...
98  sequence_number++;
99  message_offset_ += message_size;
100  offset += message_size;
101  }
102  // ... since we are not dealing with gaps, or message reordering
103  // just reset the next expected number ...
104  expected_sequence_number_ = sequence_number;
105 
106  // ... and register for a new IO callback ...
108 }
109 
110 } // namespace itch5
111 } // namespace jb
static T r(std::size_t size, void const *msg, std::size_t offset)
Read a single message or field.
mold_udp_channel(boost::asio::io_service &io, buffer_handler const &handler, udp_receiver_config const &cfg)
Constructor, create a socket and register for IO notifications.
socket_t make_socket_udp_recv(boost::asio::io_service &io, udp_receiver_config const &cfg)
Create a socket given the configuration parameters.
static std::size_t const buflen
std::function< void(std::chrono::steady_clock::time_point, std::uint64_t, std::size_t, char const *, std::size_t)> buffer_handler
A callback function type to process any received ITCH-5.0 messages.
constexpr std::size_t sequence_number_offset
The location of the sequence number field within the header.
STL namespace.
void handle_received(boost::system::error_code const &ec, size_t bytes_received)
The Boost.ASIO callback for I/O events.
void restart_async_receive_from()
Refactor code to register (and reregister) for Boost.ASIO notifications.
boost::asio::ip::udp::endpoint sender_endpoint_
constexpr std::size_t header_size
The total size of the MoldUDP64 header.
boost::asio::ip::udp::socket socket_
constexpr std::size_t block_count_offset
The location of the block count field within the header.
#define JB_LOG(lvl)
Definition: log.hpp:70
A configuration object for UDP receivers.
Create and manage a socket to receive MoldUDP64 packets.
The top-level namespace for the JayBeams library.
Definition: as_hhmmss.hpp:7