uv_stream.lua 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. ---
  2. --- Basic stream types.
  3. --- See `rpc_stream.lua` for the msgpack layer.
  4. ---
  5. local uv = vim.uv
  6. --- @class test.Stream
  7. --- @field write fun(self, data: string|string[])
  8. --- @field read_start fun(self, cb: fun(chunk: string))
  9. --- @field read_stop fun(self)
  10. --- @field close fun(self, signal?: string)
  11. --- Stream over given pipes.
  12. ---
  13. --- @class vim.StdioStream : test.Stream
  14. --- @field private _in uv.uv_pipe_t
  15. --- @field private _out uv.uv_pipe_t
  16. local StdioStream = {}
  17. StdioStream.__index = StdioStream
  18. function StdioStream.open()
  19. local self = setmetatable({
  20. _in = assert(uv.new_pipe(false)),
  21. _out = assert(uv.new_pipe(false)),
  22. }, StdioStream)
  23. self._in:open(0)
  24. self._out:open(1)
  25. return self
  26. end
  27. --- @param data string|string[]
  28. function StdioStream:write(data)
  29. self._out:write(data)
  30. end
  31. function StdioStream:read_start(cb)
  32. self._in:read_start(function(err, chunk)
  33. if err then
  34. error(err)
  35. end
  36. cb(chunk)
  37. end)
  38. end
  39. function StdioStream:read_stop()
  40. self._in:read_stop()
  41. end
  42. function StdioStream:close()
  43. self._in:close()
  44. self._out:close()
  45. end
  46. --- Stream over a named pipe or TCP socket.
  47. ---
  48. --- @class test.SocketStream : test.Stream
  49. --- @field package _stream_error? string
  50. --- @field package _socket uv.uv_pipe_t
  51. local SocketStream = {}
  52. SocketStream.__index = SocketStream
  53. function SocketStream.open(file)
  54. local socket = assert(uv.new_pipe(false))
  55. local self = setmetatable({
  56. _socket = socket,
  57. _stream_error = nil,
  58. }, SocketStream)
  59. uv.pipe_connect(socket, file, function(err)
  60. self._stream_error = self._stream_error or err
  61. end)
  62. return self
  63. end
  64. function SocketStream.connect(host, port)
  65. local socket = assert(uv.new_tcp())
  66. local self = setmetatable({
  67. _socket = socket,
  68. _stream_error = nil,
  69. }, SocketStream)
  70. uv.tcp_connect(socket, host, port, function(err)
  71. self._stream_error = self._stream_error or err
  72. end)
  73. return self
  74. end
  75. function SocketStream:write(data)
  76. if self._stream_error then
  77. error(self._stream_error)
  78. end
  79. uv.write(self._socket, data, function(err)
  80. if err then
  81. error(self._stream_error or err)
  82. end
  83. end)
  84. end
  85. function SocketStream:read_start(cb)
  86. if self._stream_error then
  87. error(self._stream_error)
  88. end
  89. uv.read_start(self._socket, function(err, chunk)
  90. if err then
  91. error(err)
  92. end
  93. cb(chunk)
  94. end)
  95. end
  96. function SocketStream:read_stop()
  97. if self._stream_error then
  98. error(self._stream_error)
  99. end
  100. uv.read_stop(self._socket)
  101. end
  102. function SocketStream:close()
  103. uv.close(self._socket)
  104. end
  105. --- Stream over child process stdio.
  106. ---
  107. --- @class test.ProcStream : test.Stream
  108. --- @field private _proc uv.uv_process_t
  109. --- @field private _pid integer
  110. --- @field private _child_stdin uv.uv_pipe_t
  111. --- @field private _child_stdout uv.uv_pipe_t
  112. --- @field private _child_stderr uv.uv_pipe_t
  113. --- Collects stdout (if `collect_text=true`). Treats data as text (CRLF converted to LF).
  114. --- @field stdout string
  115. --- Collects stderr as raw data.
  116. --- @field stderr string
  117. --- Gets stderr+stdout as text (CRLF converted to LF).
  118. --- @field output fun(): string
  119. --- @field stdout_eof boolean
  120. --- @field stderr_eof boolean
  121. --- Collects text into the `stdout` field.
  122. --- @field collect_text boolean
  123. --- Exit code
  124. --- @field status integer
  125. --- @field signal integer
  126. local ProcStream = {}
  127. ProcStream.__index = ProcStream
  128. --- Starts child process specified by `argv`.
  129. ---
  130. --- @param argv string[]
  131. --- @param env string[]?
  132. --- @param io_extra uv.uv_pipe_t?
  133. --- @return test.ProcStream
  134. function ProcStream.spawn(argv, env, io_extra)
  135. local self = setmetatable({
  136. collect_text = false,
  137. output = function(self)
  138. if not self.collect_text then
  139. error('set collect_text=true')
  140. end
  141. return (self.stderr .. self.stdout):gsub('\r\n', '\n')
  142. end,
  143. stdout = '',
  144. stderr = '',
  145. stdout_eof = false,
  146. stderr_eof = false,
  147. _child_stdin = assert(uv.new_pipe(false)),
  148. _child_stdout = assert(uv.new_pipe(false)),
  149. _child_stderr = assert(uv.new_pipe(false)),
  150. _exiting = false,
  151. }, ProcStream)
  152. local prog = argv[1]
  153. local args = {} --- @type string[]
  154. for i = 2, #argv do
  155. args[#args + 1] = argv[i]
  156. end
  157. --- @diagnostic disable-next-line:missing-fields
  158. self._proc, self._pid = uv.spawn(prog, {
  159. stdio = { self._child_stdin, self._child_stdout, self._child_stderr, io_extra },
  160. args = args,
  161. --- @diagnostic disable-next-line:assign-type-mismatch
  162. env = env,
  163. }, function(status, signal)
  164. self.signal = signal
  165. -- "Abort" exit may not set status; force to nonzero in that case.
  166. self.status = (0 ~= (status or 0) or 0 == (signal or 0)) and status or (128 + (signal or 0))
  167. end)
  168. if not self._proc then
  169. local err = self._pid
  170. error(err)
  171. end
  172. return self
  173. end
  174. function ProcStream:write(data)
  175. self._child_stdin:write(data)
  176. end
  177. function ProcStream:on_read(stream, cb, err, chunk)
  178. if err then
  179. error(err) -- stream read failed?
  180. elseif chunk then
  181. -- Always collect stderr, in case it gives useful info on failure.
  182. if stream == 'stderr' then
  183. self.stderr = self.stderr .. chunk --[[@as string]]
  184. elseif stream == 'stdout' and self.collect_text then
  185. -- Set `stdout` and convert CRLF => LF.
  186. self.stdout = (self.stdout .. chunk):gsub('\r\n', '\n')
  187. end
  188. else
  189. -- stderr_eof/stdout_eof
  190. self[stream .. '_eof'] = true ---@type boolean
  191. end
  192. -- Handler provided by the caller.
  193. if cb then
  194. cb(chunk)
  195. end
  196. end
  197. --- Collects output until the process exits.
  198. function ProcStream:wait()
  199. while not (self.stdout_eof and self.stderr_eof and (self.status or self.signal)) do
  200. uv.run('once')
  201. end
  202. end
  203. function ProcStream:read_start(on_stdout, on_stderr)
  204. self._child_stdout:read_start(function(err, chunk)
  205. self:on_read('stdout', on_stdout, err, chunk)
  206. end)
  207. self._child_stderr:read_start(function(err, chunk)
  208. self:on_read('stderr', on_stderr, err, chunk)
  209. end)
  210. end
  211. function ProcStream:read_stop()
  212. self._child_stdout:read_stop()
  213. self._child_stderr:read_stop()
  214. end
  215. function ProcStream:close(signal)
  216. if self._closed then
  217. return
  218. end
  219. self._closed = true
  220. self:read_stop()
  221. self._child_stdin:close()
  222. self._child_stdout:close()
  223. self._child_stderr:close()
  224. if type(signal) == 'string' then
  225. self._proc:kill('sig' .. signal)
  226. end
  227. while self.status == nil do
  228. uv.run 'once'
  229. end
  230. return self.status, self.signal
  231. end
  232. return {
  233. StdioStream = StdioStream,
  234. ProcStream = ProcStream,
  235. SocketStream = SocketStream,
  236. }