stream_decompressor.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. // License: GPLv3 Copyright: 2023, Kovid Goyal, <kovid at kovidgoyal.net>
  2. package utils
  3. import (
  4. "errors"
  5. "fmt"
  6. "io"
  7. )
  8. var _ = fmt.Print
  9. type StreamDecompressor = func(chunk []byte, is_last bool) error
  10. type pipe_reader struct {
  11. pr *io.PipeReader
  12. }
  13. func (self *pipe_reader) Read(b []byte) (n int, err error) {
  14. // ensure the decompressor code never gets a zero byte read with no error
  15. for len(b) > 0 {
  16. n, err = self.pr.Read(b)
  17. if err != nil || n > 0 {
  18. return
  19. }
  20. }
  21. return
  22. }
  23. // Wrap Go's awful decompressor routines to allow feeding them
  24. // data in chunks. For example:
  25. // sd := NewStreamDecompressor(zlib.NewReader, output)
  26. // sd(chunk, false)
  27. // ...
  28. // sd(last_chunk, true)
  29. // after this call, calling sd() further will just return io.EOF.
  30. // To close the decompressor at any time, call sd(nil, true).
  31. // Note: output.Write() may be called from a different thread, but only while the main thread is in sd()
  32. func NewStreamDecompressor(constructor func(io.Reader) (io.ReadCloser, error), output io.Writer) StreamDecompressor {
  33. if constructor == nil { // identity decompressor
  34. var err error
  35. return func(chunk []byte, is_last bool) error {
  36. if err != nil {
  37. return err
  38. }
  39. if len(chunk) > 0 {
  40. _, err = output.Write(chunk)
  41. }
  42. if is_last {
  43. if err == nil {
  44. err = io.EOF
  45. return nil
  46. }
  47. }
  48. return err
  49. }
  50. }
  51. pipe_r, pipe_w := io.Pipe()
  52. pr := pipe_reader{pr: pipe_r}
  53. finished := make(chan error, 1)
  54. finished_err := errors.New("finished")
  55. go func() {
  56. var err error
  57. defer func() {
  58. finished <- err
  59. }()
  60. var impl io.ReadCloser
  61. impl, err = constructor(&pr)
  62. if err != nil {
  63. pipe_r.CloseWithError(err)
  64. return
  65. }
  66. _, err = io.Copy(output, impl)
  67. cerr := impl.Close()
  68. if err == nil {
  69. err = cerr
  70. }
  71. if err == nil {
  72. err = finished_err
  73. }
  74. pipe_r.CloseWithError(err)
  75. }()
  76. var iter_err error
  77. return func(chunk []byte, is_last bool) error {
  78. if iter_err != nil {
  79. if iter_err == finished_err {
  80. iter_err = io.EOF
  81. }
  82. return iter_err
  83. }
  84. if len(chunk) > 0 {
  85. var n int
  86. n, iter_err = pipe_w.Write(chunk)
  87. if iter_err != nil && iter_err != finished_err {
  88. return iter_err
  89. }
  90. if n < len(chunk) {
  91. iter_err = io.ErrShortWrite
  92. return iter_err
  93. }
  94. // wait for output to finish
  95. if iter_err == nil {
  96. // after a zero byte read, pipe_reader.Read() calls pipe_r.Read() again so
  97. // we know it is either blocked waiting for a write to pipe_w or has finished
  98. _, iter_err = pipe_w.Write(nil)
  99. if iter_err != nil && iter_err != finished_err {
  100. return iter_err
  101. }
  102. }
  103. }
  104. if is_last {
  105. pipe_w.CloseWithError(io.EOF)
  106. err := <-finished
  107. if err != nil && err != io.EOF && err != finished_err {
  108. iter_err = err
  109. return err
  110. }
  111. iter_err = io.EOF
  112. return nil
  113. }
  114. return nil
  115. }
  116. }