request.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. package core
  2. import (
  3. "os"
  4. "io"
  5. "fmt"
  6. "bufio"
  7. "bytes"
  8. "errors"
  9. "strings"
  10. "strconv"
  11. "net/url"
  12. "net/http"
  13. "sync/atomic"
  14. "kumachan/standalone/util"
  15. )
  16. type Request struct {
  17. Method RequestMethod
  18. Endpoint *url.URL
  19. AuthToken string
  20. BodyContent [] byte
  21. }
  22. func (req *Request) Observe(lg Logger) Observable {
  23. return Observable(func(pub DataPublisher) {
  24. go sendRequest(req, pub, lg)
  25. })
  26. }
  27. type RequestMethod string
  28. const (
  29. GET RequestMethod = "GET"
  30. POST = "POST"
  31. PUT = "PUT"
  32. DELETE = "DELETE"
  33. SUBSCRIBE = "SUBSCRIBE"
  34. )
  35. func (m RequestMethod) ToHttpMethod() (string, error) {
  36. switch m {
  37. case GET: return http.MethodGet, nil
  38. case POST: return http.MethodPost, nil
  39. case PUT: return http.MethodPut, nil
  40. case DELETE: return http.MethodDelete, nil
  41. default: return "", errors.New("unsupported method: " + string(m))
  42. }
  43. }
  44. type RequestPipe struct {
  45. name string
  46. sourceFile *os.File
  47. sinkFile *os.File
  48. readError error
  49. commandQueue chan func()
  50. nextRequestId int64
  51. activeRequests map[int64] pipeRespChan
  52. }
  53. type pipeRespChan chan(func() ([] byte, error))
  54. func pipeResp(content ([] byte)) func() ([] byte, error) {
  55. return func() ([] byte, error) { return content, nil }
  56. }
  57. func pipeRespError(err error) func() ([] byte, error) {
  58. return func() ([] byte, error) { return nil, err }
  59. }
  60. var singletonStdioPipe = (*RequestPipe)(nil)
  61. func stdioPipe() *RequestPipe {
  62. if singletonStdioPipe == nil {
  63. singletonStdioPipe = createRequestPipe("stdio", os.Stdin, os.Stdout)
  64. }
  65. return singletonStdioPipe
  66. }
  67. func (req *Request) Pipe() (*RequestPipe, error, bool) {
  68. var scheme = req.Endpoint.Scheme
  69. var host = req.Endpoint.Host
  70. if scheme == "pipe" {
  71. if host == "stdio" {
  72. return stdioPipe(), nil, true
  73. } else {
  74. var err = errors.New("unknown pipe: " + host)
  75. return nil, err, true
  76. }
  77. }
  78. return nil, nil, false
  79. }
  80. func createRequestPipe(name string, source *os.File, sink *os.File) *RequestPipe {
  81. var pipe = &RequestPipe {
  82. name: name,
  83. sourceFile: source,
  84. sinkFile: sink,
  85. readError: nil,
  86. commandQueue: make(chan func(), 256),
  87. nextRequestId: 0,
  88. activeRequests: make(map[int64] pipeRespChan),
  89. }
  90. go (func() {
  91. for k := range pipe.commandQueue {
  92. k()
  93. }
  94. })()
  95. pipe.commandQueue <- func() {
  96. var read = func() error {
  97. var r = bufio.NewReader(pipe.sourceFile)
  98. for {
  99. var line, _, err = util.WellBehavedFscanln(r)
  100. if err != nil { return err }
  101. if line == "" {
  102. continue
  103. }
  104. var t = strings.Split(line, " ")
  105. if !(2 < len(t)) { goto invalid }
  106. { var kind = t[0]
  107. var id_str = t[1]
  108. var length_str = t[2]
  109. var id, err1 = strconv.ParseInt(id_str, 10, 64)
  110. if err1 != nil { goto invalid }
  111. var length, err2 = strconv.Atoi(length_str)
  112. if err2 != nil { goto invalid }
  113. var content = make([] byte, length)
  114. if length > 0 {
  115. var _, err = io.ReadFull(r, content)
  116. if err != nil { return err }
  117. }
  118. switch kind {
  119. case "OK":
  120. pipe.commandQueue <- func() {
  121. if resp, ok := pipe.activeRequests[id]; ok {
  122. resp <- pipeResp(content)
  123. } else {
  124. // no-op
  125. }
  126. }
  127. continue
  128. case "ERR":
  129. pipe.commandQueue <- func() {
  130. if resp, ok := pipe.activeRequests[id]; ok {
  131. var msg = util.WellBehavedDecodeUtf8(content)
  132. var err = errors.New(msg)
  133. resp <- pipeRespError(err)
  134. } else {
  135. // no-op
  136. }
  137. }
  138. continue
  139. default:
  140. goto invalid
  141. }}
  142. invalid:
  143. return errors.New("invalid response header: " + line)
  144. }
  145. }
  146. go (func() {
  147. var err = read()
  148. if err != nil {
  149. var err = fmt.Errorf("pipe read error: %w", err)
  150. pipe.commandQueue <- func() {
  151. pipe.readError = err
  152. for _, resp := range pipe.activeRequests {
  153. resp <- pipeRespError(err)
  154. }
  155. }
  156. }
  157. })()
  158. }
  159. return pipe
  160. }
  161. func (pipe *RequestPipe) addRequest(req *Request) (pipeRespChan, int64, func()) {
  162. var resp = make(pipeRespChan, 256)
  163. var id = atomic.AddInt64(&(pipe.nextRequestId), 1)
  164. pipe.commandQueue <- func() {
  165. pipe.activeRequests[id] = resp
  166. var method = req.Method
  167. var path = req.Endpoint.Path
  168. if path == "" { path = "/" }
  169. var token = strconv.Quote(req.AuthToken)
  170. var length = len(req.BodyContent)
  171. var write = func() error {
  172. if pipe.readError != nil {
  173. return pipe.readError
  174. }
  175. var _, err = fmt.Fprintf(pipe.sinkFile,
  176. "REQ %d %s %s %s %d\n", id, method, path, token, length)
  177. if err != nil { return err }
  178. if length > 0 {
  179. var _, err = pipe.sinkFile.Write(req.BodyContent)
  180. if err != nil { return err }
  181. }
  182. return nil
  183. }
  184. var err = write()
  185. if err != nil {
  186. resp <- pipeRespError(err)
  187. }
  188. }
  189. var remove = func() {
  190. pipe.commandQueue <- func() {
  191. delete(pipe.activeRequests, id)
  192. }
  193. }
  194. return resp, id, remove
  195. }
  196. func (pipe *RequestPipe) cancelRequest(id int64) {
  197. pipe.commandQueue <- func() {
  198. var _, err = fmt.Fprintf(pipe.sinkFile,
  199. "CANCEL %d\n", id)
  200. if err != nil {
  201. var err = fmt.Errorf(
  202. "%s: request %d: error sending cancel signal: %w",
  203. pipe.name, id, err,
  204. )
  205. println(err.Error())
  206. }
  207. }
  208. }
  209. func sendRequest(req *Request, pub DataPublisher, lg Logger) {
  210. lg.LogRequest(req)
  211. if pipe, err, ok := req.Pipe(); ok {
  212. if err != nil { pub.AsyncThrow(err); return }
  213. var resp, id, remove = pipe.addRequest(req)
  214. defer remove()
  215. if req.Method == SUBSCRIBE {
  216. var yield, complete = pub.AsyncGenerate()
  217. var throw = pub.AsyncThrow
  218. loop: for {
  219. select {
  220. case resp_ := <- resp:
  221. var resp, err = resp_()
  222. if err != nil {
  223. throw(err)
  224. break loop
  225. }
  226. if len(resp) > 0 {
  227. yield(ObjBytes(resp))
  228. continue loop
  229. } else {
  230. complete()
  231. break loop
  232. }
  233. case <- pub.AsyncContext().Done():
  234. pipe.cancelRequest(id)
  235. break loop
  236. }
  237. }
  238. } else {
  239. select {
  240. case resp_ := <- resp:
  241. var resp, err = resp_()
  242. if err != nil { pub.AsyncThrow(err); return }
  243. pub.AsyncReturn(ObjBytes(resp))
  244. case <- pub.AsyncContext().Done():
  245. pipe.cancelRequest(id)
  246. }
  247. }
  248. } else {
  249. var ctx = pub.AsyncContext()
  250. var method, err = req.Method.ToHttpMethod()
  251. if err != nil { pub.AsyncThrow(err); return }
  252. var endpoint = req.Endpoint.String()
  253. var body = bytes.NewReader(req.BodyContent)
  254. var token = req.AuthToken
  255. { var req, err = http.NewRequestWithContext (
  256. ctx, method, endpoint, body,
  257. )
  258. if err != nil { pub.AsyncThrow(err); return }
  259. if token != "" {
  260. req.Header.Set("X-Auth-Token", token)
  261. }
  262. { var res, err = http.DefaultClient.Do(req)
  263. if err != nil { pub.AsyncThrow(err); return }
  264. defer (func() {
  265. _ = res.Body.Close()
  266. })()
  267. var status = res.StatusCode
  268. var ok = (200 <= status && status < 300)
  269. if !(ok) {
  270. var err = errors.New(fmt.Sprintf("HTTP %d", status))
  271. { pub.AsyncThrow(err); return }
  272. }
  273. { var binary, err = io.ReadAll(res.Body)
  274. if err != nil { pub.AsyncThrow(err); return }
  275. pub.AsyncReturn(ObjBytes(binary)) }}}
  276. }
  277. }