webrtc.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package lib
  2. import (
  3. "bytes"
  4. "errors"
  5. "io"
  6. "log"
  7. "sync"
  8. "time"
  9. "github.com/dchest/uniuri"
  10. "github.com/keroserene/go-webrtc"
  11. )
  12. // Remote WebRTC peer.
  13. // Implements the |Snowflake| interface, which includes
  14. // |io.ReadWriter|, |Resetter|, and |Connector|.
  15. //
  16. // Handles preparation of go-webrtc PeerConnection. Only ever has
  17. // one DataChannel.
  18. type WebRTCPeer struct {
  19. id string
  20. config *webrtc.Configuration
  21. pc *webrtc.PeerConnection
  22. transport SnowflakeDataChannel // Holds the WebRTC DataChannel.
  23. broker *BrokerChannel
  24. offerChannel chan *webrtc.SessionDescription
  25. answerChannel chan *webrtc.SessionDescription
  26. errorChannel chan error
  27. recvPipe *io.PipeReader
  28. writePipe *io.PipeWriter
  29. lastReceive time.Time
  30. buffer bytes.Buffer
  31. reset chan struct{}
  32. closed bool
  33. lock sync.Mutex // Synchronization for DataChannel destruction
  34. once sync.Once // Synchronization for PeerConnection destruction
  35. BytesLogger
  36. }
  37. // Construct a WebRTC PeerConnection.
  38. func NewWebRTCPeer(config *webrtc.Configuration,
  39. broker *BrokerChannel) *WebRTCPeer {
  40. connection := new(WebRTCPeer)
  41. connection.id = "snowflake-" + uniuri.New()
  42. connection.config = config
  43. connection.broker = broker
  44. connection.offerChannel = make(chan *webrtc.SessionDescription, 1)
  45. connection.answerChannel = make(chan *webrtc.SessionDescription, 1)
  46. // Error channel is mostly for reporting during the initial SDP offer
  47. // creation & local description setting, which happens asynchronously.
  48. connection.errorChannel = make(chan error, 1)
  49. connection.reset = make(chan struct{}, 1)
  50. // Override with something that's not NullLogger to have real logging.
  51. connection.BytesLogger = &BytesNullLogger{}
  52. // Pipes remain the same even when DataChannel gets switched.
  53. connection.recvPipe, connection.writePipe = io.Pipe()
  54. return connection
  55. }
  56. // Read bytes from local SOCKS.
  57. // As part of |io.ReadWriter|
  58. func (c *WebRTCPeer) Read(b []byte) (int, error) {
  59. return c.recvPipe.Read(b)
  60. }
  61. // Writes bytes out to remote WebRTC.
  62. // As part of |io.ReadWriter|
  63. func (c *WebRTCPeer) Write(b []byte) (int, error) {
  64. c.lock.Lock()
  65. defer c.lock.Unlock()
  66. c.BytesLogger.AddOutbound(len(b))
  67. // TODO: Buffering could be improved / separated out of WebRTCPeer.
  68. if nil == c.transport {
  69. log.Printf("Buffered %d bytes --> WebRTC", len(b))
  70. c.buffer.Write(b)
  71. } else {
  72. c.transport.Send(b)
  73. }
  74. return len(b), nil
  75. }
  76. // As part of |Snowflake|
  77. func (c *WebRTCPeer) Close() error {
  78. c.once.Do(func() {
  79. c.closed = true
  80. c.cleanup()
  81. c.Reset()
  82. log.Printf("WebRTC: Closing")
  83. })
  84. return nil
  85. }
  86. // As part of |Resetter|
  87. func (c *WebRTCPeer) Reset() {
  88. if nil == c.reset {
  89. return
  90. }
  91. c.reset <- struct{}{}
  92. }
  93. // As part of |Resetter|
  94. func (c *WebRTCPeer) WaitForReset() { <-c.reset }
  95. // Prevent long-lived broken remotes.
  96. // Should also update the DataChannel in underlying go-webrtc's to make Closes
  97. // more immediate / responsive.
  98. func (c *WebRTCPeer) checkForStaleness() {
  99. c.lastReceive = time.Now()
  100. for {
  101. if c.closed {
  102. return
  103. }
  104. if time.Since(c.lastReceive).Seconds() > SnowflakeTimeout {
  105. log.Println("WebRTC: No messages received for", SnowflakeTimeout,
  106. "seconds -- closing stale connection.")
  107. c.Close()
  108. return
  109. }
  110. <-time.After(time.Second)
  111. }
  112. }
  113. // As part of |Connector| interface.
  114. func (c *WebRTCPeer) Connect() error {
  115. log.Println(c.id, " connecting...")
  116. // TODO: When go-webrtc is more stable, it's possible that a new
  117. // PeerConnection won't need to be re-prepared each time.
  118. err := c.preparePeerConnection()
  119. if err != nil {
  120. return err
  121. }
  122. err = c.establishDataChannel()
  123. if err != nil {
  124. return errors.New("WebRTC: Could not establish DataChannel.")
  125. }
  126. err = c.exchangeSDP()
  127. if err != nil {
  128. return err
  129. }
  130. go c.checkForStaleness()
  131. return nil
  132. }
  133. // Create and prepare callbacks on a new WebRTC PeerConnection.
  134. func (c *WebRTCPeer) preparePeerConnection() error {
  135. if nil != c.pc {
  136. c.pc.Destroy()
  137. c.pc = nil
  138. }
  139. pc, err := webrtc.NewPeerConnection(c.config)
  140. if err != nil {
  141. log.Printf("NewPeerConnection ERROR: %s", err)
  142. return err
  143. }
  144. // Prepare PeerConnection callbacks.
  145. pc.OnNegotiationNeeded = func() {
  146. log.Println("WebRTC: OnNegotiationNeeded")
  147. go func() {
  148. offer, err := pc.CreateOffer()
  149. // TODO: Potentially timeout and retry if ICE isn't working.
  150. if err != nil {
  151. c.errorChannel <- err
  152. return
  153. }
  154. err = pc.SetLocalDescription(offer)
  155. if err != nil {
  156. c.errorChannel <- err
  157. return
  158. }
  159. }()
  160. }
  161. // Allow candidates to accumulate until IceGatheringStateComplete.
  162. pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
  163. log.Printf(candidate.Candidate)
  164. }
  165. pc.OnIceGatheringStateChange = func(state webrtc.IceGatheringState) {
  166. if state == webrtc.IceGatheringStateComplete {
  167. log.Printf("WebRTC: IceGatheringStateComplete")
  168. c.offerChannel <- pc.LocalDescription()
  169. }
  170. }
  171. // This callback is not expected, as the Client initiates the creation
  172. // of the data channel, not the remote peer.
  173. pc.OnDataChannel = func(channel *webrtc.DataChannel) {
  174. log.Println("OnDataChannel")
  175. panic("Unexpected OnDataChannel!")
  176. }
  177. c.pc = pc
  178. log.Println("WebRTC: PeerConnection created.")
  179. return nil
  180. }
  181. // Create a WebRTC DataChannel locally.
  182. func (c *WebRTCPeer) establishDataChannel() error {
  183. c.lock.Lock()
  184. defer c.lock.Unlock()
  185. if c.transport != nil {
  186. panic("Unexpected datachannel already exists!")
  187. }
  188. dc, err := c.pc.CreateDataChannel(c.id)
  189. // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
  190. // an SDP offer while other goroutines operating on this struct handle the
  191. // signaling. Eventually fires "OnOpen".
  192. if err != nil {
  193. log.Printf("CreateDataChannel ERROR: %s", err)
  194. return err
  195. }
  196. dc.OnOpen = func() {
  197. c.lock.Lock()
  198. defer c.lock.Unlock()
  199. log.Println("WebRTC: DataChannel.OnOpen")
  200. if nil != c.transport {
  201. panic("WebRTC: transport already exists.")
  202. }
  203. // Flush buffered outgoing SOCKS data if necessary.
  204. if c.buffer.Len() > 0 {
  205. dc.Send(c.buffer.Bytes())
  206. log.Println("Flushed", c.buffer.Len(), "bytes.")
  207. c.buffer.Reset()
  208. }
  209. // Then enable the datachannel.
  210. c.transport = dc
  211. }
  212. dc.OnClose = func() {
  213. c.lock.Lock()
  214. // Future writes will go to the buffer until a new DataChannel is available.
  215. if nil == c.transport {
  216. // Closed locally, as part of a reset.
  217. log.Println("WebRTC: DataChannel.OnClose [locally]")
  218. c.lock.Unlock()
  219. return
  220. }
  221. // Closed remotely, need to reset everything.
  222. // Disable the DataChannel as a write destination.
  223. log.Println("WebRTC: DataChannel.OnClose [remotely]")
  224. c.transport = nil
  225. c.pc.DeleteDataChannel(dc)
  226. // Unlock before Close'ing, since it calls cleanup and asks for the
  227. // lock to check if the transport needs to be be deleted.
  228. c.lock.Unlock()
  229. c.Close()
  230. }
  231. dc.OnMessage = func(msg []byte) {
  232. if len(msg) <= 0 {
  233. log.Println("0 length message---")
  234. }
  235. c.BytesLogger.AddInbound(len(msg))
  236. n, err := c.writePipe.Write(msg)
  237. if err != nil {
  238. // TODO: Maybe shouldn't actually close.
  239. log.Println("Error writing to SOCKS pipe")
  240. c.writePipe.CloseWithError(err)
  241. }
  242. if n != len(msg) {
  243. log.Println("Error: short write")
  244. panic("short write")
  245. }
  246. c.lastReceive = time.Now()
  247. }
  248. log.Println("WebRTC: DataChannel created.")
  249. return nil
  250. }
  251. func (c *WebRTCPeer) sendOfferToBroker() {
  252. if nil == c.broker {
  253. return
  254. }
  255. offer := c.pc.LocalDescription()
  256. answer, err := c.broker.Negotiate(offer)
  257. if nil != err || nil == answer {
  258. log.Printf("BrokerChannel Error: %s", err)
  259. answer = nil
  260. }
  261. c.answerChannel <- answer
  262. }
  263. // Block until an SDP offer is available, send it to either
  264. // the Broker or signal pipe, then await for the SDP answer.
  265. func (c *WebRTCPeer) exchangeSDP() error {
  266. select {
  267. case <-c.offerChannel:
  268. case err := <-c.errorChannel:
  269. log.Println("Failed to prepare offer", err)
  270. c.Close()
  271. return err
  272. }
  273. // Keep trying the same offer until a valid answer arrives.
  274. var ok bool
  275. var answer *webrtc.SessionDescription = nil
  276. for nil == answer {
  277. go c.sendOfferToBroker()
  278. answer, ok = <-c.answerChannel // Blocks...
  279. if !ok || nil == answer {
  280. log.Printf("Failed to retrieve answer. Retrying in %d seconds", ReconnectTimeout)
  281. <-time.After(time.Second * ReconnectTimeout)
  282. answer = nil
  283. }
  284. }
  285. log.Printf("Received Answer.\n")
  286. err := c.pc.SetRemoteDescription(answer)
  287. if nil != err {
  288. log.Println("WebRTC: Unable to SetRemoteDescription:", err)
  289. return err
  290. }
  291. return nil
  292. }
  293. // Close all channels and transports
  294. func (c *WebRTCPeer) cleanup() {
  295. if nil != c.offerChannel {
  296. close(c.offerChannel)
  297. }
  298. if nil != c.answerChannel {
  299. close(c.answerChannel)
  300. }
  301. if nil != c.errorChannel {
  302. close(c.errorChannel)
  303. }
  304. // Close this side of the SOCKS pipe.
  305. if nil != c.writePipe {
  306. c.writePipe.Close()
  307. c.writePipe = nil
  308. }
  309. c.lock.Lock()
  310. if nil != c.transport {
  311. log.Printf("WebRTC: closing DataChannel")
  312. dataChannel := c.transport
  313. // Setting transport to nil *before* dc Close indicates to OnClose that
  314. // this was locally triggered.
  315. c.transport = nil
  316. // Release the lock before calling DeleteDataChannel (which in turn
  317. // calls Close on the dataChannel), but after nil'ing out the transport,
  318. // since otherwise we'll end up in the onClose handler in a deadlock.
  319. c.lock.Unlock()
  320. if c.pc == nil {
  321. panic("DataChannel w/o PeerConnection, not good.")
  322. }
  323. c.pc.DeleteDataChannel(dataChannel.(*webrtc.DataChannel))
  324. } else {
  325. c.lock.Unlock()
  326. }
  327. if nil != c.pc {
  328. log.Printf("WebRTC: closing PeerConnection")
  329. err := c.pc.Destroy()
  330. if nil != err {
  331. log.Printf("Error closing peerconnection...")
  332. }
  333. c.pc = nil
  334. }
  335. }