muxmetrics.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. package h2mux
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/golang-collections/collections/queue"
  6. "github.com/rs/zerolog"
  7. )
  8. // data points used to compute average receive window and send window size
  9. const (
  10. // data points used to compute average receive window and send window size
  11. dataPoints = 100
  12. // updateFreq is set to 1 sec so we can get inbound & outbound byes/sec
  13. updateFreq = time.Second
  14. )
  15. type muxMetricsUpdater interface {
  16. // metrics returns the latest metrics
  17. metrics() *MuxerMetrics
  18. // run is a blocking call to start the event loop
  19. run(log *zerolog.Logger) error
  20. // updateRTTChan is called by muxReader to report new RTT measurements
  21. updateRTT(rtt *roundTripMeasurement)
  22. //updateReceiveWindowChan is called by muxReader and muxWriter when receiveWindow size is updated
  23. updateReceiveWindow(receiveWindow uint32)
  24. //updateSendWindowChan is called by muxReader and muxWriter when sendWindow size is updated
  25. updateSendWindow(sendWindow uint32)
  26. // updateInBoundBytesChan is called periodicallyby muxReader to report bytesRead
  27. updateInBoundBytes(inBoundBytes uint64)
  28. // updateOutBoundBytesChan is called periodically by muxWriter to report bytesWrote
  29. updateOutBoundBytes(outBoundBytes uint64)
  30. }
  31. type muxMetricsUpdaterImpl struct {
  32. // rttData keeps record of rtt, rttMin, rttMax and last measured time
  33. rttData *rttData
  34. // receiveWindowData keeps record of receive window measurement
  35. receiveWindowData *flowControlData
  36. // sendWindowData keeps record of send window measurement
  37. sendWindowData *flowControlData
  38. // inBoundRate is incoming bytes/sec
  39. inBoundRate *rate
  40. // outBoundRate is outgoing bytes/sec
  41. outBoundRate *rate
  42. // updateRTTChan is the channel to receive new RTT measurement
  43. updateRTTChan chan *roundTripMeasurement
  44. //updateReceiveWindowChan is the channel to receive updated receiveWindow size
  45. updateReceiveWindowChan chan uint32
  46. //updateSendWindowChan is the channel to receive updated sendWindow size
  47. updateSendWindowChan chan uint32
  48. // updateInBoundBytesChan us the channel to receive bytesRead
  49. updateInBoundBytesChan chan uint64
  50. // updateOutBoundBytesChan us the channel to receive bytesWrote
  51. updateOutBoundBytesChan chan uint64
  52. // shutdownC is to signal the muxerMetricsUpdater to shutdown
  53. abortChan <-chan struct{}
  54. compBytesBefore, compBytesAfter *AtomicCounter
  55. }
  56. type MuxerMetrics struct {
  57. RTT, RTTMin, RTTMax time.Duration
  58. ReceiveWindowAve, SendWindowAve float64
  59. ReceiveWindowMin, ReceiveWindowMax, SendWindowMin, SendWindowMax uint32
  60. InBoundRateCurr, InBoundRateMin, InBoundRateMax uint64
  61. OutBoundRateCurr, OutBoundRateMin, OutBoundRateMax uint64
  62. CompBytesBefore, CompBytesAfter *AtomicCounter
  63. }
  64. func (m *MuxerMetrics) CompRateAve() float64 {
  65. if m.CompBytesBefore.Value() == 0 {
  66. return 1.
  67. }
  68. return float64(m.CompBytesAfter.Value()) / float64(m.CompBytesBefore.Value())
  69. }
  70. type roundTripMeasurement struct {
  71. receiveTime, sendTime time.Time
  72. }
  73. type rttData struct {
  74. rtt, rttMin, rttMax time.Duration
  75. lastMeasurementTime time.Time
  76. lock sync.RWMutex
  77. }
  78. type flowControlData struct {
  79. sum uint64
  80. min, max uint32
  81. queue *queue.Queue
  82. lock sync.RWMutex
  83. }
  84. type rate struct {
  85. curr uint64
  86. min, max uint64
  87. lock sync.RWMutex
  88. }
  89. func newMuxMetricsUpdater(
  90. abortChan <-chan struct{},
  91. compBytesBefore, compBytesAfter *AtomicCounter,
  92. ) muxMetricsUpdater {
  93. updateRTTChan := make(chan *roundTripMeasurement, 1)
  94. updateReceiveWindowChan := make(chan uint32, 1)
  95. updateSendWindowChan := make(chan uint32, 1)
  96. updateInBoundBytesChan := make(chan uint64)
  97. updateOutBoundBytesChan := make(chan uint64)
  98. return &muxMetricsUpdaterImpl{
  99. rttData: newRTTData(),
  100. receiveWindowData: newFlowControlData(),
  101. sendWindowData: newFlowControlData(),
  102. inBoundRate: newRate(),
  103. outBoundRate: newRate(),
  104. updateRTTChan: updateRTTChan,
  105. updateReceiveWindowChan: updateReceiveWindowChan,
  106. updateSendWindowChan: updateSendWindowChan,
  107. updateInBoundBytesChan: updateInBoundBytesChan,
  108. updateOutBoundBytesChan: updateOutBoundBytesChan,
  109. abortChan: abortChan,
  110. compBytesBefore: compBytesBefore,
  111. compBytesAfter: compBytesAfter,
  112. }
  113. }
  114. func (updater *muxMetricsUpdaterImpl) metrics() *MuxerMetrics {
  115. m := &MuxerMetrics{}
  116. m.RTT, m.RTTMin, m.RTTMax = updater.rttData.metrics()
  117. m.ReceiveWindowAve, m.ReceiveWindowMin, m.ReceiveWindowMax = updater.receiveWindowData.metrics()
  118. m.SendWindowAve, m.SendWindowMin, m.SendWindowMax = updater.sendWindowData.metrics()
  119. m.InBoundRateCurr, m.InBoundRateMin, m.InBoundRateMax = updater.inBoundRate.get()
  120. m.OutBoundRateCurr, m.OutBoundRateMin, m.OutBoundRateMax = updater.outBoundRate.get()
  121. m.CompBytesBefore, m.CompBytesAfter = updater.compBytesBefore, updater.compBytesAfter
  122. return m
  123. }
  124. func (updater *muxMetricsUpdaterImpl) run(log *zerolog.Logger) error {
  125. defer log.Debug().Msg("mux - metrics: event loop finished")
  126. for {
  127. select {
  128. case <-updater.abortChan:
  129. log.Debug().Msgf("mux - metrics: Stopping mux metrics updater")
  130. return nil
  131. case roundTripMeasurement := <-updater.updateRTTChan:
  132. go updater.rttData.update(roundTripMeasurement)
  133. log.Debug().Msg("mux - metrics: Update rtt")
  134. case receiveWindow := <-updater.updateReceiveWindowChan:
  135. go updater.receiveWindowData.update(receiveWindow)
  136. log.Debug().Msg("mux - metrics: Update receive window")
  137. case sendWindow := <-updater.updateSendWindowChan:
  138. go updater.sendWindowData.update(sendWindow)
  139. log.Debug().Msg("mux - metrics: Update send window")
  140. case inBoundBytes := <-updater.updateInBoundBytesChan:
  141. // inBoundBytes is bytes/sec because the update interval is 1 sec
  142. go updater.inBoundRate.update(inBoundBytes)
  143. log.Debug().Msgf("mux - metrics: Inbound bytes %d", inBoundBytes)
  144. case outBoundBytes := <-updater.updateOutBoundBytesChan:
  145. // outBoundBytes is bytes/sec because the update interval is 1 sec
  146. go updater.outBoundRate.update(outBoundBytes)
  147. log.Debug().Msgf("mux - metrics: Outbound bytes %d", outBoundBytes)
  148. }
  149. }
  150. }
  151. func (updater *muxMetricsUpdaterImpl) updateRTT(rtt *roundTripMeasurement) {
  152. select {
  153. case updater.updateRTTChan <- rtt:
  154. case <-updater.abortChan:
  155. }
  156. }
  157. func (updater *muxMetricsUpdaterImpl) updateReceiveWindow(receiveWindow uint32) {
  158. select {
  159. case updater.updateReceiveWindowChan <- receiveWindow:
  160. case <-updater.abortChan:
  161. }
  162. }
  163. func (updater *muxMetricsUpdaterImpl) updateSendWindow(sendWindow uint32) {
  164. select {
  165. case updater.updateSendWindowChan <- sendWindow:
  166. case <-updater.abortChan:
  167. }
  168. }
  169. func (updater *muxMetricsUpdaterImpl) updateInBoundBytes(inBoundBytes uint64) {
  170. select {
  171. case updater.updateInBoundBytesChan <- inBoundBytes:
  172. case <-updater.abortChan:
  173. }
  174. }
  175. func (updater *muxMetricsUpdaterImpl) updateOutBoundBytes(outBoundBytes uint64) {
  176. select {
  177. case updater.updateOutBoundBytesChan <- outBoundBytes:
  178. case <-updater.abortChan:
  179. }
  180. }
  181. func newRTTData() *rttData {
  182. return &rttData{}
  183. }
  184. func (r *rttData) update(measurement *roundTripMeasurement) {
  185. r.lock.Lock()
  186. defer r.lock.Unlock()
  187. // discard pings before lastMeasurementTime
  188. if r.lastMeasurementTime.After(measurement.sendTime) {
  189. return
  190. }
  191. r.lastMeasurementTime = measurement.sendTime
  192. r.rtt = measurement.receiveTime.Sub(measurement.sendTime)
  193. if r.rttMax < r.rtt {
  194. r.rttMax = r.rtt
  195. }
  196. if r.rttMin == 0 || r.rttMin > r.rtt {
  197. r.rttMin = r.rtt
  198. }
  199. }
  200. func (r *rttData) metrics() (rtt, rttMin, rttMax time.Duration) {
  201. r.lock.RLock()
  202. defer r.lock.RUnlock()
  203. return r.rtt, r.rttMin, r.rttMax
  204. }
  205. func newFlowControlData() *flowControlData {
  206. return &flowControlData{queue: queue.New()}
  207. }
  208. func (f *flowControlData) update(measurement uint32) {
  209. f.lock.Lock()
  210. defer f.lock.Unlock()
  211. var firstItem uint32
  212. // store new data into queue, remove oldest data if queue is full
  213. f.queue.Enqueue(measurement)
  214. if f.queue.Len() > dataPoints {
  215. // data type should always be uint32
  216. firstItem = f.queue.Dequeue().(uint32)
  217. }
  218. // if (measurement - firstItem) < 0, uint64(measurement - firstItem)
  219. // will overflow and become a large positive number
  220. f.sum += uint64(measurement)
  221. f.sum -= uint64(firstItem)
  222. if measurement > f.max {
  223. f.max = measurement
  224. }
  225. if f.min == 0 || measurement < f.min {
  226. f.min = measurement
  227. }
  228. }
  229. // caller of ave() should acquire lock first
  230. func (f *flowControlData) ave() float64 {
  231. if f.queue.Len() == 0 {
  232. return 0
  233. }
  234. return float64(f.sum) / float64(f.queue.Len())
  235. }
  236. func (f *flowControlData) metrics() (ave float64, min, max uint32) {
  237. f.lock.RLock()
  238. defer f.lock.RUnlock()
  239. return f.ave(), f.min, f.max
  240. }
  241. func newRate() *rate {
  242. return &rate{}
  243. }
  244. func (r *rate) update(measurement uint64) {
  245. r.lock.Lock()
  246. defer r.lock.Unlock()
  247. r.curr = measurement
  248. // if measurement is 0, then there is no incoming/outgoing connection, don't update min/max
  249. if r.curr == 0 {
  250. return
  251. }
  252. if measurement > r.max {
  253. r.max = measurement
  254. }
  255. if r.min == 0 || measurement < r.min {
  256. r.min = measurement
  257. }
  258. }
  259. func (r *rate) get() (curr, min, max uint64) {
  260. r.lock.RLock()
  261. defer r.lock.RUnlock()
  262. return r.curr, r.min, r.max
  263. }