JayBeams  0.1
Another project to have fun coding.
event_rate_estimator.hpp
Go to the documentation of this file.
1 #ifndef jb_event_rate_estimator_hpp
2 #define jb_event_rate_estimator_hpp
3 
4 #include <chrono>
5 #include <cstdint>
6 #include <limits>
7 #include <sstream>
8 #include <stdexcept>
9 #include <type_traits>
10 #include <vector>
11 
12 namespace jb {
13 
14 /**
15  * Estimate event rates over a trailing measurement period.
16  *
17  * Assume you are interested in statistics about events per
18  * millisecond. This class estimates the number of events per
19  * trailing millisecond, using an arbitrary sampling period. For
20  * example, you could set the sampling period to 1 microsecond, this
21  * class would then estimate the event rate over the previous
22  * millisecond for every microsecond. Or you could set the sampling
23  * period to 1 millisecond, which would then estimate the event rate
24  * on each millisecond. Admittedly, for most practical purposes
25  * measuring event rates over an exact millisecond boundary or over
26  * a trailing millisecond has limited interest, but it was fun to
27  * write this class.
28  *
29  * Given a @a measurement_period: the time over which we want to
30  * measure event rates, and @a sampling_period: how often we want to
31  * measure the event rates, the class keeps a circular buffer of N
32  * counters, representing the trailing sampling periods. N is chosen
33  * such that
34  * @f$ N > {{measurement\_period} \over {sampling\_period}} @f$.
35  *
36  * As new events arrive the counter for the current sampling period is
37  * incremented, once an event in a new sampling period is observed the
38  * class emits updates (via a functor) to update the estimated event
39  * rate.
40  *
41  * @tparam duration_t Defines class used to measure time, this
42  * class must be compatible with std::chrono::duration_type. The
43  * class assumes that event timestamps are measured as durations
44  * with respect to some well-defined (by convention) epoch. For
45  * example, some data feeds use timestamps as nanoseconds since
46  * midnight, while others use timestamps as microseconds since the
47  * Unix Epoch.
48  * @tparam counter_t The type of the counters, most of the time a
49  * simple integer would work well, but if you need to create a large
50  * number of instances of this class, and the values are expected to
51  * be very small, you might consider using a 16-bit or even and
52  * 8-bit counter. Likewise, if you expect very large values for the
53  * counters, such as when measuring event rate per minute, you
54  * should consider using a 64-bit counter.
55  */
56 template <
57  typename duration_t = std::chrono::microseconds, typename counter_t = int>
59 public:
60  //@{
61  /**
62  * @name Type traits.
63  */
64  typedef duration_t duration_type;
65  typedef counter_t counter_type;
66  //@}
67 
68  /**
69  * Build an accumulator to estimate event rates.
70  *
71  * @param measurement_period over what time period we want to
72  * measure message rates
73  * @param sampling_period how often do we want to measure message
74  * rates. Must be smaller than @a measurement_period.
75  * @throw std::invalid_argument if the measurement period is not a
76  * multiple of the sampling period.
77  */
79  duration_type measurement_period,
80  duration_type sampling_period = duration_type(1))
81  : buckets_(bucket_count(measurement_period, sampling_period))
82  , sampling_period_(sampling_period)
83  , running_total_()
84  , last_bucket_()
85  , end_pos_(buckets_.size()) {
86  }
87 
88  /**
89  * Record a sample.
90  *
91  * This class keeps the number of events observed over the last N
92  * sampling periods. New events in the same sampling period are
93  * simply recorded, but no rate estimate is made. When a timestamp
94  * in a new sampling period is required to record a sample, the @a
95  * update() functor is called to register the new measurements.
96  *
97  * @param ts the timestamp of the sample.
98  * @param update a functor to update when an event rate is estimated.
99  */
100  template <typename functor>
101  void sample(duration_type ts, functor update) {
102  if (end_pos_ >= buckets_.size()) {
103  // ... this is the first event sample, initialize the circular
104  // buffer and just return, there is no rate with a single sample
105  // ...
106  init(ts);
107  return;
108  }
109  // ... get the bucket number for the timestamps ...
110  auto bucket = ts / sampling_period_;
111  if (last_bucket_ == bucket) {
112  // ... a new sample in the same sampling period, simply
113  // increment the counters and continue ...
114  running_total_++;
115  buckets_[end_pos_]++;
116  return;
117  }
118 
119  // ... yay! new timestamp, rotate the buffer until we get to the
120  // new timestamp ...
121  while (last_bucket_ < bucket and running_total_ > 0) {
122  // ... before rotating, issue an estimate based on the number of
123  // samples contained in the circular buffer ...
124  update(running_total_, 1);
125  rotate();
126  }
127 
128  // ... we terminate the loop when the running_total_ is 0 because
129  // we know we would generate just a lot of updates with 0 value,
130  // here we just estimate that number ...
131  if (last_bucket_ < bucket) {
132  // assert running_total_ == 0
133  // for (auto i : buckets_) { assert i == 0; }
134  std::uint64_t repeats = bucket - last_bucket_;
135  update(0, repeats);
136  end_pos_ = 0;
137  last_bucket_ = bucket;
138  }
139 
140  // ... lastly, since this is a new sampling period, record the new
141  // event ...
142  running_total_++;
143  buckets_[end_pos_]++;
144  }
145 
146  typedef std::vector<counter_type> buckets;
147 
148 private:
149  /// Just initialize the circular buffer
150  void init(duration_type ts) {
151  end_pos_ = 0;
152  running_total_ = 1;
153  buckets_[end_pos_] = 1;
155  }
156 
157  /// Rotate the circular buffer
158  void rotate() {
159  end_pos_++;
160  if (end_pos_ == buckets_.size()) {
161  end_pos_ = 0;
162  }
164  buckets_[end_pos_] = 0;
165  last_bucket_++;
166  }
167 
168  /// Estimate the necessary number of buckets
169  std::size_t bucket_count(
170  duration_type measurement_period, duration_type sampling_period) {
171  if (sampling_period <= duration_type(0)) {
172  std::ostringstream os;
173  os << "jb::event_rate_estimate - sampling period ("
174  << sampling_period.count() << ") must be a positive number";
175  throw std::invalid_argument(os.str());
176  }
177  if (sampling_period > measurement_period) {
178  std::ostringstream os;
179  os << "jb::event_rate_estimate - measurement period ("
180  << measurement_period.count() << ") is smaller than sampling period ("
181  << sampling_period.count() << ")";
182  throw std::invalid_argument(os.str());
183  }
184 
185  if ((measurement_period % sampling_period).count() != 0) {
186  std::ostringstream os;
187  os << "jb::event_rate_estimate - measurement period ("
188  << measurement_period.count()
189  << ") must be a multiple of the sampling period ("
190  << sampling_period.count() << ")";
191  throw std::invalid_argument(os.str());
192  }
193 
194  // ... because measurement_period and sampling_period are positive
195  // numbers N will be a positive number ...
196  typedef typename duration_type::rep rep;
197  typedef typename std::make_unsigned<rep>::type unsigned_rep;
198  unsigned_rep N = measurement_period / sampling_period;
199  // ... beware, the type may be larger than what can be stored in
200  // an std::size_t (weird segmented memory platforms, yuck) ...
201  if (N >= std::numeric_limits<std::size_t>::max()) {
202  std::ostringstream os;
203  os << "jb::event_rate_estimate - measurement period ("
204  << measurement_period.count() << ") is too large for sampling period ("
205  << sampling_period.count() << ")";
206  throw std::invalid_argument(os.str());
207  }
208  return static_cast<std::size_t>(N);
209  }
210 
211 private:
212  /// The time period is bucketized in intervals of 1 sampling period.
213  buckets buckets_;
214 
215  /// The sampling period
216  duration_type sampling_period_;
217 
218  /// Current number of events across all buckets.
219  std::uint64_t running_total_;
220 
221  /// Current number for the sampling period
222  typename duration_type::rep last_bucket_;
223 
224  /// We treat the buckets as a circular buffer, this is the pointer
225  /// to the end of the buffer
226  std::size_t end_pos_;
227 };
228 
229 } // namespace jb
230 
231 #endif // jb_event_rate_estimator_hpp
Estimate event rates over a trailing measurement period.
buckets buckets_
The time period is bucketized in intervals of 1 sampling period.
std::size_t bucket_count(duration_type measurement_period, duration_type sampling_period)
Estimate the necessary number of buckets.
duration_type::rep last_bucket_
Current number for the sampling period.
std::vector< counter_type > buckets
void init(duration_type ts)
Just initialize the circular buffer.
event_rate_estimator(duration_type measurement_period, duration_type sampling_period=duration_type(1))
Build an accumulator to estimate event rates.
std::uint64_t running_total_
Current number of events across all buckets.
void sample(duration_type ts, functor update)
Record a sample.
std::size_t end_pos_
We treat the buckets as a circular buffer, this is the pointer to the end of the buffer.
void rotate()
Rotate the circular buffer.
duration_type sampling_period_
The sampling period.
The top-level namespace for the JayBeams library.
Definition: as_hhmmss.hpp:7