18 #include <beast/http.hpp> 19 #include <boost/asio/io_service.hpp> 20 #include <boost/asio/ip/address.hpp> 21 #include <boost/asio/ip/multicast.hpp> 22 #include <boost/asio/ip/tcp.hpp> 23 #include <boost/asio/ip/udp.hpp> 54 class session :
public std::enable_shared_from_this<session> {
64 session(config
const& cfg);
73 std::uint32_t last_message_count()
const {
74 return last_message_count_.load(std::memory_order_relaxed);
78 std::uint64_t last_message_offset()
const {
79 return last_message_offset_.load(std::memory_order_relaxed);
87 time_point now()
const {
88 return std::chrono::steady_clock::now();
93 std::atomic_bool stop_;
95 std::atomic<std::uint32_t> last_message_count_;
96 std::atomic<std::uint64_t> last_message_offset_;
97 boost::asio::io_service io_;
98 boost::asio::ip::udp::socket s0_;
99 boost::asio::ip::udp::endpoint ep0_;
100 boost::asio::ip::udp::socket s1_;
101 boost::asio::ip::udp::endpoint ep1_;
105 class replayer_control {
107 explicit replayer_control(config
const& cfg);
109 enum class state { idle, starting, replaying, stopping };
121 mutable std::mutex mu_;
122 state current_state_;
123 std::thread session_thread_;
124 std::shared_ptr<session> session_;
129 int main(
int argc,
char* argv[])
try {
132 cfg.load_overrides(argc, argv, std::string(
"moldreplay.yaml"),
"JB_ROOT");
135 boost::asio::io_service io_service;
137 using endpoint = boost::asio::ip::tcp::endpoint;
138 using address = boost::asio::ip::address;
139 endpoint ep{address::from_string(cfg.control_host()), cfg.control_port()};
143 auto replayer = std::make_shared<replayer_control>(cfg);
149 auto dispatcher = std::make_shared<jb::ehs::request_dispatcher>(
"moldreplay");
151 res.insert(
"Content-type",
"text/plain");
152 res.body =
"Server running...\r\n";
154 dispatcher->add_handler(
156 res.insert(
"Content-type",
"text/plain");
157 std::ostringstream os;
162 std::weak_ptr<jb::ehs::request_dispatcher> disp = dispatcher;
163 dispatcher->add_handler(
165 std::shared_ptr<jb::ehs::request_dispatcher> d(disp);
167 res.result(beast::http::status::internal_server_error);
168 res.body = std::string(
"An internal error occurred\r\n" 169 "Null request handler in /metrics\r\n");
172 res.set(
"content-type",
"text/plain; version=0.0.4");
173 d->append_metrics(res);
175 dispatcher->add_handler(
177 replayer->status(res);
179 dispatcher->add_handler(
181 replayer->start(req, res);
183 dispatcher->add_handler(
185 replayer->stop(req, res);
198 std::cout << u.what() << std::endl;
200 }
catch (std::exception
const& ex) {
201 std::cerr <<
"Standard exception raised: " << ex.what() << std::endl;
204 std::cerr <<
"Unknown exception raised" << std::endl;
212 std::string
const primary_destination =
"127.0.0.1";
213 std::string
const secondary_destination =
"127.0.0.1";
214 int const primary_port = 12300;
215 int const secondary_port = 12301;
217 std::string control_host =
"0.0.0.0";
218 int const control_port = 23000;
222 : primary_destination(
223 desc(
"primary-destination")
224 .help(
"The destination for the UDP messages. " 225 "The destination can be a unicast or multicast address."),
226 this, defaults::primary_destination)
229 .help(
"The destination port for the UDP messages."),
230 this, defaults::primary_port)
231 , secondary_destination(
232 desc(
"secondary-destination")
233 .help(
"The destination for the UDP messages. " 234 "The destination can be empty, a unicast, or a multicast " 236 this, defaults::secondary_destination)
238 desc(
"secondary-port")
239 .help(
"The destination port for the UDP messages."),
240 this, defaults::secondary_port)
243 .help(
"Where does the server listen for control connections." 244 "Typically this is an address for the current host," 245 "for example: 'localhost', '0.0.0.0', or '::1'."),
246 this, defaults::control_host)
248 desc(
"control-port").help(
"The port to receive control connections."),
249 this, defaults::control_port)
251 desc(
"input-file").help(
"The file to replay when requested."),
this)
253 desc(
"replay-session",
"thread-config")
254 .help(
"Configure the replay session threads."),
257 desc(
"pacer",
"mold-udp-pacer").help(
"Configure the ITCH-5.x pacer"),
259 , log(desc(
"log",
"logging"),
this) {
262 void config::validate()
const {
263 if (primary_destination() ==
"") {
264 throw jb::usage(
"Missing primary-destination argument or setting.", 1);
266 if (input_file() ==
"") {
267 throw jb::usage(
"Missing input-file argument or setting.", 1);
272 session::session(config
const& cfg)
275 , pacer_(cfg.pacer())
276 , last_message_count_(0)
277 , last_message_offset_(0)
283 , ep1_enabled_(
false) {
284 #ifndef ATOMIC_BOOL_LOCK_FREE 285 #error "Missing ATOMIC_BOOL_LOCK_FREE required by C++11 standard" 286 #endif // ATOMIC_BOOL_LOCK_FREE 288 ATOMIC_BOOL_LOCK_FREE == 2,
"Class requires lock-free std::atomic<bool>");
291 boost::asio::ip::address::from_string(cfg_.primary_destination());
292 boost::asio::ip::udp::endpoint ep0(address0, cfg_.primary_port());
293 boost::asio::ip::udp::socket s0(io_, ep0.protocol());
294 if (ep0.address().is_multicast()) {
295 s0.set_option(boost::asio::ip::multicast::enable_loopback(
true));
298 ep0_ = std::move(ep0);
300 if (cfg_.secondary_destination() !=
"") {
302 boost::asio::ip::address::from_string(cfg_.secondary_destination());
303 boost::asio::ip::udp::endpoint ep1(address1, cfg_.secondary_port());
304 boost::asio::ip::udp::socket s1(io_, ep1.protocol());
305 if (ep1.address().is_multicast()) {
306 s1.set_option(boost::asio::ip::multicast::enable_loopback(
true));
309 ep1_ = std::move(ep1);
314 void session::start() {
315 auto self = shared_from_this();
317 boost::iostreams::filtering_istream in;
319 jb::itch5::process_iostream_mlist<session>(in, *
self);
322 void session::stop() {
323 stop_.store(
true, std::memory_order_release);
326 void session::handle_unknown(
328 if (stop_.load(std::memory_order_consume)) {
329 throw std::runtime_error(
"stopping replay thread");
331 last_message_count_.store(msg.
count(), std::memory_order_relaxed);
332 last_message_offset_.store(msg.
offset(), std::memory_order_relaxed);
333 auto sink = [
this](
auto buffers) {
334 s0_.send_to(buffers, ep0_);
336 s1_.send_to(buffers, ep1_);
343 if (d > std::chrono::seconds(10)) {
344 d = std::chrono::seconds(10);
346 std::this_thread::sleep_for(d);
348 pacer_.handle_message(recv_ts, msg, sink, sleeper);
351 replayer_control::replayer_control(config
const& cfg)
354 , current_state_(state::idle) {
358 res.set(
"content-type",
"text/plain");
360 std::lock_guard<std::mutex> guard(mu_);
361 std::ostringstream os;
362 switch (current_state_) {
364 res.body =
"idle\nNothing to see here folks\n";
366 case state::starting:
367 os <<
"starting\nMessages arriving shortly\n";
369 case state::stopping:
370 os <<
"stopping\nMessages will stop flowing\n";
372 case state::replaying:
376 res.result(beast::http::status::internal_server_error);
377 res.body =
"Unkown state\n";
381 os <<
" last-count: " << session_->last_message_count() <<
"\n" 382 <<
" last-offset: " << session_->last_message_offset() <<
"\n" 387 void replayer_control::start(
389 std::lock_guard<std::mutex> guard(mu_);
390 if (current_state_ != state::idle) {
391 res.result(beast::http::status::precondition_required);
392 res.body =
"request rejected, current status is not idle\n";
398 res.result(beast::http::status::ok);
399 res.body =
"request succeeded, started new session\n";
400 auto s = std::make_shared<session>(cfg_);
404 current_state_ = state::starting;
408 if (not start_check()) {
420 session_thread_.detach();
424 void replayer_control::stop(
426 std::lock_guard<std::mutex> guard(mu_);
427 if (current_state_ != state::replaying and
428 current_state_ != state::starting) {
429 res.result(beast::http::status::precondition_required);
430 res.body =
"request rejected, current status is not valid\n";
433 current_state_ = state::stopping;
434 res.result(beast::http::status::ok);
435 res.body =
"request succeeded, stopping current session\n";
440 bool replayer_control::start_check() {
441 std::lock_guard<std::mutex> guard(mu_);
442 if (current_state_ != state::starting) {
445 current_state_ = state::replaying;
449 void replayer_control::replay_done() {
450 std::lock_guard<std::mutex> guard(mu_);
451 current_state_ = state::idle;
Define defaults for program parameters.
clock_type::time_point time_point
A convenience alias for clock_type::time_point.
Base class for all configuration objects.
beast::http::request< beast::http::string_body > request_type
The request type used for JayBeams Embedded HTTP Servers.
virtual void validate() const
Validate the settings.
Send a sequence of raw ITCH-5.x messages as MoldUDP64 packets, trying to match the original time inte...
clock_type::duration duration
The duration (the difference between two time points) type.
beast::http::response< beast::http::string_body > response_type
The response type used for JayBeams Embedded HTTP Servers.
void init(config const &cfg)
Initialize the logging functions using the configuration provided.
Hold the configuration to initialize threads.
#define config_object_constructors(NAME)
Helper class to easily define configuration attributes.
#define JB_ASSERT_THROW(PRED)
A simple class to communicate the result of parsing the options.
void open_input_file(boost::iostreams::filtering_istream &in, std::string const &filename)
Open a file for reading.
Create a control server for the program.
int main(int argc, char *argv[])
void launch_thread(std::thread &t, thread_config const &config, Function &&f, A &&... a)
Create a new thread, configure it as desired, and then call a user-defined function.
std::uint32_t count() const
std::uint64_t offset() const