rpc_stream.lua 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. ---
  2. --- Reading/writing of msgpack over any of the stream types from `uv_stream.lua`.
  3. --- Does not implement the RPC protocol, see `session.lua` for that.
  4. ---
  5. local mpack = vim.mpack
  6. local Response = {}
  7. Response.__index = Response
  8. function Response.new(rpc_stream, request_id)
  9. return setmetatable({
  10. _rpc_stream = rpc_stream,
  11. _request_id = request_id,
  12. }, Response)
  13. end
  14. function Response:send(value, is_error)
  15. local data = self._rpc_stream._session:reply(self._request_id)
  16. if is_error then
  17. data = data .. self._rpc_stream._pack(value)
  18. data = data .. self._rpc_stream._pack(mpack.NIL)
  19. else
  20. data = data .. self._rpc_stream._pack(mpack.NIL)
  21. data = data .. self._rpc_stream._pack(value)
  22. end
  23. self._rpc_stream._stream:write(data)
  24. end
  25. --- Nvim msgpack RPC stream.
  26. ---
  27. --- @class test.RpcStream
  28. --- @field private _stream test.Stream
  29. --- @field private __pack table
  30. local RpcStream = {}
  31. RpcStream.__index = RpcStream
  32. function RpcStream.new(stream)
  33. return setmetatable({
  34. _stream = stream,
  35. _pack = mpack.Packer(),
  36. _session = mpack.Session({
  37. unpack = mpack.Unpacker({
  38. ext = {
  39. -- Buffer
  40. [0] = function(_c, s)
  41. return mpack.decode(s)
  42. end,
  43. -- Window
  44. [1] = function(_c, s)
  45. return mpack.decode(s)
  46. end,
  47. -- Tabpage
  48. [2] = function(_c, s)
  49. return mpack.decode(s)
  50. end,
  51. },
  52. }),
  53. }),
  54. }, RpcStream)
  55. end
  56. function RpcStream:write(method, args, response_cb)
  57. local data
  58. if response_cb then
  59. assert(type(response_cb) == 'function')
  60. data = self._session:request(response_cb)
  61. else
  62. data = self._session:notify()
  63. end
  64. data = data .. self._pack(method) .. self._pack(args)
  65. self._stream:write(data)
  66. end
  67. function RpcStream:read_start(on_request, on_notification, on_eof)
  68. self._stream:read_start(function(data)
  69. if not data then
  70. return on_eof()
  71. end
  72. local type, id_or_cb, method_or_error, args_or_result
  73. local pos = 1
  74. local len = #data
  75. while pos <= len do
  76. type, id_or_cb, method_or_error, args_or_result, pos = self._session:receive(data, pos)
  77. if type == 'request' or type == 'notification' then
  78. if type == 'request' then
  79. on_request(method_or_error, args_or_result, Response.new(self, id_or_cb))
  80. else
  81. on_notification(method_or_error, args_or_result)
  82. end
  83. elseif type == 'response' then
  84. if method_or_error == mpack.NIL then
  85. method_or_error = nil
  86. else
  87. args_or_result = nil
  88. end
  89. id_or_cb(method_or_error, args_or_result)
  90. end
  91. end
  92. end)
  93. end
  94. function RpcStream:read_stop()
  95. self._stream:read_stop()
  96. end
  97. function RpcStream:close(signal)
  98. self._stream:close(signal)
  99. end
  100. return RpcStream