handler.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. #!/usr/bin/env python
  2. # License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
  3. from collections import deque
  4. from contextlib import suppress
  5. from types import TracebackType
  6. from typing import TYPE_CHECKING, Any, Callable, ContextManager, Deque, Dict, NamedTuple, Optional, Sequence, Type, Union, cast
  7. from kitty.fast_data_types import monotonic
  8. from kitty.types import DecoratedFunc, ParsedShortcut
  9. from kitty.typing import (
  10. AbstractEventLoop,
  11. BossType,
  12. Debug,
  13. ImageManagerType,
  14. KeyActionType,
  15. KeyEventType,
  16. LoopType,
  17. MouseButton,
  18. MouseEvent,
  19. ScreenSize,
  20. TermManagerType,
  21. WindowType,
  22. )
  23. from .operations import MouseTracking, pending_update
  24. if TYPE_CHECKING:
  25. from kitty.file_transmission import FileTransmissionCommand
  26. OpenUrlHandler = Optional[Callable[[BossType, WindowType, str, int, str], bool]]
  27. class ButtonEvent(NamedTuple):
  28. mouse_event: MouseEvent
  29. timestamp: float
  30. def is_click(a: ButtonEvent, b: ButtonEvent) -> bool:
  31. from .loop import EventType
  32. if a.mouse_event.type is not EventType.PRESS or b.mouse_event.type is not EventType.RELEASE:
  33. return False
  34. x = a.mouse_event.cell_x - b.mouse_event.cell_x
  35. y = a.mouse_event.cell_y - b.mouse_event.cell_y
  36. return x*x + y*y <= 4
  37. class Handler:
  38. image_manager_class: Optional[Type[ImageManagerType]] = None
  39. use_alternate_screen = True
  40. mouse_tracking = MouseTracking.none
  41. terminal_io_ended = False
  42. overlay_ready_report_needed = False
  43. def _initialize(
  44. self,
  45. screen_size: ScreenSize,
  46. term_manager: TermManagerType,
  47. schedule_write: Callable[[bytes], None],
  48. tui_loop: LoopType,
  49. debug: Debug,
  50. image_manager: Optional[ImageManagerType] = None
  51. ) -> None:
  52. from .operations import commander
  53. self.screen_size = screen_size
  54. self._term_manager = term_manager
  55. self._tui_loop = tui_loop
  56. self._schedule_write = schedule_write
  57. self.debug = debug
  58. self.cmd = commander(self)
  59. self._image_manager = image_manager
  60. self._button_events: Dict[MouseButton, Deque[ButtonEvent]] = {}
  61. @property
  62. def image_manager(self) -> ImageManagerType:
  63. assert self._image_manager is not None
  64. return self._image_manager
  65. @property
  66. def asyncio_loop(self) -> AbstractEventLoop:
  67. return self._tui_loop.asyncio_loop
  68. def add_shortcut(self, action: KeyActionType, spec: Union[str, ParsedShortcut]) -> None:
  69. if not hasattr(self, '_key_shortcuts'):
  70. self._key_shortcuts: Dict[ParsedShortcut, KeyActionType] = {}
  71. if isinstance(spec, str):
  72. from kitty.key_encoding import parse_shortcut
  73. spec = parse_shortcut(spec)
  74. self._key_shortcuts[spec] = action
  75. def shortcut_action(self, key_event: KeyEventType) -> Optional[KeyActionType]:
  76. for sc, action in self._key_shortcuts.items():
  77. if key_event.matches(sc):
  78. return action
  79. return None
  80. def __enter__(self) -> None:
  81. if self._image_manager is not None:
  82. self._image_manager.__enter__()
  83. self.debug.fobj = self
  84. self.initialize()
  85. def __exit__(self, etype: type, value: Exception, tb: TracebackType) -> None:
  86. del self.debug.fobj
  87. with suppress(Exception):
  88. self.finalize()
  89. if self._image_manager is not None:
  90. self._image_manager.__exit__(etype, value, tb)
  91. def initialize(self) -> None:
  92. pass
  93. def finalize(self) -> None:
  94. pass
  95. def on_resize(self, screen_size: ScreenSize) -> None:
  96. self.screen_size = screen_size
  97. def quit_loop(self, return_code: Optional[int] = None) -> None:
  98. self._tui_loop.quit(return_code)
  99. def on_term(self) -> None:
  100. self._tui_loop.quit(1)
  101. def on_hup(self) -> None:
  102. self.terminal_io_ended = True
  103. self._tui_loop.quit(1)
  104. def on_key_event(self, key_event: KeyEventType, in_bracketed_paste: bool = False) -> None:
  105. ' Override this method and perform_default_key_action() to handle all key events '
  106. if key_event.text:
  107. self.on_text(key_event.text, in_bracketed_paste)
  108. else:
  109. self.on_key(key_event)
  110. def perform_default_key_action(self, key_event: KeyEventType) -> bool:
  111. ' Override in sub-class if you want to handle these key events yourself '
  112. if key_event.matches('ctrl+c'):
  113. self.on_interrupt()
  114. return True
  115. if key_event.matches('ctrl+d'):
  116. self.on_eot()
  117. return True
  118. return False
  119. def on_text(self, text: str, in_bracketed_paste: bool = False) -> None:
  120. pass
  121. def on_key(self, key_event: KeyEventType) -> None:
  122. pass
  123. def on_mouse_event(self, mouse_event: MouseEvent) -> None:
  124. from .loop import EventType
  125. if mouse_event.type is EventType.MOVE:
  126. self.on_mouse_move(mouse_event)
  127. elif mouse_event.type is EventType.PRESS:
  128. q = self._button_events.setdefault(mouse_event.buttons, deque())
  129. q.append(ButtonEvent(mouse_event, monotonic()))
  130. if len(q) > 5:
  131. q.popleft()
  132. elif mouse_event.type is EventType.RELEASE:
  133. q = self._button_events.setdefault(mouse_event.buttons, deque())
  134. q.append(ButtonEvent(mouse_event, monotonic()))
  135. if len(q) > 5:
  136. q.popleft()
  137. if len(q) > 1 and is_click(q[-2], q[-1]):
  138. self.on_click(mouse_event)
  139. def on_mouse_move(self, mouse_event: MouseEvent) -> None:
  140. pass
  141. def on_click(self, mouse_event: MouseEvent) -> None:
  142. pass
  143. def on_interrupt(self) -> None:
  144. pass
  145. def on_eot(self) -> None:
  146. pass
  147. def on_writing_finished(self) -> None:
  148. pass
  149. def on_kitty_cmd_response(self, response: Dict[str, Any]) -> None:
  150. pass
  151. def on_clipboard_response(self, text: str, from_primary: bool = False) -> None:
  152. pass
  153. def on_file_transfer_response(self, ftc: 'FileTransmissionCommand') -> None:
  154. pass
  155. def on_capability_response(self, name: str, val: str) -> None:
  156. pass
  157. def write(self, data: Union[bytes, str]) -> None:
  158. if isinstance(data, str):
  159. data = data.encode('utf-8')
  160. self._schedule_write(data)
  161. def flush(self) -> None:
  162. pass
  163. def print(self, *args: object, sep: str = ' ', end: str = '\r\n') -> None:
  164. data = sep.join(map(str, args)) + end
  165. self.write(data)
  166. def suspend(self) -> ContextManager[TermManagerType]:
  167. return self._term_manager.suspend()
  168. @classmethod
  169. def atomic_update(cls, func: DecoratedFunc) -> DecoratedFunc:
  170. from functools import wraps
  171. @wraps(func)
  172. def f(*a: Any, **kw: Any) -> Any:
  173. with pending_update(a[0].write):
  174. return func(*a, **kw)
  175. return cast(DecoratedFunc, f)
  176. class HandleResult:
  177. type_of_input: Optional[str] = None
  178. no_ui: bool = False
  179. def __init__(self, impl: Callable[..., Any], type_of_input: Optional[str], no_ui: bool, has_ready_notification: bool, open_url_handler: OpenUrlHandler):
  180. self.impl = impl
  181. self.no_ui = no_ui
  182. self.type_of_input = type_of_input
  183. self.has_ready_notification = has_ready_notification
  184. self.open_url_handler = open_url_handler
  185. def __call__(self, args: Sequence[str], data: Any, target_window_id: int, boss: BossType) -> Any:
  186. return self.impl(args, data, target_window_id, boss)
  187. def result_handler(
  188. type_of_input: Optional[str] = None,
  189. no_ui: bool = False,
  190. has_ready_notification: bool = Handler.overlay_ready_report_needed,
  191. open_url_handler: OpenUrlHandler = None,
  192. ) -> Callable[[Callable[..., Any]], HandleResult]:
  193. def wrapper(impl: Callable[..., Any]) -> HandleResult:
  194. return HandleResult(impl, type_of_input, no_ui, has_ready_notification, open_url_handler)
  195. return wrapper