retrieve.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package light implements on-demand retrieval capable state and chain objects
  17. // for the Ethereum Light Client.
  18. package les
  19. import (
  20. "context"
  21. "crypto/rand"
  22. "encoding/binary"
  23. "fmt"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common/mclock"
  27. )
  28. var (
  29. retryQueue = time.Millisecond * 100
  30. softRequestTimeout = time.Millisecond * 500
  31. hardRequestTimeout = time.Second * 10
  32. )
  33. // retrieveManager is a layer on top of requestDistributor which takes care of
  34. // matching replies by request ID and handles timeouts and resends if necessary.
  35. type retrieveManager struct {
  36. dist *requestDistributor
  37. peers *peerSet
  38. serverPool peerSelector
  39. lock sync.RWMutex
  40. sentReqs map[uint64]*sentReq
  41. }
  42. // validatorFunc is a function that processes a reply message
  43. type validatorFunc func(distPeer, *Msg) error
  44. // peerSelector receives feedback info about response times and timeouts
  45. type peerSelector interface {
  46. adjustResponseTime(*poolEntry, time.Duration, bool)
  47. }
  48. // sentReq represents a request sent and tracked by retrieveManager
  49. type sentReq struct {
  50. rm *retrieveManager
  51. req *distReq
  52. id uint64
  53. validate validatorFunc
  54. eventsCh chan reqPeerEvent
  55. stopCh chan struct{}
  56. stopped bool
  57. err error
  58. lock sync.RWMutex // protect access to sentTo map
  59. sentTo map[distPeer]sentReqToPeer
  60. reqQueued bool // a request has been queued but not sent
  61. reqSent bool // a request has been sent but not timed out
  62. reqSrtoCount int // number of requests that reached soft (but not hard) timeout
  63. }
  64. // sentReqToPeer notifies the request-from-peer goroutine (tryRequest) about a response
  65. // delivered by the given peer. Only one delivery is allowed per request per peer,
  66. // after which delivered is set to true, the validity of the response is sent on the
  67. // valid channel and no more responses are accepted.
  68. type sentReqToPeer struct {
  69. delivered bool
  70. valid chan bool
  71. }
  72. // reqPeerEvent is sent by the request-from-peer goroutine (tryRequest) to the
  73. // request state machine (retrieveLoop) through the eventsCh channel.
  74. type reqPeerEvent struct {
  75. event int
  76. peer distPeer
  77. }
  78. const (
  79. rpSent = iota // if peer == nil, not sent (no suitable peers)
  80. rpSoftTimeout
  81. rpHardTimeout
  82. rpDeliveredValid
  83. rpDeliveredInvalid
  84. )
  85. // newRetrieveManager creates the retrieve manager
  86. func newRetrieveManager(peers *peerSet, dist *requestDistributor, serverPool peerSelector) *retrieveManager {
  87. return &retrieveManager{
  88. peers: peers,
  89. dist: dist,
  90. serverPool: serverPool,
  91. sentReqs: make(map[uint64]*sentReq),
  92. }
  93. }
  94. // retrieve sends a request (to multiple peers if necessary) and waits for an answer
  95. // that is delivered through the deliver function and successfully validated by the
  96. // validator callback. It returns when a valid answer is delivered or the context is
  97. // cancelled.
  98. func (rm *retrieveManager) retrieve(ctx context.Context, reqID uint64, req *distReq, val validatorFunc, shutdown chan struct{}) error {
  99. sentReq := rm.sendReq(reqID, req, val)
  100. select {
  101. case <-sentReq.stopCh:
  102. case <-ctx.Done():
  103. sentReq.stop(ctx.Err())
  104. case <-shutdown:
  105. sentReq.stop(fmt.Errorf("Client is shutting down"))
  106. }
  107. return sentReq.getError()
  108. }
  109. // sendReq starts a process that keeps trying to retrieve a valid answer for a
  110. // request from any suitable peers until stopped or succeeded.
  111. func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc) *sentReq {
  112. r := &sentReq{
  113. rm: rm,
  114. req: req,
  115. id: reqID,
  116. sentTo: make(map[distPeer]sentReqToPeer),
  117. stopCh: make(chan struct{}),
  118. eventsCh: make(chan reqPeerEvent, 10),
  119. validate: val,
  120. }
  121. canSend := req.canSend
  122. req.canSend = func(p distPeer) bool {
  123. // add an extra check to canSend: the request has not been sent to the same peer before
  124. r.lock.RLock()
  125. _, sent := r.sentTo[p]
  126. r.lock.RUnlock()
  127. return !sent && canSend(p)
  128. }
  129. request := req.request
  130. req.request = func(p distPeer) func() {
  131. // before actually sending the request, put an entry into the sentTo map
  132. r.lock.Lock()
  133. r.sentTo[p] = sentReqToPeer{false, make(chan bool, 1)}
  134. r.lock.Unlock()
  135. return request(p)
  136. }
  137. rm.lock.Lock()
  138. rm.sentReqs[reqID] = r
  139. rm.lock.Unlock()
  140. go r.retrieveLoop()
  141. return r
  142. }
  143. // deliver is called by the LES protocol manager to deliver reply messages to waiting requests
  144. func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error {
  145. rm.lock.RLock()
  146. req, ok := rm.sentReqs[msg.ReqID]
  147. rm.lock.RUnlock()
  148. if ok {
  149. return req.deliver(peer, msg)
  150. }
  151. return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
  152. }
  153. // reqStateFn represents a state of the retrieve loop state machine
  154. type reqStateFn func() reqStateFn
  155. // retrieveLoop is the retrieval state machine event loop
  156. func (r *sentReq) retrieveLoop() {
  157. go r.tryRequest()
  158. r.reqQueued = true
  159. state := r.stateRequesting
  160. for state != nil {
  161. state = state()
  162. }
  163. r.rm.lock.Lock()
  164. delete(r.rm.sentReqs, r.id)
  165. r.rm.lock.Unlock()
  166. }
  167. // stateRequesting: a request has been queued or sent recently; when it reaches soft timeout,
  168. // a new request is sent to a new peer
  169. func (r *sentReq) stateRequesting() reqStateFn {
  170. select {
  171. case ev := <-r.eventsCh:
  172. r.update(ev)
  173. switch ev.event {
  174. case rpSent:
  175. if ev.peer == nil {
  176. // request send failed, no more suitable peers
  177. if r.waiting() {
  178. // we are already waiting for sent requests which may succeed so keep waiting
  179. return r.stateNoMorePeers
  180. }
  181. // nothing to wait for, no more peers to ask, return with error
  182. r.stop(ErrNoPeers)
  183. // no need to go to stopped state because waiting() already returned false
  184. return nil
  185. }
  186. case rpSoftTimeout:
  187. // last request timed out, try asking a new peer
  188. go r.tryRequest()
  189. r.reqQueued = true
  190. return r.stateRequesting
  191. case rpDeliveredValid:
  192. r.stop(nil)
  193. return r.stateStopped
  194. }
  195. return r.stateRequesting
  196. case <-r.stopCh:
  197. return r.stateStopped
  198. }
  199. }
  200. // stateNoMorePeers: could not send more requests because no suitable peers are available.
  201. // Peers may become suitable for a certain request later or new peers may appear so we
  202. // keep trying.
  203. func (r *sentReq) stateNoMorePeers() reqStateFn {
  204. select {
  205. case <-time.After(retryQueue):
  206. go r.tryRequest()
  207. r.reqQueued = true
  208. return r.stateRequesting
  209. case ev := <-r.eventsCh:
  210. r.update(ev)
  211. if ev.event == rpDeliveredValid {
  212. r.stop(nil)
  213. return r.stateStopped
  214. }
  215. return r.stateNoMorePeers
  216. case <-r.stopCh:
  217. return r.stateStopped
  218. }
  219. }
  220. // stateStopped: request succeeded or cancelled, just waiting for some peers
  221. // to either answer or time out hard
  222. func (r *sentReq) stateStopped() reqStateFn {
  223. for r.waiting() {
  224. r.update(<-r.eventsCh)
  225. }
  226. return nil
  227. }
  228. // update updates the queued/sent flags and timed out peers counter according to the event
  229. func (r *sentReq) update(ev reqPeerEvent) {
  230. switch ev.event {
  231. case rpSent:
  232. r.reqQueued = false
  233. if ev.peer != nil {
  234. r.reqSent = true
  235. }
  236. case rpSoftTimeout:
  237. r.reqSent = false
  238. r.reqSrtoCount++
  239. case rpHardTimeout, rpDeliveredValid, rpDeliveredInvalid:
  240. r.reqSrtoCount--
  241. }
  242. }
  243. // waiting returns true if the retrieval mechanism is waiting for an answer from
  244. // any peer
  245. func (r *sentReq) waiting() bool {
  246. return r.reqQueued || r.reqSent || r.reqSrtoCount > 0
  247. }
  248. // tryRequest tries to send the request to a new peer and waits for it to either
  249. // succeed or time out if it has been sent. It also sends the appropriate reqPeerEvent
  250. // messages to the request's event channel.
  251. func (r *sentReq) tryRequest() {
  252. sent := r.rm.dist.queue(r.req)
  253. var p distPeer
  254. select {
  255. case p = <-sent:
  256. case <-r.stopCh:
  257. if r.rm.dist.cancel(r.req) {
  258. p = nil
  259. } else {
  260. p = <-sent
  261. }
  262. }
  263. r.eventsCh <- reqPeerEvent{rpSent, p}
  264. if p == nil {
  265. return
  266. }
  267. reqSent := mclock.Now()
  268. srto, hrto := false, false
  269. r.lock.RLock()
  270. s, ok := r.sentTo[p]
  271. r.lock.RUnlock()
  272. if !ok {
  273. panic(nil)
  274. }
  275. defer func() {
  276. // send feedback to server pool and remove peer if hard timeout happened
  277. pp, ok := p.(*peer)
  278. if ok && r.rm.serverPool != nil {
  279. respTime := time.Duration(mclock.Now() - reqSent)
  280. r.rm.serverPool.adjustResponseTime(pp.poolEntry, respTime, srto)
  281. }
  282. if hrto {
  283. pp.Log().Debug("Request timed out hard")
  284. if r.rm.peers != nil {
  285. r.rm.peers.Unregister(pp.id)
  286. }
  287. }
  288. r.lock.Lock()
  289. delete(r.sentTo, p)
  290. r.lock.Unlock()
  291. }()
  292. select {
  293. case ok := <-s.valid:
  294. if ok {
  295. r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
  296. } else {
  297. r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
  298. }
  299. return
  300. case <-time.After(softRequestTimeout):
  301. srto = true
  302. r.eventsCh <- reqPeerEvent{rpSoftTimeout, p}
  303. }
  304. select {
  305. case ok := <-s.valid:
  306. if ok {
  307. r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
  308. } else {
  309. r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
  310. }
  311. case <-time.After(hardRequestTimeout):
  312. hrto = true
  313. r.eventsCh <- reqPeerEvent{rpHardTimeout, p}
  314. }
  315. }
  316. // deliver a reply belonging to this request
  317. func (r *sentReq) deliver(peer distPeer, msg *Msg) error {
  318. r.lock.Lock()
  319. defer r.lock.Unlock()
  320. s, ok := r.sentTo[peer]
  321. if !ok || s.delivered {
  322. return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
  323. }
  324. valid := r.validate(peer, msg) == nil
  325. r.sentTo[peer] = sentReqToPeer{true, s.valid}
  326. s.valid <- valid
  327. if !valid {
  328. return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID)
  329. }
  330. return nil
  331. }
  332. // stop stops the retrieval process and sets an error code that will be returned
  333. // by getError
  334. func (r *sentReq) stop(err error) {
  335. r.lock.Lock()
  336. if !r.stopped {
  337. r.stopped = true
  338. r.err = err
  339. close(r.stopCh)
  340. }
  341. r.lock.Unlock()
  342. }
  343. // getError returns any retrieval error (either internally generated or set by the
  344. // stop function) after stopCh has been closed
  345. func (r *sentReq) getError() error {
  346. return r.err
  347. }
  348. // genReqID generates a new random request ID
  349. func genReqID() uint64 {
  350. var rnd [8]byte
  351. rand.Read(rnd[:])
  352. return binary.BigEndian.Uint64(rnd[:])
  353. }