runner.py 7.0 KB


  1. #!/usr/bin/env python
  2. # License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
  3. import importlib
  4. import os
  5. import sys
  6. from collections.abc import Callable, Generator
  7. from contextlib import contextmanager
  8. from functools import partial
  9. from typing import TYPE_CHECKING, Any, NamedTuple, cast
  10. from kitty.constants import list_kitty_resources
  11. from kitty.types import run_once
  12. from kitty.typing import BossType, WindowType
  13. from kitty.utils import resolve_abs_or_config_path
  14. aliases = {'url_hints': 'hints'}
  15. if TYPE_CHECKING:
  16. from kitty.conf.types import Definition
  17. else:
  18. Definition = object
  19. def resolved_kitten(k: str) -> str:
  20. ans = aliases.get(k, k)
  21. head, tail = os.path.split(ans)
  22. tail = tail.replace('-', '_')
  23. return os.path.join(head, tail)
  24. def path_to_custom_kitten(config_dir: str, kitten: str) -> str:
  25. path = resolve_abs_or_config_path(kitten, conf_dir=config_dir)
  26. return os.path.abspath(path)
  27. @contextmanager
  28. def preserve_sys_path() -> Generator[None, None, None]:
  29. orig = sys.path[:]
  30. try:
  31. yield
  32. finally:
  33. if sys.path != orig:
  34. del sys.path[:]
  35. sys.path.extend(orig)
  36. class CLIOnlyKitten(TypeError):
  37. def __init__(self, kitten: str):
  38. super().__init__(f'The {kitten} kitten must be run only at the commandline, as: kitten {kitten}')
  39. def import_kitten_main_module(config_dir: str, kitten: str) -> dict[str, Any]:
  40. if kitten.endswith('.py'):
  41. with preserve_sys_path():
  42. path = path_to_custom_kitten(config_dir, kitten)
  43. if os.path.dirname(path):
  44. sys.path.insert(0, os.path.dirname(path))
  45. with open(path) as f:
  46. src = f.read()
  47. code = compile(src, path, 'exec')
  48. g = {'__name__': 'kitten'}
  49. exec(code, g)
  50. hr = g.get('handle_result', lambda *a, **kw: None)
  51. return {'start': g['main'], 'end': hr}
  52. kitten = resolved_kitten(kitten)
  53. m = importlib.import_module(f'kittens.{kitten}.main')
  54. if not hasattr(m, 'main'):
  55. raise CLIOnlyKitten(kitten)
  56. return {
  57. 'start': getattr(m, 'main'),
  58. 'end': getattr(m, 'handle_result', lambda *a, **k: None),
  59. }
  60. class KittenMetadata(NamedTuple):
  61. handle_result: Callable[[Any, int, BossType], None] = lambda *a: None
  62. type_of_input: str | None = None
  63. no_ui: bool = False
  64. has_ready_notification: bool = False
  65. open_url_handler: Callable[[BossType, WindowType, str, int, str], bool] | None = None
  66. allow_remote_control: bool = False
  67. remote_control_password: str | bool = False
  68. def create_kitten_handler(kitten: str, orig_args: list[str]) -> KittenMetadata:
  69. from kitty.constants import config_dir
  70. kitten = resolved_kitten(kitten)
  71. m = import_kitten_main_module(config_dir, kitten)
  72. main = m['start']
  73. handle_result = m['end']
  74. return KittenMetadata(
  75. handle_result=partial(handle_result, [kitten] + orig_args),
  76. type_of_input=getattr(handle_result, 'type_of_input', None),
  77. no_ui=getattr(handle_result, 'no_ui', False),
  78. allow_remote_control=getattr(main, 'allow_remote_control', False),
  79. remote_control_password=getattr(main, 'remote_control_password', True),
  80. has_ready_notification=getattr(handle_result, 'has_ready_notification', False),
  81. open_url_handler=getattr(handle_result, 'open_url_handler', None))
  82. def set_debug(kitten: str) -> None:
  83. import builtins
  84. from kittens.tui.loop import debug
  85. setattr(builtins, 'debug', debug)
  86. def launch(args: list[str]) -> None:
  87. config_dir, kitten = args[:2]
  88. kitten = resolved_kitten(kitten)
  89. del args[:2]
  90. args = [kitten] + args
  91. os.environ['KITTY_CONFIG_DIRECTORY'] = config_dir
  92. set_debug(kitten)
  93. m = import_kitten_main_module(config_dir, kitten)
  94. try:
  95. result = m['start'](args)
  96. finally:
  97. sys.stdin = sys.__stdin__
  98. if result is not None:
  99. import base64
  100. import json
  101. data = base64.b85encode(json.dumps(result).encode('utf-8'))
  102. sys.stdout.buffer.write(b'\x1bP@kitty-kitten-result|')
  103. sys.stdout.buffer.write(data)
  104. sys.stdout.buffer.write(b'\x1b\\')
  105. sys.stderr.flush()
  106. sys.stdout.flush()
  107. def run_kitten(kitten: str, run_name: str = '__main__') -> None:
  108. import runpy
  109. original_kitten_name = kitten
  110. kitten = resolved_kitten(kitten)
  111. set_debug(kitten)
  112. if kitten in all_kitten_names():
  113. runpy.run_module(f'kittens.{kitten}.main', run_name=run_name)
  114. return
  115. # Look for a custom kitten
  116. if not kitten.endswith('.py'):
  117. kitten += '.py'
  118. from kitty.constants import config_dir
  119. path = path_to_custom_kitten(config_dir, kitten)
  120. if not os.path.exists(path):
  121. print('Available builtin kittens:', file=sys.stderr)
  122. for kitten in all_kitten_names():
  123. print(kitten, file=sys.stderr)
  124. raise SystemExit(f'No kitten named {original_kitten_name}')
  125. m = runpy.run_path(path, init_globals={'sys': sys, 'os': os}, run_name='__run_kitten__')
  126. from kitty.fast_data_types import set_options
  127. try:
  128. m['main'](sys.argv)
  129. finally:
  130. set_options(None)
  131. @run_once
  132. def all_kitten_names() -> frozenset[str]:
  133. ans = []
  134. for name in list_kitty_resources('kittens'):
  135. if '__' not in name and '.' not in name and name != 'tui':
  136. ans.append(name)
  137. return frozenset(ans)
  138. def list_kittens() -> None:
  139. print('You must specify the name of a kitten to run')
  140. print('Choose from:')
  141. print()
  142. for kitten in all_kitten_names():
  143. print(kitten)
  144. def get_kitten_cli_docs(kitten: str) -> Any:
  145. setattr(sys, 'cli_docs', {})
  146. run_kitten(kitten, run_name='__doc__')
  147. ans = getattr(sys, 'cli_docs')
  148. delattr(sys, 'cli_docs')
  149. if 'help_text' in ans and 'usage' in ans and 'options' in ans:
  150. return ans
  151. def get_kitten_wrapper_of(kitten: str) -> str:
  152. setattr(sys, 'cli_docs', {})
  153. run_kitten(kitten, run_name='__wrapper_of__')
  154. ans = getattr(sys, 'cli_docs')
  155. delattr(sys, 'cli_docs')
  156. return ans.get('wrapper_of') or ''
  157. def get_kitten_completer(kitten: str) -> Any:
  158. run_kitten(kitten, run_name='__completer__')
  159. ans = getattr(sys, 'kitten_completer', None)
  160. if ans is not None:
  161. delattr(sys, 'kitten_completer')
  162. return ans
  163. def get_kitten_conf_docs(kitten: str) -> Definition | None:
  164. setattr(sys, 'options_definition', None)
  165. run_kitten(kitten, run_name='__conf__')
  166. ans = getattr(sys, 'options_definition')
  167. delattr(sys, 'options_definition')
  168. return cast(Definition, ans)
  169. def get_kitten_extra_cli_parsers(kitten: str) -> dict[str,str]:
  170. setattr(sys, 'extra_cli_parsers', {})
  171. run_kitten(kitten, run_name='__extra_cli_parsers__')
  172. ans = getattr(sys, 'extra_cli_parsers')
  173. delattr(sys, 'extra_cli_parsers')
  174. return cast(dict[str, str], ans)
  175. def main() -> None:
  176. try:
  177. args = sys.argv[1:]
  178. launch(args)
  179. except Exception:
  180. print('Unhandled exception running kitten:')
  181. import traceback
  182. traceback.print_exc()
  183. input('Press Enter to quit')