123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- // Package les implements the Light Ethereum Subprotocol.
- package les
- import (
- "fmt"
- "io"
- "math"
- "math/rand"
- "net"
- "strconv"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common/mclock"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/discover"
- "github.com/ethereum/go-ethereum/p2p/discv5"
- "github.com/ethereum/go-ethereum/rlp"
- )
- const (
- // After a connection has been ended or timed out, there is a waiting period
- // before it can be selected for connection again.
- // waiting period = base delay * (1 + random(1))
- // base delay = shortRetryDelay for the first shortRetryCnt times after a
- // successful connection, after that longRetryDelay is applied
- shortRetryCnt = 5
- shortRetryDelay = time.Second * 5
- longRetryDelay = time.Minute * 10
- // maxNewEntries is the maximum number of newly discovered (never connected) nodes.
- // If the limit is reached, the least recently discovered one is thrown out.
- maxNewEntries = 1000
- // maxKnownEntries is the maximum number of known (already connected) nodes.
- // If the limit is reached, the least recently connected one is thrown out.
- // (not that unlike new entries, known entries are persistent)
- maxKnownEntries = 1000
- // target for simultaneously connected servers
- targetServerCount = 5
- // target for servers selected from the known table
- // (we leave room for trying new ones if there is any)
- targetKnownSelect = 3
- // after dialTimeout, consider the server unavailable and adjust statistics
- dialTimeout = time.Second * 30
- // targetConnTime is the minimum expected connection duration before a server
- // drops a client without any specific reason
- targetConnTime = time.Minute * 10
- // new entry selection weight calculation based on most recent discovery time:
- // unity until discoverExpireStart, then exponential decay with discoverExpireConst
- discoverExpireStart = time.Minute * 20
- discoverExpireConst = time.Minute * 20
- // known entry selection weight is dropped by a factor of exp(-failDropLn) after
- // each unsuccessful connection (restored after a successful one)
- failDropLn = 0.1
- // known node connection success and quality statistics have a long term average
- // and a short term value which is adjusted exponentially with a factor of
- // pstatRecentAdjust with each dial/connection and also returned exponentially
- // to the average with the time constant pstatReturnToMeanTC
- pstatReturnToMeanTC = time.Hour
- // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after
- // each unsuccessful connection (restored after a successful one)
- addrFailDropLn = math.Ln2
- // responseScoreTC and delayScoreTC are exponential decay time constants for
- // calculating selection chances from response times and block delay times
- responseScoreTC = time.Millisecond * 100
- delayScoreTC = time.Second * 5
- timeoutPow = 10
- // initStatsWeight is used to initialize previously unknown peers with good
- // statistics to give a chance to prove themselves
- initStatsWeight = 1
- )
- // serverPool implements a pool for storing and selecting newly discovered and already
- // known light server nodes. It received discovered nodes, stores statistics about
- // known nodes and takes care of always having enough good quality servers connected.
- type serverPool struct {
- db ethdb.Database
- dbKey []byte
- server *p2p.Server
- quit chan struct{}
- wg *sync.WaitGroup
- connWg sync.WaitGroup
- topic discv5.Topic
- discSetPeriod chan time.Duration
- discNodes chan *discv5.Node
- discLookups chan bool
- entries map[discover.NodeID]*poolEntry
- lock sync.Mutex
- timeout, enableRetry chan *poolEntry
- adjustStats chan poolStatAdjust
- knownQueue, newQueue poolEntryQueue
- knownSelect, newSelect *weightedRandomSelect
- knownSelected, newSelected int
- fastDiscover bool
- }
- // newServerPool creates a new serverPool instance
- func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
- pool := &serverPool{
- db: db,
- quit: quit,
- wg: wg,
- entries: make(map[discover.NodeID]*poolEntry),
- timeout: make(chan *poolEntry, 1),
- adjustStats: make(chan poolStatAdjust, 100),
- enableRetry: make(chan *poolEntry, 1),
- knownSelect: newWeightedRandomSelect(),
- newSelect: newWeightedRandomSelect(),
- fastDiscover: true,
- }
- pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry)
- pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry)
- return pool
- }
- func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
- pool.server = server
- pool.topic = topic
- pool.dbKey = append([]byte("serverPool/"), []byte(topic)...)
- pool.wg.Add(1)
- pool.loadNodes()
- if pool.server.DiscV5 != nil {
- pool.discSetPeriod = make(chan time.Duration, 1)
- pool.discNodes = make(chan *discv5.Node, 100)
- pool.discLookups = make(chan bool, 100)
- go pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
- }
- go pool.eventLoop()
- pool.checkDial()
- }
- // connect should be called upon any incoming connection. If the connection has been
- // dialed by the server pool recently, the appropriate pool entry is returned.
- // Otherwise, the connection should be rejected.
- // Note that whenever a connection has been accepted and a pool entry has been returned,
- // disconnect should also always be called.
- func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
- pool.lock.Lock()
- defer pool.lock.Unlock()
- entry := pool.entries[p.ID()]
- if entry == nil {
- entry = pool.findOrNewNode(p.ID(), ip, port)
- }
- p.Log().Debug("Connecting to new peer", "state", entry.state)
- if entry.state == psConnected || entry.state == psRegistered {
- return nil
- }
- pool.connWg.Add(1)
- entry.peer = p
- entry.state = psConnected
- addr := &poolEntryAddress{
- ip: ip,
- port: port,
- lastSeen: mclock.Now(),
- }
- entry.lastConnected = addr
- entry.addr = make(map[string]*poolEntryAddress)
- entry.addr[addr.strKey()] = addr
- entry.addrSelect = *newWeightedRandomSelect()
- entry.addrSelect.update(addr)
- return entry
- }
- // registered should be called after a successful handshake
- func (pool *serverPool) registered(entry *poolEntry) {
- log.Debug("Registered new entry", "enode", entry.id)
- pool.lock.Lock()
- defer pool.lock.Unlock()
- entry.state = psRegistered
- entry.regTime = mclock.Now()
- if !entry.known {
- pool.newQueue.remove(entry)
- entry.known = true
- }
- pool.knownQueue.setLatest(entry)
- entry.shortRetry = shortRetryCnt
- }
- // disconnect should be called when ending a connection. Service quality statistics
- // can be updated optionally (not updated if no registration happened, in this case
- // only connection statistics are updated, just like in case of timeout)
- func (pool *serverPool) disconnect(entry *poolEntry) {
- log.Debug("Disconnected old entry", "enode", entry.id)
- pool.lock.Lock()
- defer pool.lock.Unlock()
- if entry.state == psRegistered {
- connTime := mclock.Now() - entry.regTime
- connAdjust := float64(connTime) / float64(targetConnTime)
- if connAdjust > 1 {
- connAdjust = 1
- }
- stopped := false
- select {
- case <-pool.quit:
- stopped = true
- default:
- }
- if stopped {
- entry.connectStats.add(1, connAdjust)
- } else {
- entry.connectStats.add(connAdjust, 1)
- }
- }
- entry.state = psNotConnected
- if entry.knownSelected {
- pool.knownSelected--
- } else {
- pool.newSelected--
- }
- pool.setRetryDial(entry)
- pool.connWg.Done()
- }
- const (
- pseBlockDelay = iota
- pseResponseTime
- pseResponseTimeout
- )
- // poolStatAdjust records are sent to adjust peer block delay/response time statistics
- type poolStatAdjust struct {
- adjustType int
- entry *poolEntry
- time time.Duration
- }
- // adjustBlockDelay adjusts the block announce delay statistics of a node
- func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
- if entry == nil {
- return
- }
- pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
- }
- // adjustResponseTime adjusts the request response time statistics of a node
- func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
- if entry == nil {
- return
- }
- if timeout {
- pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
- } else {
- pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time}
- }
- }
- // eventLoop handles pool events and mutex locking for all internal functions
- func (pool *serverPool) eventLoop() {
- lookupCnt := 0
- var convTime mclock.AbsTime
- if pool.discSetPeriod != nil {
- pool.discSetPeriod <- time.Millisecond * 100
- }
- for {
- select {
- case entry := <-pool.timeout:
- pool.lock.Lock()
- if !entry.removed {
- pool.checkDialTimeout(entry)
- }
- pool.lock.Unlock()
- case entry := <-pool.enableRetry:
- pool.lock.Lock()
- if !entry.removed {
- entry.delayedRetry = false
- pool.updateCheckDial(entry)
- }
- pool.lock.Unlock()
- case adj := <-pool.adjustStats:
- pool.lock.Lock()
- switch adj.adjustType {
- case pseBlockDelay:
- adj.entry.delayStats.add(float64(adj.time), 1)
- case pseResponseTime:
- adj.entry.responseStats.add(float64(adj.time), 1)
- adj.entry.timeoutStats.add(0, 1)
- case pseResponseTimeout:
- adj.entry.timeoutStats.add(1, 1)
- }
- pool.lock.Unlock()
- case node := <-pool.discNodes:
- pool.lock.Lock()
- entry := pool.findOrNewNode(discover.NodeID(node.ID), node.IP, node.TCP)
- pool.updateCheckDial(entry)
- pool.lock.Unlock()
- case conv := <-pool.discLookups:
- if conv {
- if lookupCnt == 0 {
- convTime = mclock.Now()
- }
- lookupCnt++
- if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) {
- pool.fastDiscover = false
- if pool.discSetPeriod != nil {
- pool.discSetPeriod <- time.Minute
- }
- }
- }
- case <-pool.quit:
- if pool.discSetPeriod != nil {
- close(pool.discSetPeriod)
- }
- pool.connWg.Wait()
- pool.saveNodes()
- pool.wg.Done()
- return
- }
- }
- }
- func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16) *poolEntry {
- now := mclock.Now()
- entry := pool.entries[id]
- if entry == nil {
- log.Debug("Discovered new entry", "id", id)
- entry = &poolEntry{
- id: id,
- addr: make(map[string]*poolEntryAddress),
- addrSelect: *newWeightedRandomSelect(),
- shortRetry: shortRetryCnt,
- }
- pool.entries[id] = entry
- // initialize previously unknown peers with good statistics to give a chance to prove themselves
- entry.connectStats.add(1, initStatsWeight)
- entry.delayStats.add(0, initStatsWeight)
- entry.responseStats.add(0, initStatsWeight)
- entry.timeoutStats.add(0, initStatsWeight)
- }
- entry.lastDiscovered = now
- addr := &poolEntryAddress{
- ip: ip,
- port: port,
- }
- if a, ok := entry.addr[addr.strKey()]; ok {
- addr = a
- } else {
- entry.addr[addr.strKey()] = addr
- }
- addr.lastSeen = now
- entry.addrSelect.update(addr)
- if !entry.known {
- pool.newQueue.setLatest(entry)
- }
- return entry
- }
- // loadNodes loads known nodes and their statistics from the database
- func (pool *serverPool) loadNodes() {
- enc, err := pool.db.Get(pool.dbKey)
- if err != nil {
- return
- }
- var list []*poolEntry
- err = rlp.DecodeBytes(enc, &list)
- if err != nil {
- log.Debug("Failed to decode node list", "err", err)
- return
- }
- for _, e := range list {
- log.Debug("Loaded server stats", "id", e.id, "fails", e.lastConnected.fails,
- "conn", fmt.Sprintf("%v/%v", e.connectStats.avg, e.connectStats.weight),
- "delay", fmt.Sprintf("%v/%v", time.Duration(e.delayStats.avg), e.delayStats.weight),
- "response", fmt.Sprintf("%v/%v", time.Duration(e.responseStats.avg), e.responseStats.weight),
- "timeout", fmt.Sprintf("%v/%v", e.timeoutStats.avg, e.timeoutStats.weight))
- pool.entries[e.id] = e
- pool.knownQueue.setLatest(e)
- pool.knownSelect.update((*knownEntry)(e))
- }
- }
- // saveNodes saves known nodes and their statistics into the database. Nodes are
- // ordered from least to most recently connected.
- func (pool *serverPool) saveNodes() {
- list := make([]*poolEntry, len(pool.knownQueue.queue))
- for i := range list {
- list[i] = pool.knownQueue.fetchOldest()
- }
- enc, err := rlp.EncodeToBytes(list)
- if err == nil {
- pool.db.Put(pool.dbKey, enc)
- }
- }
- // removeEntry removes a pool entry when the entry count limit is reached.
- // Note that it is called by the new/known queues from which the entry has already
- // been removed so removing it from the queues is not necessary.
- func (pool *serverPool) removeEntry(entry *poolEntry) {
- pool.newSelect.remove((*discoveredEntry)(entry))
- pool.knownSelect.remove((*knownEntry)(entry))
- entry.removed = true
- delete(pool.entries, entry.id)
- }
- // setRetryDial starts the timer which will enable dialing a certain node again
- func (pool *serverPool) setRetryDial(entry *poolEntry) {
- delay := longRetryDelay
- if entry.shortRetry > 0 {
- entry.shortRetry--
- delay = shortRetryDelay
- }
- delay += time.Duration(rand.Int63n(int64(delay) + 1))
- entry.delayedRetry = true
- go func() {
- select {
- case <-pool.quit:
- case <-time.After(delay):
- select {
- case <-pool.quit:
- case pool.enableRetry <- entry:
- }
- }
- }()
- }
- // updateCheckDial is called when an entry can potentially be dialed again. It updates
- // its selection weights and checks if new dials can/should be made.
- func (pool *serverPool) updateCheckDial(entry *poolEntry) {
- pool.newSelect.update((*discoveredEntry)(entry))
- pool.knownSelect.update((*knownEntry)(entry))
- pool.checkDial()
- }
- // checkDial checks if new dials can/should be made. It tries to select servers both
- // based on good statistics and recent discovery.
- func (pool *serverPool) checkDial() {
- fillWithKnownSelects := !pool.fastDiscover
- for pool.knownSelected < targetKnownSelect {
- entry := pool.knownSelect.choose()
- if entry == nil {
- fillWithKnownSelects = false
- break
- }
- pool.dial((*poolEntry)(entry.(*knownEntry)), true)
- }
- for pool.knownSelected+pool.newSelected < targetServerCount {
- entry := pool.newSelect.choose()
- if entry == nil {
- break
- }
- pool.dial((*poolEntry)(entry.(*discoveredEntry)), false)
- }
- if fillWithKnownSelects {
- // no more newly discovered nodes to select and since fast discover period
- // is over, we probably won't find more in the near future so select more
- // known entries if possible
- for pool.knownSelected < targetServerCount {
- entry := pool.knownSelect.choose()
- if entry == nil {
- break
- }
- pool.dial((*poolEntry)(entry.(*knownEntry)), true)
- }
- }
- }
- // dial initiates a new connection
- func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
- if pool.server == nil || entry.state != psNotConnected {
- return
- }
- entry.state = psDialed
- entry.knownSelected = knownSelected
- if knownSelected {
- pool.knownSelected++
- } else {
- pool.newSelected++
- }
- addr := entry.addrSelect.choose().(*poolEntryAddress)
- log.Debug("Dialing new peer", "lesaddr", entry.id.String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected)
- entry.dialed = addr
- go func() {
- pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port))
- select {
- case <-pool.quit:
- case <-time.After(dialTimeout):
- select {
- case <-pool.quit:
- case pool.timeout <- entry:
- }
- }
- }()
- }
- // checkDialTimeout checks if the node is still in dialed state and if so, resets it
- // and adjusts connection statistics accordingly.
- func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
- if entry.state != psDialed {
- return
- }
- log.Debug("Dial timeout", "lesaddr", entry.id.String()+"@"+entry.dialed.strKey())
- entry.state = psNotConnected
- if entry.knownSelected {
- pool.knownSelected--
- } else {
- pool.newSelected--
- }
- entry.connectStats.add(0, 1)
- entry.dialed.fails++
- pool.setRetryDial(entry)
- }
- const (
- psNotConnected = iota
- psDialed
- psConnected
- psRegistered
- )
- // poolEntry represents a server node and stores its current state and statistics.
- type poolEntry struct {
- peer *peer
- id discover.NodeID
- addr map[string]*poolEntryAddress
- lastConnected, dialed *poolEntryAddress
- addrSelect weightedRandomSelect
- lastDiscovered mclock.AbsTime
- known, knownSelected bool
- connectStats, delayStats poolStats
- responseStats, timeoutStats poolStats
- state int
- regTime mclock.AbsTime
- queueIdx int
- removed bool
- delayedRetry bool
- shortRetry int
- }
- func (e *poolEntry) EncodeRLP(w io.Writer) error {
- return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats})
- }
- func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
- var entry struct {
- ID discover.NodeID
- IP net.IP
- Port uint16
- Fails uint
- CStat, DStat, RStat, TStat poolStats
- }
- if err := s.Decode(&entry); err != nil {
- return err
- }
- addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()}
- e.id = entry.ID
- e.addr = make(map[string]*poolEntryAddress)
- e.addr[addr.strKey()] = addr
- e.addrSelect = *newWeightedRandomSelect()
- e.addrSelect.update(addr)
- e.lastConnected = addr
- e.connectStats = entry.CStat
- e.delayStats = entry.DStat
- e.responseStats = entry.RStat
- e.timeoutStats = entry.TStat
- e.shortRetry = shortRetryCnt
- e.known = true
- return nil
- }
- // discoveredEntry implements wrsItem
- type discoveredEntry poolEntry
- // Weight calculates random selection weight for newly discovered entries
- func (e *discoveredEntry) Weight() int64 {
- if e.state != psNotConnected || e.delayedRetry {
- return 0
- }
- t := time.Duration(mclock.Now() - e.lastDiscovered)
- if t <= discoverExpireStart {
- return 1000000000
- }
- return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst)))
- }
- // knownEntry implements wrsItem
- type knownEntry poolEntry
- // Weight calculates random selection weight for known entries
- func (e *knownEntry) Weight() int64 {
- if e.state != psNotConnected || !e.known || e.delayedRetry {
- return 0
- }
- return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow(1-e.timeoutStats.recentAvg(), timeoutPow))
- }
- // poolEntryAddress is a separate object because currently it is necessary to remember
- // multiple potential network addresses for a pool entry. This will be removed after
- // the final implementation of v5 discovery which will retrieve signed and serial
- // numbered advertisements, making it clear which IP/port is the latest one.
- type poolEntryAddress struct {
- ip net.IP
- port uint16
- lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db
- fails uint // connection failures since last successful connection (persistent)
- }
- func (a *poolEntryAddress) Weight() int64 {
- t := time.Duration(mclock.Now() - a.lastSeen)
- return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1
- }
- func (a *poolEntryAddress) strKey() string {
- return a.ip.String() + ":" + strconv.Itoa(int(a.port))
- }
- // poolStats implement statistics for a certain quantity with a long term average
- // and a short term value which is adjusted exponentially with a factor of
- // pstatRecentAdjust with each update and also returned exponentially to the
- // average with the time constant pstatReturnToMeanTC
- type poolStats struct {
- sum, weight, avg, recent float64
- lastRecalc mclock.AbsTime
- }
- // init initializes stats with a long term sum/update count pair retrieved from the database
- func (s *poolStats) init(sum, weight float64) {
- s.sum = sum
- s.weight = weight
- var avg float64
- if weight > 0 {
- avg = s.sum / weight
- }
- s.avg = avg
- s.recent = avg
- s.lastRecalc = mclock.Now()
- }
- // recalc recalculates recent value return-to-mean and long term average
- func (s *poolStats) recalc() {
- now := mclock.Now()
- s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC))
- if s.sum == 0 {
- s.avg = 0
- } else {
- if s.sum > s.weight*1e30 {
- s.avg = 1e30
- } else {
- s.avg = s.sum / s.weight
- }
- }
- s.lastRecalc = now
- }
- // add updates the stats with a new value
- func (s *poolStats) add(value, weight float64) {
- s.weight += weight
- s.sum += value * weight
- s.recalc()
- }
- // recentAvg returns the short-term adjusted average
- func (s *poolStats) recentAvg() float64 {
- s.recalc()
- return s.recent
- }
- func (s *poolStats) EncodeRLP(w io.Writer) error {
- return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)})
- }
- func (s *poolStats) DecodeRLP(st *rlp.Stream) error {
- var stats struct {
- SumUint, WeightUint uint64
- }
- if err := st.Decode(&stats); err != nil {
- return err
- }
- s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint))
- return nil
- }
- // poolEntryQueue keeps track of its least recently accessed entries and removes
- // them when the number of entries reaches the limit
- type poolEntryQueue struct {
- queue map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value
- newPtr, oldPtr, maxCnt int
- removeFromPool func(*poolEntry)
- }
- // newPoolEntryQueue returns a new poolEntryQueue
- func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue {
- return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool}
- }
- // fetchOldest returns and removes the least recently accessed entry
- func (q *poolEntryQueue) fetchOldest() *poolEntry {
- if len(q.queue) == 0 {
- return nil
- }
- for {
- if e := q.queue[q.oldPtr]; e != nil {
- delete(q.queue, q.oldPtr)
- q.oldPtr++
- return e
- }
- q.oldPtr++
- }
- }
- // remove removes an entry from the queue
- func (q *poolEntryQueue) remove(entry *poolEntry) {
- if q.queue[entry.queueIdx] == entry {
- delete(q.queue, entry.queueIdx)
- }
- }
- // setLatest adds or updates a recently accessed entry. It also checks if an old entry
- // needs to be removed and removes it from the parent pool too with a callback function.
- func (q *poolEntryQueue) setLatest(entry *poolEntry) {
- if q.queue[entry.queueIdx] == entry {
- delete(q.queue, entry.queueIdx)
- } else {
- if len(q.queue) == q.maxCnt {
- e := q.fetchOldest()
- q.remove(e)
- q.removeFromPool(e)
- }
- }
- entry.queueIdx = q.newPtr
- q.queue[entry.queueIdx] = entry
- q.newPtr++
- }
|