connectionrpc.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package pogs
  2. import (
  3. "context"
  4. "errors"
  5. "net"
  6. "time"
  7. "github.com/google/uuid"
  8. capnp "zombiezen.com/go/capnproto2"
  9. "zombiezen.com/go/capnproto2/pogs"
  10. "zombiezen.com/go/capnproto2/rpc"
  11. "zombiezen.com/go/capnproto2/server"
  12. "github.com/cloudflare/cloudflared/tunnelrpc"
  13. )
  14. type RegistrationServer interface {
  15. RegisterConnection(ctx context.Context, auth TunnelAuth, tunnelID uuid.UUID, connIndex byte, options *ConnectionOptions) (*ConnectionDetails, error)
  16. UnregisterConnection(ctx context.Context)
  17. }
  18. type RegistrationServer_PogsImpl struct {
  19. impl RegistrationServer
  20. }
  21. func RegistrationServer_ServerToClient(s RegistrationServer) tunnelrpc.RegistrationServer {
  22. return tunnelrpc.RegistrationServer_ServerToClient(RegistrationServer_PogsImpl{s})
  23. }
  24. func (i RegistrationServer_PogsImpl) RegisterConnection(p tunnelrpc.RegistrationServer_registerConnection) error {
  25. server.Ack(p.Options)
  26. auth, err := p.Params.Auth()
  27. if err != nil {
  28. return err
  29. }
  30. var pogsAuth TunnelAuth
  31. err = pogsAuth.UnmarshalCapnproto(auth)
  32. if err != nil {
  33. return err
  34. }
  35. uuidBytes, err := p.Params.TunnelId()
  36. if err != nil {
  37. return err
  38. }
  39. tunnelID, err := uuid.FromBytes(uuidBytes)
  40. if err != nil {
  41. return err
  42. }
  43. connIndex := p.Params.ConnIndex()
  44. options, err := p.Params.Options()
  45. if err != nil {
  46. return err
  47. }
  48. var pogsOptions ConnectionOptions
  49. err = pogsOptions.UnmarshalCapnproto(options)
  50. if err != nil {
  51. return err
  52. }
  53. connDetails, callError := i.impl.RegisterConnection(p.Ctx, pogsAuth, tunnelID, connIndex, &pogsOptions)
  54. resp, err := p.Results.NewResult()
  55. if err != nil {
  56. return err
  57. }
  58. if callError != nil {
  59. if connError, err := resp.Result().NewError(); err != nil {
  60. return err
  61. } else {
  62. return MarshalError(connError, callError)
  63. }
  64. }
  65. if details, err := resp.Result().NewConnectionDetails(); err != nil {
  66. return err
  67. } else {
  68. return connDetails.MarshalCapnproto(details)
  69. }
  70. }
  71. func (i RegistrationServer_PogsImpl) UnregisterConnection(p tunnelrpc.RegistrationServer_unregisterConnection) error {
  72. server.Ack(p.Options)
  73. i.impl.UnregisterConnection(p.Ctx)
  74. return nil
  75. }
  76. type RegistrationServer_PogsClient struct {
  77. Client capnp.Client
  78. Conn *rpc.Conn
  79. }
  80. func (c RegistrationServer_PogsClient) Close() error {
  81. c.Client.Close()
  82. return c.Conn.Close()
  83. }
  84. func (c RegistrationServer_PogsClient) RegisterConnection(ctx context.Context, auth TunnelAuth, tunnelID uuid.UUID, connIndex byte, options *ConnectionOptions) (*ConnectionDetails, error) {
  85. client := tunnelrpc.TunnelServer{Client: c.Client}
  86. promise := client.RegisterConnection(ctx, func(p tunnelrpc.RegistrationServer_registerConnection_Params) error {
  87. tunnelAuth, err := p.NewAuth()
  88. if err != nil {
  89. return err
  90. }
  91. if err = auth.MarshalCapnproto(tunnelAuth); err != nil {
  92. return err
  93. }
  94. err = p.SetAuth(tunnelAuth)
  95. if err != nil {
  96. return err
  97. }
  98. err = p.SetTunnelId(tunnelID[:])
  99. if err != nil {
  100. return err
  101. }
  102. p.SetConnIndex(connIndex)
  103. connectionOptions, err := p.NewOptions()
  104. if err != nil {
  105. return err
  106. }
  107. err = options.MarshalCapnproto(connectionOptions)
  108. if err != nil {
  109. return err
  110. }
  111. return nil
  112. })
  113. response, err := promise.Result().Struct()
  114. if err != nil {
  115. return nil, wrapRPCError(err)
  116. }
  117. result := response.Result()
  118. switch result.Which() {
  119. case tunnelrpc.ConnectionResponse_result_Which_error:
  120. resultError, err := result.Error()
  121. if err != nil {
  122. return nil, wrapRPCError(err)
  123. }
  124. cause, err := resultError.Cause()
  125. if err != nil {
  126. return nil, wrapRPCError(err)
  127. }
  128. err = errors.New(cause)
  129. if resultError.ShouldRetry() {
  130. err = RetryErrorAfter(err, time.Duration(resultError.RetryAfter()))
  131. }
  132. return nil, err
  133. case tunnelrpc.ConnectionResponse_result_Which_connectionDetails:
  134. connDetails, err := result.ConnectionDetails()
  135. if err != nil {
  136. return nil, wrapRPCError(err)
  137. }
  138. details := new(ConnectionDetails)
  139. if err = details.UnmarshalCapnproto(connDetails); err != nil {
  140. return nil, wrapRPCError(err)
  141. }
  142. return details, nil
  143. }
  144. return nil, newRPCError("unknown result which %d", result.Which())
  145. }
  146. func (c RegistrationServer_PogsClient) UnregisterConnection(ctx context.Context) error {
  147. client := tunnelrpc.TunnelServer{Client: c.Client}
  148. promise := client.UnregisterConnection(ctx, func(p tunnelrpc.RegistrationServer_unregisterConnection_Params) error {
  149. return nil
  150. })
  151. _, err := promise.Struct()
  152. if err != nil {
  153. return wrapRPCError(err)
  154. }
  155. return nil
  156. }
  157. type ClientInfo struct {
  158. ClientID []byte `capnp:"clientId"` // must be a slice for capnp compatibility
  159. Features []string
  160. Version string
  161. Arch string
  162. }
  163. type ConnectionOptions struct {
  164. Client ClientInfo
  165. OriginLocalIP net.IP `capnp:"originLocalIp"`
  166. ReplaceExisting bool
  167. CompressionQuality uint8
  168. NumPreviousAttempts uint8
  169. }
  170. type TunnelAuth struct {
  171. AccountTag string
  172. TunnelSecret []byte
  173. }
  174. func (p *ConnectionOptions) MarshalCapnproto(s tunnelrpc.ConnectionOptions) error {
  175. return pogs.Insert(tunnelrpc.ConnectionOptions_TypeID, s.Struct, p)
  176. }
  177. func (p *ConnectionOptions) UnmarshalCapnproto(s tunnelrpc.ConnectionOptions) error {
  178. return pogs.Extract(p, tunnelrpc.ConnectionOptions_TypeID, s.Struct)
  179. }
  180. func (a *TunnelAuth) MarshalCapnproto(s tunnelrpc.TunnelAuth) error {
  181. return pogs.Insert(tunnelrpc.TunnelAuth_TypeID, s.Struct, a)
  182. }
  183. func (a *TunnelAuth) UnmarshalCapnproto(s tunnelrpc.TunnelAuth) error {
  184. return pogs.Extract(a, tunnelrpc.TunnelAuth_TypeID, s.Struct)
  185. }
  186. type ConnectionDetails struct {
  187. UUID uuid.UUID
  188. Location string
  189. }
  190. func (details *ConnectionDetails) MarshalCapnproto(s tunnelrpc.ConnectionDetails) error {
  191. if err := s.SetUuid(details.UUID[:]); err != nil {
  192. return err
  193. }
  194. if err := s.SetLocationName(details.Location); err != nil {
  195. return err
  196. }
  197. return nil
  198. }
  199. func (details *ConnectionDetails) UnmarshalCapnproto(s tunnelrpc.ConnectionDetails) error {
  200. uuidBytes, err := s.Uuid()
  201. if err != nil {
  202. return err
  203. }
  204. details.UUID, err = uuid.FromBytes(uuidBytes)
  205. if err != nil {
  206. return err
  207. }
  208. details.Location, err = s.LocationName()
  209. if err != nil {
  210. return err
  211. }
  212. return err
  213. }
  214. func MarshalError(s tunnelrpc.ConnectionError, err error) error {
  215. if err := s.SetCause(err.Error()); err != nil {
  216. return err
  217. }
  218. if retryableErr, ok := err.(*RetryableError); ok {
  219. s.SetShouldRetry(true)
  220. s.SetRetryAfter(int64(retryableErr.Delay))
  221. }
  222. return nil
  223. }