123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- // Copyright 2017 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- // Package light implements on-demand retrieval capable state and chain objects
- // for the Ethereum Light Client.
- package les
- import (
- "container/list"
- "errors"
- "sync"
- "time"
- )
- // ErrNoPeers is returned if no peers capable of serving a queued request are available
- var ErrNoPeers = errors.New("no suitable peers available")
- // requestDistributor implements a mechanism that distributes requests to
- // suitable peers, obeying flow control rules and prioritizing them in creation
- // order (even when a resend is necessary).
- type requestDistributor struct {
- reqQueue *list.List
- lastReqOrder uint64
- peers map[distPeer]struct{}
- peerLock sync.RWMutex
- stopChn, loopChn chan struct{}
- loopNextSent bool
- lock sync.Mutex
- }
- // distPeer is an LES server peer interface for the request distributor.
- // waitBefore returns either the necessary waiting time before sending a request
- // with the given upper estimated cost or the estimated remaining relative buffer
- // value after sending such a request (in which case the request can be sent
- // immediately). At least one of these values is always zero.
- type distPeer interface {
- waitBefore(uint64) (time.Duration, float64)
- canQueue() bool
- queueSend(f func())
- }
- // distReq is the request abstraction used by the distributor. It is based on
- // three callback functions:
- // - getCost returns the upper estimate of the cost of sending the request to a given peer
- // - canSend tells if the server peer is suitable to serve the request
- // - request prepares sending the request to the given peer and returns a function that
- // does the actual sending. Request order should be preserved but the callback itself should not
- // block until it is sent because other peers might still be able to receive requests while
- // one of them is blocking. Instead, the returned function is put in the peer's send queue.
- type distReq struct {
- getCost func(distPeer) uint64
- canSend func(distPeer) bool
- request func(distPeer) func()
- reqOrder uint64
- sentChn chan distPeer
- element *list.Element
- }
- // newRequestDistributor creates a new request distributor
- func newRequestDistributor(peers *peerSet, stopChn chan struct{}) *requestDistributor {
- d := &requestDistributor{
- reqQueue: list.New(),
- loopChn: make(chan struct{}, 2),
- stopChn: stopChn,
- peers: make(map[distPeer]struct{}),
- }
- if peers != nil {
- peers.notify(d)
- }
- go d.loop()
- return d
- }
- // registerPeer implements peerSetNotify
- func (d *requestDistributor) registerPeer(p *peer) {
- d.peerLock.Lock()
- d.peers[p] = struct{}{}
- d.peerLock.Unlock()
- }
- // unregisterPeer implements peerSetNotify
- func (d *requestDistributor) unregisterPeer(p *peer) {
- d.peerLock.Lock()
- delete(d.peers, p)
- d.peerLock.Unlock()
- }
- // registerTestPeer adds a new test peer
- func (d *requestDistributor) registerTestPeer(p distPeer) {
- d.peerLock.Lock()
- d.peers[p] = struct{}{}
- d.peerLock.Unlock()
- }
- // distMaxWait is the maximum waiting time after which further necessary waiting
- // times are recalculated based on new feedback from the servers
- const distMaxWait = time.Millisecond * 10
- // main event loop
- func (d *requestDistributor) loop() {
- for {
- select {
- case <-d.stopChn:
- d.lock.Lock()
- elem := d.reqQueue.Front()
- for elem != nil {
- close(elem.Value.(*distReq).sentChn)
- elem = elem.Next()
- }
- d.lock.Unlock()
- return
- case <-d.loopChn:
- d.lock.Lock()
- d.loopNextSent = false
- loop:
- for {
- peer, req, wait := d.nextRequest()
- if req != nil && wait == 0 {
- chn := req.sentChn // save sentChn because remove sets it to nil
- d.remove(req)
- send := req.request(peer)
- if send != nil {
- peer.queueSend(send)
- }
- chn <- peer
- close(chn)
- } else {
- if wait == 0 {
- // no request to send and nothing to wait for; the next
- // queued request will wake up the loop
- break loop
- }
- d.loopNextSent = true // a "next" signal has been sent, do not send another one until this one has been received
- if wait > distMaxWait {
- // waiting times may be reduced by incoming request replies, if it is too long, recalculate it periodically
- wait = distMaxWait
- }
- go func() {
- time.Sleep(wait)
- d.loopChn <- struct{}{}
- }()
- break loop
- }
- }
- d.lock.Unlock()
- }
- }
- }
- // selectPeerItem represents a peer to be selected for a request by weightedRandomSelect
- type selectPeerItem struct {
- peer distPeer
- req *distReq
- weight int64
- }
- // Weight implements wrsItem interface
- func (sp selectPeerItem) Weight() int64 {
- return sp.weight
- }
- // nextRequest returns the next possible request from any peer, along with the
- // associated peer and necessary waiting time
- func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
- checkedPeers := make(map[distPeer]struct{})
- elem := d.reqQueue.Front()
- var (
- bestPeer distPeer
- bestReq *distReq
- bestWait time.Duration
- sel *weightedRandomSelect
- )
- d.peerLock.RLock()
- defer d.peerLock.RUnlock()
- for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
- req := elem.Value.(*distReq)
- canSend := false
- for peer := range d.peers {
- if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) {
- canSend = true
- cost := req.getCost(peer)
- wait, bufRemain := peer.waitBefore(cost)
- if wait == 0 {
- if sel == nil {
- sel = newWeightedRandomSelect()
- }
- sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
- } else {
- if bestReq == nil || wait < bestWait {
- bestPeer = peer
- bestReq = req
- bestWait = wait
- }
- }
- checkedPeers[peer] = struct{}{}
- }
- }
- next := elem.Next()
- if !canSend && elem == d.reqQueue.Front() {
- close(req.sentChn)
- d.remove(req)
- }
- elem = next
- }
- if sel != nil {
- c := sel.choose().(selectPeerItem)
- return c.peer, c.req, 0
- }
- return bestPeer, bestReq, bestWait
- }
- // queue adds a request to the distribution queue, returns a channel where the
- // receiving peer is sent once the request has been sent (request callback returned).
- // If the request is cancelled or timed out without suitable peers, the channel is
- // closed without sending any peer references to it.
- func (d *requestDistributor) queue(r *distReq) chan distPeer {
- d.lock.Lock()
- defer d.lock.Unlock()
- if r.reqOrder == 0 {
- d.lastReqOrder++
- r.reqOrder = d.lastReqOrder
- }
- back := d.reqQueue.Back()
- if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder {
- r.element = d.reqQueue.PushBack(r)
- } else {
- before := d.reqQueue.Front()
- for before.Value.(*distReq).reqOrder < r.reqOrder {
- before = before.Next()
- }
- r.element = d.reqQueue.InsertBefore(r, before)
- }
- if !d.loopNextSent {
- d.loopNextSent = true
- d.loopChn <- struct{}{}
- }
- r.sentChn = make(chan distPeer, 1)
- return r.sentChn
- }
- // cancel removes a request from the queue if it has not been sent yet (returns
- // false if it has been sent already). It is guaranteed that the callback functions
- // will not be called after cancel returns.
- func (d *requestDistributor) cancel(r *distReq) bool {
- d.lock.Lock()
- defer d.lock.Unlock()
- if r.sentChn == nil {
- return false
- }
- close(r.sentChn)
- d.remove(r)
- return true
- }
- // remove removes a request from the queue
- func (d *requestDistributor) remove(r *distReq) {
- r.sentChn = nil
- if r.element != nil {
- d.reqQueue.Remove(r.element)
- r.element = nil
- }
- }
|