peers.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package lib
  2. import (
  3. "container/list"
  4. "errors"
  5. "fmt"
  6. "log"
  7. )
  8. // Container which keeps track of multiple WebRTC remote peers.
  9. // Implements |SnowflakeCollector|.
  10. //
  11. // Maintaining a set of pre-connected Peers with fresh but inactive datachannels
  12. // allows allows rapid recovery when the current WebRTC Peer disconnects.
  13. //
  14. // Note: For now, only one remote can be active at any given moment.
  15. // This is a property of Tor circuits & its current multiplexing constraints,
  16. // but could be updated if that changes.
  17. // (Also, this constraint does not necessarily apply to the more generic PT
  18. // version of Snowflake)
  19. type Peers struct {
  20. Tongue
  21. BytesLogger
  22. snowflakeChan chan Snowflake
  23. activePeers *list.List
  24. capacity int
  25. melt chan struct{}
  26. }
  27. // Construct a fresh container of remote peers.
  28. func NewPeers(max int) *Peers {
  29. p := &Peers{capacity: max}
  30. // Use buffered go channel to pass snowflakes onwards to the SOCKS handler.
  31. p.snowflakeChan = make(chan Snowflake, max)
  32. p.activePeers = list.New()
  33. p.melt = make(chan struct{}, 1)
  34. return p
  35. }
  36. // As part of |SnowflakeCollector| interface.
  37. func (p *Peers) Collect() (Snowflake, error) {
  38. cnt := p.Count()
  39. s := fmt.Sprintf("Currently at [%d/%d]", cnt, p.capacity)
  40. if cnt >= p.capacity {
  41. s := fmt.Sprintf("At capacity [%d/%d]", cnt, p.capacity)
  42. return nil, errors.New(s)
  43. }
  44. log.Println("WebRTC: Collecting a new Snowflake.", s)
  45. // Engage the Snowflake Catching interface, which must be available.
  46. if nil == p.Tongue {
  47. return nil, errors.New("Missing Tongue to catch Snowflakes with.")
  48. }
  49. // BUG: some broker conflict here.
  50. connection, err := p.Tongue.Catch()
  51. if nil != err {
  52. return nil, err
  53. }
  54. // Track new valid Snowflake in internal collection and pass along.
  55. p.activePeers.PushBack(connection)
  56. p.snowflakeChan <- connection
  57. return connection, nil
  58. }
  59. // As part of |SnowflakeCollector| interface.
  60. func (p *Peers) Pop() Snowflake {
  61. // Blocks until an available, valid snowflake appears.
  62. var snowflake Snowflake
  63. var ok bool
  64. for nil == snowflake {
  65. snowflake, ok = <-p.snowflakeChan
  66. conn := snowflake.(*WebRTCPeer)
  67. if !ok {
  68. return nil
  69. }
  70. if conn.closed {
  71. snowflake = nil
  72. }
  73. }
  74. // Set to use the same rate-limited traffic logger to keep consistency.
  75. snowflake.(*WebRTCPeer).BytesLogger = p.BytesLogger
  76. return snowflake
  77. }
  78. // As part of |SnowflakeCollector| interface.
  79. func (p *Peers) Melted() <-chan struct{} {
  80. return p.melt
  81. }
  82. // Returns total available Snowflakes (including the active one)
  83. // The count only reduces when connections themselves close, rather than when
  84. // they are popped.
  85. func (p *Peers) Count() int {
  86. p.purgeClosedPeers()
  87. return p.activePeers.Len()
  88. }
  89. func (p *Peers) purgeClosedPeers() {
  90. for e := p.activePeers.Front(); e != nil; {
  91. next := e.Next()
  92. conn := e.Value.(*WebRTCPeer)
  93. // Purge those marked for deletion.
  94. if conn.closed {
  95. p.activePeers.Remove(e)
  96. }
  97. e = next
  98. }
  99. }
  100. // Close all Peers contained here.
  101. func (p *Peers) End() {
  102. close(p.snowflakeChan)
  103. p.melt <- struct{}{}
  104. cnt := p.Count()
  105. for e := p.activePeers.Front(); e != nil; {
  106. next := e.Next()
  107. conn := e.Value.(*WebRTCPeer)
  108. conn.Close()
  109. p.activePeers.Remove(e)
  110. e = next
  111. }
  112. log.Println("WebRTC: melted all", cnt, "snowflakes.")
  113. }