distributor.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package light implements on-demand retrieval capable state and chain objects
  17. // for the Ethereum Light Client.
  18. package les
  19. import (
  20. "container/list"
  21. "errors"
  22. "sync"
  23. "time"
  24. )
  25. // ErrNoPeers is returned if no peers capable of serving a queued request are available
  26. var ErrNoPeers = errors.New("no suitable peers available")
  27. // requestDistributor implements a mechanism that distributes requests to
  28. // suitable peers, obeying flow control rules and prioritizing them in creation
  29. // order (even when a resend is necessary).
  30. type requestDistributor struct {
  31. reqQueue *list.List
  32. lastReqOrder uint64
  33. peers map[distPeer]struct{}
  34. peerLock sync.RWMutex
  35. stopChn, loopChn chan struct{}
  36. loopNextSent bool
  37. lock sync.Mutex
  38. }
  39. // distPeer is an LES server peer interface for the request distributor.
  40. // waitBefore returns either the necessary waiting time before sending a request
  41. // with the given upper estimated cost or the estimated remaining relative buffer
  42. // value after sending such a request (in which case the request can be sent
  43. // immediately). At least one of these values is always zero.
  44. type distPeer interface {
  45. waitBefore(uint64) (time.Duration, float64)
  46. canQueue() bool
  47. queueSend(f func())
  48. }
  49. // distReq is the request abstraction used by the distributor. It is based on
  50. // three callback functions:
  51. // - getCost returns the upper estimate of the cost of sending the request to a given peer
  52. // - canSend tells if the server peer is suitable to serve the request
  53. // - request prepares sending the request to the given peer and returns a function that
  54. // does the actual sending. Request order should be preserved but the callback itself should not
  55. // block until it is sent because other peers might still be able to receive requests while
  56. // one of them is blocking. Instead, the returned function is put in the peer's send queue.
  57. type distReq struct {
  58. getCost func(distPeer) uint64
  59. canSend func(distPeer) bool
  60. request func(distPeer) func()
  61. reqOrder uint64
  62. sentChn chan distPeer
  63. element *list.Element
  64. }
  65. // newRequestDistributor creates a new request distributor
  66. func newRequestDistributor(peers *peerSet, stopChn chan struct{}) *requestDistributor {
  67. d := &requestDistributor{
  68. reqQueue: list.New(),
  69. loopChn: make(chan struct{}, 2),
  70. stopChn: stopChn,
  71. peers: make(map[distPeer]struct{}),
  72. }
  73. if peers != nil {
  74. peers.notify(d)
  75. }
  76. go d.loop()
  77. return d
  78. }
  79. // registerPeer implements peerSetNotify
  80. func (d *requestDistributor) registerPeer(p *peer) {
  81. d.peerLock.Lock()
  82. d.peers[p] = struct{}{}
  83. d.peerLock.Unlock()
  84. }
  85. // unregisterPeer implements peerSetNotify
  86. func (d *requestDistributor) unregisterPeer(p *peer) {
  87. d.peerLock.Lock()
  88. delete(d.peers, p)
  89. d.peerLock.Unlock()
  90. }
  91. // registerTestPeer adds a new test peer
  92. func (d *requestDistributor) registerTestPeer(p distPeer) {
  93. d.peerLock.Lock()
  94. d.peers[p] = struct{}{}
  95. d.peerLock.Unlock()
  96. }
  97. // distMaxWait is the maximum waiting time after which further necessary waiting
  98. // times are recalculated based on new feedback from the servers
  99. const distMaxWait = time.Millisecond * 10
  100. // main event loop
  101. func (d *requestDistributor) loop() {
  102. for {
  103. select {
  104. case <-d.stopChn:
  105. d.lock.Lock()
  106. elem := d.reqQueue.Front()
  107. for elem != nil {
  108. close(elem.Value.(*distReq).sentChn)
  109. elem = elem.Next()
  110. }
  111. d.lock.Unlock()
  112. return
  113. case <-d.loopChn:
  114. d.lock.Lock()
  115. d.loopNextSent = false
  116. loop:
  117. for {
  118. peer, req, wait := d.nextRequest()
  119. if req != nil && wait == 0 {
  120. chn := req.sentChn // save sentChn because remove sets it to nil
  121. d.remove(req)
  122. send := req.request(peer)
  123. if send != nil {
  124. peer.queueSend(send)
  125. }
  126. chn <- peer
  127. close(chn)
  128. } else {
  129. if wait == 0 {
  130. // no request to send and nothing to wait for; the next
  131. // queued request will wake up the loop
  132. break loop
  133. }
  134. d.loopNextSent = true // a "next" signal has been sent, do not send another one until this one has been received
  135. if wait > distMaxWait {
  136. // waiting times may be reduced by incoming request replies, if it is too long, recalculate it periodically
  137. wait = distMaxWait
  138. }
  139. go func() {
  140. time.Sleep(wait)
  141. d.loopChn <- struct{}{}
  142. }()
  143. break loop
  144. }
  145. }
  146. d.lock.Unlock()
  147. }
  148. }
  149. }
  150. // selectPeerItem represents a peer to be selected for a request by weightedRandomSelect
  151. type selectPeerItem struct {
  152. peer distPeer
  153. req *distReq
  154. weight int64
  155. }
  156. // Weight implements wrsItem interface
  157. func (sp selectPeerItem) Weight() int64 {
  158. return sp.weight
  159. }
  160. // nextRequest returns the next possible request from any peer, along with the
  161. // associated peer and necessary waiting time
  162. func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
  163. checkedPeers := make(map[distPeer]struct{})
  164. elem := d.reqQueue.Front()
  165. var (
  166. bestPeer distPeer
  167. bestReq *distReq
  168. bestWait time.Duration
  169. sel *weightedRandomSelect
  170. )
  171. d.peerLock.RLock()
  172. defer d.peerLock.RUnlock()
  173. for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
  174. req := elem.Value.(*distReq)
  175. canSend := false
  176. for peer := range d.peers {
  177. if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) {
  178. canSend = true
  179. cost := req.getCost(peer)
  180. wait, bufRemain := peer.waitBefore(cost)
  181. if wait == 0 {
  182. if sel == nil {
  183. sel = newWeightedRandomSelect()
  184. }
  185. sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
  186. } else {
  187. if bestReq == nil || wait < bestWait {
  188. bestPeer = peer
  189. bestReq = req
  190. bestWait = wait
  191. }
  192. }
  193. checkedPeers[peer] = struct{}{}
  194. }
  195. }
  196. next := elem.Next()
  197. if !canSend && elem == d.reqQueue.Front() {
  198. close(req.sentChn)
  199. d.remove(req)
  200. }
  201. elem = next
  202. }
  203. if sel != nil {
  204. c := sel.choose().(selectPeerItem)
  205. return c.peer, c.req, 0
  206. }
  207. return bestPeer, bestReq, bestWait
  208. }
  209. // queue adds a request to the distribution queue, returns a channel where the
  210. // receiving peer is sent once the request has been sent (request callback returned).
  211. // If the request is cancelled or timed out without suitable peers, the channel is
  212. // closed without sending any peer references to it.
  213. func (d *requestDistributor) queue(r *distReq) chan distPeer {
  214. d.lock.Lock()
  215. defer d.lock.Unlock()
  216. if r.reqOrder == 0 {
  217. d.lastReqOrder++
  218. r.reqOrder = d.lastReqOrder
  219. }
  220. back := d.reqQueue.Back()
  221. if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder {
  222. r.element = d.reqQueue.PushBack(r)
  223. } else {
  224. before := d.reqQueue.Front()
  225. for before.Value.(*distReq).reqOrder < r.reqOrder {
  226. before = before.Next()
  227. }
  228. r.element = d.reqQueue.InsertBefore(r, before)
  229. }
  230. if !d.loopNextSent {
  231. d.loopNextSent = true
  232. d.loopChn <- struct{}{}
  233. }
  234. r.sentChn = make(chan distPeer, 1)
  235. return r.sentChn
  236. }
  237. // cancel removes a request from the queue if it has not been sent yet (returns
  238. // false if it has been sent already). It is guaranteed that the callback functions
  239. // will not be called after cancel returns.
  240. func (d *requestDistributor) cancel(r *distReq) bool {
  241. d.lock.Lock()
  242. defer d.lock.Unlock()
  243. if r.sentChn == nil {
  244. return false
  245. }
  246. close(r.sentChn)
  247. d.remove(r)
  248. return true
  249. }
  250. // remove removes a request from the queue
  251. func (d *requestDistributor) remove(r *distReq) {
  252. r.sentChn = nil
  253. if r.element != nil {
  254. d.reqQueue.Remove(r.element)
  255. r.element = nil
  256. }
  257. }