counting.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. // Copyright (C) 2014 The Protocol Authors.
  2. package protocol
  3. import (
  4. "io"
  5. "sync/atomic"
  6. "time"
  7. )
  8. type countingReader struct {
  9. io.Reader
  10. idString string
  11. tot atomic.Int64 // bytes
  12. last atomic.Int64 // unix nanos
  13. }
  14. var (
  15. totalIncoming atomic.Int64
  16. totalOutgoing atomic.Int64
  17. )
  18. func (c *countingReader) Read(bs []byte) (int, error) {
  19. n, err := c.Reader.Read(bs)
  20. c.tot.Add(int64(n))
  21. totalIncoming.Add(int64(n))
  22. c.last.Store(time.Now().UnixNano())
  23. metricDeviceRecvBytes.WithLabelValues(c.idString).Add(float64(n))
  24. return n, err
  25. }
  26. func (c *countingReader) Tot() int64 { return c.tot.Load() }
  27. func (c *countingReader) Last() time.Time {
  28. return time.Unix(0, c.last.Load())
  29. }
  30. type countingWriter struct {
  31. io.Writer
  32. idString string
  33. tot atomic.Int64 // bytes
  34. last atomic.Int64 // unix nanos
  35. }
  36. func (c *countingWriter) Write(bs []byte) (int, error) {
  37. n, err := c.Writer.Write(bs)
  38. c.tot.Add(int64(n))
  39. totalOutgoing.Add(int64(n))
  40. c.last.Store(time.Now().UnixNano())
  41. metricDeviceSentBytes.WithLabelValues(c.idString).Add(float64(n))
  42. return n, err
  43. }
  44. func (c *countingWriter) Tot() int64 { return c.tot.Load() }
  45. func (c *countingWriter) Last() time.Time {
  46. return time.Unix(0, c.last.Load())
  47. }
  48. func TotalInOut() (int64, int64) {
  49. return totalIncoming.Load(), totalOutgoing.Load()
  50. }