origin_service.go 8.2 KB


  1. package ingress
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "net/url"
  10. "sync"
  11. "time"
  12. "github.com/cloudflare/cloudflared/hello"
  13. "github.com/cloudflare/cloudflared/ipaccess"
  14. "github.com/cloudflare/cloudflared/socks"
  15. "github.com/cloudflare/cloudflared/tlsconfig"
  16. "github.com/cloudflare/cloudflared/websocket"
  17. gws "github.com/gorilla/websocket"
  18. "github.com/pkg/errors"
  19. "github.com/rs/zerolog"
  20. )
  21. // originService is something a tunnel can proxy traffic to.
  22. type originService interface {
  23. String() string
  24. // Start the origin service if it's managed by cloudflared, e.g. proxy servers or Hello World.
  25. // If it's not managed by cloudflared, this is a no-op because the user is responsible for
  26. // starting the origin service.
  27. start(wg *sync.WaitGroup, log *zerolog.Logger, shutdownC <-chan struct{}, errC chan error, cfg OriginRequestConfig) error
  28. }
  29. // unixSocketPath is an OriginService representing a unix socket (which accepts HTTP)
  30. type unixSocketPath struct {
  31. path string
  32. transport *http.Transport
  33. }
  34. func (o *unixSocketPath) String() string {
  35. return "unix socket: " + o.path
  36. }
  37. func (o *unixSocketPath) start(wg *sync.WaitGroup, log *zerolog.Logger, shutdownC <-chan struct{}, errC chan error, cfg OriginRequestConfig) error {
  38. transport, err := newHTTPTransport(o, cfg, log)
  39. if err != nil {
  40. return err
  41. }
  42. o.transport = transport
  43. return nil
  44. }
  45. func (o *unixSocketPath) Dial(reqURL *url.URL, headers http.Header) (*gws.Conn, *http.Response, error) {
  46. d := &gws.Dialer{
  47. NetDial: o.transport.Dial,
  48. NetDialContext: o.transport.DialContext,
  49. TLSClientConfig: o.transport.TLSClientConfig,
  50. }
  51. reqURL.Scheme = websocket.ChangeRequestScheme(reqURL)
  52. return d.Dial(reqURL.String(), headers)
  53. }
  54. type httpService struct {
  55. url *url.URL
  56. hostHeader string
  57. transport *http.Transport
  58. }
  59. func (o *httpService) start(wg *sync.WaitGroup, log *zerolog.Logger, shutdownC <-chan struct{}, errC chan error, cfg OriginRequestConfig) error {
  60. transport, err := newHTTPTransport(o, cfg, log)
  61. if err != nil {
  62. return err
  63. }
  64. o.hostHeader = cfg.HTTPHostHeader
  65. o.transport = transport
  66. return nil
  67. }
  68. func (o *httpService) String() string {
  69. return o.url.String()
  70. }
  71. // rawTCPService dials TCP to the destination specified by the client
  72. // It's used by warp routing
  73. type rawTCPService struct {
  74. name string
  75. }
  76. func (o *rawTCPService) String() string {
  77. return o.name
  78. }
  79. func (o *rawTCPService) start(wg *sync.WaitGroup, log *zerolog.Logger, shutdownC <-chan struct{}, errC chan error, cfg OriginRequestConfig) error {
  80. return nil
  81. }
  82. // tcpOverWSService models TCP origins serving eyeballs connecting over websocket, such as
  83. // cloudflared access commands.
  84. type tcpOverWSService struct {
  85. dest string
  86. isBastion bool
  87. streamHandler streamHandlerFunc
  88. }
  89. type socksProxyOverWSService struct {
  90. conn *socksProxyOverWSConnection
  91. }
  92. func newTCPOverWSService(url *url.URL) *tcpOverWSService {
  93. switch url.Scheme {
  94. case "ssh":
  95. addPortIfMissing(url, 22)
  96. case "rdp":
  97. addPortIfMissing(url, 3389)
  98. case "smb":
  99. addPortIfMissing(url, 445)
  100. case "tcp":
  101. addPortIfMissing(url, 7864) // just a random port since there isn't a default in this case
  102. }
  103. return &tcpOverWSService{
  104. dest: url.Host,
  105. }
  106. }
  107. func newBastionService() *tcpOverWSService {
  108. return &tcpOverWSService{
  109. isBastion: true,
  110. }
  111. }
  112. func newSocksProxyOverWSService(accessPolicy *ipaccess.Policy) *socksProxyOverWSService {
  113. proxy := socksProxyOverWSService{
  114. conn: &socksProxyOverWSConnection{
  115. accessPolicy: accessPolicy,
  116. },
  117. }
  118. return &proxy
  119. }
  120. func addPortIfMissing(uri *url.URL, port int) {
  121. if uri.Port() == "" {
  122. uri.Host = fmt.Sprintf("%s:%d", uri.Hostname(), port)
  123. }
  124. }
  125. func (o *tcpOverWSService) String() string {
  126. if o.isBastion {
  127. return ServiceBastion
  128. }
  129. return o.dest
  130. }
  131. func (o *tcpOverWSService) start(wg *sync.WaitGroup, log *zerolog.Logger, shutdownC <-chan struct{}, errC chan error, cfg OriginRequestConfig) error {
  132. if cfg.ProxyType == socksProxy {
  133. o.streamHandler = socks.StreamHandler
  134. } else {
  135. o.streamHandler = DefaultStreamHandler
  136. }
  137. return nil
  138. }
  139. func (o *socksProxyOverWSService) start(wg *sync.WaitGroup, log *zerolog.Logger, shutdownC <-chan struct{}, errC chan error, cfg OriginRequestConfig) error {
  140. return nil
  141. }
  142. func (o *socksProxyOverWSService) String() string {
  143. return ServiceSocksProxy
  144. }
  145. // HelloWorld is an OriginService for the built-in Hello World server.
  146. // Users only use this for testing and experimenting with cloudflared.
  147. type helloWorld struct {
  148. server net.Listener
  149. transport *http.Transport
  150. }
  151. func (o *helloWorld) String() string {
  152. return "Hello World test origin"
  153. }
  154. // Start starts a HelloWorld server and stores its address in the Service receiver.
  155. func (o *helloWorld) start(
  156. wg *sync.WaitGroup,
  157. log *zerolog.Logger,
  158. shutdownC <-chan struct{},
  159. errC chan error,
  160. cfg OriginRequestConfig,
  161. ) error {
  162. transport, err := newHTTPTransport(o, cfg, log)
  163. if err != nil {
  164. return err
  165. }
  166. o.transport = transport
  167. helloListener, err := hello.CreateTLSListener("127.0.0.1:")
  168. if err != nil {
  169. return errors.Wrap(err, "Cannot start Hello World Server")
  170. }
  171. wg.Add(1)
  172. go func() {
  173. defer wg.Done()
  174. _ = hello.StartHelloWorldServer(log, helloListener, shutdownC)
  175. }()
  176. o.server = helloListener
  177. return nil
  178. }
  179. // statusCode is an OriginService that just responds with a given HTTP status.
  180. // Typical use-case is "user wants the catch-all rule to just respond 404".
  181. type statusCode struct {
  182. resp *http.Response
  183. }
  184. func newStatusCode(status int) statusCode {
  185. resp := &http.Response{
  186. StatusCode: status,
  187. Status: fmt.Sprintf("%d %s", status, http.StatusText(status)),
  188. Body: new(NopReadCloser),
  189. }
  190. return statusCode{resp: resp}
  191. }
  192. func (o *statusCode) String() string {
  193. return fmt.Sprintf("HTTP %d", o.resp.StatusCode)
  194. }
  195. func (o *statusCode) start(
  196. wg *sync.WaitGroup,
  197. log *zerolog.Logger,
  198. shutdownC <-chan struct{},
  199. errC chan error,
  200. cfg OriginRequestConfig,
  201. ) error {
  202. return nil
  203. }
  204. type NopReadCloser struct{}
  205. // Read always returns EOF to signal end of input
  206. func (nrc *NopReadCloser) Read(buf []byte) (int, error) {
  207. return 0, io.EOF
  208. }
  209. func (nrc *NopReadCloser) Close() error {
  210. return nil
  211. }
  212. func newHTTPTransport(service originService, cfg OriginRequestConfig, log *zerolog.Logger) (*http.Transport, error) {
  213. originCertPool, err := tlsconfig.LoadOriginCA(cfg.CAPool, log)
  214. if err != nil {
  215. return nil, errors.Wrap(err, "Error loading cert pool")
  216. }
  217. httpTransport := http.Transport{
  218. Proxy: http.ProxyFromEnvironment,
  219. MaxIdleConns: cfg.KeepAliveConnections,
  220. MaxIdleConnsPerHost: cfg.KeepAliveConnections,
  221. IdleConnTimeout: cfg.KeepAliveTimeout,
  222. TLSHandshakeTimeout: cfg.TLSTimeout,
  223. ExpectContinueTimeout: 1 * time.Second,
  224. TLSClientConfig: &tls.Config{RootCAs: originCertPool, InsecureSkipVerify: cfg.NoTLSVerify},
  225. }
  226. if _, isHelloWorld := service.(*helloWorld); !isHelloWorld && cfg.OriginServerName != "" {
  227. httpTransport.TLSClientConfig.ServerName = cfg.OriginServerName
  228. }
  229. dialer := &net.Dialer{
  230. Timeout: cfg.ConnectTimeout,
  231. KeepAlive: cfg.TCPKeepAlive,
  232. }
  233. if cfg.NoHappyEyeballs {
  234. dialer.FallbackDelay = -1 // As of Golang 1.12, a negative delay disables "happy eyeballs"
  235. }
  236. // DialContext depends on which kind of origin is being used.
  237. dialContext := dialer.DialContext
  238. switch service := service.(type) {
  239. // If this origin is a unix socket, enforce network type "unix".
  240. case *unixSocketPath:
  241. httpTransport.DialContext = func(ctx context.Context, _, _ string) (net.Conn, error) {
  242. return dialContext(ctx, "unix", service.path)
  243. }
  244. // Otherwise, use the regular network config.
  245. default:
  246. httpTransport.DialContext = dialContext
  247. }
  248. return &httpTransport, nil
  249. }
  250. // MockOriginHTTPService should only be used by other packages to mock OriginService. Set Transport to configure desired RoundTripper behavior.
  251. type MockOriginHTTPService struct {
  252. Transport http.RoundTripper
  253. }
  254. func (mos MockOriginHTTPService) RoundTrip(req *http.Request) (*http.Response, error) {
  255. return mos.Transport.RoundTrip(req)
  256. }
  257. func (mos MockOriginHTTPService) String() string {
  258. return "MockOriginService"
  259. }
  260. func (mos MockOriginHTTPService) start(wg *sync.WaitGroup, log *zerolog.Logger, shutdownC <-chan struct{}, errC chan error, cfg OriginRequestConfig) error {
  261. return nil
  262. }