tunnel.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912
  1. package origin
  2. import (
  3. "bufio"
  4. "context"
  5. "crypto/tls"
  6. "fmt"
  7. "io"
  8. "net"
  9. "net/http"
  10. "net/url"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/google/uuid"
  16. "github.com/pkg/errors"
  17. "github.com/prometheus/client_golang/prometheus"
  18. "golang.org/x/sync/errgroup"
  19. "github.com/cloudflare/cloudflared/buffer"
  20. "github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
  21. "github.com/cloudflare/cloudflared/cmd/cloudflared/ui"
  22. "github.com/cloudflare/cloudflared/connection"
  23. "github.com/cloudflare/cloudflared/h2mux"
  24. "github.com/cloudflare/cloudflared/logger"
  25. "github.com/cloudflare/cloudflared/signal"
  26. "github.com/cloudflare/cloudflared/tunnelrpc"
  27. "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  28. tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  29. "github.com/cloudflare/cloudflared/validation"
  30. "github.com/cloudflare/cloudflared/websocket"
  31. )
  32. const (
  33. dialTimeout = 15 * time.Second
  34. openStreamTimeout = 30 * time.Second
  35. muxerTimeout = 5 * time.Second
  36. lbProbeUserAgentPrefix = "Mozilla/5.0 (compatible; Cloudflare-Traffic-Manager/1.0; +https://www.cloudflare.com/traffic-manager/;"
  37. TagHeaderNamePrefix = "Cf-Warp-Tag-"
  38. DuplicateConnectionError = "EDUPCONN"
  39. FeatureSerializedHeaders = "serialized_headers"
  40. FeatureQuickReconnects = "quick_reconnects"
  41. )
  42. type registerRPCName string
  43. const (
  44. register registerRPCName = "register"
  45. reconnect registerRPCName = "reconnect"
  46. )
  47. type TunnelConfig struct {
  48. BuildInfo *buildinfo.BuildInfo
  49. ClientID string
  50. ClientTlsConfig *tls.Config
  51. CloseConnOnce *sync.Once // Used to close connectedSignal no more than once
  52. CompressionQuality uint64
  53. EdgeAddrs []string
  54. GracePeriod time.Duration
  55. HAConnections int
  56. HTTPTransport http.RoundTripper
  57. HeartbeatInterval time.Duration
  58. Hostname string
  59. HTTPHostHeader string
  60. IncidentLookup IncidentLookup
  61. IsAutoupdated bool
  62. IsFreeTunnel bool
  63. LBPool string
  64. Logger logger.Service
  65. TransportLogger logger.Service
  66. MaxHeartbeats uint64
  67. Metrics *TunnelMetrics
  68. MetricsUpdateFreq time.Duration
  69. NoChunkedEncoding bool
  70. OriginCert []byte
  71. ReportedVersion string
  72. Retries uint
  73. RunFromTerminal bool
  74. Tags []tunnelpogs.Tag
  75. TlsConfig *tls.Config
  76. WSGI bool
  77. // OriginUrl may not be used if a user specifies a unix socket.
  78. OriginUrl string
  79. // feature-flag to use new edge reconnect tokens
  80. UseReconnectToken bool
  81. NamedTunnel *NamedTunnelConfig
  82. ReplaceExisting bool
  83. TunnelEventChan chan<- ui.TunnelEvent
  84. }
  85. type dupConnRegisterTunnelError struct{}
  86. var errDuplicationConnection = &dupConnRegisterTunnelError{}
  87. func (e dupConnRegisterTunnelError) Error() string {
  88. return "already connected to this server, trying another address"
  89. }
  90. type muxerShutdownError struct{}
  91. func (e muxerShutdownError) Error() string {
  92. return "muxer shutdown"
  93. }
  94. // RegisterTunnel error from server
  95. type serverRegisterTunnelError struct {
  96. cause error
  97. permanent bool
  98. }
  99. func (e serverRegisterTunnelError) Error() string {
  100. return e.cause.Error()
  101. }
  102. // RegisterTunnel error from client
  103. type clientRegisterTunnelError struct {
  104. cause error
  105. }
  106. func newClientRegisterTunnelError(cause error, counter *prometheus.CounterVec, name registerRPCName) clientRegisterTunnelError {
  107. counter.WithLabelValues(cause.Error(), string(name)).Inc()
  108. return clientRegisterTunnelError{cause: cause}
  109. }
  110. func (e clientRegisterTunnelError) Error() string {
  111. return e.cause.Error()
  112. }
  113. func (c *TunnelConfig) muxerConfig(handler h2mux.MuxedStreamHandler) h2mux.MuxerConfig {
  114. return h2mux.MuxerConfig{
  115. Timeout: muxerTimeout,
  116. Handler: handler,
  117. IsClient: true,
  118. HeartbeatInterval: c.HeartbeatInterval,
  119. MaxHeartbeats: c.MaxHeartbeats,
  120. Logger: c.TransportLogger,
  121. CompressionQuality: h2mux.CompressionSetting(c.CompressionQuality),
  122. }
  123. }
  124. func (c *TunnelConfig) RegistrationOptions(connectionID uint8, OriginLocalIP string, uuid uuid.UUID) *tunnelpogs.RegistrationOptions {
  125. policy := tunnelrpc.ExistingTunnelPolicy_balance
  126. if c.HAConnections <= 1 && c.LBPool == "" {
  127. policy = tunnelrpc.ExistingTunnelPolicy_disconnect
  128. }
  129. return &tunnelpogs.RegistrationOptions{
  130. ClientID: c.ClientID,
  131. Version: c.ReportedVersion,
  132. OS: fmt.Sprintf("%s_%s", c.BuildInfo.GoOS, c.BuildInfo.GoArch),
  133. ExistingTunnelPolicy: policy,
  134. PoolName: c.LBPool,
  135. Tags: c.Tags,
  136. ConnectionID: connectionID,
  137. OriginLocalIP: OriginLocalIP,
  138. IsAutoupdated: c.IsAutoupdated,
  139. RunFromTerminal: c.RunFromTerminal,
  140. CompressionQuality: c.CompressionQuality,
  141. UUID: uuid.String(),
  142. Features: c.SupportedFeatures(),
  143. }
  144. }
  145. func (c *TunnelConfig) ConnectionOptions(originLocalAddr string, numPreviousAttempts uint8) *tunnelpogs.ConnectionOptions {
  146. // attempt to parse out origin IP, but don't fail since it's informational field
  147. host, _, _ := net.SplitHostPort(originLocalAddr)
  148. originIP := net.ParseIP(host)
  149. return &tunnelpogs.ConnectionOptions{
  150. Client: c.NamedTunnel.Client,
  151. OriginLocalIP: originIP,
  152. ReplaceExisting: c.ReplaceExisting,
  153. CompressionQuality: uint8(c.CompressionQuality),
  154. NumPreviousAttempts: numPreviousAttempts,
  155. }
  156. }
  157. func (c *TunnelConfig) SupportedFeatures() []string {
  158. features := []string{FeatureSerializedHeaders}
  159. if c.NamedTunnel == nil {
  160. features = append(features, FeatureQuickReconnects)
  161. }
  162. return features
  163. }
  164. func (c *TunnelConfig) IsTrialTunnel() bool {
  165. return c.Hostname == ""
  166. }
  167. type NamedTunnelConfig struct {
  168. Auth pogs.TunnelAuth
  169. ID uuid.UUID
  170. Client pogs.ClientInfo
  171. }
  172. func StartTunnelDaemon(ctx context.Context, config *TunnelConfig, connectedSignal *signal.Signal, cloudflaredID uuid.UUID, reconnectCh chan ReconnectSignal) error {
  173. s, err := NewSupervisor(config, cloudflaredID)
  174. if err != nil {
  175. return err
  176. }
  177. return s.Run(ctx, connectedSignal, reconnectCh)
  178. }
  179. func ServeTunnelLoop(ctx context.Context,
  180. credentialManager *reconnectCredentialManager,
  181. config *TunnelConfig,
  182. addr *net.TCPAddr,
  183. connectionIndex uint8,
  184. connectedSignal *signal.Signal,
  185. cloudflaredUUID uuid.UUID,
  186. bufferPool *buffer.Pool,
  187. reconnectCh chan ReconnectSignal,
  188. ) error {
  189. config.Metrics.incrementHaConnections()
  190. defer config.Metrics.decrementHaConnections()
  191. backoff := BackoffHandler{MaxRetries: config.Retries}
  192. connectedFuse := h2mux.NewBooleanFuse()
  193. go func() {
  194. if connectedFuse.Await() {
  195. connectedSignal.Notify()
  196. }
  197. }()
  198. // Ensure the above goroutine will terminate if we return without connecting
  199. defer connectedFuse.Fuse(false)
  200. for {
  201. err, recoverable := ServeTunnel(
  202. ctx,
  203. credentialManager,
  204. config,
  205. config.Logger,
  206. addr, connectionIndex,
  207. connectedFuse,
  208. &backoff,
  209. cloudflaredUUID,
  210. bufferPool,
  211. reconnectCh,
  212. )
  213. if recoverable {
  214. if duration, ok := backoff.GetBackoffDuration(ctx); ok {
  215. if config.TunnelEventChan != nil {
  216. config.TunnelEventChan <- ui.TunnelEvent{Index: connectionIndex, EventType: ui.Reconnecting}
  217. }
  218. config.Logger.Infof("Retrying connection %d in %s seconds", connectionIndex, duration)
  219. backoff.Backoff(ctx)
  220. continue
  221. }
  222. }
  223. return err
  224. }
  225. }
  226. func ServeTunnel(
  227. ctx context.Context,
  228. credentialManager *reconnectCredentialManager,
  229. config *TunnelConfig,
  230. logger logger.Service,
  231. addr *net.TCPAddr,
  232. connectionIndex uint8,
  233. connectedFuse *h2mux.BooleanFuse,
  234. backoff *BackoffHandler,
  235. cloudflaredUUID uuid.UUID,
  236. bufferPool *buffer.Pool,
  237. reconnectCh chan ReconnectSignal,
  238. ) (err error, recoverable bool) {
  239. // Treat panics as recoverable errors
  240. defer func() {
  241. if r := recover(); r != nil {
  242. var ok bool
  243. err, ok = r.(error)
  244. if !ok {
  245. err = fmt.Errorf("ServeTunnel: %v", r)
  246. }
  247. recoverable = true
  248. }
  249. }()
  250. // If launch-ui flag is set, send disconnect msg
  251. if config.TunnelEventChan != nil {
  252. defer func() {
  253. config.TunnelEventChan <- ui.TunnelEvent{Index: connectionIndex, EventType: ui.Disconnected}
  254. }()
  255. }
  256. connectionTag := uint8ToString(connectionIndex)
  257. // Returns error from parsing the origin URL or handshake errors
  258. handler, originLocalAddr, err := NewTunnelHandler(ctx, config, addr, connectionIndex, bufferPool)
  259. if err != nil {
  260. switch err.(type) {
  261. case connection.DialError:
  262. logger.Errorf("Connection %d unable to dial edge: %s", connectionIndex, err)
  263. case h2mux.MuxerHandshakeError:
  264. logger.Errorf("Connection %d handshake with edge server failed: %s", connectionIndex, err)
  265. default:
  266. logger.Errorf("Connection %d failed: %s", connectionIndex, err)
  267. return err, false
  268. }
  269. return err, true
  270. }
  271. errGroup, serveCtx := errgroup.WithContext(ctx)
  272. errGroup.Go(func() (err error) {
  273. defer func() {
  274. if err == nil {
  275. connectedFuse.Fuse(true)
  276. backoff.SetGracePeriod()
  277. }
  278. }()
  279. if config.NamedTunnel != nil {
  280. return RegisterConnection(ctx, handler.muxer, config, connectionIndex, originLocalAddr, uint8(backoff.retries))
  281. }
  282. if config.UseReconnectToken && connectedFuse.Value() {
  283. err := ReconnectTunnel(serveCtx, handler.muxer, config, logger, connectionIndex, originLocalAddr, cloudflaredUUID, credentialManager)
  284. if err == nil {
  285. return nil
  286. }
  287. // log errors and proceed to RegisterTunnel
  288. logger.Errorf("Couldn't reconnect connection %d. Reregistering it instead. Error was: %v", connectionIndex, err)
  289. }
  290. return RegisterTunnel(serveCtx, credentialManager, handler.muxer, config, logger, connectionIndex, originLocalAddr, cloudflaredUUID)
  291. })
  292. errGroup.Go(func() error {
  293. updateMetricsTickC := time.Tick(config.MetricsUpdateFreq)
  294. for {
  295. select {
  296. case <-serveCtx.Done():
  297. // UnregisterTunnel blocks until the RPC call returns
  298. if connectedFuse.Value() {
  299. if config.NamedTunnel != nil {
  300. _ = UnregisterConnection(ctx, handler.muxer, config)
  301. } else {
  302. _ = UnregisterTunnel(handler.muxer, config.GracePeriod, config.TransportLogger)
  303. }
  304. }
  305. handler.muxer.Shutdown()
  306. return nil
  307. case <-updateMetricsTickC:
  308. handler.UpdateMetrics(connectionTag)
  309. }
  310. }
  311. })
  312. errGroup.Go(func() error {
  313. for {
  314. select {
  315. case reconnect := <-reconnectCh:
  316. return &reconnect
  317. case <-serveCtx.Done():
  318. return nil
  319. }
  320. }
  321. })
  322. errGroup.Go(func() error {
  323. // All routines should stop when muxer finish serving. When muxer is shutdown
  324. // gracefully, it doesn't return an error, so we need to return errMuxerShutdown
  325. // here to notify other routines to stop
  326. err := handler.muxer.Serve(serveCtx)
  327. if err == nil {
  328. return muxerShutdownError{}
  329. }
  330. return err
  331. })
  332. err = errGroup.Wait()
  333. if err != nil {
  334. switch err := err.(type) {
  335. case *dupConnRegisterTunnelError:
  336. // don't retry this connection anymore, let supervisor pick new a address
  337. return err, false
  338. case *serverRegisterTunnelError:
  339. logger.Errorf("Register tunnel error from server side: %s", err.cause)
  340. // Don't send registration error return from server to Sentry. They are
  341. // logged on server side
  342. if incidents := config.IncidentLookup.ActiveIncidents(); len(incidents) > 0 {
  343. logger.Error(activeIncidentsMsg(incidents))
  344. }
  345. return err.cause, !err.permanent
  346. case *clientRegisterTunnelError:
  347. logger.Errorf("Register tunnel error on client side: %s", err.cause)
  348. return err, true
  349. case *muxerShutdownError:
  350. logger.Info("Muxer shutdown")
  351. return err, true
  352. case *ReconnectSignal:
  353. logger.Infof("Restarting connection %d due to reconnect signal in %d seconds", connectionIndex, err.Delay)
  354. err.DelayBeforeReconnect()
  355. return err, true
  356. default:
  357. if err == context.Canceled {
  358. logger.Debugf("Serve tunnel error: %s", err)
  359. return err, false
  360. }
  361. logger.Errorf("Serve tunnel error: %s", err)
  362. return err, true
  363. }
  364. }
  365. return nil, true
  366. }
  367. func RegisterConnection(
  368. ctx context.Context,
  369. muxer *h2mux.Muxer,
  370. config *TunnelConfig,
  371. connectionIndex uint8,
  372. originLocalAddr string,
  373. numPreviousAttempts uint8,
  374. ) error {
  375. const registerConnection = "registerConnection"
  376. config.TransportLogger.Debug("initiating RPC stream for RegisterConnection")
  377. rpc, err := connection.NewRPCClient(ctx, muxer, config.TransportLogger, openStreamTimeout)
  378. if err != nil {
  379. // RPC stream open error
  380. return newClientRegisterTunnelError(err, config.Metrics.rpcFail, registerConnection)
  381. }
  382. defer rpc.Close()
  383. conn, err := rpc.RegisterConnection(
  384. ctx,
  385. config.NamedTunnel.Auth,
  386. config.NamedTunnel.ID,
  387. connectionIndex,
  388. config.ConnectionOptions(originLocalAddr, numPreviousAttempts),
  389. )
  390. if err != nil {
  391. if err.Error() == DuplicateConnectionError {
  392. config.Metrics.regFail.WithLabelValues("dup_edge_conn", registerConnection).Inc()
  393. return errDuplicationConnection
  394. }
  395. config.Metrics.regFail.WithLabelValues("server_error", registerConnection).Inc()
  396. return serverRegistrationErrorFromRPC(err)
  397. }
  398. config.Metrics.regSuccess.WithLabelValues(registerConnection).Inc()
  399. config.Logger.Infof("Connection %d registered with %s using ID %s", connectionIndex, conn.Location, conn.UUID)
  400. // If launch-ui flag is set, send connect msg
  401. if config.TunnelEventChan != nil {
  402. config.TunnelEventChan <- ui.TunnelEvent{Index: connectionIndex, EventType: ui.Connected, Location: conn.Location}
  403. }
  404. return nil
  405. }
  406. func serverRegistrationErrorFromRPC(err error) *serverRegisterTunnelError {
  407. if retryable, ok := err.(*tunnelpogs.RetryableError); ok {
  408. return &serverRegisterTunnelError{
  409. cause: retryable.Unwrap(),
  410. permanent: false,
  411. }
  412. }
  413. return &serverRegisterTunnelError{
  414. cause: err,
  415. permanent: true,
  416. }
  417. }
  418. func UnregisterConnection(
  419. ctx context.Context,
  420. muxer *h2mux.Muxer,
  421. config *TunnelConfig,
  422. ) error {
  423. config.TransportLogger.Debug("initiating RPC stream for UnregisterConnection")
  424. rpc, err := connection.NewRPCClient(ctx, muxer, config.TransportLogger, openStreamTimeout)
  425. if err != nil {
  426. // RPC stream open error
  427. return newClientRegisterTunnelError(err, config.Metrics.rpcFail, register)
  428. }
  429. defer rpc.Close()
  430. return rpc.UnregisterConnection(ctx)
  431. }
  432. func RegisterTunnel(
  433. ctx context.Context,
  434. credentialManager *reconnectCredentialManager,
  435. muxer *h2mux.Muxer,
  436. config *TunnelConfig,
  437. logger logger.Service,
  438. connectionID uint8,
  439. originLocalIP string,
  440. uuid uuid.UUID,
  441. ) error {
  442. config.TransportLogger.Debug("initiating RPC stream to register")
  443. if config.TunnelEventChan != nil {
  444. config.TunnelEventChan <- ui.TunnelEvent{EventType: ui.RegisteringTunnel}
  445. }
  446. tunnelServer, err := connection.NewRPCClient(ctx, muxer, config.TransportLogger, openStreamTimeout)
  447. if err != nil {
  448. // RPC stream open error
  449. return newClientRegisterTunnelError(err, config.Metrics.rpcFail, register)
  450. }
  451. defer tunnelServer.Close()
  452. // Request server info without blocking tunnel registration; must use capnp library directly.
  453. serverInfoPromise := tunnelrpc.TunnelServer{Client: tunnelServer.Client}.GetServerInfo(ctx, func(tunnelrpc.TunnelServer_getServerInfo_Params) error {
  454. return nil
  455. })
  456. LogServerInfo(serverInfoPromise.Result(), connectionID, config.Metrics, logger, config.TunnelEventChan)
  457. registration := tunnelServer.RegisterTunnel(
  458. ctx,
  459. config.OriginCert,
  460. config.Hostname,
  461. config.RegistrationOptions(connectionID, originLocalIP, uuid),
  462. )
  463. if registrationErr := registration.DeserializeError(); registrationErr != nil {
  464. // RegisterTunnel RPC failure
  465. return processRegisterTunnelError(registrationErr, config.Metrics, register)
  466. }
  467. // Send free tunnel URL to UI
  468. if config.TunnelEventChan != nil {
  469. config.TunnelEventChan <- ui.TunnelEvent{EventType: ui.SetUrl, Url: registration.Url}
  470. }
  471. credentialManager.SetEventDigest(connectionID, registration.EventDigest)
  472. return processRegistrationSuccess(config, logger, connectionID, registration, register, credentialManager)
  473. }
  474. func processRegistrationSuccess(
  475. config *TunnelConfig,
  476. logger logger.Service,
  477. connectionID uint8,
  478. registration *tunnelpogs.TunnelRegistration,
  479. name registerRPCName,
  480. credentialManager *reconnectCredentialManager,
  481. ) error {
  482. for _, logLine := range registration.LogLines {
  483. logger.Info(logLine)
  484. }
  485. if registration.TunnelID != "" {
  486. config.Metrics.tunnelsHA.AddTunnelID(connectionID, registration.TunnelID)
  487. logger.Infof("Each HA connection's tunnel IDs: %v", config.Metrics.tunnelsHA.String())
  488. }
  489. // Print out the user's trial zone URL in a nice box (if they requested and got one and UI flag is not set)
  490. if config.TunnelEventChan == nil {
  491. if config.IsTrialTunnel() {
  492. if registrationURL, err := url.Parse(registration.Url); err == nil {
  493. for _, line := range asciiBox(trialZoneMsg(registrationURL.String()), 2) {
  494. logger.Info(line)
  495. }
  496. } else {
  497. logger.Error("Failed to connect tunnel, please try again.")
  498. return fmt.Errorf("empty URL in response from Cloudflare edge")
  499. }
  500. }
  501. }
  502. credentialManager.SetConnDigest(connectionID, registration.ConnDigest)
  503. config.Metrics.userHostnamesCounts.WithLabelValues(registration.Url).Inc()
  504. logger.Infof("Route propagating, it may take up to 1 minute for your new route to become functional")
  505. config.Metrics.regSuccess.WithLabelValues(string(name)).Inc()
  506. return nil
  507. }
  508. func processRegisterTunnelError(err tunnelpogs.TunnelRegistrationError, metrics *TunnelMetrics, name registerRPCName) error {
  509. if err.Error() == DuplicateConnectionError {
  510. metrics.regFail.WithLabelValues("dup_edge_conn", string(name)).Inc()
  511. return errDuplicationConnection
  512. }
  513. metrics.regFail.WithLabelValues("server_error", string(name)).Inc()
  514. return serverRegisterTunnelError{
  515. cause: err,
  516. permanent: err.IsPermanent(),
  517. }
  518. }
  519. func UnregisterTunnel(muxer *h2mux.Muxer, gracePeriod time.Duration, logger logger.Service) error {
  520. logger.Debug("initiating RPC stream to unregister")
  521. ctx := context.Background()
  522. tunnelServer, err := connection.NewRPCClient(ctx, muxer, logger, openStreamTimeout)
  523. if err != nil {
  524. // RPC stream open error
  525. return err
  526. }
  527. defer tunnelServer.Close()
  528. // gracePeriod is encoded in int64 using capnproto
  529. return tunnelServer.UnregisterTunnel(ctx, gracePeriod.Nanoseconds())
  530. }
  531. func LogServerInfo(
  532. promise tunnelrpc.ServerInfo_Promise,
  533. connectionID uint8,
  534. metrics *TunnelMetrics,
  535. logger logger.Service,
  536. tunnelEventChan chan<- ui.TunnelEvent,
  537. ) {
  538. serverInfoMessage, err := promise.Struct()
  539. if err != nil {
  540. logger.Errorf("Failed to retrieve server information: %s", err)
  541. return
  542. }
  543. serverInfo, err := tunnelpogs.UnmarshalServerInfo(serverInfoMessage)
  544. if err != nil {
  545. logger.Errorf("Failed to retrieve server information: %s", err)
  546. return
  547. }
  548. // If launch-ui flag is set, send connect msg
  549. if tunnelEventChan != nil {
  550. tunnelEventChan <- ui.TunnelEvent{Index: connectionID, EventType: ui.Connected, Location: serverInfo.LocationName}
  551. }
  552. logger.Infof("Connected to %s", serverInfo.LocationName)
  553. metrics.registerServerLocation(uint8ToString(connectionID), serverInfo.LocationName)
  554. }
  555. type TunnelHandler struct {
  556. originUrl string
  557. httpHostHeader string
  558. muxer *h2mux.Muxer
  559. httpClient http.RoundTripper
  560. tlsConfig *tls.Config
  561. tags []tunnelpogs.Tag
  562. metrics *TunnelMetrics
  563. // connectionID is only used by metrics, and prometheus requires labels to be string
  564. connectionID string
  565. logger logger.Service
  566. noChunkedEncoding bool
  567. bufferPool *buffer.Pool
  568. }
  569. // NewTunnelHandler returns a TunnelHandler, origin LAN IP and error
  570. func NewTunnelHandler(ctx context.Context,
  571. config *TunnelConfig,
  572. addr *net.TCPAddr,
  573. connectionID uint8,
  574. bufferPool *buffer.Pool,
  575. ) (*TunnelHandler, string, error) {
  576. originURL, err := validation.ValidateUrl(config.OriginUrl)
  577. if err != nil {
  578. return nil, "", fmt.Errorf("unable to parse origin URL %#v", originURL)
  579. }
  580. h := &TunnelHandler{
  581. originUrl: originURL,
  582. httpHostHeader: config.HTTPHostHeader,
  583. httpClient: config.HTTPTransport,
  584. tlsConfig: config.ClientTlsConfig,
  585. tags: config.Tags,
  586. metrics: config.Metrics,
  587. connectionID: uint8ToString(connectionID),
  588. logger: config.Logger,
  589. noChunkedEncoding: config.NoChunkedEncoding,
  590. bufferPool: bufferPool,
  591. }
  592. if h.httpClient == nil {
  593. h.httpClient = http.DefaultTransport
  594. }
  595. edgeConn, err := connection.DialEdge(ctx, dialTimeout, config.TlsConfig, addr)
  596. if err != nil {
  597. return nil, "", err
  598. }
  599. // Establish a muxed connection with the edge
  600. // Client mux handshake with agent server
  601. h.muxer, err = h2mux.Handshake(edgeConn, edgeConn, config.muxerConfig(h), h.metrics.activeStreams)
  602. if err != nil {
  603. return nil, "", errors.Wrap(err, "h2mux handshake with edge error")
  604. }
  605. return h, edgeConn.LocalAddr().String(), nil
  606. }
  607. func (h *TunnelHandler) AppendTagHeaders(r *http.Request) {
  608. for _, tag := range h.tags {
  609. r.Header.Add(TagHeaderNamePrefix+tag.Name, tag.Value)
  610. }
  611. }
  612. func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
  613. h.metrics.incrementRequests(h.connectionID)
  614. defer h.metrics.decrementConcurrentRequests(h.connectionID)
  615. req, reqErr := h.createRequest(stream)
  616. if reqErr != nil {
  617. h.writeErrorResponse(stream, reqErr)
  618. return reqErr
  619. }
  620. cfRay := findCfRayHeader(req)
  621. lbProbe := isLBProbeRequest(req)
  622. h.logRequest(req, cfRay, lbProbe)
  623. var resp *http.Response
  624. var respErr error
  625. if websocket.IsWebSocketUpgrade(req) {
  626. resp, respErr = h.serveWebsocket(stream, req)
  627. } else {
  628. resp, respErr = h.serveHTTP(stream, req)
  629. }
  630. if respErr != nil {
  631. h.writeErrorResponse(stream, respErr)
  632. return respErr
  633. }
  634. h.logResponseOk(resp, cfRay, lbProbe)
  635. return nil
  636. }
  637. func (h *TunnelHandler) createRequest(stream *h2mux.MuxedStream) (*http.Request, error) {
  638. req, err := http.NewRequest("GET", h.originUrl, h2mux.MuxedStreamReader{MuxedStream: stream})
  639. if err != nil {
  640. return nil, errors.Wrap(err, "Unexpected error from http.NewRequest")
  641. }
  642. err = h2mux.H2RequestHeadersToH1Request(stream.Headers, req)
  643. if err != nil {
  644. return nil, errors.Wrap(err, "invalid request received")
  645. }
  646. h.AppendTagHeaders(req)
  647. return req, nil
  648. }
  649. func (h *TunnelHandler) serveWebsocket(stream *h2mux.MuxedStream, req *http.Request) (*http.Response, error) {
  650. if h.httpHostHeader != "" {
  651. req.Header.Set("Host", h.httpHostHeader)
  652. req.Host = h.httpHostHeader
  653. }
  654. conn, response, err := websocket.ClientConnect(req, h.tlsConfig)
  655. if err != nil {
  656. return nil, err
  657. }
  658. defer conn.Close()
  659. err = stream.WriteHeaders(h2mux.H1ResponseToH2ResponseHeaders(response))
  660. if err != nil {
  661. return nil, errors.Wrap(err, "Error writing response header")
  662. }
  663. // Copy to/from stream to the undelying connection. Use the underlying
  664. // connection because cloudflared doesn't operate on the message themselves
  665. websocket.Stream(conn.UnderlyingConn(), stream)
  666. return response, nil
  667. }
  668. func (h *TunnelHandler) serveHTTP(stream *h2mux.MuxedStream, req *http.Request) (*http.Response, error) {
  669. // Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
  670. if h.noChunkedEncoding {
  671. req.TransferEncoding = []string{"gzip", "deflate"}
  672. cLength, err := strconv.Atoi(req.Header.Get("Content-Length"))
  673. if err == nil {
  674. req.ContentLength = int64(cLength)
  675. }
  676. }
  677. // Request origin to keep connection alive to improve performance
  678. req.Header.Set("Connection", "keep-alive")
  679. if h.httpHostHeader != "" {
  680. req.Header.Set("Host", h.httpHostHeader)
  681. req.Host = h.httpHostHeader
  682. }
  683. response, err := h.httpClient.RoundTrip(req)
  684. if err != nil {
  685. return nil, errors.Wrap(err, "Error proxying request to origin")
  686. }
  687. defer response.Body.Close()
  688. headers := h2mux.H1ResponseToH2ResponseHeaders(response)
  689. headers = append(headers, h2mux.CreateResponseMetaHeader(h2mux.ResponseMetaHeaderField, h2mux.ResponseSourceOrigin))
  690. err = stream.WriteHeaders(headers)
  691. if err != nil {
  692. return nil, errors.Wrap(err, "Error writing response header")
  693. }
  694. if h.isEventStream(response) {
  695. h.writeEventStream(stream, response.Body)
  696. } else {
  697. // Use CopyBuffer, because Copy only allocates a 32KiB buffer, and cross-stream
  698. // compression generates dictionary on first write
  699. buf := h.bufferPool.Get()
  700. defer h.bufferPool.Put(buf)
  701. io.CopyBuffer(stream, response.Body, buf)
  702. }
  703. return response, nil
  704. }
  705. func (h *TunnelHandler) writeEventStream(stream *h2mux.MuxedStream, responseBody io.ReadCloser) {
  706. reader := bufio.NewReader(responseBody)
  707. for {
  708. line, err := reader.ReadBytes('\n')
  709. if err != nil {
  710. break
  711. }
  712. stream.Write(line)
  713. }
  714. }
  715. func (h *TunnelHandler) isEventStream(response *http.Response) bool {
  716. if response.Header.Get("content-type") == "text/event-stream" {
  717. h.logger.Debug("Detected Server-Side Events from Origin")
  718. return true
  719. }
  720. return false
  721. }
  722. func (h *TunnelHandler) writeErrorResponse(stream *h2mux.MuxedStream, err error) {
  723. h.logger.Errorf("HTTP request error: %s", err)
  724. stream.WriteHeaders([]h2mux.Header{
  725. {Name: ":status", Value: "502"},
  726. h2mux.CreateResponseMetaHeader(h2mux.ResponseMetaHeaderField, h2mux.ResponseSourceCloudflared),
  727. })
  728. stream.Write([]byte("502 Bad Gateway"))
  729. h.metrics.incrementResponses(h.connectionID, "502")
  730. }
  731. func (h *TunnelHandler) logRequest(req *http.Request, cfRay string, lbProbe bool) {
  732. logger := h.logger
  733. if cfRay != "" {
  734. logger.Debugf("CF-RAY: %s %s %s %s", cfRay, req.Method, req.URL, req.Proto)
  735. } else if lbProbe {
  736. logger.Debugf("CF-RAY: %s Load Balancer health check %s %s %s", cfRay, req.Method, req.URL, req.Proto)
  737. } else {
  738. logger.Infof("CF-RAY: %s All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s ", cfRay, req.Method, req.URL, req.Proto)
  739. }
  740. logger.Debugf("CF-RAY: %s Request Headers %+v", cfRay, req.Header)
  741. if contentLen := req.ContentLength; contentLen == -1 {
  742. logger.Debugf("CF-RAY: %s Request Content length unknown", cfRay)
  743. } else {
  744. logger.Debugf("CF-RAY: %s Request content length %d", cfRay, contentLen)
  745. }
  746. }
  747. func (h *TunnelHandler) logResponseOk(r *http.Response, cfRay string, lbProbe bool) {
  748. h.metrics.incrementResponses(h.connectionID, "200")
  749. logger := h.logger
  750. if cfRay != "" {
  751. logger.Debugf("CF-RAY: %s %s", cfRay, r.Status)
  752. } else if lbProbe {
  753. logger.Debugf("Response to Load Balancer health check %s", r.Status)
  754. } else {
  755. logger.Infof("%s", r.Status)
  756. }
  757. logger.Debugf("CF-RAY: %s Response Headers %+v", cfRay, r.Header)
  758. if contentLen := r.ContentLength; contentLen == -1 {
  759. logger.Debugf("CF-RAY: %s Response content length unknown", cfRay)
  760. } else {
  761. logger.Debugf("CF-RAY: %s Response content length %d", cfRay, contentLen)
  762. }
  763. }
  764. func (h *TunnelHandler) UpdateMetrics(connectionID string) {
  765. h.metrics.updateMuxerMetrics(connectionID, h.muxer.Metrics())
  766. }
  767. func uint8ToString(input uint8) string {
  768. return strconv.FormatUint(uint64(input), 10)
  769. }
  770. // Print out the given lines in a nice ASCII box.
  771. func asciiBox(lines []string, padding int) (box []string) {
  772. maxLen := maxLen(lines)
  773. spacer := strings.Repeat(" ", padding)
  774. border := "+" + strings.Repeat("-", maxLen+(padding*2)) + "+"
  775. box = append(box, border)
  776. for _, line := range lines {
  777. box = append(box, "|"+spacer+line+strings.Repeat(" ", maxLen-len(line))+spacer+"|")
  778. }
  779. box = append(box, border)
  780. return
  781. }
  782. func maxLen(lines []string) int {
  783. max := 0
  784. for _, line := range lines {
  785. if len(line) > max {
  786. max = len(line)
  787. }
  788. }
  789. return max
  790. }
  791. func trialZoneMsg(url string) []string {
  792. return []string{
  793. "Your free tunnel has started! Visit it:",
  794. " " + url,
  795. }
  796. }
  797. func activeIncidentsMsg(incidents []Incident) string {
  798. preamble := "There is an active Cloudflare incident that may be related:"
  799. if len(incidents) > 1 {
  800. preamble = "There are active Cloudflare incidents that may be related:"
  801. }
  802. incidentStrings := []string{}
  803. for _, incident := range incidents {
  804. incidentString := fmt.Sprintf("%s (%s)", incident.Name, incident.URL())
  805. incidentStrings = append(incidentStrings, incidentString)
  806. }
  807. return preamble + " " + strings.Join(incidentStrings, "; ")
  808. }
  809. func findCfRayHeader(h1 *http.Request) string {
  810. return h1.Header.Get("Cf-Ray")
  811. }
  812. func isLBProbeRequest(req *http.Request) bool {
  813. return strings.HasPrefix(req.UserAgent(), lbProbeUserAgentPrefix)
  814. }