20 KB

  1. #!/usr/bin/env python
  2. # License: GPLv3 Copyright: 2022, Kovid Goyal <kovid at>
  3. import os
  4. import shlex
  5. import shutil
  6. import subprocess
  7. import tempfile
  8. import unittest
  9. from contextlib import contextmanager
  10. from functools import lru_cache, partial
  11. from kitty.bash import decode_ansi_c_quoted_string
  12. from kitty.constants import is_macos, kitten_exe, kitty_base_dir, shell_integration_dir, terminfo_dir
  13. from kitty.fast_data_types import CURSOR_BEAM, CURSOR_BLOCK, CURSOR_UNDERLINE
  14. from kitty.shell_integration import setup_bash_env, setup_fish_env, setup_zsh_env
  15. from . import BaseTest
  16. @lru_cache
  17. def bash_ok():
  18. v = shutil.which('bash')
  19. if not v:
  20. return False
  21. o = subprocess.check_output([v, '-c', 'echo "${BASH_VERSINFO[0]}\n${BASH_VERSINFO[4]}"']).decode('utf-8').splitlines()
  22. if not o:
  23. return False
  24. major_ver, relstatus = o[0], o[-1]
  25. return int(major_ver) >= 5 and relstatus == 'release'
  26. def basic_shell_env(home_dir):
  27. ans = {
  28. 'PATH': os.environ.get('PATH', '/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin'),
  29. 'HOME': home_dir,
  30. 'TERM': 'xterm-kitty',
  31. 'TERMINFO': terminfo_dir,
  32. 'KITTY_SHELL_INTEGRATION': 'enabled',
  33. 'KITTY_INSTALLATION_DIR': kitty_base_dir,
  36. 'WEZTERM_SHELL_SKIP_ALL': '1', # dont fail if WezTerm's system wide, default on (why?) shell integration is installed
  37. }
  38. for x in ('USER', 'LANG'):
  39. if os.environ.get(x):
  40. ans[x] = os.environ[x]
  41. return ans
  42. def safe_env_for_running_shell(argv, home_dir, rc='', shell='zsh', with_kitten=False):
  43. ans = basic_shell_env(home_dir)
  44. if shell == 'zsh':
  45. argv.insert(1, '--noglobalrcs')
  46. with open(os.path.join(home_dir, '.zshrc'), 'w') as f:
  47. print(rc + '\nZLE_RPROMPT_INDENT=0', file=f)
  48. setup_zsh_env(ans, argv)
  49. elif shell == 'fish':
  50. conf_dir = os.path.join(home_dir, '.config', 'fish')
  51. os.makedirs(conf_dir, exist_ok=True)
  52. # Avoid generating unneeded completion scripts
  53. os.makedirs(os.path.join(home_dir, '.local', 'share', 'fish', 'generated_completions'), exist_ok=True)
  54. with open(os.path.join(conf_dir, ''), 'w') as f:
  55. print(rc + '\n', file=f)
  56. setup_fish_env(ans, argv)
  57. elif shell == 'bash':
  58. bashrc = os.path.join(home_dir, '.bashrc')
  59. if with_kitten:
  61. else:
  62. setup_bash_env(ans, argv)
  63. ans['KITTY_BASH_INJECT'] += ' posix'
  64. ans['KITTY_BASH_POSIX_ENV'] = bashrc
  65. with open(bashrc, 'w') as f:
  66. # ensure LINES and COLUMNS are kept up to date
  67. print('shopt -s checkwinsize', file=f)
  68. if rc:
  69. print(rc, file=f)
  70. return ans
  71. class ShellIntegration(BaseTest):
  72. with_kitten = False
  73. @contextmanager
  74. def run_shell(self, shell='zsh', rc='', cmd='', setup_env=None):
  75. home_dir = self.home_dir = os.path.realpath(tempfile.mkdtemp())
  76. cmd = cmd or shell
  77. cmd = shlex.split(cmd.format(**locals()))
  78. env = (setup_env or safe_env_for_running_shell)(cmd, home_dir, rc=rc, shell=shell, with_kitten=self.with_kitten)
  80. try:
  81. if self.with_kitten:
  82. cmd = [kitten_exe(), 'run-shell', '--shell', shlex.join(cmd)]
  83. pty = self.create_pty(cmd, cwd=home_dir, env=env, cols=180)
  84. i = 10
  85. while i > 0 and not pty.screen_contents().strip():
  86. pty.process_input_from_child()
  87. i -= 1
  88. yield pty
  89. finally:
  90. if os.path.exists(home_dir):
  91. shutil.rmtree(home_dir)
  92. @unittest.skipUnless(shutil.which('zsh'), 'zsh not installed')
  93. def test_zsh_integration(self):
  94. ps1, rps1 = 'left>', '<right'
  95. with self.run_shell(
  96. rc=f'''
  97. PS1="{ps1}"
  98. RPS1="{rps1}"
  99. ''') as pty:
  100. q = ps1 + ' ' * (pty.screen.columns - len(ps1) - len(rps1)) + rps1
  101. try:
  102. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM)
  103. except TimeoutError as e:
  104. raise AssertionError(f'Cursor was not changed to beam. Screen contents: {repr(pty.screen_contents())}') from e
  105. pty.wait_till(lambda: pty.screen_contents() == q)
  106.[-1], '~')
  107. pty.callbacks.clear()
  108. pty.send_cmd_to_child('mkdir test && ls -a')
  109. self.assert_command(pty)
  110. pty.wait_till(lambda: pty.screen_contents().count(rps1) == 2)
  111.[-2:], ['mkdir test && ls -a', '~'])
  112. q = '\n'.join(str(pty.screen.line(i)) for i in range(1, pty.screen.cursor.y))
  113., q)
  114. # shrink the screen
  115. pty.write_to_child(r'echo $COLUMNS')
  116. pty.set_window_size(rows=20, columns=40)
  117. q = ps1 + 'echo $COLUMNS' + ' ' * (40 - len(ps1) - len(rps1) - len('echo $COLUMNS')) + rps1
  118. pty.process_input_from_child()
  119. def redrawn():
  120. q = pty.screen_contents()
  121. return '$COLUMNS' in q and q.count(rps1) == 2 and q.count(ps1) == 2
  122. pty.wait_till(redrawn)
  123., str(pty.screen.line(pty.screen.cursor.y)))
  124. pty.write_to_child('\r')
  125. self.assert_command(pty, 'echo $COLUMNS')
  126. pty.wait_till(lambda: pty.screen_contents().count(rps1) == 3)
  127.'40', str(pty.screen.line(pty.screen.cursor.y - 1)))
  128., str(pty.screen.line(pty.screen.cursor.y - 2)))
  129. pty.send_cmd_to_child('clear')
  130. self.assert_command(pty)
  131. q = ps1 + ' ' * (pty.screen.columns - len(ps1) - len(rps1)) + rps1
  132. pty.wait_till(lambda: pty.screen_contents() == q)
  133. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM)
  134. pty.send_cmd_to_child('cat')
  135. pty.wait_till(lambda: pty.screen.cursor.shape == 0)
  136. pty.write_to_child('\x04')
  137. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM)
  138. self.assert_command(pty)
  139. with self.run_shell(rc=f'''PS1="{ps1}"''') as pty:
  140. pty.callbacks.clear()
  141. pty.send_cmd_to_child('printf "%s\x16\a%s" "a" "b"')
  142. pty.wait_till(lambda: 'ab' in pty.screen_contents())
  143. self.assertTrue(pty.screen.last_reported_cwd.decode().endswith(self.home_dir))
  144. self.assertIn('%s^G%s', pty.screen_contents())
  145. q = os.path.join(self.home_dir, 'testing-cwd-notification-🐱')
  146. os.mkdir(q)
  147. pty.send_cmd_to_child(f'cd {q}')
  148. pty.wait_till(lambda: pty.screen.last_reported_cwd.decode().endswith(q))
  149. if not is_macos: # Fails on older macOS like the one used to build kitty binary because of unicode encoding issues
  150. self.assert_command(pty)
  151. with self.run_shell(rc=f'''PS1="{ps1}"\nexport ES="a\n b c\nd"''') as pty:
  152. pty.callbacks.clear()
  153. pty.send_cmd_to_child('clone-in-kitty')
  154. pty.wait_till(lambda: len(pty.callbacks.clone_cmds) == 1)
  155. self.assert_command(pty)
  156. env = pty.callbacks.clone_cmds[0].env
  157.'ES'), 'a\n b c\nd')
  158. @unittest.skipUnless(shutil.which('fish'), 'fish not installed')
  159. def test_fish_integration(self):
  160. fish_prompt, right_prompt = 'left>', '<right'
  161. completions_dir = os.path.join(kitty_base_dir, 'shell-integration', 'fish', 'vendor_completions.d')
  162. with self.run_shell(
  163. shell='fish',
  164. rc=f'''
  165. set -g fish_greeting
  166. function fish_prompt; echo -n "{fish_prompt}"; end
  167. function fish_right_prompt; echo -n "{right_prompt}"; end
  168. function _test_comp_path; contains "{completions_dir}" $fish_complete_path; and echo ok; end
  169. function _set_key; set -g fish_key_bindings fish_$argv[1]_key_bindings; end
  170. function _set_status_prompt; function fish_prompt; echo -n "$pipestatus $status {fish_prompt}"; end; end
  171. ''') as pty:
  172. q = fish_prompt + ' ' * (pty.screen.columns - len(fish_prompt) - len(right_prompt)) + right_prompt
  173. pty.wait_till(lambda: pty.screen_contents().count(right_prompt) == 1)
  174., q)
  175. # shell integration dir must no be in XDG_DATA_DIRS
  176. cmd = f'string match -q -- "*{shell_integration_dir}*" "$XDG_DATA_DIRS" || echo "XDD_OK"'
  177. pty.send_cmd_to_child(cmd)
  178. pty.wait_till(lambda: 'XDD_OK' in pty.screen_contents())
  179. self.assert_command(pty, cmd)
  180. # CWD reporting
  181. self.assertTrue(pty.screen.last_reported_cwd.decode().endswith(self.home_dir))
  182. q = os.path.join(self.home_dir, 'testing-cwd-notification-🐱')
  183. os.mkdir(q)
  184. pty.send_cmd_to_child(f'cd {q}')
  185. self.assert_command(pty)
  186. pty.wait_till(lambda: pty.screen.last_reported_cwd.decode().endswith(q))
  187. pty.send_cmd_to_child('cd -')
  188. pty.wait_till(lambda: pty.screen.last_reported_cwd.decode().endswith(self.home_dir))
  189. # completion and prompt marking
  190. pty.wait_till(lambda: 'cd -' not in pty.screen_contents().splitlines()[-1])
  191. pty.send_cmd_to_child('clear')
  192. pty.wait_till(lambda: pty.screen_contents().count(right_prompt) == 1)
  193. pty.send_cmd_to_child('_test_comp_path')
  194. self.assert_command(pty)
  195. pty.wait_till(lambda: pty.screen_contents().count(right_prompt) == 2)
  196. q = '\n'.join(str(pty.screen.line(i)) for i in range(1, pty.screen.cursor.y))
  197., 'ok')
  198., q)
  199. # resize and redraw (fish_handle_reflow)
  200. pty.write_to_child(r'echo $COLUMNS')
  201. pty.set_window_size(rows=20, columns=40)
  202. q = fish_prompt + 'echo $COLUMNS' + ' ' * (40 - len(fish_prompt) - len(right_prompt) - len('echo $COLUMNS')) + right_prompt
  203. pty.process_input_from_child()
  204. def redrawn():
  205. q = pty.screen_contents()
  206. return '$COLUMNS' in q and q.count(right_prompt) == 2 and q.count(fish_prompt) == 2
  207. pty.wait_till(redrawn)
  208., str(pty.screen.line(pty.screen.cursor.y)))
  209. pty.write_to_child('\r')
  210. pty.wait_till(lambda: pty.screen_contents().count(right_prompt) == 3)
  211. self.assert_command(pty, 'echo $COLUMNS')
  212.'40', str(pty.screen.line(pty.screen.cursor.y - 1)))
  213., str(pty.screen.line(pty.screen.cursor.y - 2)))
  214. # cursor shapes
  215. pty.send_cmd_to_child('clear')
  216. q = fish_prompt + ' ' * (pty.screen.columns - len(fish_prompt) - len(right_prompt)) + right_prompt
  217. pty.wait_till(lambda: pty.screen_contents() == q)
  218. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM)
  219. pty.send_cmd_to_child('echo; cat')
  220. pty.wait_till(lambda: pty.screen.cursor.shape == 0 and pty.screen.cursor.y > 1)
  221. pty.write_to_child('\x04')
  222. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM)
  223. pty.send_cmd_to_child('_set_key vi')
  224. pty.wait_till(lambda: pty.screen_contents().count(right_prompt) == 3)
  225. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM)
  226. pty.write_to_child('\x1b')
  227. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BLOCK)
  228. pty.write_to_child('r')
  229. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_UNDERLINE)
  230. pty.write_to_child('\x1b')
  231. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BLOCK)
  232. pty.write_to_child('i')
  233. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM)
  234. pty.send_cmd_to_child('_set_key default')
  235. self.assert_command(pty)
  236. pty.wait_till(lambda: pty.screen_contents().count(right_prompt) == 4)
  237. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM)
  238. pty.send_cmd_to_child('exit')
  239. def assert_command(self, pty, cmd='', exit_status=0):
  240. cmd = cmd or pty.last_cmd
  241. pty.wait_till(lambda: pty.callbacks.last_cmd_exit_status == 0, timeout_msg=lambda: f'{pty.callbacks.last_cmd_exit_status=} != {exit_status}')
  242. pty.wait_till(lambda: pty.callbacks.last_cmd_cmdline == cmd, timeout_msg=lambda: f'{pty.callbacks.last_cmd_cmdline=!r} != {cmd!r}')
  243. @unittest.skipUnless(bash_ok(), 'bash not installed, too old, or debug build')
  244. def test_bash_integration(self):
  245. ps1 = 'prompt> '
  246. with self.run_shell(
  247. shell='bash', rc=f'''
  248. PS1="{ps1}"
  249. ''') as pty:
  250. try:
  251. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM)
  252. except TimeoutError as e:
  253. raise AssertionError(f'Cursor was not changed to beam. Screen contents: {repr(pty.screen_contents())}') from e
  254. pty.wait_till(lambda: pty.screen_contents().count(ps1) == 1)
  255., ps1)
  256. pty.wait_till(lambda: pty.callbacks.titlebuf[-1:] == ['~'])
  257.[-1], '~')
  258. pty.callbacks.clear()
  259. cmd = 'mkdir test && ls -a'
  260. pty.send_cmd_to_child(cmd)
  261. pty.wait_till(lambda: pty.callbacks.titlebuf[-2:] == [cmd, '~'])
  262. pty.wait_till(lambda: pty.screen_contents().count(ps1) == 2)
  263. self.assert_command(pty, cmd)
  264. q = '\n'.join(str(pty.screen.line(i)) for i in range(1, pty.screen.cursor.y))
  265., q)
  266. # shrink the screen
  267. pty.write_to_child(r'echo $COLUMNS')
  268. pty.set_window_size(rows=20, columns=40)
  269. pty.process_input_from_child()
  270. def redrawn():
  271. q = pty.screen_contents()
  272. return '$COLUMNS' in q and q.count(ps1) == 2
  273. pty.wait_till(redrawn)
  274. + 'echo $COLUMNS', str(pty.screen.line(pty.screen.cursor.y)))
  275. pty.write_to_child('\r')
  276. pty.wait_till(lambda: pty.screen_contents().count(ps1) == 3)
  277.'40', str(pty.screen.line(pty.screen.cursor.y - 1)))
  278. + 'echo $COLUMNS', str(pty.screen.line(pty.screen.cursor.y - 2)))
  279. pty.send_cmd_to_child('clear')
  280. pty.wait_till(lambda: pty.screen_contents() == ps1)
  281. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM)
  282. pty.send_cmd_to_child('cat')
  283. pty.wait_till(lambda: pty.screen.cursor.shape == 0)
  284. pty.write_to_child('\x04')
  285. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM)
  286. pty.write_to_child('\x04')
  287. pty.send_cmd_to_child('clear')
  288. pty.wait_till(lambda: pty.callbacks.titlebuf)
  289. with self.run_shell(shell='bash', rc=f'''PS1="{ps1}"\ndeclare LOCAL_KSI_VAR=1''') as pty:
  290. pty.callbacks.clear()
  291. pty.send_cmd_to_child('declare')
  292. pty.wait_till(lambda: 'LOCAL_KSI_VAR' in pty.screen_contents())
  293. self.assert_command(pty, 'declare')
  294. with self.run_shell(shell='bash', rc=f'''PS1="{ps1}"''') as pty:
  295. pty.callbacks.clear()
  296. pty.send_cmd_to_child('printf "%s\x16\a%s" "a" "b"')
  297. pty.wait_till(lambda: pty.screen_contents().count(ps1) == 2)
  298., f'{ps1}printf "%s^G%s" "a" "b"\nab{ps1}')
  299. self.assertTrue(pty.screen.last_reported_cwd.decode().endswith(self.home_dir))
  300. pty.send_cmd_to_child('echo $HISTFILE')
  301. pty.wait_till(lambda: '.bash_history' in pty.screen_contents().replace('\n', ''))
  302. q = os.path.join(self.home_dir, 'testing-cwd-notification-🐱')
  303. os.mkdir(q)
  304. pty.send_cmd_to_child(f'cd {q}')
  305. pty.wait_till(lambda: pty.screen.last_reported_cwd.decode().endswith(q))
  306. for ps1 in ('line1\\nline\\2\\prompt> ', 'line1\nprompt> ', 'line1\\nprompt> ',):
  307. with self.subTest(ps1=ps1), self.run_shell(
  308. shell='bash', rc=f'''
  309. PS1="{ps1}"
  310. ''') as pty:
  311. ps1 = ps1.replace('\\n', '\n')
  312. pty.wait_till(lambda: pty.screen_contents().count(ps1) == 1)
  313. pty.send_cmd_to_child('echo test')
  314. pty.wait_till(lambda: pty.screen_contents().count(ps1) == 2)
  315., f'{ps1}echo test\ntest\n{ps1}')
  316. pty.write_to_child(r'echo $COLUMNS')
  317. pty.set_window_size(rows=20, columns=40)
  318. pty.process_input_from_child()
  319. pty.wait_till(redrawn)
  320.[-1] + 'echo $COLUMNS', str(pty.screen.line(pty.screen.cursor.y)))
  321. pty.write_to_child('\r')
  322. pty.wait_till(lambda: pty.screen_contents().count(ps1) == 3)
  323.'40', str(pty.screen.line(pty.screen.cursor.y - len(ps1.splitlines()))))
  324.[-1] + 'echo $COLUMNS', str(pty.screen.line(pty.screen.cursor.y - 1 - len(ps1.splitlines()))))
  325. self.assert_command(pty, 'echo $COLUMNS')
  326. # test startup file sourcing
  327. def setup_env(excluded, argv, home_dir, rc='', shell='bash', with_kitten=self.with_kitten):
  328. ans = basic_shell_env(home_dir)
  329. if not with_kitten:
  330. setup_bash_env(ans, argv)
  331. for x in {'profile', 'bash.bashrc', '.bash_profile', '.bash_login', '.profile', '.bashrc', 'rcfile'} - excluded:
  332. with open(os.path.join(home_dir, x), 'w') as f:
  333. if x == '.bashrc' and rc:
  334. print(rc, file=f)
  335. else:
  336. print(f'echo [{x}]', file=f)
  337. ans['KITTY_BASH_ETC_LOCATION'] = home_dir
  338. ans['PS1'] = 'PROMPT $ '
  339. return ans
  340. def run_test(argv, *expected, excluded=(), rc='', wait_string='PROMPT $', assert_not_in=False):
  341. with self.subTest(argv=argv), self.run_shell(shell='bash', setup_env=partial(setup_env, set(excluded)), cmd=argv, rc=rc) as pty:
  342. pty.wait_till(lambda: wait_string in pty.screen_contents())
  343. q = pty.screen_contents()
  344. for x in expected:
  345. if assert_not_in:
  346. self.assertNotIn(f'[{x}]', q)
  347. else:
  348. self.assertIn(f'[{x}]', q)
  349. run_test('bash', 'bash.bashrc', '.bashrc')
  350. run_test('bash --rcfile rcfile', 'bash.bashrc', 'rcfile')
  351. run_test('bash --init-file rcfile', 'bash.bashrc', 'rcfile')
  352. run_test('bash --norc')
  353. run_test('bash -l', 'profile', '.bash_profile')
  354. run_test('bash --noprofile -l')
  355. run_test('bash -l', 'profile', '.bash_login', excluded=('.bash_profile',))
  356. run_test('bash -l', 'profile', '.profile', excluded=('.bash_profile', '.bash_login'))
  357. # test argument parsing and non-interactive shell
  358. run_test('bash -s arg1 --rcfile rcfile', 'rcfile', rc='echo ok;read', wait_string='ok', assert_not_in=True)
  359. run_test('bash +O login_shell -ic "echo ok;read"', 'bash.bashrc', excluded=('.bash_profile'), wait_string='ok', assert_not_in=True)
  360. run_test('bash -l .bashrc', 'profile', rc='echo ok;read', wait_string='ok', assert_not_in=True)
  361. run_test('bash -il -- .bashrc', 'profile', rc='echo ok;read', wait_string='ok')
  362. with self.run_shell(shell='bash', setup_env=partial(setup_env, set()), cmd='bash',
  363. rc=f'''PS1="{ps1}"\nexport ES=$'a\n `b` c\n$d'\nexport ES2="XXX" ''') as pty:
  364. pty.callbacks.clear()
  365. pty.send_cmd_to_child('clone-in-kitty')
  366. pty.wait_till(lambda: len(pty.callbacks.clone_cmds) == 1)
  367. env = pty.callbacks.clone_cmds[0].env
  368.'ES'), 'a\n `b` c\n$d', f'Screen contents: {pty.screen_contents()!r}')
  369.'ES2'), 'XXX', f'Screen contents: {pty.screen_contents()!r}')
  370. for q, e in {
  371. 'a': 'a',
  372. r'a\ab': 'a\ab',
  373. r'a\x7z': 'a\x07z',
  374. r'a\7b': 'a\007b',
  375. r'a\U1f345x': 'a🍅x',
  376. r'a\c b': 'a\0b',
  377. }.items():
  378."$'{q}'"), e, f'Failed to decode: {q!r}')
  379. class ShellIntegrationWithKitten(ShellIntegration):
  380. with_kitten = True