__init__.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. #!/usr/bin/env python
  2. # License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
  3. import fcntl
  4. import io
  5. import os
  6. import select
  7. import shlex
  8. import shutil
  9. import signal
  10. import struct
  11. import sys
  12. import termios
  13. import time
  14. from contextlib import contextmanager, suppress
  15. from functools import wraps
  16. from pty import CHILD, STDIN_FILENO, STDOUT_FILENO, fork
  17. from typing import Optional
  18. from unittest import TestCase
  19. from kitty.config import finalize_keys, finalize_mouse_mappings
  20. from kitty.fast_data_types import Cursor, HistoryBuf, LineBuf, Screen, get_options, monotonic, set_options
  21. from kitty.options.parse import merge_result_dicts
  22. from kitty.options.types import Options, defaults
  23. from kitty.rgb import to_color
  24. from kitty.types import MouseEvent
  25. from kitty.utils import read_screen_size
  26. from kitty.window import decode_cmdline, process_remote_print, process_title_from_child
  27. def parse_bytes(screen, data, dump_callback=None):
  28. data = memoryview(data)
  29. while data:
  30. dest = screen.test_create_write_buffer()
  31. s = screen.test_commit_write_buffer(data, dest)
  32. data = data[s:]
  33. screen.test_parse_written_data(dump_callback)
  34. class Callbacks:
  35. def __init__(self, pty=None) -> None:
  36. self.clear()
  37. self.pty = pty
  38. self.ftc = None
  39. self.set_pointer_shape = lambda data: None
  40. self.last_cmd_at = 0
  41. self.last_cmd_cmdline = ''
  42. self.last_cmd_exit_status = sys.maxsize
  43. def write(self, data) -> None:
  44. self.wtcbuf += bytes(data)
  45. def notify_child_of_resize(self):
  46. self.num_of_resize_events += 1
  47. def color_control(self, code, data) -> None:
  48. from kitty.window import color_control
  49. response = color_control(self.color_profile, code, data)
  50. if response:
  51. def p(x):
  52. ans = to_color(x)
  53. if ans is None:
  54. ans = x
  55. return ans
  56. parts = {x.partition('=')[0]:p(x.partition('=')[2]) for x in response.split(';')[1:]}
  57. self.color_control_responses.append(parts)
  58. def title_changed(self, data, is_base64=False) -> None:
  59. self.titlebuf.append(process_title_from_child(data, is_base64, ''))
  60. def icon_changed(self, data) -> None:
  61. self.iconbuf += str(data, 'utf-8')
  62. def set_dynamic_color(self, code, data='') -> None:
  63. if code == 22:
  64. self.set_pointer_shape(data)
  65. else:
  66. self.colorbuf += str(data or b'', 'utf-8')
  67. def set_color_table_color(self, code, data='') -> None:
  68. self.ctbuf += ''
  69. def color_profile_popped(self, x) -> None:
  70. pass
  71. def cmd_output_marking(self, is_start: Optional[bool], data: str = '') -> None:
  72. if is_start:
  73. self.last_cmd_at = monotonic()
  74. self.last_cmd_cmdline = decode_cmdline(data) if data else data
  75. else:
  76. if self.last_cmd_at != 0:
  77. self.last_cmd_at = 0
  78. with suppress(Exception):
  79. self.last_cmd_exit_status = int(data)
  80. def request_capabilities(self, q) -> None:
  81. from kitty.terminfo import get_capabilities
  82. for c in get_capabilities(q, None):
  83. self.write(c.encode('ascii'))
  84. def desktop_notify(self, osc_code: int, raw_data: memoryview) -> None:
  85. self.notifications.append((osc_code, str(raw_data, 'utf-8')))
  86. def open_url(self, url: str, hyperlink_id: int) -> None:
  87. self.open_urls.append((url, hyperlink_id))
  88. def clipboard_control(self, data: memoryview, is_partial: bool = False) -> None:
  89. self.cc_buf.append((str(data, 'utf-8'), is_partial))
  90. def clear(self) -> None:
  91. self.wtcbuf = b''
  92. self.iconbuf = self.colorbuf = self.ctbuf = ''
  93. self.titlebuf = []
  94. self.printbuf = []
  95. self.color_control_responses = []
  96. self.notifications = []
  97. self.open_urls = []
  98. self.cc_buf = []
  99. self.bell_count = 0
  100. self.clone_cmds = []
  101. self.current_clone_data = ''
  102. self.last_cmd_exit_status = sys.maxsize
  103. self.last_cmd_cmdline = ''
  104. self.last_cmd_at = 0
  105. self.num_of_resize_events = 0
  106. def on_bell(self) -> None:
  107. self.bell_count += 1
  108. def on_activity_since_last_focus(self) -> None:
  109. pass
  110. def on_mouse_event(self, event):
  111. ev = MouseEvent(**event)
  112. opts = get_options()
  113. action_def = opts.mousemap.get(ev)
  114. if not action_def:
  115. return False
  116. self.current_mouse_button = ev.button
  117. for action in opts.alias_map.resolve_aliases(action_def, 'mouse_map'):
  118. getattr(self, action.func)(*action.args)
  119. self.current_mouse_button = 0
  120. return True
  121. def handle_remote_print(self, msg):
  122. text = process_remote_print(msg)
  123. self.printbuf.append(text)
  124. def handle_remote_cmd(self, msg):
  125. pass
  126. def handle_remote_clone(self, msg):
  127. msg = str(msg, 'utf-8')
  128. if not msg:
  129. if self.current_clone_data:
  130. cdata, self.current_clone_data = self.current_clone_data, ''
  131. from kitty.launch import CloneCmd
  132. self.clone_cmds.append(CloneCmd(cdata))
  133. self.current_clone_data = ''
  134. return
  135. num, rest = msg.split(':', 1)
  136. if num == '0' or len(self.current_clone_data) > 1024 * 1024:
  137. self.current_clone_data = ''
  138. self.current_clone_data += rest
  139. def handle_remote_ssh(self, msg):
  140. from kittens.ssh.utils import get_ssh_data
  141. if self.pty:
  142. for line in get_ssh_data(msg, "testing"):
  143. self.pty.write_to_child(line)
  144. def handle_remote_echo(self, msg):
  145. from base64 import standard_b64decode
  146. if self.pty:
  147. data = standard_b64decode(msg)
  148. self.pty.write_to_child(data)
  149. def file_transmission(self, data):
  150. if self.ftc:
  151. self.ftc.handle_serialized_command(data)
  152. def filled_line_buf(ynum=5, xnum=5, cursor=Cursor()):
  153. ans = LineBuf(ynum, xnum)
  154. cursor.x = 0
  155. for i in range(ynum):
  156. t = (f'{i}') * xnum
  157. ans.line(i).set_text(t, 0, xnum, cursor)
  158. return ans
  159. def filled_cursor():
  160. ans = Cursor()
  161. ans.bold = ans.italic = ans.reverse = ans.strikethrough = ans.dim = True
  162. ans.fg = 0x101
  163. ans.bg = 0x201
  164. ans.decoration_fg = 0x301
  165. return ans
  166. def filled_history_buf(ynum=5, xnum=5, cursor=Cursor()):
  167. lb = filled_line_buf(ynum, xnum, cursor)
  168. ans = HistoryBuf(ynum, xnum)
  169. for i in range(ynum):
  170. ans.push(lb.line(i))
  171. return ans
  172. def retry_on_failure(max_attempts=2, sleep_duration=2):
  173. def decorator(func):
  174. @wraps(func)
  175. def wrapper(*args, **kwargs):
  176. for attempt in range(max_attempts):
  177. try:
  178. return func(*args, **kwargs)
  179. except Exception as e:
  180. if attempt < max_attempts - 1: # Don't sleep on the last attempt
  181. time.sleep(sleep_duration)
  182. else:
  183. raise e # Re-raise the last exception
  184. return wrapper
  185. return decorator
  186. class BaseTest(TestCase):
  187. ae = TestCase.assertEqual
  188. maxDiff = 2048
  189. is_ci = os.environ.get('CI') == 'true'
  190. def rmtree_ignoring_errors(self, tdir):
  191. try:
  192. shutil.rmtree(tdir)
  193. except FileNotFoundError as err:
  194. print('Failed to delete the directory:', tdir, 'with error:', err, file=sys.stderr)
  195. def tearDown(self):
  196. set_options(None)
  197. def set_options(self, options=None):
  198. final_options = {'scrollback_pager_history_size': 1024, 'click_interval': 0.5}
  199. if options:
  200. final_options.update(options)
  201. options = Options(merge_result_dicts(defaults._asdict(), final_options))
  202. finalize_keys(options, {})
  203. finalize_mouse_mappings(options, {})
  204. set_options(options)
  205. return options
  206. def cmd_to_run_python_code(self, code):
  207. from kitty.constants import kitty_exe
  208. return [kitty_exe(), '+runpy', code]
  209. def create_screen(self, cols=5, lines=5, scrollback=5, cell_width=10, cell_height=20, options=None):
  210. self.set_options(options)
  211. c = Callbacks()
  212. s = Screen(c, lines, cols, scrollback, cell_width, cell_height, 0, c)
  213. c.color_profile = s.color_profile
  214. return s
  215. def create_pty(
  216. self, argv=None, cols=80, lines=100, scrollback=100, cell_width=10, cell_height=20,
  217. options=None, cwd=None, env=None, stdin_fd=None, stdout_fd=None
  218. ):
  219. self.set_options(options)
  220. return PTY(argv, lines, cols, scrollback, cell_width, cell_height, cwd, env, stdin_fd=stdin_fd, stdout_fd=stdout_fd)
  221. def assertEqualAttributes(self, c1, c2):
  222. x1, y1, c1.x, c1.y = c1.x, c1.y, 0, 0
  223. x2, y2, c2.x, c2.y = c2.x, c2.y, 0, 0
  224. try:
  225. self.assertEqual(c1, c2)
  226. finally:
  227. c1.x, c1.y, c2.x, c2.y = x1, y1, x2, y2
  228. debug_stdout = debug_stderr = -1
  229. @contextmanager
  230. def forwardable_stdio():
  231. global debug_stderr, debug_stdout
  232. debug_stdout = fd = os.dup(sys.stdout.fileno())
  233. os.set_inheritable(fd, True)
  234. debug_stderr = fd = os.dup(sys.stderr.fileno())
  235. os.set_inheritable(fd, True)
  236. try:
  237. yield
  238. finally:
  239. os.close(debug_stderr)
  240. os.close(debug_stdout)
  241. debug_stderr = debug_stdout = -1
  242. class PTY:
  243. def __init__(
  244. self, argv=None, rows=25, columns=80, scrollback=100, cell_width=10, cell_height=20,
  245. cwd=None, env=None, stdin_fd=None, stdout_fd=None
  246. ):
  247. self.is_child = False
  248. if isinstance(argv, str):
  249. argv = shlex.split(argv)
  250. self.write_buf = b''
  251. if argv is None:
  252. from kitty.child import openpty
  253. self.master_fd, self.slave_fd = openpty()
  254. else:
  255. self.child_pid, self.master_fd = fork()
  256. self.is_child = self.child_pid == CHILD
  257. self.child_waited_for = False
  258. if self.is_child:
  259. while read_screen_size().width != columns * cell_width:
  260. time.sleep(0.01)
  261. if cwd:
  262. os.chdir(cwd)
  263. if stdin_fd is not None:
  264. os.dup2(stdin_fd, STDIN_FILENO)
  265. os.close(stdin_fd)
  266. if stdout_fd is not None:
  267. os.dup2(stdout_fd, STDOUT_FILENO)
  268. os.close(stdout_fd)
  269. signal.pthread_sigmask(signal.SIG_SETMASK, ())
  270. env = os.environ if env is None else env
  271. if debug_stdout > -1:
  272. env['KITTY_STDIO_FORWARDED'] = str(debug_stdout)
  273. os.execvpe(argv[0], argv, env)
  274. if stdin_fd is not None:
  275. os.close(stdin_fd)
  276. if stdout_fd is not None:
  277. os.close(stdout_fd)
  278. os.set_blocking(self.master_fd, False)
  279. self.cell_width = cell_width
  280. self.cell_height = cell_height
  281. self.set_window_size(rows=rows, columns=columns)
  282. self.callbacks = Callbacks(self)
  283. self.screen = Screen(self.callbacks, rows, columns, scrollback, cell_width, cell_height, 0, self.callbacks)
  284. self.received_bytes = b''
  285. def turn_off_echo(self):
  286. s = termios.tcgetattr(self.master_fd)
  287. s[3] &= ~termios.ECHO
  288. termios.tcsetattr(self.master_fd, termios.TCSANOW, s)
  289. def is_echo_on(self):
  290. s = termios.tcgetattr(self.master_fd)
  291. return True if s[3] & termios.ECHO else False
  292. def __del__(self):
  293. if not self.is_child:
  294. if hasattr(self, 'master_fd'):
  295. os.close(self.master_fd)
  296. del self.master_fd
  297. if hasattr(self, 'slave_fd'):
  298. os.close(self.slave_fd)
  299. del self.slave_fd
  300. if self.child_pid > 0 and not self.child_waited_for:
  301. os.waitpid(self.child_pid, 0)
  302. self.child_waited_for = True
  303. def write_to_child(self, data, flush=False):
  304. if isinstance(data, str):
  305. data = data.encode('utf-8')
  306. self.write_buf += data
  307. if flush:
  308. self.process_input_from_child(0)
  309. def send_cmd_to_child(self, cmd, flush=False):
  310. self.callbacks.last_cmd_exit_status = sys.maxsize
  311. self.last_cmd = cmd
  312. self.write_to_child(cmd + '\r', flush=flush)
  313. def process_input_from_child(self, timeout=10):
  314. rd, wd, _ = select.select([self.master_fd], [self.master_fd] if self.write_buf else [], [], max(0, timeout))
  315. if wd:
  316. n = os.write(self.master_fd, self.write_buf)
  317. self.write_buf = self.write_buf[n:]
  318. bytes_read = 0
  319. if rd:
  320. data = os.read(self.master_fd, io.DEFAULT_BUFFER_SIZE)
  321. bytes_read += len(data)
  322. self.received_bytes += data
  323. parse_bytes(self.screen, data)
  324. return bytes_read
  325. def wait_till(self, q, timeout=10, timeout_msg=None):
  326. end_time = time.monotonic() + timeout
  327. while not q() and time.monotonic() <= end_time:
  328. self.process_input_from_child(timeout=end_time - time.monotonic())
  329. if not q():
  330. msg = 'The condition was not met'
  331. if timeout_msg is not None:
  332. msg = timeout_msg()
  333. raise TimeoutError(f'Timed out: {msg}. Screen contents: \n {repr(self.screen_contents())}')
  334. def wait_till_child_exits(self, timeout=30 if BaseTest.is_ci else 10, require_exit_code=None):
  335. end_time = time.monotonic() + timeout
  336. while time.monotonic() <= end_time:
  337. si_pid, status = os.waitpid(self.child_pid, os.WNOHANG)
  338. if si_pid == self.child_pid and os.WIFEXITED(status):
  339. ec = os.waitstatus_to_exitcode(status) if hasattr(os, 'waitstatus_to_exitcode') else require_exit_code
  340. self.child_waited_for = True
  341. if require_exit_code is not None and ec != require_exit_code:
  342. raise AssertionError(
  343. f'Child exited with exit status: {status} code: {ec} != {require_exit_code}.'
  344. f' Screen contents:\n{self.screen_contents()}')
  345. return status
  346. with suppress(OSError):
  347. self.process_input_from_child(timeout=0.02)
  348. raise AssertionError(f'Child did not exit in {timeout} seconds. Screen contents:\n{self.screen_contents()}')
  349. def set_window_size(self, rows=25, columns=80, send_signal=True):
  350. if hasattr(self, 'screen'):
  351. self.screen.resize(rows, columns)
  352. if send_signal:
  353. x_pixels = columns * self.cell_width
  354. y_pixels = rows * self.cell_height
  355. s = struct.pack('HHHH', rows, columns, x_pixels, y_pixels)
  356. fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, s)
  357. def screen_contents(self):
  358. lines = []
  359. for i in range(self.screen.lines):
  360. x = str(self.screen.line(i))
  361. if x:
  362. lines.append(x)
  363. return '\n'.join(lines)
  364. def last_cmd_output(self, as_ansi=False, add_wrap_markers=False):
  365. from kitty.window import cmd_output
  366. return cmd_output(self.screen, as_ansi=as_ansi, add_wrap_markers=add_wrap_markers)