minicurses.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import functools
  2. from threading import Lock
  3. from .utils import supports_terminal_sequences, write_string
  4. CONTROL_SEQUENCES = {
  5. 'DOWN': '\n',
  6. 'UP': '\033[A',
  7. 'ERASE_LINE': '\033[K',
  8. 'RESET': '\033[0m',
  9. }
  10. _COLORS = {
  11. 'BLACK': '0',
  12. 'RED': '1',
  13. 'GREEN': '2',
  14. 'YELLOW': '3',
  15. 'BLUE': '4',
  16. 'PURPLE': '5',
  17. 'CYAN': '6',
  18. 'WHITE': '7',
  19. }
  20. _TEXT_STYLES = {
  21. 'NORMAL': '0',
  22. 'BOLD': '1',
  23. 'UNDERLINED': '4',
  24. }
  25. def format_text(text, f):
  26. '''
  27. @param f String representation of formatting to apply in the form:
  28. [style] [light] font_color [on [light] bg_color]
  29. E.g. "red", "bold green on light blue"
  30. '''
  31. f = f.upper()
  32. tokens = f.strip().split()
  33. bg_color = ''
  34. if 'ON' in tokens:
  35. if tokens[-1] == 'ON':
  36. raise SyntaxError(f'Empty background format specified in {f!r}')
  37. if tokens[-1] not in _COLORS:
  38. raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
  39. bg_color = f'4{_COLORS[tokens.pop()]}'
  40. if tokens[-1] == 'LIGHT':
  41. bg_color = f'0;10{bg_color[1:]}'
  42. tokens.pop()
  43. if tokens[-1] != 'ON':
  44. raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
  45. bg_color = f'\033[{bg_color}m'
  46. tokens.pop()
  47. if not tokens:
  48. fg_color = ''
  49. elif tokens[-1] not in _COLORS:
  50. raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
  51. else:
  52. fg_color = f'3{_COLORS[tokens.pop()]}'
  53. if tokens and tokens[-1] == 'LIGHT':
  54. fg_color = f'9{fg_color[1:]}'
  55. tokens.pop()
  56. fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
  57. fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
  58. if tokens:
  59. raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
  60. if fg_color or bg_color:
  61. text = text.replace(CONTROL_SEQUENCES['RESET'], f'{fg_color}{bg_color}')
  62. return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
  63. else:
  64. return text
  65. class MultilinePrinterBase:
  66. def __init__(self, stream=None, lines=1):
  67. self.stream = stream
  68. self.maximum = lines - 1
  69. self._HAVE_FULLCAP = supports_terminal_sequences(stream)
  70. def __enter__(self):
  71. return self
  72. def __exit__(self, *args):
  73. self.end()
  74. def print_at_line(self, text, pos):
  75. pass
  76. def end(self):
  77. pass
  78. def _add_line_number(self, text, line):
  79. if self.maximum:
  80. return f'{line + 1}: {text}'
  81. return text
  82. def write(self, *text):
  83. write_string(''.join(text), self.stream)
  84. class QuietMultilinePrinter(MultilinePrinterBase):
  85. pass
  86. class MultilineLogger(MultilinePrinterBase):
  87. def write(self, *text):
  88. self.stream.debug(''.join(text))
  89. def print_at_line(self, text, pos):
  90. # stream is the logger object, not an actual stream
  91. self.write(self._add_line_number(text, pos))
  92. class BreaklineStatusPrinter(MultilinePrinterBase):
  93. def print_at_line(self, text, pos):
  94. self.write(self._add_line_number(text, pos), '\n')
  95. class MultilinePrinter(MultilinePrinterBase):
  96. def __init__(self, stream=None, lines=1, preserve_output=True):
  97. super().__init__(stream, lines)
  98. self.preserve_output = preserve_output
  99. self._lastline = self._lastlength = 0
  100. self._movelock = Lock()
  101. def lock(func):
  102. @functools.wraps(func)
  103. def wrapper(self, *args, **kwargs):
  104. with self._movelock:
  105. return func(self, *args, **kwargs)
  106. return wrapper
  107. def _move_cursor(self, dest):
  108. current = min(self._lastline, self.maximum)
  109. yield '\r'
  110. distance = dest - current
  111. if distance < 0:
  112. yield CONTROL_SEQUENCES['UP'] * -distance
  113. elif distance > 0:
  114. yield CONTROL_SEQUENCES['DOWN'] * distance
  115. self._lastline = dest
  116. @lock
  117. def print_at_line(self, text, pos):
  118. if self._HAVE_FULLCAP:
  119. self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
  120. return
  121. text = self._add_line_number(text, pos)
  122. textlen = len(text)
  123. if self._lastline == pos:
  124. # move cursor at the start of progress when writing to same line
  125. prefix = '\r'
  126. if self._lastlength > textlen:
  127. text += ' ' * (self._lastlength - textlen)
  128. self._lastlength = textlen
  129. else:
  130. # otherwise, break the line
  131. prefix = '\n'
  132. self._lastlength = textlen
  133. self.write(prefix, text)
  134. self._lastline = pos
  135. @lock
  136. def end(self):
  137. # move cursor to the end of the last line, and write line break
  138. # so that other to_screen calls can precede
  139. text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else []
  140. if self.preserve_output:
  141. self.write(*text, '\n')
  142. return
  143. if self._HAVE_FULLCAP:
  144. self.write(
  145. *text, CONTROL_SEQUENCES['ERASE_LINE'],
  146. f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
  147. else:
  148. self.write('\r', ' ' * self._lastlength, '\r')