rolling_number.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. from __future__ import absolute_import
  2. from multiprocessing import RLock
  3. from collections import deque
  4. import logging
  5. import types
  6. import time
  7. import six
  8. from atomos.multiprocessing.atomic import AtomicLong
  9. log = logging.getLogger(__name__)
  10. class RollingNumber(object):
  11. """ A **number** which can be used to track **counters** (increment) or set
  12. values over time.
  13. It is *rolling* in the sense that a :attr:`milliseconds` is
  14. given that you want to track (such as 10 seconds) and then that is broken
  15. into **buckets** (defaults to 10) so that the 10 second window doesn't
  16. empty out and restart every 10 seconds, but instead every 1 second you
  17. have a new :class:`Bucket` added and one dropped so that 9 of the buckets
  18. remain and only the newest starts from scratch.
  19. This is done so that the statistics are gathered over a *rolling* 10
  20. second window with data being added/dropped in 1 second intervals
  21. (or whatever granularity is defined by the arguments) rather than
  22. each 10 second window starting at 0 again.
  23. Performance-wise this class is optimized for writes, not reads. This is
  24. done because it expects far higher write volume (thousands/second) than
  25. reads (a few per second).
  26. For example, on each read to getSum/getCount it will iterate buckets to
  27. sum the data so that on writes we don't need to maintain the overall sum
  28. and pay the synchronization cost at each write to ensure the sum is
  29. up-to-date when the read can easily iterate each bucket to get the sum
  30. when it needs it.
  31. See test module :mod:`tests.test_rolling_number` for usage and expected
  32. behavior examples.
  33. """
  34. def __init__(self, milliseconds, bucket_numbers, _time=None):
  35. self.time = _time or ActualTime() # Create a instance of time here
  36. self.milliseconds = milliseconds
  37. self.buckets = BucketCircular(bucket_numbers)
  38. self.bucket_numbers = bucket_numbers
  39. self.cumulative = CumulativeSum()
  40. self._new_bucket_lock = RLock()
  41. if self.milliseconds % self.bucket_numbers != 0:
  42. raise Exception('The milliseconds must divide equally into '
  43. 'bucket_numbers. For example 1000/10 is ok, '
  44. '1000/11 is not.')
  45. def buckets_size_in_milliseconds(self):
  46. return self.milliseconds / self.bucket_numbers
  47. def increment(self, event):
  48. """ Increment the **counter** in the current bucket by one for the
  49. given :class:`RollingNumberEvent` type.
  50. The :class:`RollingNumberEvent` must be a **counter** type
  51. >>> RollingNumberEvent.isCounter()
  52. True
  53. Args:
  54. event (:class:`RollingNumberEvent`): Event defining which
  55. **counter** to increment.
  56. """
  57. self.current_bucket().adder(event).increment()
  58. def update_rolling_max(self, event, value):
  59. """ Update a value and retain the max value.
  60. The :class:`RollingNumberEvent` must be a **max updater** type
  61. >>> RollingNumberEvent.isMaxUpdater()
  62. True
  63. Args:
  64. value (int): Max value to update.
  65. event (:class:`RollingNumberEvent`): Event defining which
  66. **counter** to increment.
  67. """
  68. self.current_bucket().max_updater(event).update(value)
  69. def current_bucket(self):
  70. """ Retrieve the current :class:`Bucket`
  71. Retrieve the latest :class:`Bucket` if the given time is **BEFORE**
  72. the end of the **bucket** window, otherwise it returns ``None``.
  73. The following needs to be synchronized/locked even with a
  74. synchronized/thread-safe data structure such as LinkedBlockingDeque
  75. because the logic involves multiple steps to check existence,
  76. create an object then insert the object. The 'check' or 'insertion'
  77. themselves are thread-safe by themselves but not the aggregate
  78. algorithm, thus we put this entire block of logic inside
  79. synchronized.
  80. I am using a :class:`multiprocessing.RLock` if/then
  81. so that a single thread will get the lock and as soon as one thread
  82. gets the lock all others will go the 'else' block and just return
  83. the currentBucket until the newBucket is created. This should allow
  84. the throughput to be far higher and only slow down 1 thread instead
  85. of blocking all of them in each cycle of creating a new bucket based
  86. on some testing (and it makes sense that it should as well).
  87. This means the timing won't be exact to the millisecond as to what
  88. data ends up in a bucket, but that's acceptable. It's not critical
  89. to have exact precision to the millisecond, as long as it's rolling,
  90. if we can instead reduce the impact synchronization.
  91. More importantly though it means that the 'if' block within the
  92. lock needs to be careful about what it changes that can still
  93. be accessed concurrently in the 'else' block since we're not
  94. completely synchronizing access.
  95. For example, we can't have a multi-step process to add a bucket,
  96. remove a bucket, then update the sum since the 'else' block of code
  97. can retrieve the sum while this is all happening. The trade-off is
  98. that we don't maintain the rolling sum and let readers just iterate
  99. bucket to calculate the sum themselves. This is an example of
  100. favoring write-performance instead of read-performance and how the
  101. tryLock versus a synchronized block needs to be accommodated.
  102. Returns:
  103. bucket: Returns the latest :class:`Bucket` or ``None``.
  104. """
  105. # TODO: Check the doc string above^.
  106. current_time = self.time.current_time_in_millis()
  107. # a shortcut to try and get the most common result of immediately
  108. # finding the current bucket
  109. # Retrieve the latest bucket if the given time is BEFORE the end of
  110. # the bucket window, otherwise it returns None.
  111. # NOTE: This is thread-safe because it's accessing 'buckets' which is
  112. # a ?LinkedBlockingDeque?
  113. current_bucket = self.buckets.peek_last()
  114. if current_bucket is not None and current_time < (current_bucket.window_start + self.buckets_size_in_milliseconds()):
  115. return current_bucket
  116. with self._new_bucket_lock:
  117. # If we didn't find the current bucket above, then we have to
  118. # create one.
  119. if self.buckets.peek_last() is None:
  120. new_bucket = Bucket(current_time)
  121. self.buckets.add_last(new_bucket)
  122. return new_bucket
  123. else:
  124. for i in range(self.bucket_numbers):
  125. last_bucket = self.buckets.peek_last()
  126. if current_time < (last_bucket.window_start + self.buckets_size_in_milliseconds()):
  127. return last_bucket
  128. elif current_time - (last_bucket.window_start + self.buckets_size_in_milliseconds()) > self.milliseconds:
  129. self.reset()
  130. return self.current_bucket()
  131. else:
  132. self.buckets.add_last(Bucket(last_bucket.window_start + self.buckets_size_in_milliseconds()))
  133. self.cumulative.add_bucket(last_bucket)
  134. return self.buckets.peek_last()
  135. # we didn't get the lock so just return the latest bucket while
  136. # another thread creates the next one
  137. current_bucket = self.buckets.peek_last()
  138. if current_bucket is not None:
  139. return current_bucket
  140. else:
  141. # The rare scenario where multiple threads raced to create the
  142. # very first bucket wait slightly and then use recursion while
  143. # the other thread finishes creating a bucket
  144. time.sleep(5)
  145. self.current_bucket()
  146. def reset(self):
  147. """ Reset all rolling **counters**
  148. Force a reset of all rolling **counters** (clear all **buckets**) so
  149. that statistics start being gathered from scratch.
  150. This does NOT reset the :class:`CumulativeSum` values.
  151. """
  152. last_bucket = self.buckets.peek_last()
  153. if last_bucket:
  154. self.cumulative.add_bucket(last_bucket)
  155. self.buckets.clear()
  156. def rolling_sum(self, event):
  157. """ Rolling sum
  158. Get the sum of all buckets in the rolling counter for the given
  159. :class:`RollingNumberEvent`.
  160. The :class:`RollingNumberEvent` must be a **counter** type
  161. >>> RollingNumberEvent.isCounter()
  162. True
  163. Args:
  164. event (:class:`RollingNumberEvent`): Event defining which counter
  165. to retrieve values from.
  166. Returns:
  167. long: Return value from the given :class:`RollingNumberEvent`
  168. counter type.
  169. """
  170. last_bucket = self.current_bucket()
  171. if not last_bucket:
  172. return 0
  173. sum = 0
  174. for bucket in self.buckets:
  175. sum += bucket.adder(event).sum()
  176. return sum
  177. def rolling_max(self, event):
  178. values = self.values(event)
  179. if not values:
  180. return 0
  181. else:
  182. return values[len(values) - 1]
  183. def values(self, event):
  184. last_bucket = self.current_bucket()
  185. if not last_bucket:
  186. return 0
  187. values = []
  188. for bucket in self.buckets:
  189. if event.is_counter():
  190. values.append(bucket.adder(event).sum())
  191. if event.is_max_updater():
  192. values.append(bucket.max_updater(event).max())
  193. return values
  194. def value_of_latest_bucket(self, event):
  195. last_bucket = self.current_bucket()
  196. if not last_bucket:
  197. return 0
  198. return last_bucket.get(event)
  199. def cumulative_sum(self, event):
  200. """ Cumulative sum
  201. The cumulative sum of all buckets ever since the start without
  202. rolling for the given :class`RollingNumberEvent` type.
  203. See :meth:`rolling_sum` for the rolling sum.
  204. The :class:`RollingNumberEvent` must be a **counter** type
  205. >>> RollingNumberEvent.isCounter()
  206. True
  207. Args:
  208. event (:class:`RollingNumberEvent`): Event defining which
  209. **counter** to increment.
  210. Returns:
  211. long: Returns the cumulative sum of all **increments** and
  212. **adds** for the given :class:`RollingNumberEvent` **counter**
  213. type.
  214. """
  215. return self.value_of_latest_bucket(event) + self.cumulative.get(event)
  216. class BucketCircular(deque):
  217. ''' This is a circular array acting as a FIFO queue. '''
  218. def __init__(self, size):
  219. super(BucketCircular, self).__init__(maxlen=size)
  220. @property
  221. def size(self):
  222. return len(self)
  223. def last(self):
  224. return self.peek_last()
  225. def peek_last(self):
  226. try:
  227. return self[0]
  228. except IndexError:
  229. return None
  230. def add_last(self, bucket):
  231. self.appendleft(bucket)
  232. class Bucket(object):
  233. """ Counters for a given `bucket` of time
  234. We support both :class:`LongAdder` and :class:`LongMaxUpdater` in a
  235. :class:`Bucket` but don't want the memory allocation of all types for each
  236. so we only allocate the objects if the :class:`RollingNumberEvent` matches
  237. the correct **type** - though we still have the allocation of empty arrays
  238. to the given length as we want to keep using the **type** value for fast
  239. random access.
  240. """
  241. def __init__(self, start_time):
  242. self.window_start = start_time
  243. self._adder = {}
  244. self._max_updater = {}
  245. # TODO: Change this to use a metaclass
  246. for name, event in RollingNumberEvent.__members__.items():
  247. if event.is_counter():
  248. self._adder[event.name] = LongAdder()
  249. for name, event in RollingNumberEvent.__members__.items():
  250. if event.is_max_updater():
  251. self._max_updater[event.name] = LongMaxUpdater()
  252. def get(self, event):
  253. if event.is_counter():
  254. return self.adder(event).sum()
  255. if event.is_max_updater():
  256. return self.max_updater(event).max()
  257. raise Exception('Unknown type of event.')
  258. # TODO: Rename to add
  259. def adder(self, event):
  260. if event.is_counter():
  261. return self._adder[event.name]
  262. raise Exception('Type is not a LongAdder.')
  263. # TODO: Rename to update_max
  264. def max_updater(self, event):
  265. if event.is_max_updater():
  266. return self._max_updater[event.name]
  267. raise Exception('Type is not a LongMaxUpdater.')
  268. # TODO: Move this to hystrix/util/long_adder.py
  269. class LongAdder(object):
  270. def __init__(self, min_value=0):
  271. self._count = AtomicLong(value=min_value)
  272. def increment(self):
  273. self._count.add_and_get(1)
  274. def decrement(self):
  275. self._count.subtract_and_get(1)
  276. def sum(self):
  277. return self._count.get()
  278. def add(self, value):
  279. self._count.add_and_get(value)
  280. # TODO: Move this to hystrix/util/long_max_updater.py
  281. class LongMaxUpdater(object):
  282. def __init__(self, min_value=0):
  283. self._count = AtomicLong(value=min_value)
  284. def max(self):
  285. return self._count.get()
  286. def update(self, value):
  287. if value > self.max():
  288. self._count.set(value)
  289. class CumulativeSum(object):
  290. def __init__(self):
  291. self._adder = {}
  292. self._max_updater = {}
  293. # TODO: Change this to use a metaclass
  294. for name, event in RollingNumberEvent.__members__.items():
  295. if event.is_counter():
  296. self._adder[event.name] = LongAdder()
  297. for name, event in RollingNumberEvent.__members__.items():
  298. if event.is_max_updater():
  299. self._max_updater[event.name] = LongMaxUpdater()
  300. def add_bucket(self, bucket):
  301. for name, event in RollingNumberEvent.__members__.items():
  302. if event.is_counter():
  303. self.adder(event).add(bucket.adder(event).sum())
  304. if event.is_max_updater():
  305. self.max_updater(event).update(bucket.max_updater(event).max())
  306. def get(self, event):
  307. if event.is_counter():
  308. return self.adder(event).sum()
  309. if event.is_max_updater():
  310. return self.max_updater(event).max()
  311. raise Exception('Unknown type of event.')
  312. def adder(self, event):
  313. if event.is_counter():
  314. return self._adder[event.name]
  315. raise Exception('Unknown type of event.')
  316. def max_updater(self, event):
  317. if event.is_max_updater():
  318. return self._max_updater[event.name]
  319. raise Exception('Unknown type of event.')
  320. def _is_function(obj):
  321. return isinstance(obj, types.FunctionType)
  322. def _is_dunder(name):
  323. return (name[:2] == name[-2:] == '__' and
  324. name[2:3] != '_' and
  325. name[-3:-2] != '_' and
  326. len(name) > 4)
  327. class Event(object):
  328. def __init__(self, name, value):
  329. self._name = name
  330. self._value = value
  331. def is_counter(self):
  332. return self._value == 1
  333. def is_max_updater(self):
  334. return self._value == 2
  335. @property
  336. def name(self):
  337. return self._name
  338. @property
  339. def value(self):
  340. return self._value
  341. class EventMetaclass(type):
  342. def __new__(cls, name, bases, attrs):
  343. __members = {}
  344. for name, value in attrs.items():
  345. if not _is_dunder(name) and not _is_function(value):
  346. __members[name] = Event(name, value)
  347. for name, value in __members.items():
  348. attrs[name] = __members[name]
  349. new_class = super(EventMetaclass, cls).__new__(cls, name,
  350. bases, attrs)
  351. setattr(new_class, '__members__', __members)
  352. return new_class
  353. # TODO: Move this to hystrix/util/rolling_number_event.py
  354. class RollingNumberEvent(six.with_metaclass(EventMetaclass, object)):
  355. """ Various states/events that can be captured in the
  356. :class:`RollingNumber`.
  357. Note that events are defined as different types:
  358. >>> self.is_counter() == True
  359. True
  360. >>> self.is_max_updater() == True
  361. True
  362. The **counter** type events can be used with
  363. :meth:`RollingNumber.increment`, :meth:`RollingNumber.add`,
  364. :meth:`RollingNumber.rolling_sum` and others.
  365. The **max updater** type events can be used with
  366. :meth:`RollingNumber.update_rolling_max` and
  367. :meth:`RollingNumber.rolling_max_value`.
  368. """
  369. SUCCESS = 1
  370. FAILURE = 1
  371. TIMEOUT = 1
  372. SHORT_CIRCUITED = 1
  373. THREAD_POOL_REJECTED = 1
  374. SEMAPHORE_REJECTED = 1
  375. BAD_REQUEST = 1
  376. FALLBACK_SUCCESS = 1
  377. FALLBACK_FAILURE = 1
  378. FALLBACK_REJECTION = 1
  379. EXCEPTION_THROWN = 1
  380. THREAD_EXECUTION = 1
  381. THREAD_MAX_ACTIVE = 2
  382. COLLAPSED = 1
  383. RESPONSE_FROM_CACHE = 1
  384. def __init__(self, event):
  385. self._event = event
  386. def is_counter(self):
  387. """ Is counter
  388. Returns:
  389. bool: Returns ``True`` event type is **counter**, otherwise
  390. it returns ``False`` .
  391. """
  392. return self._event.value == 1
  393. def is_max_updater(self):
  394. """ Is mas updater
  395. Returns:
  396. bool: Returns ``True`` event type is **max updater**, otherwise
  397. it returns ``False`` .
  398. """
  399. return self._event.value == 2
  400. class ActualTime(object):
  401. """ Actual time
  402. """
  403. def current_time_in_millis(self):
  404. """ Current time in milliseconds
  405. Returns:
  406. int: Returns :func:`time.time()` converted to milliseconds
  407. """
  408. return int(round(time.time() * 1000))