atexit.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. #!/usr/bin/env python
  2. # License: GPLv3 Copyright: 2025, Kovid Goyal <kovid at kovidgoyal.net>
  3. import os
  4. import select
  5. import shutil
  6. import signal
  7. import subprocess
  8. import tempfile
  9. from kitty.constants import kitten_exe, kitty_exe
  10. from kitty.shm import SharedMemory
  11. from . import BaseTest
  12. class Atexit(BaseTest):
  13. def setUp(self):
  14. self.tdir = tempfile.mkdtemp()
  15. def tearDown(self):
  16. shutil.rmtree(self.tdir)
  17. def test_atexit(self):
  18. def r(action='close'):
  19. p = subprocess.Popen([kitty_exe(), '+runpy', f'''\
  20. import subprocess
  21. p = subprocess.Popen(['{kitten_exe()}', '__atexit__'])
  22. print(p.pid, flush=True)
  23. raise SystemExit(p.wait())
  24. '''], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
  25. readers = [p.stdout.fileno()]
  26. def read():
  27. r, _, _ = select.select(readers, [], [], 10)
  28. if not r:
  29. raise TimeoutError('Timed out waiting for read from child')
  30. return p.stdout.readline().rstrip().decode()
  31. atexit_pid = int(read())
  32. for i in range(2):
  33. with open(os.path.join(self.tdir, str(i)), 'w') as f:
  34. p.stdin.write(f'unlink {f.name}\n'.encode())
  35. p.stdin.flush()
  36. select.select(readers, [], [], 10)
  37. self.ae(read(), str(i+1))
  38. sdir = os.path.join(self.tdir, 'd')
  39. os.mkdir(sdir)
  40. p.stdin.write(f'rmtree {sdir}\n'.encode())
  41. p.stdin.flush()
  42. open(os.path.join(sdir, 'f'), 'w').close()
  43. select.select(readers, [], [], 10)
  44. self.ae(read(), str(i+2))
  45. shm = SharedMemory(size=64)
  46. shm.write(b'1' * 64)
  47. shm.flush()
  48. p.stdin.write(f'shm_unlink {shm.name}\n'.encode())
  49. p.stdin.flush()
  50. self.ae(read(), str(i+3))
  51. self.assertTrue(os.listdir(self.tdir))
  52. shm2 = SharedMemory(shm.name)
  53. self.ae(shm2.read()[:64], b'1' * 64)
  54. # Ensure child is ignoring signals
  55. os.kill(atexit_pid, signal.SIGINT)
  56. os.kill(atexit_pid, signal.SIGTERM)
  57. if action == 'close':
  58. p.stdin.close()
  59. elif action == 'terminate':
  60. p.terminate()
  61. else:
  62. p.kill()
  63. p.wait(10)
  64. if action != 'close':
  65. p.stdin.close()
  66. select.select(readers, [], [], 10)
  67. self.assertFalse(read())
  68. p.stdout.close()
  69. self.assertFalse(os.listdir(self.tdir))
  70. try:
  71. os.waitpid(atexit_pid, 0)
  72. except ChildProcessError:
  73. pass
  74. self.assertRaises(FileNotFoundError, lambda: SharedMemory(shm.name))
  75. r('close')
  76. r('terminate')
  77. r('kill')