origin_connection.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package ingress
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "io"
  6. "net"
  7. "net/http"
  8. "github.com/cloudflare/cloudflared/ipaccess"
  9. "github.com/cloudflare/cloudflared/socks"
  10. "github.com/cloudflare/cloudflared/websocket"
  11. gws "github.com/gorilla/websocket"
  12. "github.com/rs/zerolog"
  13. )
  14. // OriginConnection is a way to stream to a service running on the user's origin.
  15. // Different concrete implementations will stream different protocols as long as they are io.ReadWriters.
  16. type OriginConnection interface {
  17. // Stream should generally be implemented as a bidirectional io.Copy.
  18. Stream(ctx context.Context, tunnelConn io.ReadWriter, log *zerolog.Logger)
  19. Close()
  20. }
  21. type streamHandlerFunc func(originConn io.ReadWriter, remoteConn net.Conn, log *zerolog.Logger)
  22. // Stream copies copy data to & from provided io.ReadWriters.
  23. func Stream(conn, backendConn io.ReadWriter, log *zerolog.Logger) {
  24. proxyDone := make(chan struct{}, 2)
  25. go func() {
  26. _, err := io.Copy(conn, backendConn)
  27. if err != nil {
  28. log.Debug().Msgf("conn to backendConn copy: %v", err)
  29. }
  30. proxyDone <- struct{}{}
  31. }()
  32. go func() {
  33. _, err := io.Copy(backendConn, conn)
  34. if err != nil {
  35. log.Debug().Msgf("backendConn to conn copy: %v", err)
  36. }
  37. proxyDone <- struct{}{}
  38. }()
  39. // If one side is done, we are done.
  40. <-proxyDone
  41. }
  42. // DefaultStreamHandler is an implementation of streamHandlerFunc that
  43. // performs a two way io.Copy between originConn and remoteConn.
  44. func DefaultStreamHandler(originConn io.ReadWriter, remoteConn net.Conn, log *zerolog.Logger) {
  45. Stream(originConn, remoteConn, log)
  46. }
  47. // tcpConnection is an OriginConnection that directly streams to raw TCP.
  48. type tcpConnection struct {
  49. conn net.Conn
  50. }
  51. func (tc *tcpConnection) Stream(ctx context.Context, tunnelConn io.ReadWriter, log *zerolog.Logger) {
  52. Stream(tunnelConn, tc.conn, log)
  53. }
  54. func (tc *tcpConnection) Close() {
  55. tc.conn.Close()
  56. }
  57. // tcpOverWSConnection is an OriginConnection that streams to TCP over WS.
  58. type tcpOverWSConnection struct {
  59. conn net.Conn
  60. streamHandler streamHandlerFunc
  61. }
  62. func (wc *tcpOverWSConnection) Stream(ctx context.Context, tunnelConn io.ReadWriter, log *zerolog.Logger) {
  63. wc.streamHandler(websocket.NewConn(ctx, tunnelConn, log), wc.conn, log)
  64. }
  65. func (wc *tcpOverWSConnection) Close() {
  66. wc.conn.Close()
  67. }
  68. // wsConnection is an OriginConnection that streams WS between eyeball and origin.
  69. type wsConnection struct {
  70. wsConn *gws.Conn
  71. resp *http.Response
  72. }
  73. func (wsc *wsConnection) Stream(ctx context.Context, tunnelConn io.ReadWriter, log *zerolog.Logger) {
  74. Stream(tunnelConn, wsc.wsConn.UnderlyingConn(), log)
  75. }
  76. func (wsc *wsConnection) Close() {
  77. wsc.resp.Body.Close()
  78. wsc.wsConn.Close()
  79. }
  80. func newWSConnection(clientTLSConfig *tls.Config, r *http.Request) (OriginConnection, *http.Response, error) {
  81. d := &gws.Dialer{
  82. TLSClientConfig: clientTLSConfig,
  83. }
  84. wsConn, resp, err := websocket.ClientConnect(r, d)
  85. if err != nil {
  86. return nil, nil, err
  87. }
  88. return &wsConnection{
  89. wsConn,
  90. resp,
  91. }, resp, nil
  92. }
  93. // socksProxyOverWSConnection is an OriginConnection that streams SOCKS connections over WS.
  94. // The connection to the origin happens inside the SOCKS code as the client specifies the origin
  95. // details in the packet.
  96. type socksProxyOverWSConnection struct {
  97. accessPolicy *ipaccess.Policy
  98. }
  99. func (sp *socksProxyOverWSConnection) Stream(ctx context.Context, tunnelConn io.ReadWriter, log *zerolog.Logger) {
  100. socks.StreamNetHandler(websocket.NewConn(ctx, tunnelConn, log), sp.accessPolicy, log)
  101. }
  102. func (sp *socksProxyOverWSConnection) Close() {
  103. }