__init__.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. #!/usr/bin/env python
  2. # License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
  3. import fcntl
  4. import io
  5. import os
  6. import select
  7. import shlex
  8. import shutil
  9. import signal
  10. import struct
  11. import sys
  12. import termios
  13. import time
  14. from contextlib import contextmanager, suppress
  15. from functools import wraps
  16. from pty import CHILD, STDIN_FILENO, STDOUT_FILENO, fork
  17. from unittest import TestCase
  18. from kitty.config import finalize_keys, finalize_mouse_mappings
  19. from kitty.fast_data_types import TEXT_SIZE_CODE, Cursor, HistoryBuf, LineBuf, Screen, get_options, monotonic, set_options
  20. from kitty.options.parse import merge_result_dicts
  21. from kitty.options.types import Options, defaults
  22. from kitty.rgb import to_color
  23. from kitty.types import MouseEvent
  24. from kitty.utils import read_screen_size
  25. from kitty.window import decode_cmdline, process_remote_print, process_title_from_child
  26. def parse_bytes(screen, data, dump_callback=None):
  27. data = memoryview(data)
  28. while data:
  29. dest = screen.test_create_write_buffer()
  30. s = screen.test_commit_write_buffer(data, dest)
  31. data = data[s:]
  32. screen.test_parse_written_data(dump_callback)
  33. def draw_multicell(screen: Screen, text: str, width: int = 0, scale: int = 1, subscale_n: int = 0, subscale_d: int = 0, vertical_align: int = 0) -> None:
  34. cmd = f'\x1b]{TEXT_SIZE_CODE};w={width}:s={scale}:n={subscale_n}:d={subscale_d}:v={vertical_align};{text}\a'
  35. parse_bytes(screen, cmd.encode())
  36. class Callbacks:
  37. def __init__(self, pty=None) -> None:
  38. self.clear()
  39. self.pty = pty
  40. self.ftc = None
  41. self.set_pointer_shape = lambda data: None
  42. self.last_cmd_at = 0
  43. self.last_cmd_cmdline = ''
  44. self.last_cmd_exit_status = sys.maxsize
  45. def write(self, data) -> None:
  46. self.wtcbuf += bytes(data)
  47. def notify_child_of_resize(self):
  48. self.num_of_resize_events += 1
  49. def color_control(self, code, data) -> None:
  50. from kitty.window import color_control
  51. response = color_control(self.color_profile, code, data)
  52. if response:
  53. def p(x):
  54. if '@' in x:
  55. return (to_color(x.partition('@')[0]), int(255 * float(x.partition('@')[2])))
  56. ans = to_color(x)
  57. if ans is None:
  58. ans = x
  59. return ans
  60. parts = {x.partition('=')[0]:p(x.partition('=')[2]) for x in response.split(';')[1:]}
  61. self.color_control_responses.append(parts)
  62. def title_changed(self, data, is_base64=False) -> None:
  63. self.titlebuf.append(process_title_from_child(data, is_base64, ''))
  64. def icon_changed(self, data) -> None:
  65. self.iconbuf += str(data, 'utf-8')
  66. def set_dynamic_color(self, code, data='') -> None:
  67. if code == 22:
  68. self.set_pointer_shape(data)
  69. else:
  70. self.colorbuf += str(data or b'', 'utf-8')
  71. def set_color_table_color(self, code, data='') -> None:
  72. self.ctbuf += ''
  73. def color_profile_popped(self, x) -> None:
  74. pass
  75. def cmd_output_marking(self, is_start: bool | None, data: str = '') -> None:
  76. if is_start:
  77. self.last_cmd_at = monotonic()
  78. self.last_cmd_cmdline = decode_cmdline(data) if data else data
  79. else:
  80. if self.last_cmd_at != 0:
  81. self.last_cmd_at = 0
  82. with suppress(Exception):
  83. self.last_cmd_exit_status = int(data)
  84. def request_capabilities(self, q) -> None:
  85. from kitty.terminfo import get_capabilities
  86. for c in get_capabilities(q, None):
  87. self.write(c.encode('ascii'))
  88. def desktop_notify(self, osc_code: int, raw_data: memoryview) -> None:
  89. self.notifications.append((osc_code, str(raw_data, 'utf-8')))
  90. def open_url(self, url: str, hyperlink_id: int) -> None:
  91. self.open_urls.append((url, hyperlink_id))
  92. def clipboard_control(self, data: memoryview, is_partial: bool = False) -> None:
  93. self.cc_buf.append((str(data, 'utf-8'), is_partial))
  94. def clear(self) -> None:
  95. self.wtcbuf = b''
  96. self.iconbuf = self.colorbuf = self.ctbuf = ''
  97. self.titlebuf = []
  98. self.printbuf = []
  99. self.color_control_responses = []
  100. self.notifications = []
  101. self.open_urls = []
  102. self.cc_buf = []
  103. self.bell_count = 0
  104. self.clone_cmds = []
  105. self.current_clone_data = ''
  106. self.last_cmd_exit_status = sys.maxsize
  107. self.last_cmd_cmdline = ''
  108. self.last_cmd_at = 0
  109. self.num_of_resize_events = 0
  110. def on_bell(self) -> None:
  111. self.bell_count += 1
  112. def on_activity_since_last_focus(self) -> None:
  113. pass
  114. def on_mouse_event(self, event):
  115. ev = MouseEvent(**event)
  116. opts = get_options()
  117. action_def = opts.mousemap.get(ev)
  118. if not action_def:
  119. return False
  120. self.current_mouse_button = ev.button
  121. for action in opts.alias_map.resolve_aliases(action_def, 'mouse_map'):
  122. getattr(self, action.func)(*action.args)
  123. self.current_mouse_button = 0
  124. return True
  125. def handle_remote_print(self, msg):
  126. text = process_remote_print(msg)
  127. self.printbuf.append(text)
  128. def handle_remote_cmd(self, msg):
  129. pass
  130. def handle_remote_clone(self, msg):
  131. msg = str(msg, 'utf-8')
  132. if not msg:
  133. if self.current_clone_data:
  134. cdata, self.current_clone_data = self.current_clone_data, ''
  135. from kitty.launch import CloneCmd
  136. self.clone_cmds.append(CloneCmd(cdata))
  137. self.current_clone_data = ''
  138. return
  139. num, rest = msg.split(':', 1)
  140. if num == '0' or len(self.current_clone_data) > 1024 * 1024:
  141. self.current_clone_data = ''
  142. self.current_clone_data += rest
  143. def handle_remote_ssh(self, msg):
  144. from kittens.ssh.utils import get_ssh_data
  145. if self.pty:
  146. for line in get_ssh_data(msg, "testing"):
  147. self.pty.write_to_child(line)
  148. def handle_remote_echo(self, msg):
  149. from base64 import standard_b64decode
  150. if self.pty:
  151. data = standard_b64decode(msg)
  152. self.pty.write_to_child(data)
  153. def file_transmission(self, data):
  154. if self.ftc:
  155. self.ftc.handle_serialized_command(data)
  156. def filled_line_buf(ynum=5, xnum=5, cursor=Cursor()):
  157. ans = LineBuf(ynum, xnum)
  158. cursor.x = 0
  159. for i in range(ynum):
  160. t = (f'{i}') * xnum
  161. ans.line(i).set_text(t, 0, xnum, cursor)
  162. return ans
  163. def filled_cursor():
  164. ans = Cursor()
  165. ans.bold = ans.italic = ans.reverse = ans.strikethrough = ans.dim = True
  166. ans.fg = 0x101
  167. ans.bg = 0x201
  168. ans.decoration_fg = 0x301
  169. return ans
  170. def filled_history_buf(ynum=5, xnum=5, cursor=Cursor()):
  171. lb = filled_line_buf(ynum, xnum, cursor)
  172. ans = HistoryBuf(ynum, xnum)
  173. for i in range(ynum):
  174. ans.push(lb.line(i))
  175. return ans
  176. def retry_on_failure(max_attempts=2, sleep_duration=2):
  177. def decorator(func):
  178. @wraps(func)
  179. def wrapper(*args, **kwargs):
  180. for attempt in range(max_attempts):
  181. try:
  182. return func(*args, **kwargs)
  183. except Exception as e:
  184. if attempt < max_attempts - 1: # Don't sleep on the last attempt
  185. time.sleep(sleep_duration)
  186. else:
  187. raise e # Re-raise the last exception
  188. return wrapper
  189. return decorator
  190. class BaseTest(TestCase):
  191. ae = TestCase.assertEqual
  192. maxDiff = 2048
  193. is_ci = os.environ.get('CI') == 'true'
  194. def rmtree_ignoring_errors(self, tdir):
  195. try:
  196. shutil.rmtree(tdir)
  197. except FileNotFoundError as err:
  198. print('Failed to delete the directory:', tdir, 'with error:', err, file=sys.stderr)
  199. def tearDown(self):
  200. set_options(None)
  201. def set_options(self, options=None):
  202. final_options = {'scrollback_pager_history_size': 1024, 'click_interval': 0.5}
  203. if options:
  204. final_options.update(options)
  205. options = Options(merge_result_dicts(defaults._asdict(), final_options))
  206. finalize_keys(options, {})
  207. finalize_mouse_mappings(options, {})
  208. set_options(options)
  209. return options
  210. def cmd_to_run_python_code(self, code):
  211. from kitty.constants import kitty_exe
  212. return [kitty_exe(), '+runpy', code]
  213. def create_screen(self, cols=5, lines=5, scrollback=5, cell_width=10, cell_height=20, options=None):
  214. self.set_options(options)
  215. c = Callbacks()
  216. s = Screen(c, lines, cols, scrollback, cell_width, cell_height, 0, c)
  217. c.color_profile = s.color_profile
  218. return s
  219. def create_pty(
  220. self, argv=None, cols=80, lines=100, scrollback=100, cell_width=10, cell_height=20,
  221. options=None, cwd=None, env=None, stdin_fd=None, stdout_fd=None
  222. ):
  223. self.set_options(options)
  224. return PTY(argv, lines, cols, scrollback, cell_width, cell_height, cwd, env, stdin_fd=stdin_fd, stdout_fd=stdout_fd)
  225. def assertEqualAttributes(self, c1, c2):
  226. x1, y1, c1.x, c1.y = c1.x, c1.y, 0, 0
  227. x2, y2, c2.x, c2.y = c2.x, c2.y, 0, 0
  228. try:
  229. self.assertEqual(c1, c2)
  230. finally:
  231. c1.x, c1.y, c2.x, c2.y = x1, y1, x2, y2
  232. debug_stdout = debug_stderr = -1
  233. @contextmanager
  234. def forwardable_stdio():
  235. global debug_stderr, debug_stdout
  236. debug_stdout = fd = os.dup(sys.stdout.fileno())
  237. os.set_inheritable(fd, True)
  238. debug_stderr = fd = os.dup(sys.stderr.fileno())
  239. os.set_inheritable(fd, True)
  240. try:
  241. yield
  242. finally:
  243. os.close(debug_stderr)
  244. os.close(debug_stdout)
  245. debug_stderr = debug_stdout = -1
  246. class PTY:
  247. def __init__(
  248. self, argv=None, rows=25, columns=80, scrollback=100, cell_width=10, cell_height=20,
  249. cwd=None, env=None, stdin_fd=None, stdout_fd=None
  250. ):
  251. self.is_child = False
  252. if isinstance(argv, str):
  253. argv = shlex.split(argv)
  254. self.write_buf = b''
  255. if argv is None:
  256. from kitty.child import openpty
  257. self.master_fd, self.slave_fd = openpty()
  258. self.child_pid = 0
  259. else:
  260. self.child_pid, self.master_fd = fork()
  261. self.is_child = self.child_pid == CHILD
  262. self.child_waited_for = False
  263. if self.is_child:
  264. while read_screen_size().width != columns * cell_width:
  265. time.sleep(0.01)
  266. if cwd:
  267. os.chdir(cwd)
  268. if stdin_fd is not None:
  269. os.dup2(stdin_fd, STDIN_FILENO)
  270. os.close(stdin_fd)
  271. if stdout_fd is not None:
  272. os.dup2(stdout_fd, STDOUT_FILENO)
  273. os.close(stdout_fd)
  274. signal.pthread_sigmask(signal.SIG_SETMASK, ())
  275. env = os.environ if env is None else env
  276. if debug_stdout > -1:
  277. env['KITTY_STDIO_FORWARDED'] = str(debug_stdout)
  278. os.execvpe(argv[0], argv, env)
  279. if stdin_fd is not None:
  280. os.close(stdin_fd)
  281. if stdout_fd is not None:
  282. os.close(stdout_fd)
  283. os.set_blocking(self.master_fd, False)
  284. self.cell_width = cell_width
  285. self.cell_height = cell_height
  286. self.set_window_size(rows=rows, columns=columns)
  287. self.callbacks = Callbacks(self)
  288. self.screen = Screen(self.callbacks, rows, columns, scrollback, cell_width, cell_height, 0, self.callbacks)
  289. self.received_bytes = b''
  290. def turn_off_echo(self):
  291. s = termios.tcgetattr(self.master_fd)
  292. s[3] &= ~termios.ECHO
  293. termios.tcsetattr(self.master_fd, termios.TCSANOW, s)
  294. def is_echo_on(self):
  295. s = termios.tcgetattr(self.master_fd)
  296. return True if s[3] & termios.ECHO else False
  297. def __del__(self):
  298. if not self.is_child:
  299. if hasattr(self, 'master_fd'):
  300. os.close(self.master_fd)
  301. del self.master_fd
  302. if hasattr(self, 'slave_fd'):
  303. os.close(self.slave_fd)
  304. del self.slave_fd
  305. if self.child_pid > 0 and not self.child_waited_for:
  306. os.waitpid(self.child_pid, 0)
  307. self.child_waited_for = True
  308. def write_to_child(self, data, flush=False):
  309. if isinstance(data, str):
  310. data = data.encode('utf-8')
  311. self.write_buf += data
  312. if flush:
  313. self.process_input_from_child(0)
  314. def send_cmd_to_child(self, cmd, flush=False):
  315. self.callbacks.last_cmd_exit_status = sys.maxsize
  316. self.last_cmd = cmd
  317. self.write_to_child(cmd + '\r', flush=flush)
  318. def process_input_from_child(self, timeout=10):
  319. rd, wd, _ = select.select([self.master_fd], [self.master_fd] if self.write_buf else [], [], max(0, timeout))
  320. if wd:
  321. n = os.write(self.master_fd, self.write_buf)
  322. self.write_buf = self.write_buf[n:]
  323. bytes_read = 0
  324. if rd:
  325. data = os.read(self.master_fd, io.DEFAULT_BUFFER_SIZE)
  326. bytes_read += len(data)
  327. self.received_bytes += data
  328. parse_bytes(self.screen, data)
  329. return bytes_read
  330. def wait_till(self, q, timeout=10, timeout_msg=None):
  331. end_time = time.monotonic() + timeout
  332. while not q() and time.monotonic() <= end_time:
  333. try:
  334. self.process_input_from_child(timeout=end_time - time.monotonic())
  335. except OSError as e:
  336. if not q():
  337. raise Exception(f'Failed to read from pty with error: {e}. Screen contents: \n {repr(self.screen_contents())}') from e
  338. return
  339. if not q():
  340. msg = 'The condition was not met'
  341. if timeout_msg is not None:
  342. msg = timeout_msg()
  343. raise TimeoutError(f'Timed out: {msg}. Screen contents: \n {repr(self.screen_contents())}')
  344. def wait_till_child_exits(self, timeout=30 if BaseTest.is_ci else 10, require_exit_code=None):
  345. end_time = time.monotonic() + timeout
  346. while time.monotonic() <= end_time:
  347. si_pid, status = os.waitpid(self.child_pid, os.WNOHANG)
  348. if si_pid == self.child_pid and os.WIFEXITED(status):
  349. ec = os.waitstatus_to_exitcode(status) if hasattr(os, 'waitstatus_to_exitcode') else require_exit_code
  350. self.child_waited_for = True
  351. if require_exit_code is not None and ec != require_exit_code:
  352. raise AssertionError(
  353. f'Child exited with exit status: {status} code: {ec} != {require_exit_code}.'
  354. f' Screen contents:\n{self.screen_contents()}')
  355. return status
  356. with suppress(OSError):
  357. self.process_input_from_child(timeout=0.02)
  358. raise AssertionError(f'Child did not exit in {timeout} seconds. Screen contents:\n{self.screen_contents()}')
  359. def set_window_size(self, rows=25, columns=80, send_signal=True):
  360. if hasattr(self, 'screen'):
  361. self.screen.resize(rows, columns)
  362. if send_signal:
  363. x_pixels = columns * self.cell_width
  364. y_pixels = rows * self.cell_height
  365. s = struct.pack('HHHH', rows, columns, x_pixels, y_pixels)
  366. fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, s)
  367. def screen_contents(self):
  368. lines = []
  369. for i in range(self.screen.lines):
  370. x = str(self.screen.line(i))
  371. if x:
  372. lines.append(x)
  373. return '\n'.join(lines)
  374. def last_cmd_output(self, as_ansi=False, add_wrap_markers=False):
  375. from kitty.window import cmd_output
  376. return cmd_output(self.screen, as_ansi=as_ansi, add_wrap_markers=add_wrap_markers)