write.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. // License: GPLv3 Copyright: 2022, Kovid Goyal, <kovid at kovidgoyal.net>
  2. package loop
  3. import (
  4. "fmt"
  5. "io"
  6. "os"
  7. "time"
  8. "golang.org/x/sys/unix"
  9. "kitty/tools/tty"
  10. "kitty/tools/utils"
  11. )
  12. type write_msg struct {
  13. id IdType
  14. bytes []byte
  15. str string
  16. }
  17. func (self *write_msg) String() string {
  18. return fmt.Sprintf("write_msg{%v %#v %#v}", self.id, string(self.bytes), self.str)
  19. }
  20. func write_ignoring_temporary_errors(f *tty.Term, buf []byte) (int, error) {
  21. n, err := f.Write(buf)
  22. if err != nil {
  23. if is_temporary_error(err) {
  24. err = nil
  25. }
  26. return n, err
  27. }
  28. if n == 0 {
  29. return 0, io.EOF
  30. }
  31. return n, err
  32. }
  33. func writestring_ignoring_temporary_errors(f *tty.Term, buf string) (int, error) {
  34. n, err := f.WriteString(buf)
  35. if err != nil {
  36. if is_temporary_error(err) {
  37. err = nil
  38. }
  39. return n, err
  40. }
  41. if n == 0 {
  42. return 0, io.EOF
  43. }
  44. return n, err
  45. }
  46. func (self *Loop) flush_pending_writes(tty_write_channel chan<- write_msg) (num_sent int) {
  47. defer func() {
  48. if num_sent > 0 {
  49. self.pending_writes = utils.ShiftLeft(self.pending_writes, num_sent)
  50. }
  51. }()
  52. for len(self.pending_writes) > num_sent {
  53. select {
  54. case tty_write_channel <- self.pending_writes[num_sent]:
  55. num_sent++
  56. default:
  57. return
  58. }
  59. }
  60. return
  61. }
  62. func (self *Loop) wait_for_write_to_complete(sentinel IdType, tty_write_channel chan<- write_msg, write_done_channel <-chan IdType, timeout time.Duration) error {
  63. num_sent := 0
  64. defer func() {
  65. if num_sent > 0 {
  66. self.pending_writes = utils.ShiftLeft(self.pending_writes, num_sent)
  67. }
  68. }()
  69. end_time := time.Now().Add(timeout)
  70. for num_sent < len(self.pending_writes) {
  71. timeout = time.Until(end_time)
  72. if timeout <= 0 {
  73. return os.ErrDeadlineExceeded
  74. }
  75. select {
  76. case tty_write_channel <- self.pending_writes[num_sent]:
  77. num_sent++
  78. case write_id, more := <-write_done_channel:
  79. if self.OnWriteComplete != nil {
  80. err := self.OnWriteComplete(write_id, write_id < self.write_msg_id_counter)
  81. if err != nil {
  82. return err
  83. }
  84. }
  85. if write_id == sentinel {
  86. return nil
  87. }
  88. if !more {
  89. return fmt.Errorf("The write_done_channel was unexpectedly closed")
  90. }
  91. case <-time.After(timeout):
  92. return os.ErrDeadlineExceeded
  93. }
  94. }
  95. for {
  96. timeout = time.Until(end_time)
  97. if timeout <= 0 {
  98. return os.ErrDeadlineExceeded
  99. }
  100. select {
  101. case write_id, more := <-write_done_channel:
  102. if self.OnWriteComplete != nil {
  103. err := self.OnWriteComplete(write_id, write_id < self.write_msg_id_counter)
  104. if err != nil {
  105. return err
  106. }
  107. }
  108. if write_id == sentinel {
  109. return nil
  110. }
  111. if !more {
  112. return fmt.Errorf("The write_done_channel was unexpectedly closed")
  113. }
  114. case <-time.After(timeout):
  115. return os.ErrDeadlineExceeded
  116. }
  117. }
  118. }
  119. func (self *Loop) add_write_to_pending_queue(data write_msg) {
  120. if len(self.pending_writes) > 0 || self.tty_write_channel == nil {
  121. self.pending_writes = append(self.pending_writes, data)
  122. } else {
  123. select {
  124. case self.tty_write_channel <- data:
  125. default:
  126. self.pending_writes = append(self.pending_writes, data)
  127. }
  128. }
  129. }
  130. func (self write_msg) is_empty() bool {
  131. if self.bytes == nil {
  132. return self.str == ""
  133. }
  134. return len(self.bytes) == 0
  135. }
  136. func (self *write_msg) write(f *tty.Term) (err error) {
  137. n := 0
  138. if self.bytes == nil {
  139. n, err = writestring_ignoring_temporary_errors(f, self.str)
  140. } else {
  141. n, err = write_ignoring_temporary_errors(f, self.bytes)
  142. }
  143. if n > 0 {
  144. if self.bytes == nil {
  145. self.str = self.str[n:]
  146. } else {
  147. self.bytes = self.bytes[n:]
  148. }
  149. }
  150. return
  151. }
  152. func write_to_tty(
  153. pipe_r *os.File, term *tty.Term,
  154. job_channel <-chan write_msg, err_channel chan<- error, write_done_channel chan<- IdType,
  155. ) {
  156. keep_going := true
  157. defer func() {
  158. pipe_r.Close()
  159. close(write_done_channel)
  160. }()
  161. selector := utils.CreateSelect(2)
  162. pipe_fd := int(pipe_r.Fd())
  163. tty_fd := term.Fd()
  164. selector.RegisterRead(pipe_fd)
  165. selector.RegisterWrite(tty_fd)
  166. wait_for_write_available := func() {
  167. for {
  168. n, err := selector.WaitForever()
  169. if err != nil && err != unix.EINTR {
  170. err_channel <- err
  171. keep_going = false
  172. return
  173. }
  174. if n > 0 {
  175. break
  176. }
  177. }
  178. if selector.IsReadyToRead(pipe_fd) {
  179. keep_going = false
  180. }
  181. }
  182. write_data := func(msg write_msg) {
  183. for !msg.is_empty() {
  184. wait_for_write_available()
  185. if !keep_going {
  186. return
  187. }
  188. if err := msg.write(term); err != nil {
  189. err_channel <- err
  190. keep_going = false
  191. return
  192. }
  193. }
  194. }
  195. for {
  196. data, more := <-job_channel
  197. if !more {
  198. keep_going = false
  199. break
  200. }
  201. write_data(data)
  202. if keep_going {
  203. write_done_channel <- data.id
  204. } else {
  205. break
  206. }
  207. }
  208. }
  209. func flush_writer(pipe_w *os.File, tty_write_channel chan<- write_msg, write_done_channel <-chan IdType, pending_writes []write_msg, timeout time.Duration) {
  210. writer_quit := false
  211. defer func() {
  212. if tty_write_channel != nil {
  213. close(tty_write_channel)
  214. tty_write_channel = nil
  215. }
  216. pipe_w.Close()
  217. if !writer_quit {
  218. for {
  219. _, more := <-write_done_channel
  220. if !more {
  221. writer_quit = true
  222. break
  223. }
  224. }
  225. }
  226. }()
  227. deadline := time.Now().Add(timeout)
  228. for len(pending_writes) > 0 && !writer_quit {
  229. timeout = time.Until(deadline)
  230. if timeout <= 0 {
  231. return
  232. }
  233. select {
  234. case <-time.After(timeout):
  235. return
  236. case _, more := <-write_done_channel:
  237. if !more {
  238. writer_quit = true
  239. }
  240. case tty_write_channel <- pending_writes[0]:
  241. pending_writes = pending_writes[1:]
  242. }
  243. }
  244. close(tty_write_channel)
  245. tty_write_channel = nil
  246. timeout = time.Until(deadline)
  247. if timeout <= 0 {
  248. return
  249. }
  250. for !writer_quit {
  251. select {
  252. case _, more := <-write_done_channel:
  253. if !more {
  254. writer_quit = true
  255. }
  256. case <-time.After(timeout):
  257. return
  258. }
  259. }
  260. }