command_metrics.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. from __future__ import absolute_import
  2. import logging
  3. import six
  4. from atomos.multiprocessing.atomic import AtomicLong
  5. from hystrix.metrics import Metrics
  6. from hystrix.event_type import EventType
  7. from hystrix.rolling_number import (RollingNumber, RollingNumberEvent,
  8. ActualTime)
  9. log = logging.getLogger(__name__)
  10. class CommandMetricsMetaclass(type):
  11. """ Metaclass for :class:`CommandMetrics`
  12. Return a cached or create the :class:`CommandMetrics` instance for a given
  13. :class:`hystrix.command.Command` name.
  14. This ensures only 1 :class:`CommandMetrics` instance per
  15. :class:`hystrix.command.Command` name.
  16. """
  17. __instances__ = dict()
  18. __blacklist__ = ('CommandMetrics', 'CommandMetricsMetaclass')
  19. def __new__(cls, name, bases, attrs):
  20. # Do not use cache for black listed classes.
  21. if name in cls.__blacklist__:
  22. return super(CommandMetricsMetaclass, cls).__new__(cls, name,
  23. bases, attrs)
  24. # User defined class name or create a default.
  25. command_metrics_key = attrs.get('command_metrics_key') or \
  26. '{}CommandMetrics'.format(name)
  27. # Check for CommandMetrics class instance
  28. if command_metrics_key not in cls.__instances__:
  29. new_class = super(CommandMetricsMetaclass, cls).__new__(cls,
  30. command_metrics_key,
  31. bases,
  32. attrs)
  33. setattr(new_class, 'command_metrics_key', command_metrics_key)
  34. cls.__instances__[command_metrics_key] = new_class
  35. return cls.__instances__[command_metrics_key]
  36. class CommandMetrics(six.with_metaclass(CommandMetricsMetaclass, Metrics)):
  37. """ Used by :class:`hystrix.command.Command` to record metrics.
  38. """
  39. command_metrics_key = None
  40. # TODO: Review default value None here
  41. def __init__(self, command_metrics_key=None, group_key=None,
  42. pool_key=None, properties=None, event_notifier=None):
  43. counter = RollingNumber(
  44. properties.metrics_rolling_statistical_window_in_milliseconds(),
  45. properties.metrics_rolling_statistical_window_buckets())
  46. super(CommandMetrics, self).__init__(counter)
  47. self.properties = properties
  48. self.actual_time = ActualTime()
  49. self.group_key = group_key
  50. self.event_notifier = event_notifier
  51. self.health_counts_snapshot = None
  52. self.last_health_counts_snapshot = AtomicLong(value=self.actual_time.current_time_in_millis())
  53. def mark_success(self, duration):
  54. """ Mark success incrementing counter and emiting event
  55. When a :class:`hystrix.command.Command` successfully completes it will
  56. call this method to report its success along with how long the
  57. execution took.
  58. Args:
  59. duration: Command duration
  60. """
  61. # TODO: Why this receive a parameter and do nothing with it?
  62. self.event_notifier.mark_event(EventType.SUCCESS, self.command_metrics_key)
  63. self.counter.increment(RollingNumberEvent.SUCCESS)
  64. def mark_failure(self, duration):
  65. """ Mark failure incrementing counter and emiting event
  66. When a :class:`hystrix.command.Command` fail to completes it will
  67. call this method to report its failure along with how long the
  68. execution took.
  69. Args:
  70. duration: Command duration
  71. """
  72. # TODO: Why this receive a parameter and do nothing with it?
  73. self.event_notifier.mark_event(EventType.FAILURE, self.command_metrics_key)
  74. self.counter.increment(RollingNumberEvent.FAILURE)
  75. def mark_timeout(self, duration):
  76. """ Mark timeout incrementing counter and emiting event
  77. When a :class:`hystrix.command.Command` times out (fails to complete)
  78. it will call this method to report its failure along with how long the
  79. command waited (this time should equal or be very close to the timeout
  80. value).
  81. Args:
  82. duration: Command duration
  83. """
  84. # TODO: Why this receive a parameter and do nothing with it?
  85. self.event_notifier.mark_event(EventType.TIMEOUT, self.command_metrics_key)
  86. self.counter.increment(RollingNumberEvent.TIMEOUT)
  87. def mark_bad_request(self, duration):
  88. """ Mark bad request incrementing counter and emiting event
  89. When a :class:`hystrix.command.Command` is executed and triggers a
  90. :class:`hystrix.BadRequestException` during its execution it willi
  91. call this method to report its failure along with how long the
  92. command waited (this time should equal or be very close to the timeout
  93. value).
  94. Args:
  95. duration: Command duration
  96. """
  97. # TODO: Why this receive a parameter and do nothing with it?
  98. self.event_notifier.mark_event(EventType.BAD_REQUEST, self.command_metrics_key)
  99. self.counter.increment(RollingNumberEvent.BAD_REQUEST)
  100. def health_counts(self):
  101. """ Health counts
  102. Retrieve a snapshot of total requests, error count and error percentage.
  103. Returns:
  104. instance: :class:`hystrix.command_metrics.HealthCounts`
  105. """
  106. # we put an interval between snapshots so high-volume commands don't
  107. # spend too much unnecessary time calculating metrics in very small time periods
  108. last_time = self.last_health_counts_snapshot.get()
  109. current_time = ActualTime().current_time_in_millis()
  110. if (current_time - last_time) >= self.properties.metrics_health_snapshot_interval_in_milliseconds() or self.health_counts_snapshot is None:
  111. if self.last_health_counts_snapshot.compare_and_set(last_time, current_time):
  112. # Our thread won setting the snapshot time so we will
  113. # proceed with generating a new snapshot
  114. # losing threads will continue using the old snapshot
  115. success = self.counter.rolling_sum(RollingNumberEvent.SUCCESS)
  116. failure = self.counter.rolling_sum(RollingNumberEvent.FAILURE)
  117. timeout = self.counter.rolling_sum(RollingNumberEvent.TIMEOUT)
  118. thread_pool_rejected = self.counter.rolling_sum(RollingNumberEvent.THREAD_POOL_REJECTED)
  119. semaphore_rejected = self.counter.rolling_sum(RollingNumberEvent.SEMAPHORE_REJECTED)
  120. short_circuited = self.counter.rolling_sum(RollingNumberEvent.SHORT_CIRCUITED)
  121. total_count = failure + success + timeout + thread_pool_rejected + short_circuited + semaphore_rejected
  122. error_count = failure + timeout + thread_pool_rejected + short_circuited + semaphore_rejected
  123. error_percentage = 0
  124. if total_count > 0:
  125. error_percentage = int(error_count / total_count * 100)
  126. self.health_counts_snapshot = HealthCounts(total_count, error_count, error_percentage)
  127. return self.health_counts_snapshot
  128. class HealthCounts(object):
  129. """ Number of requests during rolling window.
  130. Number that failed (failure + success + timeout + thread pool rejected +
  131. short circuited + semaphore rejected).
  132. Error percentage;
  133. """
  134. def __init__(self, total, error, error_percentage):
  135. self._total_count = total
  136. self._error_count = error
  137. self._error_percentage = error_percentage
  138. def total_requests(self):
  139. """ Total reqeust
  140. Returns:
  141. int: Returns total request count.
  142. """
  143. return self._total_count
  144. def error_count(self):
  145. """ Error count
  146. Returns:
  147. int: Returns error count.
  148. """
  149. return self._error_count
  150. def error_percentage(self):
  151. """ Error percentage
  152. Returns:
  153. int: Returns error percentage.
  154. """
  155. return self._error_percentage