__init__.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. #!/usr/bin/env python
  2. # License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
  3. import fcntl
  4. import io
  5. import os
  6. import select
  7. import shlex
  8. import shutil
  9. import signal
  10. import struct
  11. import sys
  12. import termios
  13. import time
  14. from contextlib import contextmanager, suppress
  15. from functools import wraps
  16. from pty import CHILD, STDIN_FILENO, STDOUT_FILENO, fork
  17. from typing import Optional
  18. from unittest import TestCase
  19. from kitty.config import finalize_keys, finalize_mouse_mappings
  20. from kitty.fast_data_types import Cursor, HistoryBuf, LineBuf, Screen, get_options, monotonic, set_options
  21. from kitty.options.parse import merge_result_dicts
  22. from kitty.options.types import Options, defaults
  23. from kitty.types import MouseEvent
  24. from kitty.utils import read_screen_size
  25. from kitty.window import decode_cmdline, process_remote_print, process_title_from_child
  26. def parse_bytes(screen, data, dump_callback=None):
  27. data = memoryview(data)
  28. while data:
  29. dest = screen.test_create_write_buffer()
  30. s = screen.test_commit_write_buffer(data, dest)
  31. data = data[s:]
  32. screen.test_parse_written_data(dump_callback)
  33. class Callbacks:
  34. def __init__(self, pty=None) -> None:
  35. self.clear()
  36. self.pty = pty
  37. self.ftc = None
  38. self.set_pointer_shape = lambda data: None
  39. self.last_cmd_at = 0
  40. self.last_cmd_cmdline = ''
  41. self.last_cmd_exit_status = sys.maxsize
  42. def write(self, data) -> None:
  43. self.wtcbuf += bytes(data)
  44. def notify_child_of_resize(self):
  45. self.num_of_resize_events += 1
  46. def title_changed(self, data, is_base64=False) -> None:
  47. self.titlebuf.append(process_title_from_child(data, is_base64, ''))
  48. def icon_changed(self, data) -> None:
  49. self.iconbuf += str(data, 'utf-8')
  50. def set_dynamic_color(self, code, data='') -> None:
  51. if code == 22:
  52. self.set_pointer_shape(data)
  53. else:
  54. self.colorbuf += str(data or b'', 'utf-8')
  55. def set_color_table_color(self, code, data='') -> None:
  56. self.ctbuf += ''
  57. def color_profile_popped(self, x) -> None:
  58. pass
  59. def cmd_output_marking(self, is_start: Optional[bool], data: str = '') -> None:
  60. if is_start:
  61. self.last_cmd_at = monotonic()
  62. self.last_cmd_cmdline = decode_cmdline(data) if data else data
  63. else:
  64. if self.last_cmd_at != 0:
  65. self.last_cmd_at = 0
  66. with suppress(Exception):
  67. self.last_cmd_exit_status = int(data)
  68. def request_capabilities(self, q) -> None:
  69. from kitty.terminfo import get_capabilities
  70. for c in get_capabilities(q, None):
  71. self.write(c.encode('ascii'))
  72. def desktop_notify(self, osc_code: int, raw_data: memoryview) -> None:
  73. self.notifications.append((osc_code, str(raw_data, 'utf-8')))
  74. def open_url(self, url: str, hyperlink_id: int) -> None:
  75. self.open_urls.append((url, hyperlink_id))
  76. def clipboard_control(self, data: memoryview, is_partial: bool = False) -> None:
  77. self.cc_buf.append((str(data, 'utf-8'), is_partial))
  78. def clear(self) -> None:
  79. self.wtcbuf = b''
  80. self.iconbuf = self.colorbuf = self.ctbuf = ''
  81. self.titlebuf = []
  82. self.printbuf = []
  83. self.notifications = []
  84. self.open_urls = []
  85. self.cc_buf = []
  86. self.bell_count = 0
  87. self.clone_cmds = []
  88. self.current_clone_data = ''
  89. self.last_cmd_exit_status = sys.maxsize
  90. self.last_cmd_cmdline = ''
  91. self.last_cmd_at = 0
  92. self.num_of_resize_events = 0
  93. def on_bell(self) -> None:
  94. self.bell_count += 1
  95. def on_activity_since_last_focus(self) -> None:
  96. pass
  97. def on_mouse_event(self, event):
  98. ev = MouseEvent(**event)
  99. opts = get_options()
  100. action_def = opts.mousemap.get(ev)
  101. if not action_def:
  102. return False
  103. self.current_mouse_button = ev.button
  104. for action in opts.alias_map.resolve_aliases(action_def, 'mouse_map'):
  105. getattr(self, action.func)(*action.args)
  106. self.current_mouse_button = 0
  107. return True
  108. def handle_remote_print(self, msg):
  109. text = process_remote_print(msg)
  110. self.printbuf.append(text)
  111. def handle_remote_cmd(self, msg):
  112. pass
  113. def handle_remote_clone(self, msg):
  114. msg = str(msg, 'utf-8')
  115. if not msg:
  116. if self.current_clone_data:
  117. cdata, self.current_clone_data = self.current_clone_data, ''
  118. from kitty.launch import CloneCmd
  119. self.clone_cmds.append(CloneCmd(cdata))
  120. self.current_clone_data = ''
  121. return
  122. num, rest = msg.split(':', 1)
  123. if num == '0' or len(self.current_clone_data) > 1024 * 1024:
  124. self.current_clone_data = ''
  125. self.current_clone_data += rest
  126. def handle_remote_ssh(self, msg):
  127. from kittens.ssh.utils import get_ssh_data
  128. if self.pty:
  129. for line in get_ssh_data(msg, "testing"):
  130. self.pty.write_to_child(line)
  131. def handle_remote_echo(self, msg):
  132. from base64 import standard_b64decode
  133. if self.pty:
  134. data = standard_b64decode(msg)
  135. self.pty.write_to_child(data)
  136. def file_transmission(self, data):
  137. if self.ftc:
  138. self.ftc.handle_serialized_command(data)
  139. def filled_line_buf(ynum=5, xnum=5, cursor=Cursor()):
  140. ans = LineBuf(ynum, xnum)
  141. cursor.x = 0
  142. for i in range(ynum):
  143. t = (f'{i}') * xnum
  144. ans.line(i).set_text(t, 0, xnum, cursor)
  145. return ans
  146. def filled_cursor():
  147. ans = Cursor()
  148. ans.bold = ans.italic = ans.reverse = ans.strikethrough = ans.dim = True
  149. ans.fg = 0x101
  150. ans.bg = 0x201
  151. ans.decoration_fg = 0x301
  152. return ans
  153. def filled_history_buf(ynum=5, xnum=5, cursor=Cursor()):
  154. lb = filled_line_buf(ynum, xnum, cursor)
  155. ans = HistoryBuf(ynum, xnum)
  156. for i in range(ynum):
  157. ans.push(lb.line(i))
  158. return ans
  159. def retry_on_failure(max_attempts=2, sleep_duration=2):
  160. def decorator(func):
  161. @wraps(func)
  162. def wrapper(*args, **kwargs):
  163. for attempt in range(max_attempts):
  164. try:
  165. return func(*args, **kwargs)
  166. except Exception as e:
  167. if attempt < max_attempts - 1: # Don't sleep on the last attempt
  168. time.sleep(sleep_duration)
  169. else:
  170. raise e # Re-raise the last exception
  171. return wrapper
  172. return decorator
  173. class BaseTest(TestCase):
  174. ae = TestCase.assertEqual
  175. maxDiff = 2048
  176. is_ci = os.environ.get('CI') == 'true'
  177. def rmtree_ignoring_errors(self, tdir):
  178. try:
  179. shutil.rmtree(tdir)
  180. except FileNotFoundError as err:
  181. print('Failed to delete the directory:', tdir, 'with error:', err, file=sys.stderr)
  182. def tearDown(self):
  183. set_options(None)
  184. def set_options(self, options=None):
  185. final_options = {'scrollback_pager_history_size': 1024, 'click_interval': 0.5}
  186. if options:
  187. final_options.update(options)
  188. options = Options(merge_result_dicts(defaults._asdict(), final_options))
  189. finalize_keys(options, {})
  190. finalize_mouse_mappings(options, {})
  191. set_options(options)
  192. return options
  193. def cmd_to_run_python_code(self, code):
  194. from kitty.constants import kitty_exe
  195. return [kitty_exe(), '+runpy', code]
  196. def create_screen(self, cols=5, lines=5, scrollback=5, cell_width=10, cell_height=20, options=None):
  197. self.set_options(options)
  198. c = Callbacks()
  199. s = Screen(c, lines, cols, scrollback, cell_width, cell_height, 0, c)
  200. return s
  201. def create_pty(
  202. self, argv=None, cols=80, lines=100, scrollback=100, cell_width=10, cell_height=20,
  203. options=None, cwd=None, env=None, stdin_fd=None, stdout_fd=None
  204. ):
  205. self.set_options(options)
  206. return PTY(argv, lines, cols, scrollback, cell_width, cell_height, cwd, env, stdin_fd=stdin_fd, stdout_fd=stdout_fd)
  207. def assertEqualAttributes(self, c1, c2):
  208. x1, y1, c1.x, c1.y = c1.x, c1.y, 0, 0
  209. x2, y2, c2.x, c2.y = c2.x, c2.y, 0, 0
  210. try:
  211. self.assertEqual(c1, c2)
  212. finally:
  213. c1.x, c1.y, c2.x, c2.y = x1, y1, x2, y2
  214. debug_stdout = debug_stderr = -1
  215. @contextmanager
  216. def forwardable_stdio():
  217. global debug_stderr, debug_stdout
  218. debug_stdout = fd = os.dup(sys.stdout.fileno())
  219. os.set_inheritable(fd, True)
  220. debug_stderr = fd = os.dup(sys.stderr.fileno())
  221. os.set_inheritable(fd, True)
  222. try:
  223. yield
  224. finally:
  225. os.close(debug_stderr)
  226. os.close(debug_stdout)
  227. debug_stderr = debug_stdout = -1
  228. class PTY:
  229. def __init__(
  230. self, argv=None, rows=25, columns=80, scrollback=100, cell_width=10, cell_height=20,
  231. cwd=None, env=None, stdin_fd=None, stdout_fd=None
  232. ):
  233. self.is_child = False
  234. if isinstance(argv, str):
  235. argv = shlex.split(argv)
  236. self.write_buf = b''
  237. if argv is None:
  238. from kitty.child import openpty
  239. self.master_fd, self.slave_fd = openpty()
  240. else:
  241. self.child_pid, self.master_fd = fork()
  242. self.is_child = self.child_pid == CHILD
  243. self.child_waited_for = False
  244. if self.is_child:
  245. while read_screen_size().width != columns * cell_width:
  246. time.sleep(0.01)
  247. if cwd:
  248. os.chdir(cwd)
  249. if stdin_fd is not None:
  250. os.dup2(stdin_fd, STDIN_FILENO)
  251. os.close(stdin_fd)
  252. if stdout_fd is not None:
  253. os.dup2(stdout_fd, STDOUT_FILENO)
  254. os.close(stdout_fd)
  255. signal.pthread_sigmask(signal.SIG_SETMASK, ())
  256. env = os.environ if env is None else env
  257. if debug_stdout > -1:
  258. env['KITTY_STDIO_FORWARDED'] = str(debug_stdout)
  259. os.execvpe(argv[0], argv, env)
  260. if stdin_fd is not None:
  261. os.close(stdin_fd)
  262. if stdout_fd is not None:
  263. os.close(stdout_fd)
  264. os.set_blocking(self.master_fd, False)
  265. self.cell_width = cell_width
  266. self.cell_height = cell_height
  267. self.set_window_size(rows=rows, columns=columns)
  268. self.callbacks = Callbacks(self)
  269. self.screen = Screen(self.callbacks, rows, columns, scrollback, cell_width, cell_height, 0, self.callbacks)
  270. self.received_bytes = b''
  271. def turn_off_echo(self):
  272. s = termios.tcgetattr(self.master_fd)
  273. s[3] &= ~termios.ECHO
  274. termios.tcsetattr(self.master_fd, termios.TCSANOW, s)
  275. def is_echo_on(self):
  276. s = termios.tcgetattr(self.master_fd)
  277. return True if s[3] & termios.ECHO else False
  278. def __del__(self):
  279. if not self.is_child:
  280. if hasattr(self, 'master_fd'):
  281. os.close(self.master_fd)
  282. del self.master_fd
  283. if hasattr(self, 'slave_fd'):
  284. os.close(self.slave_fd)
  285. del self.slave_fd
  286. if self.child_pid > 0 and not self.child_waited_for:
  287. os.waitpid(self.child_pid, 0)
  288. self.child_waited_for = True
  289. def write_to_child(self, data, flush=False):
  290. if isinstance(data, str):
  291. data = data.encode('utf-8')
  292. self.write_buf += data
  293. if flush:
  294. self.process_input_from_child(0)
  295. def send_cmd_to_child(self, cmd, flush=False):
  296. self.callbacks.last_cmd_exit_status = sys.maxsize
  297. self.last_cmd = cmd
  298. self.write_to_child(cmd + '\r', flush=flush)
  299. def process_input_from_child(self, timeout=10):
  300. rd, wd, _ = select.select([self.master_fd], [self.master_fd] if self.write_buf else [], [], max(0, timeout))
  301. if wd:
  302. n = os.write(self.master_fd, self.write_buf)
  303. self.write_buf = self.write_buf[n:]
  304. bytes_read = 0
  305. if rd:
  306. data = os.read(self.master_fd, io.DEFAULT_BUFFER_SIZE)
  307. bytes_read += len(data)
  308. self.received_bytes += data
  309. parse_bytes(self.screen, data)
  310. return bytes_read
  311. def wait_till(self, q, timeout=10, timeout_msg=None):
  312. end_time = time.monotonic() + timeout
  313. while not q() and time.monotonic() <= end_time:
  314. self.process_input_from_child(timeout=end_time - time.monotonic())
  315. if not q():
  316. msg = 'The condition was not met'
  317. if timeout_msg is not None:
  318. msg = timeout_msg()
  319. raise TimeoutError(f'Timed out: {msg}. Screen contents: \n {repr(self.screen_contents())}')
  320. def wait_till_child_exits(self, timeout=30 if BaseTest.is_ci else 10, require_exit_code=None):
  321. end_time = time.monotonic() + timeout
  322. while time.monotonic() <= end_time:
  323. si_pid, status = os.waitpid(self.child_pid, os.WNOHANG)
  324. if si_pid == self.child_pid and os.WIFEXITED(status):
  325. ec = os.waitstatus_to_exitcode(status) if hasattr(os, 'waitstatus_to_exitcode') else require_exit_code
  326. self.child_waited_for = True
  327. if require_exit_code is not None and ec != require_exit_code:
  328. raise AssertionError(
  329. f'Child exited with exit status: {status} code: {ec} != {require_exit_code}.'
  330. f' Screen contents:\n{self.screen_contents()}')
  331. return status
  332. with suppress(OSError):
  333. self.process_input_from_child(timeout=0.02)
  334. raise AssertionError(f'Child did not exit in {timeout} seconds. Screen contents:\n{self.screen_contents()}')
  335. def set_window_size(self, rows=25, columns=80, send_signal=True):
  336. if hasattr(self, 'screen'):
  337. self.screen.resize(rows, columns)
  338. if send_signal:
  339. x_pixels = columns * self.cell_width
  340. y_pixels = rows * self.cell_height
  341. s = struct.pack('HHHH', rows, columns, x_pixels, y_pixels)
  342. fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, s)
  343. def screen_contents(self):
  344. lines = []
  345. for i in range(self.screen.lines):
  346. x = str(self.screen.line(i))
  347. if x:
  348. lines.append(x)
  349. return '\n'.join(lines)
  350. def last_cmd_output(self, as_ansi=False, add_wrap_markers=False):
  351. from kitty.window import cmd_output
  352. return cmd_output(self.screen, as_ansi=as_ansi, add_wrap_markers=add_wrap_markers)