rolling_percentile.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. from __future__ import absolute_import
  2. from multiprocessing import RLock, Array
  3. import itertools
  4. import logging
  5. import time
  6. import math
  7. from hystrix.rolling_number import BucketCircular
  8. log = logging.getLogger(__name__)
  9. class RollingPercentile(object):
  10. def __init__(self, _time, milliseconds, bucket_numbers,
  11. bucket_data_length, enabled):
  12. self.time = _time
  13. self.milliseconds = milliseconds
  14. self.buckets = BucketCircular(bucket_numbers)
  15. self.bucket_numbers = bucket_numbers
  16. self.bucket_data_length = bucket_data_length
  17. self.enabled = enabled
  18. self.snapshot = PercentileSnapshot(0)
  19. self._new_bucket_lock = RLock()
  20. def buckets_size_in_milliseconds(self):
  21. return self.milliseconds / self.bucket_numbers
  22. def current_bucket(self):
  23. current_time = self.time.current_time_in_millis()
  24. current_bucket = self.buckets.peek_last()
  25. if current_bucket is not None and current_time < (current_bucket.window_start + self.buckets_size_in_milliseconds()):
  26. return current_bucket
  27. with self._new_bucket_lock:
  28. # If we didn't find the current bucket above, then we have to
  29. # create one.
  30. if self.buckets.peek_last() is None:
  31. new_bucket = Bucket(current_time, self.bucket_data_length)
  32. self.buckets.add_last(new_bucket)
  33. return new_bucket
  34. else:
  35. for i in range(self.bucket_numbers):
  36. last_bucket = self.buckets.peek_last()
  37. if current_time < (last_bucket.window_start + self.buckets_size_in_milliseconds()):
  38. return last_bucket
  39. elif current_time - (last_bucket.window_start + self.buckets_size_in_milliseconds()) > self.milliseconds:
  40. self.reset()
  41. return self.current_bucket()
  42. else:
  43. all_buckets = [b for b in self.buckets]
  44. self.buckets.add_last(Bucket(last_bucket.window_start + self.buckets_size_in_milliseconds(), self.bucket_data_length))
  45. self.snapshot = PercentileSnapshot(*all_buckets)
  46. return self.buckets.peek_last()
  47. # we didn't get the lock so just return the latest bucket while
  48. # another thread creates the next one
  49. current_bucket = self.buckets.peek_last()
  50. if current_bucket is not None:
  51. return current_bucket
  52. else:
  53. # The rare scenario where multiple threads raced to create the
  54. # very first bucket wait slightly and then use recursion while
  55. # the other thread finishes creating a bucket
  56. time.sleep(5)
  57. self.current_bucket()
  58. def add_value(self, *values):
  59. ''' Add value (or values) to current bucket.
  60. '''
  61. if not self.enabled:
  62. return
  63. for value in values:
  64. self.current_bucket().data.add_value(value)
  65. def percentile(self, percentile):
  66. if not self.enabled:
  67. return -1
  68. # Force logic to move buckets forward in case other requests aren't
  69. # making it happen
  70. self.current_bucket()
  71. # Fetch the current snapshot
  72. return self.current_percentile_snapshot().percentile(percentile)
  73. def current_percentile_snapshot(self):
  74. return self.snapshot
  75. def mean(self):
  76. if not self.enabled:
  77. return -1
  78. # Force logic to move buckets forward in case other requests aren't
  79. # making it happen
  80. self.current_bucket()
  81. # Fetch the current snapshot
  82. return self.current_percentile_snapshot().mean()
  83. class Bucket(object):
  84. ''' Counters for a given 'bucket' of time. '''
  85. def __init__(self, start_time, bucket_data_length):
  86. self.window_start = start_time
  87. self.data = PercentileBucketData(bucket_data_length)
  88. class PercentileBucketData(object):
  89. def __init__(self, data_length):
  90. self.data_length = data_length
  91. self.list = Array('i', self.data_length, lock=RLock())
  92. # TODO: Change this to use a generator
  93. self.index = itertools.count()
  94. self.number = 0
  95. def add_value(self, *latencies):
  96. # We just wrap around the beginning and over-write if we go past
  97. # 'data_length' as that will effectively cause us to "sample" the
  98. # most recent data
  99. for latency in latencies:
  100. self.number = next(self.index)
  101. self.list[self.number % self.data_length] = latency
  102. self.number = self.number + 1
  103. def length(self):
  104. if self.number > len(self.list):
  105. return len(self.list)
  106. else:
  107. return self.number
  108. class PercentileSnapshot(object):
  109. def __init__(self, *args):
  110. self.data = Array('i', 0, lock=RLock())
  111. self._mean = 0
  112. self.length = 0
  113. if isinstance(args[0], int):
  114. self.data = list(args)
  115. self.length = len(args)
  116. self.buckets = []
  117. _sum = 0
  118. for d in self.data:
  119. _sum += d
  120. self._mean = _sum / self.length
  121. self.data = Array('i', sorted(sorted(self.data), key=bool,
  122. reverse=True), lock=RLock())
  123. elif isinstance(args[0], Bucket):
  124. self.length_from_buckets = 0
  125. self.buckets = args
  126. for bucket in self.buckets:
  127. self.length_from_buckets += bucket.data.data_length
  128. self.data = Array('i', self.length_from_buckets, lock=RLock())
  129. _sum = 0
  130. index = 0
  131. for bucket in self.buckets:
  132. pbd = bucket.data
  133. length = pbd.length()
  134. for i in range(length):
  135. v = pbd.list[i]
  136. self.data[index] = v
  137. index += 1
  138. _sum += v
  139. self.length = index
  140. if self.length == 0:
  141. self._mean = 0
  142. else:
  143. self._mean = _sum / self.length
  144. self.data = Array('i', sorted(sorted(self.data), key=bool,
  145. reverse=True), lock=RLock())
  146. def percentile(self, percentile):
  147. if self.length == 0:
  148. return 0
  149. return self.compute_percentile(percentile)
  150. def compute_percentile(self, percent):
  151. if self.length <= 0:
  152. return 0
  153. elif percent <= 0.0:
  154. return self.data[0]
  155. elif percent >= 100.0:
  156. return self.data[self.length - 1]
  157. rank = (percent / 100.0) * self.length
  158. # Linear interpolation between closest ranks
  159. ilow = int(math.floor(rank))
  160. ihigh = int(math.ceil(rank))
  161. assert 0 <= ilow and ilow <= rank and rank <= ihigh and ihigh <= self.length
  162. assert (ihigh - ilow) <= 1
  163. if ihigh >= self.length:
  164. # Another edge case
  165. return self.data[self.length - 1]
  166. elif ilow == ihigh:
  167. return self.data[ilow]
  168. else:
  169. # Interpolate between the two bounding values
  170. return int(self.data[ilow] + (rank - ilow) * (self.data[ihigh] - self.data[ilow]))
  171. def mean(self):
  172. return int(self._mean)