runner.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. #!/usr/bin/env python
  2. # License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
  3. import importlib
  4. import os
  5. import sys
  6. from contextlib import contextmanager
  7. from functools import partial
  8. from typing import TYPE_CHECKING, Any, Dict, FrozenSet, Generator, List, Optional, cast
  9. from kitty.constants import list_kitty_resources
  10. from kitty.types import run_once
  11. from kitty.utils import resolve_abs_or_config_path
  12. aliases = {'url_hints': 'hints'}
  13. if TYPE_CHECKING:
  14. from kitty.conf.types import Definition
  15. else:
  16. Definition = object
  17. def resolved_kitten(k: str) -> str:
  18. ans = aliases.get(k, k)
  19. head, tail = os.path.split(ans)
  20. tail = tail.replace('-', '_')
  21. return os.path.join(head, tail)
  22. def path_to_custom_kitten(config_dir: str, kitten: str) -> str:
  23. path = resolve_abs_or_config_path(kitten, conf_dir=config_dir)
  24. return os.path.abspath(path)
  25. @contextmanager
  26. def preserve_sys_path() -> Generator[None, None, None]:
  27. orig = sys.path[:]
  28. try:
  29. yield
  30. finally:
  31. if sys.path != orig:
  32. del sys.path[:]
  33. sys.path.extend(orig)
  34. def import_kitten_main_module(config_dir: str, kitten: str) -> Dict[str, Any]:
  35. if kitten.endswith('.py'):
  36. with preserve_sys_path():
  37. path = path_to_custom_kitten(config_dir, kitten)
  38. if os.path.dirname(path):
  39. sys.path.insert(0, os.path.dirname(path))
  40. with open(path) as f:
  41. src = f.read()
  42. code = compile(src, path, 'exec')
  43. g = {'__name__': 'kitten'}
  44. exec(code, g)
  45. hr = g.get('handle_result', lambda *a, **kw: None)
  46. return {'start': g['main'], 'end': hr}
  47. kitten = resolved_kitten(kitten)
  48. m = importlib.import_module(f'kittens.{kitten}.main')
  49. return {'start': getattr(m, 'main'), 'end': getattr(m, 'handle_result', lambda *a, **k: None)}
  50. def create_kitten_handler(kitten: str, orig_args: List[str]) -> Any:
  51. from kitty.constants import config_dir
  52. kitten = resolved_kitten(kitten)
  53. m = import_kitten_main_module(config_dir, kitten)
  54. ans = partial(m['end'], [kitten] + orig_args)
  55. setattr(ans, 'type_of_input', getattr(m['end'], 'type_of_input', None))
  56. setattr(ans, 'no_ui', getattr(m['end'], 'no_ui', False))
  57. setattr(ans, 'has_ready_notification', getattr(m['end'], 'has_ready_notification', False))
  58. return ans
  59. def set_debug(kitten: str) -> None:
  60. import builtins
  61. from kittens.tui.loop import debug
  62. setattr(builtins, 'debug', debug)
  63. def launch(args: List[str]) -> None:
  64. config_dir, kitten = args[:2]
  65. kitten = resolved_kitten(kitten)
  66. del args[:2]
  67. args = [kitten] + args
  68. os.environ['KITTY_CONFIG_DIRECTORY'] = config_dir
  69. set_debug(kitten)
  70. m = import_kitten_main_module(config_dir, kitten)
  71. try:
  72. result = m['start'](args)
  73. finally:
  74. sys.stdin = sys.__stdin__
  75. if result is not None:
  76. import base64
  77. import json
  78. data = base64.b85encode(json.dumps(result).encode('utf-8'))
  79. sys.stdout.buffer.write(b'\x1bP@kitty-kitten-result|')
  80. sys.stdout.buffer.write(data)
  81. sys.stdout.buffer.write(b'\x1b\\')
  82. sys.stderr.flush()
  83. sys.stdout.flush()
  84. def run_kitten(kitten: str, run_name: str = '__main__') -> None:
  85. import runpy
  86. original_kitten_name = kitten
  87. kitten = resolved_kitten(kitten)
  88. set_debug(kitten)
  89. if kitten in all_kitten_names():
  90. runpy.run_module(f'kittens.{kitten}.main', run_name=run_name)
  91. return
  92. # Look for a custom kitten
  93. if not kitten.endswith('.py'):
  94. kitten += '.py'
  95. from kitty.constants import config_dir
  96. path = path_to_custom_kitten(config_dir, kitten)
  97. if not os.path.exists(path):
  98. print('Available builtin kittens:', file=sys.stderr)
  99. for kitten in all_kitten_names():
  100. print(kitten, file=sys.stderr)
  101. raise SystemExit(f'No kitten named {original_kitten_name}')
  102. m = runpy.run_path(path, init_globals={'sys': sys, 'os': os}, run_name='__run_kitten__')
  103. from kitty.fast_data_types import set_options
  104. try:
  105. m['main'](sys.argv)
  106. finally:
  107. set_options(None)
  108. @run_once
  109. def all_kitten_names() -> FrozenSet[str]:
  110. ans = []
  111. for name in list_kitty_resources('kittens'):
  112. if '__' not in name and '.' not in name and name != 'tui':
  113. ans.append(name)
  114. return frozenset(ans)
  115. def list_kittens() -> None:
  116. print('You must specify the name of a kitten to run')
  117. print('Choose from:')
  118. print()
  119. for kitten in all_kitten_names():
  120. print(kitten)
  121. def get_kitten_cli_docs(kitten: str) -> Any:
  122. setattr(sys, 'cli_docs', {})
  123. run_kitten(kitten, run_name='__doc__')
  124. ans = getattr(sys, 'cli_docs')
  125. delattr(sys, 'cli_docs')
  126. if 'help_text' in ans and 'usage' in ans and 'options' in ans:
  127. return ans
  128. def get_kitten_wrapper_of(kitten: str) -> str:
  129. setattr(sys, 'cli_docs', {})
  130. run_kitten(kitten, run_name='__wrapper_of__')
  131. ans = getattr(sys, 'cli_docs')
  132. delattr(sys, 'cli_docs')
  133. return ans.get('wrapper_of') or ''
  134. def get_kitten_completer(kitten: str) -> Any:
  135. run_kitten(kitten, run_name='__completer__')
  136. ans = getattr(sys, 'kitten_completer', None)
  137. if ans is not None:
  138. delattr(sys, 'kitten_completer')
  139. return ans
  140. def get_kitten_conf_docs(kitten: str) -> Optional[Definition]:
  141. setattr(sys, 'options_definition', None)
  142. run_kitten(kitten, run_name='__conf__')
  143. ans = getattr(sys, 'options_definition')
  144. delattr(sys, 'options_definition')
  145. return cast(Definition, ans)
  146. def get_kitten_extra_cli_parsers(kitten: str) -> Dict[str,str]:
  147. setattr(sys, 'extra_cli_parsers', {})
  148. run_kitten(kitten, run_name='__extra_cli_parsers__')
  149. ans = getattr(sys, 'extra_cli_parsers')
  150. delattr(sys, 'extra_cli_parsers')
  151. return cast(Dict[str, str], ans)
  152. def main() -> None:
  153. try:
  154. args = sys.argv[1:]
  155. launch(args)
  156. except Exception:
  157. print('Unhandled exception running kitten:')
  158. import traceback
  159. traceback.print_exc()
  160. input('Press Enter to quit')