JayBeams  0.1
Another project to have fun coding.
offline_feed_statistics.cpp
Go to the documentation of this file.
2 
3 #include <jb/as_hhmmss.hpp>
4 #include <jb/log.hpp>
5 
6 #include <iostream>
7 
8 namespace {
9 
10 /**
11  * Log a histogram used to capture message rates.
12  *
13  * @param ts the logical timestamp of the data processed. Typically
14  * this would be the timestamp recorded in a file being replayed, or
15  * the logical timestamp in an experiment. It is not the wall clock
16  * time when the program was running.
17  * @param period_name a human readable string representing the time
18  * period over which the rate is being measured.
19  * @param histo the histogram that has captured the message rates
20  *
21  * @tparam the type of histogram used, typical an instantiation of
22  * jb::event_rate_histogram<>
23  */
24 template <typename event_rate_histogram_t>
25 void report_rate(
26  std::chrono::nanoseconds ts, char const* period_name,
27  event_rate_histogram_t const& histo) {
28  JB_LOG(info) << "events/" << period_name << ": " << jb::as_hhmmss(ts)
29  << ", min=" << histo.observed_min()
30  << ", p25=" << histo.estimated_quantile(0.25)
31  << ", p50=" << histo.estimated_quantile(0.50)
32  << ", p75=" << histo.estimated_quantile(0.75)
33  << ", p90=" << histo.estimated_quantile(0.90)
34  << ", p99=" << histo.estimated_quantile(0.99)
35  << ", p99.9=" << histo.estimated_quantile(0.999)
36  << ", p99.99=" << histo.estimated_quantile(0.9999)
37  << ", max=" << histo.observed_max()
38  << ", N=" << histo.nsamples();
39 }
40 
41 /**
42  * Print a summary of an event rate histogram in CSV form
43  *
44  * @param os the output stream where the CSV line is printed
45  * @param histo the histogram that has captured the message rates
46  *
47  * @tparam the type of histogram used, typical an instantiation of
48  * jb::event_rate_histogram<>
49  */
50 template <typename event_rate_histogram_t>
51 void csv_rate(std::ostream& os, event_rate_histogram_t const& histo) {
52  os << "," << histo.observed_min() << "," << histo.estimated_quantile(0.25)
53  << "," << histo.estimated_quantile(0.50) << ","
54  << histo.estimated_quantile(0.75) << "," << histo.estimated_quantile(0.90)
55  << "," << histo.estimated_quantile(0.99) << ","
56  << histo.estimated_quantile(0.999) << ","
57  << histo.estimated_quantile(0.9999) << "," << histo.observed_max();
58 }
59 
60 /**
61  * Log a histogram used to capture message inter-arrival times.
62  *
63  * @param ts the logical timestamp of the data processed. Typically
64  * this would be the timestamp recorded in a file being replayed, or
65  * the logical timestamp in an experiment. It is not the wall clock
66  * time when the program was running.
67  * @param name a human readable string representing the measurement
68  * @param histo the histogram that has captured the message rates
69  *
70  * @tparam the type of histogram used, typical an instantiation of
71  * jb::histogram<>
72  */
73 template <typename latency_histogram_t>
74 void report_arrival(
75  std::chrono::nanoseconds ts, char const* name,
76  latency_histogram_t const& histo) {
77  JB_LOG(info) << name << ": " << jb::as_hhmmss(ts)
78  << ", min=" << histo.observed_min()
79  << "ns, p0.01=" << histo.estimated_quantile(0.0001)
80  << "ns, p0.1=" << histo.estimated_quantile(0.001)
81  << "ns, p01=" << histo.estimated_quantile(0.01)
82  << "ns, p05=" << histo.estimated_quantile(0.05)
83  << "ns, p10=" << histo.estimated_quantile(0.10)
84  << "ns, p25=" << histo.estimated_quantile(0.25)
85  << "ns, p50=" << histo.estimated_quantile(0.50)
86  << "ns, p75=" << histo.estimated_quantile(0.75)
87  << "ns, p90=" << histo.estimated_quantile(0.90)
88  << "ns, p99=" << histo.estimated_quantile(0.99)
89  << "ns, max=" << histo.observed_max()
90  << "ns, N=" << histo.nsamples();
91 }
92 
93 /**
94  * Print a summary of message interarrival histogram CSV form
95  *
96  * @param os the output stream where the CSV line is printed
97  * @param histo the histogram that has captured the message rates
98  *
99  * @tparam the type of histogram used, typical an instantiation of
100  * jb::histogram<>
101  */
102 template <typename latency_histogram_t>
103 void csv_arrival(std::ostream& os, latency_histogram_t const& histo) {
104  os << "," << histo.observed_min() << "," << histo.estimated_quantile(0.0001)
105  << "," << histo.estimated_quantile(0.001) << ","
106  << histo.estimated_quantile(0.01) << "," << histo.estimated_quantile(0.05)
107  << "," << histo.estimated_quantile(0.10) << ","
108  << histo.estimated_quantile(0.25) << "," << histo.estimated_quantile(0.50)
109  << "," << histo.estimated_quantile(0.75) << ","
110  << histo.estimated_quantile(0.90) << "," << histo.estimated_quantile(0.99)
111  << "," << histo.observed_max();
112 }
113 
114 /**
115  * Log a histogram used to capture latencies (such as processing latency)
116  *
117  * @param ts the logical timestamp of the data processed. Typically
118  * this would be the timestamp recorded in a file being replayed, or
119  * the logical timestamp in an experiment. It is not the wall clock
120  * time when the program was running.
121  * @param name a human readable string representing the measurement
122  * @param histo the histogram that has captured the message rates
123  *
124  * @tparam the type of histogram used, typical an instantiation of
125  * jb::histogram<> where the samples are nanoseconds
126  */
127 template <typename latency_histogram_t>
128 void report_latency(
129  std::chrono::nanoseconds ts, char const* name,
130  latency_histogram_t const& histo) {
131  JB_LOG(info) << name << ": " << jb::as_hhmmss(ts)
132  << ", min=" << histo.observed_min()
133  << "ns, p10=" << histo.estimated_quantile(0.10)
134  << "ns, p25=" << histo.estimated_quantile(0.25)
135  << "ns, p50=" << histo.estimated_quantile(0.50)
136  << "ns, p75=" << histo.estimated_quantile(0.75)
137  << "ns, p90=" << histo.estimated_quantile(0.90)
138  << "ns, p99=" << histo.estimated_quantile(0.99)
139  << "ns, p99.9=" << histo.estimated_quantile(0.999)
140  << "ns, p99.99=" << histo.estimated_quantile(0.9999)
141  << "ns, max=" << histo.observed_max()
142  << "ns, N=" << histo.nsamples();
143 }
144 
145 /**
146  * Print a summary of latency measurements in CSV form
147  *
148  * @param os the output stream where the CSV line is printed
149  * @param histo the histogram that has captured the latencies
150  *
151  * @tparam the type of histogram used, typical an instantiation of
152  * jb::histogram<> where the samples are nanoseconds
153  */
154 template <typename latency_histogram_t>
155 void csv_latency(std::ostream& os, latency_histogram_t const& histo) {
156  os << "," << histo.observed_min() << "," << histo.estimated_quantile(0.10)
157  << "," << histo.estimated_quantile(0.25) << ","
158  << histo.estimated_quantile(0.50) << "," << histo.estimated_quantile(0.75)
159  << "," << histo.estimated_quantile(0.90) << ","
160  << histo.estimated_quantile(0.99) << "," << histo.estimated_quantile(0.999)
161  << "," << histo.estimated_quantile(0.9999) << "," << histo.observed_max();
162 }
163 
164 } // anonymous namespace
165 
167  : per_sec_rate_(
168  cfg.max_messages_per_second(), std::chrono::seconds(1),
169  std::chrono::milliseconds(1))
170  , per_msec_rate_(
171  cfg.max_messages_per_millisecond(), std::chrono::milliseconds(1),
172  std::chrono::microseconds(1))
173  , per_usec_rate_(
174  cfg.max_messages_per_microsecond(), std::chrono::microseconds(1),
175  std::chrono::nanoseconds(1))
176  , interarrival_(interarrival_histogram_t::binning_strategy(
178  , processing_latency_(processing_latency_histogram_t::binning_strategy(
180  , reporting_interval_(
181  std::chrono::seconds(cfg.reporting_interval_seconds()))
182  , last_ts_(0)
183  , last_report_ts_(0) {
184 }
185 
187  char const* fields[] = {"min", "p25", "p50", "p75", "p90",
188  "p99", "p999", "p9999", "max"};
189  char const* tracked[] = {"RatePerSec", "RatePerMSec", "RatePerUSec"};
190  os << "Name,NSamples";
191  for (auto t : tracked) {
192  for (auto f : fields) {
193  os << "," << f << t;
194  }
195  }
196  os << ",minArrival,p0001Arrival,p001Arrival,p01Arrival"
197  << ",p05Arrival,p10Arrival,p25Arrival,p50Arrival,p75Arrival"
198  << ",p90Arrival,p99Arrival,maxArrival";
199  os << ",minProcessing,p10Processing,p25Processing,p50Processing"
200  << ",p75Processing,p90Processing,p99Processing,p999Processing"
201  << ",p9999Processing,maxProcessing";
202  os << "\n";
203 }
204 
206  using namespace std::chrono;
207  log_progress(duration_cast<nanoseconds>(std::chrono::hours(24)));
208 }
209 
211  std::chrono::nanoseconds ts) const {
212  // ... if we have no samples in the interrival histogram logging
213  // progress does not work, so don't do it ...
214  if (interarrival_.nsamples() == 0) {
215  return;
216  }
217  report_rate(ts, "sec ", per_sec_rate_);
218  report_rate(ts, "msec", per_msec_rate_);
219  report_rate(ts, "usec", per_usec_rate_);
220  report_arrival(ts, "arrival ", interarrival_);
221  report_latency(ts, "processing ", processing_latency_);
222 }
223 
225  std::chrono::nanoseconds ts, std::chrono::nanoseconds pl) {
226  // ... first record the sample in the per-second, per-millisecond,
227  // and per-microsecond histograms ...
228  per_sec_rate_.sample(ts);
231 
232  // ... we need at least two samples to start recording interrival
233  // times, check if there was a previous sample
234  if (processing_latency_.nsamples() > 0) {
235  // ... measure the delay and record it ...
236  std::chrono::nanoseconds d = ts - last_ts_;
237  interarrival_.sample(d.count());
238  }
239  // ... save the timestamp so we can record the interarrival latency
240  // next time ...
241  last_ts_ = ts;
242 
243  // ... and record the processing latency, must be after recording
244  // the interarrival time, as this also serves as a sentinel ...
245  processing_latency_.sample(pl.count());
246 
247  // ... log progress every so many seconds ...
249  log_progress(ts);
250  last_report_ts_ = ts;
251  }
252 }
253 
255  std::string const& name, std::ostream& os) const {
256  if (per_sec_rate_.nsamples() == 0 or per_msec_rate_.nsamples() == 0 or
257  per_usec_rate_.nsamples() == 0 or interarrival_.nsamples() == 0 or
258  processing_latency_.nsamples() == 0) {
259  os << name << ",0";
260  os << ",,,,,,,,,"; // per-second rate
261  os << ",,,,,,,,,"; // per-millisecond rate
262  os << ",,,,,,,,,"; // per-microsecond rate
263  os << ",,,,,,,,,,,,"; // interarrival
264  os << ",,,,,,,,,,"; // processing latency
265  os << "\n";
266  return;
267  }
268  os << name << "," << processing_latency_.nsamples();
269  csv_rate(os, per_sec_rate_);
270  csv_rate(os, per_msec_rate_);
271  csv_rate(os, per_usec_rate_);
272  csv_arrival(os, interarrival_);
273  csv_latency(os, processing_latency_);
274  os << std::endl;
275 }
276 
277 namespace jb {
278 namespace defaults {
279 
280 #ifndef JB_OFS_DEFAULTS_max_messages_per_second
281 #define JB_OFS_DEFAULTS_max_messages_per_second 1000000
282 #endif
283 #ifndef JB_OFS_DEFAULTS_max_messages_per_millisecond
284 #define JB_OFS_DEFAULTS_max_messages_per_millisecond 100000
285 #endif
286 #ifndef JB_OFS_DEFAULTS_max_messages_per_microsecond
287 #define JB_OFS_DEFAULTS_max_messages_per_microsecond 100000
288 #endif
289 #ifndef JB_OFS_DEFAULTS_max_interarrival_time_nanoseconds
290 #define JB_OFS_DEFAULTS_max_interarrival_time_nanoseconds 100000
291 #endif
292 #ifndef JB_OFS_DEFAULTS_max_processing_latency_nanoseconds
293 #define JB_OFS_DEFAULTS_max_processing_latency_nanoseconds 1000000
294 #endif
295 #ifndef JB_OFS_DEFAULTS_reporting_interval_seconds
296 #define JB_OFS_DEFAULTS_reporting_interval_seconds 600
297 #endif
298 
307 
308 } // namespace defaults
309 } // namespace jb
310 
313  desc("max-messages-per-second")
314  .help(
315  "Configure the per-second messages rate histogram to expect"
316  " no more than this number of messages per second."
317  " Higher values consume more memory, but give more accurate"
318  " results for high percentiles."),
321  desc("max-messages-per-millisecond")
322  .help(
323  "Configure the per-millisecond messages rate histogram to "
324  "expect"
325  " no more than this number of messages per millisecond."
326  " Higher values consume more memory, but give more accurate"
327  " results for high percentiles."),
330  desc("max-messages-per-microsecond")
331  .help(
332  "Configure the per-microsecond messages rate histogram to "
333  "expect"
334  " no more than this number of messages per microsecond."
335  " Higher values consume more memory, but give more accurate"
336  " results for high percentiles."),
339  desc("max-interarrival-time-nanoseconds")
340  .help(
341  "Configure the interarrival time histogram to expect"
342  " no more than this time between messages."
343  " Higher values consume more memory, but give more accurate"
344  " results for high percentiles."),
347  desc("max-processing-time-nanoseconds")
348  .help(
349  "Configure the processing latency histogram to expect"
350  " no more that no processing time is higher than this value."
351  " Higher values consume more memory, but give more accurate"
352  " results for high percentiles."),
355  desc("reporting-interval-seconds")
356  .help("Configure how often the statistics are logged."
357  " Use 0 to suppress all logging."
358  " The time is measured using the even timestamps,"
359  " for feeds using recorded or simulated timestamps the"
360  " reporting interval will not match the wall time."),
362 }
363 
365  if (max_messages_per_second() <= 1) {
366  std::ostringstream os;
367  os << "max-messages-per-second must be > 1, value="
369  throw jb::usage(os.str(), 1);
370  }
371 
372  if (max_messages_per_millisecond() <= 1) {
373  std::ostringstream os;
374  os << "max-messages-per-millisecond must be > 1, value="
376  throw jb::usage(os.str(), 1);
377  }
378 
379  if (max_messages_per_microsecond() <= 1) {
380  std::ostringstream os;
381  os << "max-messages-per-microsecond must be > 1, value="
383  throw jb::usage(os.str(), 1);
384  }
385 
387  std::ostringstream os;
388  os << "max-interarrival-time-nanoseconds must be > 1, value="
390  throw jb::usage(os.str(), 1);
391  }
392 
394  std::ostringstream os;
395  os << "max-processing_latency-nanoseconds must be > 1, value="
397  throw jb::usage(os.str(), 1);
398  }
399 
400  if (reporting_interval_seconds() < 0) {
401  std::ostringstream os;
402  os << "reporting-interval-seconds must be > 1, value="
404  throw jb::usage(os.str(), 1);
405  }
406 }
jb::config_attribute< config, int > reporting_interval_seconds
void record_sample(std::chrono::nanoseconds ts, std::chrono::nanoseconds processing_latency)
Refactor non-template portions of sample()
void sample(sample_type const &t)
Record a new sample.
Definition: histogram.hpp:197
Define defaults for program parameters.
#define JB_OFS_DEFAULTS_max_processing_latency_nanoseconds
void log_progress(std::chrono::nanoseconds ts) const
Report progress up to a certain point in the input.
std::int64_t max_interarrival_time_nanoseconds
#define JB_OFS_DEFAULTS_max_messages_per_second
#define JB_OFS_DEFAULTS_max_messages_per_millisecond
STL namespace.
Helper class to print time durations s in HHMMSS format.
Definition: as_hhmmss.hpp:35
std::chrono::nanoseconds last_report_ts_
void sample(duration_type ts)
Record a new sample.
jb::config_attribute< config, int > max_messages_per_microsecond
jb::config_attribute< config, int > max_processing_latency_nanoseconds
#define JB_OFS_DEFAULTS_reporting_interval_seconds
jb::config_attribute< config, std::int64_t > max_interarrival_time_nanoseconds
offline_feed_statistics(config const &cfg)
Constructor.
#define JB_OFS_DEFAULTS_max_messages_per_microsecond
jb::config_attribute< config, int > max_messages_per_millisecond
jb::config_attribute< config, int > max_messages_per_second
void print_csv(std::string const &name, std::ostream &os) const
Print all the measurements in CSV format.
A simple class to communicate the result of parsing the options.
Definition: usage.hpp:11
#define JB_OFS_DEFAULTS_max_interarrival_time_nanoseconds
void log_final_progress() const
Final progress report at the end of the input.
interarrival_histogram_t interarrival_
processing_latency_histogram_t processing_latency_
static void print_csv_header(std::ostream &os)
Print a CSV header.
std::uint64_t nsamples() const
Return the number of samples observed to this point.
Definition: histogram.hpp:79
Configure an offline_feed_statistics object.
#define JB_LOG(lvl)
Definition: log.hpp:70
The top-level namespace for the JayBeams library.
Definition: as_hhmmss.hpp:7
void validate() const override
Validate the configuration.