quic.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. package connection
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/google/uuid"
  13. "github.com/lucas-clemente/quic-go"
  14. "github.com/pkg/errors"
  15. "github.com/rs/zerolog"
  16. "golang.org/x/sync/errgroup"
  17. "github.com/cloudflare/cloudflared/datagramsession"
  18. "github.com/cloudflare/cloudflared/ingress"
  19. quicpogs "github.com/cloudflare/cloudflared/quic"
  20. tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  21. )
  22. const (
  23. // HTTPHeaderKey is used to get or set http headers in QUIC ALPN if the underlying proxy connection type is HTTP.
  24. HTTPHeaderKey = "HttpHeader"
  25. // HTTPMethodKey is used to get or set http method in QUIC ALPN if the underlying proxy connection type is HTTP.
  26. HTTPMethodKey = "HttpMethod"
  27. // HTTPHostKey is used to get or set http Method in QUIC ALPN if the underlying proxy connection type is HTTP.
  28. HTTPHostKey = "HttpHost"
  29. )
  30. // QUICConnection represents the type that facilitates Proxying via QUIC streams.
  31. type QUICConnection struct {
  32. session quic.Session
  33. logger *zerolog.Logger
  34. orchestrator Orchestrator
  35. sessionManager datagramsession.Manager
  36. controlStreamHandler ControlStreamHandler
  37. connOptions *tunnelpogs.ConnectionOptions
  38. }
  39. // NewQUICConnection returns a new instance of QUICConnection.
  40. func NewQUICConnection(
  41. quicConfig *quic.Config,
  42. edgeAddr net.Addr,
  43. tlsConfig *tls.Config,
  44. orchestrator Orchestrator,
  45. connOptions *tunnelpogs.ConnectionOptions,
  46. controlStreamHandler ControlStreamHandler,
  47. logger *zerolog.Logger,
  48. ) (*QUICConnection, error) {
  49. session, err := quic.DialAddr(edgeAddr.String(), tlsConfig, quicConfig)
  50. if err != nil {
  51. return nil, fmt.Errorf("failed to dial to edge: %w", err)
  52. }
  53. datagramMuxer, err := quicpogs.NewDatagramMuxer(session)
  54. if err != nil {
  55. return nil, err
  56. }
  57. sessionManager := datagramsession.NewManager(datagramMuxer, logger)
  58. return &QUICConnection{
  59. session: session,
  60. orchestrator: orchestrator,
  61. logger: logger,
  62. sessionManager: sessionManager,
  63. controlStreamHandler: controlStreamHandler,
  64. connOptions: connOptions,
  65. }, nil
  66. }
  67. // Serve starts a QUIC session that begins accepting streams.
  68. func (q *QUICConnection) Serve(ctx context.Context) error {
  69. // origintunneld assumes the first stream is used for the control plane
  70. controlStream, err := q.session.OpenStream()
  71. if err != nil {
  72. return fmt.Errorf("failed to open a registration control stream: %w", err)
  73. }
  74. // If either goroutine returns nil error, we rely on this cancellation to make sure the other goroutine exits
  75. // as fast as possible as well. Nil error means we want to exit for good (caller code won't retry serving this
  76. // connection).
  77. // If either goroutine returns a non nil error, then the error group cancels the context, thus also canceling the
  78. // other goroutine as fast as possible.
  79. ctx, cancel := context.WithCancel(ctx)
  80. errGroup, ctx := errgroup.WithContext(ctx)
  81. // In the future, if cloudflared can autonomously push traffic to the edge, we have to make sure the control
  82. // stream is already fully registered before the other goroutines can proceed.
  83. errGroup.Go(func() error {
  84. defer cancel()
  85. return q.serveControlStream(ctx, controlStream)
  86. })
  87. errGroup.Go(func() error {
  88. defer cancel()
  89. return q.acceptStream(ctx)
  90. })
  91. errGroup.Go(func() error {
  92. defer cancel()
  93. return q.sessionManager.Serve(ctx)
  94. })
  95. return errGroup.Wait()
  96. }
  97. func (q *QUICConnection) serveControlStream(ctx context.Context, controlStream quic.Stream) error {
  98. // This blocks until the control plane is done.
  99. err := q.controlStreamHandler.ServeControlStream(ctx, controlStream, q.connOptions)
  100. if err != nil {
  101. // Not wrapping error here to be consistent with the http2 message.
  102. return err
  103. }
  104. return nil
  105. }
  106. func (q *QUICConnection) acceptStream(ctx context.Context) error {
  107. defer q.Close()
  108. for {
  109. quicStream, err := q.session.AcceptStream(ctx)
  110. if err != nil {
  111. // context.Canceled is usually a user ctrl+c. We don't want to log an error here as it's intentional.
  112. if errors.Is(err, context.Canceled) || q.controlStreamHandler.IsStopped() {
  113. return nil
  114. }
  115. return fmt.Errorf("failed to accept QUIC stream: %w", err)
  116. }
  117. go func() {
  118. stream := quicpogs.NewSafeStreamCloser(quicStream)
  119. defer stream.Close()
  120. if err = q.handleStream(stream); err != nil {
  121. q.logger.Err(err).Msg("Failed to handle QUIC stream")
  122. }
  123. }()
  124. }
  125. }
  126. // Close closes the session with no errors specified.
  127. func (q *QUICConnection) Close() {
  128. q.session.CloseWithError(0, "")
  129. }
  130. func (q *QUICConnection) handleStream(stream io.ReadWriteCloser) error {
  131. signature, err := quicpogs.DetermineProtocol(stream)
  132. if err != nil {
  133. return err
  134. }
  135. switch signature {
  136. case quicpogs.DataStreamProtocolSignature:
  137. reqServerStream, err := quicpogs.NewRequestServerStream(stream, signature)
  138. if err != nil {
  139. return nil
  140. }
  141. return q.handleDataStream(reqServerStream)
  142. case quicpogs.RPCStreamProtocolSignature:
  143. rpcStream, err := quicpogs.NewRPCServerStream(stream, signature)
  144. if err != nil {
  145. return err
  146. }
  147. return q.handleRPCStream(rpcStream)
  148. default:
  149. return fmt.Errorf("unknown protocol %v", signature)
  150. }
  151. }
  152. func (q *QUICConnection) handleDataStream(stream *quicpogs.RequestServerStream) error {
  153. connectRequest, err := stream.ReadConnectRequestData()
  154. if err != nil {
  155. return err
  156. }
  157. originProxy, err := q.orchestrator.GetOriginProxy()
  158. if err != nil {
  159. return err
  160. }
  161. switch connectRequest.Type {
  162. case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket:
  163. req, err := buildHTTPRequest(connectRequest, stream)
  164. if err != nil {
  165. return err
  166. }
  167. w := newHTTPResponseAdapter(stream)
  168. return originProxy.ProxyHTTP(w, req, connectRequest.Type == quicpogs.ConnectionTypeWebsocket)
  169. case quicpogs.ConnectionTypeTCP:
  170. rwa := &streamReadWriteAcker{stream}
  171. return originProxy.ProxyTCP(context.Background(), rwa, &TCPRequest{Dest: connectRequest.Dest})
  172. }
  173. return nil
  174. }
  175. func (q *QUICConnection) handleRPCStream(rpcStream *quicpogs.RPCServerStream) error {
  176. return rpcStream.Serve(q, q, q.logger)
  177. }
  178. // RegisterUdpSession is the RPC method invoked by edge to register and run a session
  179. func (q *QUICConnection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration) error {
  180. // Each session is a series of datagram from an eyeball to a dstIP:dstPort.
  181. // (src port, dst IP, dst port) uniquely identifies a session, so it needs a dedicated connected socket.
  182. originProxy, err := ingress.DialUDP(dstIP, dstPort)
  183. if err != nil {
  184. q.logger.Err(err).Msgf("Failed to create udp proxy to %s:%d", dstIP, dstPort)
  185. return err
  186. }
  187. session, err := q.sessionManager.RegisterSession(ctx, sessionID, originProxy)
  188. if err != nil {
  189. q.logger.Err(err).Str("sessionID", sessionID.String()).Msgf("Failed to register udp session")
  190. return err
  191. }
  192. go q.serveUDPSession(session, closeAfterIdleHint)
  193. q.logger.Debug().Msgf("Registered session %v, %v, %v", sessionID, dstIP, dstPort)
  194. return nil
  195. }
  196. func (q *QUICConnection) serveUDPSession(session *datagramsession.Session, closeAfterIdleHint time.Duration) {
  197. ctx := q.session.Context()
  198. closedByRemote, err := session.Serve(ctx, closeAfterIdleHint)
  199. // If session is terminated by remote, then we know it has been unregistered from session manager and edge
  200. if !closedByRemote {
  201. if err != nil {
  202. q.closeUDPSession(ctx, session.ID, err.Error())
  203. } else {
  204. q.closeUDPSession(ctx, session.ID, "terminated without error")
  205. }
  206. }
  207. q.logger.Debug().Err(err).Str("sessionID", session.ID.String()).Msg("Session terminated")
  208. }
  209. // closeUDPSession first unregisters the session from session manager, then it tries to unregister from edge
  210. func (q *QUICConnection) closeUDPSession(ctx context.Context, sessionID uuid.UUID, message string) {
  211. q.sessionManager.UnregisterSession(ctx, sessionID, message, false)
  212. stream, err := q.session.OpenStream()
  213. if err != nil {
  214. // Log this at debug because this is not an error if session was closed due to lost connection
  215. // with edge
  216. q.logger.Debug().Err(err).Str("sessionID", sessionID.String()).
  217. Msgf("Failed to open quic stream to unregister udp session with edge")
  218. return
  219. }
  220. rpcClientStream, err := quicpogs.NewRPCClientStream(ctx, stream, q.logger)
  221. if err != nil {
  222. // Log this at debug because this is not an error if session was closed due to lost connection
  223. // with edge
  224. q.logger.Err(err).Str("sessionID", sessionID.String()).
  225. Msgf("Failed to open rpc stream to unregister udp session with edge")
  226. return
  227. }
  228. if err := rpcClientStream.UnregisterUdpSession(ctx, sessionID, message); err != nil {
  229. q.logger.Err(err).Str("sessionID", sessionID.String()).
  230. Msgf("Failed to unregister udp session with edge")
  231. }
  232. }
  233. // UnregisterUdpSession is the RPC method invoked by edge to unregister and terminate a sesssion
  234. func (q *QUICConnection) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
  235. return q.sessionManager.UnregisterSession(ctx, sessionID, message, true)
  236. }
  237. // UpdateConfiguration is the RPC method invoked by edge when there is a new configuration
  238. func (q *QUICConnection) UpdateConfiguration(ctx context.Context, version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse {
  239. return q.orchestrator.UpdateConfig(version, config)
  240. }
  241. // streamReadWriteAcker is a light wrapper over QUIC streams with a callback to send response back to
  242. // the client.
  243. type streamReadWriteAcker struct {
  244. *quicpogs.RequestServerStream
  245. }
  246. // AckConnection acks response back to the proxy.
  247. func (s *streamReadWriteAcker) AckConnection() error {
  248. return s.WriteConnectResponseData(nil)
  249. }
  250. // httpResponseAdapter translates responses written by the HTTP Proxy into ones that can be used in QUIC.
  251. type httpResponseAdapter struct {
  252. *quicpogs.RequestServerStream
  253. }
  254. func newHTTPResponseAdapter(s *quicpogs.RequestServerStream) httpResponseAdapter {
  255. return httpResponseAdapter{s}
  256. }
  257. func (hrw httpResponseAdapter) WriteRespHeaders(status int, header http.Header) error {
  258. metadata := make([]quicpogs.Metadata, 0)
  259. metadata = append(metadata, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(status)})
  260. for k, vv := range header {
  261. for _, v := range vv {
  262. httpHeaderKey := fmt.Sprintf("%s:%s", HTTPHeaderKey, k)
  263. metadata = append(metadata, quicpogs.Metadata{Key: httpHeaderKey, Val: v})
  264. }
  265. }
  266. return hrw.WriteConnectResponseData(nil, metadata...)
  267. }
  268. func (hrw httpResponseAdapter) WriteErrorResponse(err error) {
  269. hrw.WriteConnectResponseData(err, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(http.StatusBadGateway)})
  270. }
  271. func buildHTTPRequest(connectRequest *quicpogs.ConnectRequest, body io.ReadCloser) (*http.Request, error) {
  272. metadata := connectRequest.MetadataMap()
  273. dest := connectRequest.Dest
  274. method := metadata[HTTPMethodKey]
  275. host := metadata[HTTPHostKey]
  276. isWebsocket := connectRequest.Type == quicpogs.ConnectionTypeWebsocket
  277. req, err := http.NewRequest(method, dest, body)
  278. if err != nil {
  279. return nil, err
  280. }
  281. req.Host = host
  282. for _, metadata := range connectRequest.Metadata {
  283. if strings.Contains(metadata.Key, HTTPHeaderKey) {
  284. // metadata.Key is off the format httpHeaderKey:<HTTPHeader>
  285. httpHeaderKey := strings.Split(metadata.Key, ":")
  286. if len(httpHeaderKey) != 2 {
  287. return nil, fmt.Errorf("header Key: %s malformed", metadata.Key)
  288. }
  289. req.Header.Add(httpHeaderKey[1], metadata.Val)
  290. }
  291. }
  292. // Go's http.Client automatically sends chunked request body if this value is not set on the
  293. // *http.Request struct regardless of header:
  294. // https://go.googlesource.com/go/+/go1.8rc2/src/net/http/transfer.go#154.
  295. if err := setContentLength(req); err != nil {
  296. return nil, fmt.Errorf("Error setting content-length: %w", err)
  297. }
  298. // Go's client defaults to chunked encoding after a 200ms delay if the following cases are true:
  299. // * the request body blocks
  300. // * the content length is not set (or set to -1)
  301. // * the method doesn't usually have a body (GET, HEAD, DELETE, ...)
  302. // * there is no transfer-encoding=chunked already set.
  303. // So, if transfer cannot be chunked and content length is 0, we dont set a request body.
  304. if !isWebsocket && !isTransferEncodingChunked(req) && req.ContentLength == 0 {
  305. req.Body = http.NoBody
  306. }
  307. stripWebsocketUpgradeHeader(req)
  308. return req, err
  309. }
  310. func setContentLength(req *http.Request) error {
  311. var err error
  312. if contentLengthStr := req.Header.Get("Content-Length"); contentLengthStr != "" {
  313. req.ContentLength, err = strconv.ParseInt(contentLengthStr, 10, 64)
  314. }
  315. return err
  316. }
  317. func isTransferEncodingChunked(req *http.Request) bool {
  318. transferEncodingVal := req.Header.Get("Transfer-Encoding")
  319. // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding suggests that this can be a comma
  320. // separated value as well.
  321. return strings.Contains(strings.ToLower(transferEncodingVal), "chunked")
  322. }