loop.py 17 KB


  1. #!/usr/bin/env python
  2. # License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
  3. import asyncio
  4. import codecs
  5. import io
  6. import os
  7. import re
  8. import selectors
  9. import signal
  10. import sys
  11. import termios
  12. from collections.abc import Callable, Generator
  13. from contextlib import contextmanager, suppress
  14. from enum import Enum, IntFlag, auto
  15. from functools import partial
  16. from typing import Any, NamedTuple
  17. from kitty.constants import is_macos
  18. from kitty.fast_data_types import FILE_TRANSFER_CODE, close_tty, normal_tty, open_tty, parse_input_from_terminal, raw_tty
  19. from kitty.key_encoding import ALT, CTRL, SHIFT, backspace_key, decode_key_event, enter_key
  20. from kitty.typing import ImageManagerType, KeyEventType, Protocol
  21. from kitty.utils import ScreenSize, ScreenSizeGetter, screen_size_function, write_all
  22. from .handler import Handler
  23. from .operations import MouseTracking, init_state, reset_state
  24. class BinaryWrite(Protocol):
  25. def write(self, data: bytes) -> None:
  26. pass
  27. def flush(self) -> None:
  28. pass
  29. def debug_write(*a: Any, **kw: Any) -> None:
  30. from base64 import standard_b64encode
  31. fobj = kw.pop('file', sys.stderr.buffer)
  32. buf = io.StringIO()
  33. kw['file'] = buf
  34. print(*a, **kw)
  35. stext = buf.getvalue()
  36. for i in range(0, len(stext), 256):
  37. chunk = stext[i:i + 256]
  38. text = b'\x1bP@kitty-print|' + standard_b64encode(chunk.encode('utf-8')) + b'\x1b\\'
  39. fobj.write(text)
  40. fobj.flush()
  41. class Debug:
  42. fobj: BinaryWrite | None = None
  43. def __call__(self, *a: Any, **kw: Any) -> None:
  44. kw['file'] = self.fobj or sys.stdout.buffer
  45. debug_write(*a, **kw)
  46. debug = Debug()
  47. ftc_code = str(FILE_TRANSFER_CODE)
  48. class TermManager:
  49. def __init__(
  50. self, optional_actions: int = termios.TCSANOW, use_alternate_screen: bool = True,
  51. mouse_tracking: MouseTracking = MouseTracking.none
  52. ) -> None:
  53. self.extra_finalize: str | None = None
  54. self.optional_actions = optional_actions
  55. self.use_alternate_screen = use_alternate_screen
  56. self.mouse_tracking = mouse_tracking
  57. def set_state_for_loop(self, set_raw: bool = True) -> None:
  58. if set_raw:
  59. raw_tty(self.tty_fd, self.original_termios)
  60. write_all(self.tty_fd, init_state(self.use_alternate_screen, self.mouse_tracking))
  61. def reset_state_to_original(self) -> None:
  62. normal_tty(self.tty_fd, self.original_termios)
  63. if self.extra_finalize:
  64. write_all(self.tty_fd, self.extra_finalize)
  65. write_all(self.tty_fd, reset_state(self.use_alternate_screen))
  66. @contextmanager
  67. def suspend(self) -> Generator['TermManager', None, None]:
  68. self.reset_state_to_original()
  69. yield self
  70. self.set_state_for_loop()
  71. def __enter__(self) -> 'TermManager':
  72. self.tty_fd, self.original_termios = open_tty(False, self.optional_actions)
  73. self.set_state_for_loop(set_raw=False)
  74. return self
  75. def __exit__(self, *a: object) -> None:
  76. with suppress(Exception):
  77. self.reset_state_to_original()
  78. close_tty(self.tty_fd, self.original_termios)
  79. del self.tty_fd, self.original_termios
  80. class MouseButton(IntFlag):
  81. NONE, LEFT, MIDDLE, RIGHT, FOURTH, FIFTH, SIXTH, SEVENTH = 0, 1, 2, 4, 8, 16, 32, 64
  82. WHEEL_UP, WHEEL_DOWN, WHEEL_LEFT, WHEEL_RIGHT = -1, -2, -4, -8
  83. bmap = MouseButton.LEFT, MouseButton.MIDDLE, MouseButton.RIGHT
  84. ebmap = MouseButton.FOURTH, MouseButton.FIFTH, MouseButton.SIXTH, MouseButton.SEVENTH
  85. wbmap = MouseButton.WHEEL_UP, MouseButton.WHEEL_DOWN, MouseButton.WHEEL_LEFT, MouseButton.WHEEL_RIGHT
  86. SHIFT_INDICATOR = 1 << 2
  87. ALT_INDICATOR = 1 << 3
  88. CTRL_INDICATOR = 1 << 4
  89. MOTION_INDICATOR = 1 << 5
  90. class EventType(Enum):
  91. PRESS = auto()
  92. RELEASE = auto()
  93. MOVE = auto()
  94. class MouseEvent(NamedTuple):
  95. cell_x: int
  96. cell_y: int
  97. pixel_x: int
  98. pixel_y: int
  99. type: EventType
  100. buttons: MouseButton
  101. mods: int
  102. def pixel_to_cell(px: int, length: int, cell_length: int) -> int:
  103. px = max(0, min(px, length - 1))
  104. return px // cell_length
  105. def decode_sgr_mouse(text: str, screen_size: ScreenSize) -> MouseEvent:
  106. cb_, x_, y_ = text.split(';')
  107. m, y_ = y_[-1], y_[:-1]
  108. cb, x, y = map(int, (cb_, x_, y_))
  109. typ = EventType.RELEASE if m == 'm' else (EventType.MOVE if cb & MOTION_INDICATOR else EventType.PRESS)
  110. buttons: MouseButton = MouseButton.NONE
  111. cb3 = cb & 3
  112. if cb >= 128:
  113. buttons |= ebmap[cb3]
  114. elif cb >= 64:
  115. buttons |= wbmap[cb3]
  116. elif cb3 < 3:
  117. buttons |= bmap[cb3]
  118. mods = 0
  119. if cb & SHIFT_INDICATOR:
  120. mods |= SHIFT
  121. if cb & ALT_INDICATOR:
  122. mods |= ALT
  123. if cb & CTRL_INDICATOR:
  124. mods |= CTRL
  125. return MouseEvent(
  126. pixel_to_cell(x, screen_size.width, screen_size.cell_width), pixel_to_cell(y, screen_size.height, screen_size.cell_height),
  127. x, y, typ, buttons, mods
  128. )
  129. class UnhandledException(Handler):
  130. def __init__(self, tb: str) -> None:
  131. self.tb = tb
  132. def initialize(self) -> None:
  133. self.cmd.clear_screen()
  134. self.cmd.set_scrolling_region()
  135. self.cmd.set_cursor_visible(True)
  136. self.cmd.set_default_colors()
  137. self.write(self.tb.replace('\n', '\r\n'))
  138. self.write('\r\n')
  139. self.write('Press Enter to quit')
  140. def on_key(self, key_event: KeyEventType) -> None:
  141. if key_event.key == 'ENTER':
  142. self.quit_loop(1)
  143. def on_interrupt(self) -> None:
  144. self.quit_loop(1)
  145. on_eot = on_term = on_interrupt
  146. class SignalManager:
  147. def __init__(
  148. self,
  149. loop: asyncio.AbstractEventLoop,
  150. on_winch: Callable[[], None],
  151. on_interrupt: Callable[[], None],
  152. on_term: Callable[[], None],
  153. on_hup: Callable[[], None],
  154. ) -> None:
  155. self.asyncio_loop = loop
  156. self.on_winch, self.on_interrupt, self.on_term = on_winch, on_interrupt, on_term
  157. self.on_hup = on_hup
  158. def __enter__(self) -> None:
  159. self.asyncio_loop.add_signal_handler(signal.SIGWINCH, self.on_winch)
  160. self.asyncio_loop.add_signal_handler(signal.SIGINT, self.on_interrupt)
  161. self.asyncio_loop.add_signal_handler(signal.SIGTERM, self.on_term)
  162. self.asyncio_loop.add_signal_handler(signal.SIGHUP, self.on_hup)
  163. def __exit__(self, *a: Any) -> None:
  164. tuple(map(self.asyncio_loop.remove_signal_handler, (
  165. signal.SIGWINCH, signal.SIGINT, signal.SIGTERM, signal.SIGHUP)))
  166. sanitize_bracketed_paste: str = '[\x03\x04\x0e\x0f\r\x07\x7f\x8d\x8e\x8f\x90\x9b\x9d\x9e\x9f]'
  167. class Loop:
  168. def __init__(
  169. self,
  170. sanitize_bracketed_paste: str = sanitize_bracketed_paste,
  171. optional_actions: int = termios.TCSADRAIN
  172. ):
  173. if is_macos:
  174. # On macOS PTY devices are not supported by the KqueueSelector and
  175. # the PollSelector is broken, causes 100% CPU usage
  176. self.asyncio_loop: asyncio.AbstractEventLoop = asyncio.SelectorEventLoop(selectors.SelectSelector())
  177. asyncio.set_event_loop(self.asyncio_loop)
  178. else:
  179. self.asyncio_loop = asyncio.get_event_loop()
  180. self.return_code = 0
  181. self.overlay_ready_reported = False
  182. self.optional_actions = optional_actions
  183. self.read_buf = ''
  184. self.decoder = codecs.getincrementaldecoder('utf-8')('ignore')
  185. try:
  186. self.iov_limit = max(os.sysconf('SC_IOV_MAX') - 1, 255)
  187. except Exception:
  188. self.iov_limit = 255
  189. self.parse_input_from_terminal = partial(parse_input_from_terminal, self._on_text, self._on_dcs, self._on_csi, self._on_osc, self._on_pm, self._on_apc)
  190. self.ebs_pat = re.compile('([\177\r\x03\x04])')
  191. self.in_bracketed_paste = False
  192. self.sanitize_bracketed_paste = bool(sanitize_bracketed_paste)
  193. if self.sanitize_bracketed_paste:
  194. self.sanitize_ibp_pat = re.compile(sanitize_bracketed_paste)
  195. def _read_ready(self, handler: Handler, fd: int) -> None:
  196. try:
  197. bdata = os.read(fd, io.DEFAULT_BUFFER_SIZE)
  198. except BlockingIOError:
  199. return
  200. if not bdata:
  201. handler.terminal_io_ended = True
  202. self.quit(1)
  203. return
  204. data = self.decoder.decode(bdata)
  205. if self.read_buf:
  206. data = self.read_buf + data
  207. self.read_buf = data
  208. self.handler = handler
  209. try:
  210. self.read_buf = self.parse_input_from_terminal(self.read_buf, self.in_bracketed_paste)
  211. except Exception:
  212. self.read_buf = ''
  213. raise
  214. finally:
  215. del self.handler
  216. # terminal input callbacks {{{
  217. def _on_text(self, text: str) -> None:
  218. if self.in_bracketed_paste and self.sanitize_bracketed_paste:
  219. text = self.sanitize_ibp_pat.sub('', text)
  220. for chunk in self.ebs_pat.split(text):
  221. if len(chunk) == 1:
  222. if chunk == '\r':
  223. self.handler.on_key(enter_key)
  224. elif chunk == '\177':
  225. self.handler.on_key(backspace_key)
  226. elif chunk == '\x03':
  227. self.handler.on_interrupt()
  228. elif chunk == '\x04':
  229. self.handler.on_eot()
  230. else:
  231. self.handler.on_text(chunk, self.in_bracketed_paste)
  232. elif chunk:
  233. self.handler.on_text(chunk, self.in_bracketed_paste)
  234. def _on_dcs(self, dcs: str) -> None:
  235. if dcs.startswith('@kitty-cmd'):
  236. import json
  237. self.handler.on_kitty_cmd_response(json.loads(dcs[len('@kitty-cmd'):]))
  238. elif dcs.startswith('1+r'):
  239. from binascii import unhexlify
  240. vals = dcs[3:].split(';')
  241. for q in vals:
  242. parts = q.split('=', 1)
  243. try:
  244. name, val = parts[0], unhexlify(parts[1]).decode('utf-8', 'replace')
  245. except Exception:
  246. continue
  247. self.handler.on_capability_response(name, val)
  248. def _on_csi(self, csi: str) -> None:
  249. q = csi[-1]
  250. if q in 'mM':
  251. if csi.startswith('<'):
  252. # SGR mouse event
  253. try:
  254. ev = decode_sgr_mouse(csi[1:], self.handler.screen_size)
  255. except Exception:
  256. pass
  257. else:
  258. self.handler.on_mouse_event(ev)
  259. elif q in 'u~ABCDEHFPQRS':
  260. if csi == '200~':
  261. self.in_bracketed_paste = True
  262. return
  263. elif csi == '201~':
  264. self.in_bracketed_paste = False
  265. return
  266. try:
  267. k = decode_key_event(csi[:-1], q)
  268. except Exception:
  269. pass
  270. else:
  271. if not self.handler.perform_default_key_action(k):
  272. self.handler.on_key_event(k)
  273. def _on_pm(self, pm: str) -> None:
  274. pass
  275. def _on_osc(self, osc: str) -> None:
  276. idx = osc.find(';')
  277. if idx <= 0:
  278. return
  279. q = osc[:idx]
  280. if q == '52':
  281. widx = osc.find(';', idx + 1)
  282. if widx < idx:
  283. from_primary = osc.find('p', idx + 1) > -1
  284. payload = ''
  285. else:
  286. from base64 import standard_b64decode
  287. from_primary = osc.find('p', idx+1, widx) > -1
  288. data = memoryview(osc.encode('ascii'))
  289. payload = standard_b64decode(data[widx+1:]).decode('utf-8')
  290. self.handler.on_clipboard_response(payload, from_primary)
  291. elif q == ftc_code:
  292. from kitty.file_transmission import FileTransmissionCommand
  293. data = memoryview(osc.encode('ascii'))
  294. self.handler.on_file_transfer_response(FileTransmissionCommand.deserialize(data[idx+1:]))
  295. def _on_apc(self, apc: str) -> None:
  296. if apc.startswith('G'):
  297. if self.handler.image_manager is not None:
  298. self.handler.image_manager.handle_response(apc)
  299. # }}}
  300. @property
  301. def total_pending_bytes_to_write(self) -> int:
  302. return sum(map(len, self.write_buf))
  303. def _write_ready(self, handler: Handler, fd: int) -> None:
  304. if len(self.write_buf) > self.iov_limit:
  305. self.write_buf[self.iov_limit - 1] = b''.join(self.write_buf[self.iov_limit - 1:])
  306. del self.write_buf[self.iov_limit:]
  307. total_size = self.total_pending_bytes_to_write
  308. if total_size:
  309. try:
  310. written = os.writev(fd, self.write_buf)
  311. except BlockingIOError:
  312. return
  313. if not written:
  314. handler.terminal_io_ended = True
  315. self.quit(1)
  316. return
  317. else:
  318. written = 0
  319. if written >= total_size:
  320. self.write_buf: list[bytes] = []
  321. self.asyncio_loop.remove_writer(fd)
  322. self.waiting_for_writes = False
  323. handler.on_writing_finished()
  324. else:
  325. consumed = 0
  326. for i, buf in enumerate(self.write_buf):
  327. if not written:
  328. break
  329. if len(buf) <= written:
  330. written -= len(buf)
  331. consumed += 1
  332. continue
  333. self.write_buf[i] = buf[written:]
  334. break
  335. del self.write_buf[:consumed]
  336. def quit(self, return_code: int | None = None) -> None:
  337. if return_code is not None:
  338. self.return_code = return_code
  339. self.asyncio_loop.stop()
  340. def loop_impl(self, handler: Handler, term_manager: TermManager, image_manager: ImageManagerType | None = None) -> str | None:
  341. self.write_buf = []
  342. tty_fd = term_manager.tty_fd
  343. tb = None
  344. self.waiting_for_writes = True
  345. def schedule_write(data: bytes) -> None:
  346. self.write_buf.append(data)
  347. if not self.waiting_for_writes:
  348. self.asyncio_loop.add_writer(tty_fd, self._write_ready, handler, tty_fd)
  349. self.waiting_for_writes = True
  350. def handle_exception(loop: asyncio.AbstractEventLoop, context: dict[str, Any]) -> None:
  351. nonlocal tb
  352. loop.stop()
  353. tb = context['message']
  354. exc = context.get('exception')
  355. if exc is not None:
  356. import traceback
  357. tb += '\n' + ''.join(traceback.format_exception(exc.__class__, exc, exc.__traceback__))
  358. self.asyncio_loop.set_exception_handler(handle_exception)
  359. handler._initialize(self._get_screen_size(), term_manager, schedule_write, self, debug, image_manager)
  360. with handler:
  361. if handler.overlay_ready_report_needed:
  362. handler.cmd.overlay_ready()
  363. self.asyncio_loop.add_reader(
  364. tty_fd, self._read_ready, handler, tty_fd)
  365. self.asyncio_loop.add_writer(
  366. tty_fd, self._write_ready, handler, tty_fd)
  367. self.asyncio_loop.run_forever()
  368. self.asyncio_loop.remove_reader(tty_fd)
  369. if self.waiting_for_writes:
  370. self.asyncio_loop.remove_writer(tty_fd)
  371. return tb
  372. def loop(self, handler: Handler) -> None:
  373. tb: str | None = None
  374. def _on_sigwinch() -> None:
  375. self._get_screen_size.changed = True
  376. handler.screen_size = self._get_screen_size()
  377. handler.on_resize(handler.screen_size)
  378. signal_manager = SignalManager(self.asyncio_loop, _on_sigwinch, handler.on_interrupt, handler.on_term, handler.on_hup)
  379. with TermManager(self.optional_actions, handler.use_alternate_screen, handler.mouse_tracking) as term_manager, signal_manager:
  380. self._get_screen_size: ScreenSizeGetter = screen_size_function(term_manager.tty_fd)
  381. image_manager = None
  382. if handler.image_manager_class is not None:
  383. image_manager = handler.image_manager_class(handler)
  384. try:
  385. tb = self.loop_impl(handler, term_manager, image_manager)
  386. except Exception:
  387. import traceback
  388. tb = traceback.format_exc()
  389. term_manager.extra_finalize = b''.join(self.write_buf).decode('utf-8')
  390. if tb is not None:
  391. report_overlay_ready = handler.overlay_ready_report_needed and not self.overlay_ready_reported
  392. self.return_code = 1
  393. if not handler.terminal_io_ended:
  394. self._report_error_loop(tb, term_manager, report_overlay_ready)
  395. def _report_error_loop(self, tb: str, term_manager: TermManager, overlay_ready_report_needed: bool) -> None:
  396. handler = UnhandledException(tb)
  397. handler.overlay_ready_report_needed = overlay_ready_report_needed
  398. self.loop_impl(handler, term_manager)