activestreammap.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package h2mux
  2. import (
  3. "sync"
  4. "github.com/prometheus/client_golang/prometheus"
  5. "golang.org/x/net/http2"
  6. )
  7. // activeStreamMap is used to moderate access to active streams between the read and write
  8. // threads, and deny access to new peer streams while shutting down.
  9. type activeStreamMap struct {
  10. sync.RWMutex
  11. // streams tracks open streams.
  12. streams map[uint32]*MuxedStream
  13. // nextStreamID is the next ID to use on our side of the connection.
  14. // This is odd for clients, even for servers.
  15. nextStreamID uint32
  16. // maxPeerStreamID is the ID of the most recent stream opened by the peer.
  17. maxPeerStreamID uint32
  18. // activeStreams is a gauge shared by all muxers of this process to expose the total number of active streams
  19. activeStreams prometheus.Gauge
  20. // ignoreNewStreams is true when the connection is being shut down. New streams
  21. // cannot be registered.
  22. ignoreNewStreams bool
  23. // streamsEmpty is a chan that will be closed when no more streams are open.
  24. streamsEmptyChan chan struct{}
  25. closeOnce sync.Once
  26. }
  27. func newActiveStreamMap(useClientStreamNumbers bool, activeStreams prometheus.Gauge) *activeStreamMap {
  28. m := &activeStreamMap{
  29. streams: make(map[uint32]*MuxedStream),
  30. streamsEmptyChan: make(chan struct{}),
  31. nextStreamID: 1,
  32. activeStreams: activeStreams,
  33. }
  34. // Client initiated stream uses odd stream ID, server initiated stream uses even stream ID
  35. if !useClientStreamNumbers {
  36. m.nextStreamID = 2
  37. }
  38. return m
  39. }
  40. // This function should be called while `m` is locked.
  41. func (m *activeStreamMap) notifyStreamsEmpty() {
  42. m.closeOnce.Do(func() {
  43. close(m.streamsEmptyChan)
  44. })
  45. }
  46. // Len returns the number of active streams.
  47. func (m *activeStreamMap) Len() int {
  48. m.RLock()
  49. defer m.RUnlock()
  50. return len(m.streams)
  51. }
  52. func (m *activeStreamMap) Get(streamID uint32) (*MuxedStream, bool) {
  53. m.RLock()
  54. defer m.RUnlock()
  55. stream, ok := m.streams[streamID]
  56. return stream, ok
  57. }
  58. // Set returns true if the stream was assigned successfully. If a stream
  59. // already existed with that ID or we are shutting down, return false.
  60. func (m *activeStreamMap) Set(newStream *MuxedStream) bool {
  61. m.Lock()
  62. defer m.Unlock()
  63. if _, ok := m.streams[newStream.streamID]; ok {
  64. return false
  65. }
  66. if m.ignoreNewStreams {
  67. return false
  68. }
  69. m.streams[newStream.streamID] = newStream
  70. m.activeStreams.Inc()
  71. return true
  72. }
  73. // Delete stops tracking the stream. It should be called only after it is closed and resetted.
  74. func (m *activeStreamMap) Delete(streamID uint32) {
  75. m.Lock()
  76. defer m.Unlock()
  77. if _, ok := m.streams[streamID]; ok {
  78. delete(m.streams, streamID)
  79. m.activeStreams.Dec()
  80. }
  81. // shutting down, and now the map is empty
  82. if m.ignoreNewStreams && len(m.streams) == 0 {
  83. m.notifyStreamsEmpty()
  84. }
  85. }
  86. // Shutdown blocks new streams from being created.
  87. // It returns `done`, a channel that is closed once the last stream has closed
  88. // and `progress`, whether a shutdown was already in progress
  89. func (m *activeStreamMap) Shutdown() (done <-chan struct{}, alreadyInProgress bool) {
  90. m.Lock()
  91. defer m.Unlock()
  92. if m.ignoreNewStreams {
  93. // already shutting down
  94. return m.streamsEmptyChan, true
  95. }
  96. m.ignoreNewStreams = true
  97. if len(m.streams) == 0 {
  98. // there are no streams to wait for
  99. m.notifyStreamsEmpty()
  100. }
  101. return m.streamsEmptyChan, false
  102. }
  103. // AcquireLocalID acquires a new stream ID for a stream you're opening.
  104. func (m *activeStreamMap) AcquireLocalID() uint32 {
  105. m.Lock()
  106. defer m.Unlock()
  107. x := m.nextStreamID
  108. m.nextStreamID += 2
  109. return x
  110. }
  111. // ObservePeerID observes the ID of a stream opened by the peer. It returns true if we should accept
  112. // the new stream, or false to reject it. The ErrCode gives the reason why.
  113. func (m *activeStreamMap) AcquirePeerID(streamID uint32) (bool, http2.ErrCode) {
  114. m.Lock()
  115. defer m.Unlock()
  116. switch {
  117. case m.ignoreNewStreams:
  118. return false, http2.ErrCodeStreamClosed
  119. case streamID > m.maxPeerStreamID:
  120. m.maxPeerStreamID = streamID
  121. return true, http2.ErrCodeNo
  122. default:
  123. return false, http2.ErrCodeStreamClosed
  124. }
  125. }
  126. // IsPeerStreamID is true if the stream ID belongs to the peer.
  127. func (m *activeStreamMap) IsPeerStreamID(streamID uint32) bool {
  128. m.RLock()
  129. defer m.RUnlock()
  130. return (streamID % 2) != (m.nextStreamID % 2)
  131. }
  132. // IsLocalStreamID is true if it is a stream we have opened, even if it is now closed.
  133. func (m *activeStreamMap) IsLocalStreamID(streamID uint32) bool {
  134. m.RLock()
  135. defer m.RUnlock()
  136. return (streamID%2) == (m.nextStreamID%2) && streamID < m.nextStreamID
  137. }
  138. // LastPeerStreamID returns the most recently opened peer stream ID.
  139. func (m *activeStreamMap) LastPeerStreamID() uint32 {
  140. m.RLock()
  141. defer m.RUnlock()
  142. return m.maxPeerStreamID
  143. }
  144. // LastLocalStreamID returns the most recently opened local stream ID.
  145. func (m *activeStreamMap) LastLocalStreamID() uint32 {
  146. m.RLock()
  147. defer m.RUnlock()
  148. if m.nextStreamID > 1 {
  149. return m.nextStreamID - 2
  150. }
  151. return 0
  152. }
  153. // Abort closes every active stream and prevents new ones being created. This should be used to
  154. // return errors in pending read/writes when the underlying connection goes away.
  155. func (m *activeStreamMap) Abort() {
  156. m.Lock()
  157. defer m.Unlock()
  158. for _, stream := range m.streams {
  159. stream.Close()
  160. }
  161. m.ignoreNewStreams = true
  162. m.notifyStreamsEmpty()
  163. }