serverpool.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "fmt"
  20. "io"
  21. "math"
  22. "math/rand"
  23. "net"
  24. "strconv"
  25. "sync"
  26. "time"
  27. "github.com/ethereum/go-ethereum/common/mclock"
  28. "github.com/ethereum/go-ethereum/ethdb"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/p2p"
  31. "github.com/ethereum/go-ethereum/p2p/discover"
  32. "github.com/ethereum/go-ethereum/p2p/discv5"
  33. "github.com/ethereum/go-ethereum/rlp"
  34. )
  35. const (
  36. // After a connection has been ended or timed out, there is a waiting period
  37. // before it can be selected for connection again.
  38. // waiting period = base delay * (1 + random(1))
  39. // base delay = shortRetryDelay for the first shortRetryCnt times after a
  40. // successful connection, after that longRetryDelay is applied
  41. shortRetryCnt = 5
  42. shortRetryDelay = time.Second * 5
  43. longRetryDelay = time.Minute * 10
  44. // maxNewEntries is the maximum number of newly discovered (never connected) nodes.
  45. // If the limit is reached, the least recently discovered one is thrown out.
  46. maxNewEntries = 1000
  47. // maxKnownEntries is the maximum number of known (already connected) nodes.
  48. // If the limit is reached, the least recently connected one is thrown out.
  49. // (not that unlike new entries, known entries are persistent)
  50. maxKnownEntries = 1000
  51. // target for simultaneously connected servers
  52. targetServerCount = 5
  53. // target for servers selected from the known table
  54. // (we leave room for trying new ones if there is any)
  55. targetKnownSelect = 3
  56. // after dialTimeout, consider the server unavailable and adjust statistics
  57. dialTimeout = time.Second * 30
  58. // targetConnTime is the minimum expected connection duration before a server
  59. // drops a client without any specific reason
  60. targetConnTime = time.Minute * 10
  61. // new entry selection weight calculation based on most recent discovery time:
  62. // unity until discoverExpireStart, then exponential decay with discoverExpireConst
  63. discoverExpireStart = time.Minute * 20
  64. discoverExpireConst = time.Minute * 20
  65. // known entry selection weight is dropped by a factor of exp(-failDropLn) after
  66. // each unsuccessful connection (restored after a successful one)
  67. failDropLn = 0.1
  68. // known node connection success and quality statistics have a long term average
  69. // and a short term value which is adjusted exponentially with a factor of
  70. // pstatRecentAdjust with each dial/connection and also returned exponentially
  71. // to the average with the time constant pstatReturnToMeanTC
  72. pstatReturnToMeanTC = time.Hour
  73. // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after
  74. // each unsuccessful connection (restored after a successful one)
  75. addrFailDropLn = math.Ln2
  76. // responseScoreTC and delayScoreTC are exponential decay time constants for
  77. // calculating selection chances from response times and block delay times
  78. responseScoreTC = time.Millisecond * 100
  79. delayScoreTC = time.Second * 5
  80. timeoutPow = 10
  81. // initStatsWeight is used to initialize previously unknown peers with good
  82. // statistics to give a chance to prove themselves
  83. initStatsWeight = 1
  84. )
  85. // serverPool implements a pool for storing and selecting newly discovered and already
  86. // known light server nodes. It received discovered nodes, stores statistics about
  87. // known nodes and takes care of always having enough good quality servers connected.
  88. type serverPool struct {
  89. db ethdb.Database
  90. dbKey []byte
  91. server *p2p.Server
  92. quit chan struct{}
  93. wg *sync.WaitGroup
  94. connWg sync.WaitGroup
  95. topic discv5.Topic
  96. discSetPeriod chan time.Duration
  97. discNodes chan *discv5.Node
  98. discLookups chan bool
  99. entries map[discover.NodeID]*poolEntry
  100. lock sync.Mutex
  101. timeout, enableRetry chan *poolEntry
  102. adjustStats chan poolStatAdjust
  103. knownQueue, newQueue poolEntryQueue
  104. knownSelect, newSelect *weightedRandomSelect
  105. knownSelected, newSelected int
  106. fastDiscover bool
  107. }
  108. // newServerPool creates a new serverPool instance
  109. func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
  110. pool := &serverPool{
  111. db: db,
  112. quit: quit,
  113. wg: wg,
  114. entries: make(map[discover.NodeID]*poolEntry),
  115. timeout: make(chan *poolEntry, 1),
  116. adjustStats: make(chan poolStatAdjust, 100),
  117. enableRetry: make(chan *poolEntry, 1),
  118. knownSelect: newWeightedRandomSelect(),
  119. newSelect: newWeightedRandomSelect(),
  120. fastDiscover: true,
  121. }
  122. pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry)
  123. pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry)
  124. return pool
  125. }
  126. func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
  127. pool.server = server
  128. pool.topic = topic
  129. pool.dbKey = append([]byte("serverPool/"), []byte(topic)...)
  130. pool.wg.Add(1)
  131. pool.loadNodes()
  132. if pool.server.DiscV5 != nil {
  133. pool.discSetPeriod = make(chan time.Duration, 1)
  134. pool.discNodes = make(chan *discv5.Node, 100)
  135. pool.discLookups = make(chan bool, 100)
  136. go pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
  137. }
  138. go pool.eventLoop()
  139. pool.checkDial()
  140. }
  141. // connect should be called upon any incoming connection. If the connection has been
  142. // dialed by the server pool recently, the appropriate pool entry is returned.
  143. // Otherwise, the connection should be rejected.
  144. // Note that whenever a connection has been accepted and a pool entry has been returned,
  145. // disconnect should also always be called.
  146. func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
  147. pool.lock.Lock()
  148. defer pool.lock.Unlock()
  149. entry := pool.entries[p.ID()]
  150. if entry == nil {
  151. entry = pool.findOrNewNode(p.ID(), ip, port)
  152. }
  153. p.Log().Debug("Connecting to new peer", "state", entry.state)
  154. if entry.state == psConnected || entry.state == psRegistered {
  155. return nil
  156. }
  157. pool.connWg.Add(1)
  158. entry.peer = p
  159. entry.state = psConnected
  160. addr := &poolEntryAddress{
  161. ip: ip,
  162. port: port,
  163. lastSeen: mclock.Now(),
  164. }
  165. entry.lastConnected = addr
  166. entry.addr = make(map[string]*poolEntryAddress)
  167. entry.addr[addr.strKey()] = addr
  168. entry.addrSelect = *newWeightedRandomSelect()
  169. entry.addrSelect.update(addr)
  170. return entry
  171. }
  172. // registered should be called after a successful handshake
  173. func (pool *serverPool) registered(entry *poolEntry) {
  174. log.Debug("Registered new entry", "enode", entry.id)
  175. pool.lock.Lock()
  176. defer pool.lock.Unlock()
  177. entry.state = psRegistered
  178. entry.regTime = mclock.Now()
  179. if !entry.known {
  180. pool.newQueue.remove(entry)
  181. entry.known = true
  182. }
  183. pool.knownQueue.setLatest(entry)
  184. entry.shortRetry = shortRetryCnt
  185. }
  186. // disconnect should be called when ending a connection. Service quality statistics
  187. // can be updated optionally (not updated if no registration happened, in this case
  188. // only connection statistics are updated, just like in case of timeout)
  189. func (pool *serverPool) disconnect(entry *poolEntry) {
  190. log.Debug("Disconnected old entry", "enode", entry.id)
  191. pool.lock.Lock()
  192. defer pool.lock.Unlock()
  193. if entry.state == psRegistered {
  194. connTime := mclock.Now() - entry.regTime
  195. connAdjust := float64(connTime) / float64(targetConnTime)
  196. if connAdjust > 1 {
  197. connAdjust = 1
  198. }
  199. stopped := false
  200. select {
  201. case <-pool.quit:
  202. stopped = true
  203. default:
  204. }
  205. if stopped {
  206. entry.connectStats.add(1, connAdjust)
  207. } else {
  208. entry.connectStats.add(connAdjust, 1)
  209. }
  210. }
  211. entry.state = psNotConnected
  212. if entry.knownSelected {
  213. pool.knownSelected--
  214. } else {
  215. pool.newSelected--
  216. }
  217. pool.setRetryDial(entry)
  218. pool.connWg.Done()
  219. }
  220. const (
  221. pseBlockDelay = iota
  222. pseResponseTime
  223. pseResponseTimeout
  224. )
  225. // poolStatAdjust records are sent to adjust peer block delay/response time statistics
  226. type poolStatAdjust struct {
  227. adjustType int
  228. entry *poolEntry
  229. time time.Duration
  230. }
  231. // adjustBlockDelay adjusts the block announce delay statistics of a node
  232. func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
  233. if entry == nil {
  234. return
  235. }
  236. pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
  237. }
  238. // adjustResponseTime adjusts the request response time statistics of a node
  239. func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
  240. if entry == nil {
  241. return
  242. }
  243. if timeout {
  244. pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
  245. } else {
  246. pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time}
  247. }
  248. }
  249. // eventLoop handles pool events and mutex locking for all internal functions
  250. func (pool *serverPool) eventLoop() {
  251. lookupCnt := 0
  252. var convTime mclock.AbsTime
  253. if pool.discSetPeriod != nil {
  254. pool.discSetPeriod <- time.Millisecond * 100
  255. }
  256. for {
  257. select {
  258. case entry := <-pool.timeout:
  259. pool.lock.Lock()
  260. if !entry.removed {
  261. pool.checkDialTimeout(entry)
  262. }
  263. pool.lock.Unlock()
  264. case entry := <-pool.enableRetry:
  265. pool.lock.Lock()
  266. if !entry.removed {
  267. entry.delayedRetry = false
  268. pool.updateCheckDial(entry)
  269. }
  270. pool.lock.Unlock()
  271. case adj := <-pool.adjustStats:
  272. pool.lock.Lock()
  273. switch adj.adjustType {
  274. case pseBlockDelay:
  275. adj.entry.delayStats.add(float64(adj.time), 1)
  276. case pseResponseTime:
  277. adj.entry.responseStats.add(float64(adj.time), 1)
  278. adj.entry.timeoutStats.add(0, 1)
  279. case pseResponseTimeout:
  280. adj.entry.timeoutStats.add(1, 1)
  281. }
  282. pool.lock.Unlock()
  283. case node := <-pool.discNodes:
  284. pool.lock.Lock()
  285. entry := pool.findOrNewNode(discover.NodeID(node.ID), node.IP, node.TCP)
  286. pool.updateCheckDial(entry)
  287. pool.lock.Unlock()
  288. case conv := <-pool.discLookups:
  289. if conv {
  290. if lookupCnt == 0 {
  291. convTime = mclock.Now()
  292. }
  293. lookupCnt++
  294. if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) {
  295. pool.fastDiscover = false
  296. if pool.discSetPeriod != nil {
  297. pool.discSetPeriod <- time.Minute
  298. }
  299. }
  300. }
  301. case <-pool.quit:
  302. if pool.discSetPeriod != nil {
  303. close(pool.discSetPeriod)
  304. }
  305. pool.connWg.Wait()
  306. pool.saveNodes()
  307. pool.wg.Done()
  308. return
  309. }
  310. }
  311. }
  312. func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16) *poolEntry {
  313. now := mclock.Now()
  314. entry := pool.entries[id]
  315. if entry == nil {
  316. log.Debug("Discovered new entry", "id", id)
  317. entry = &poolEntry{
  318. id: id,
  319. addr: make(map[string]*poolEntryAddress),
  320. addrSelect: *newWeightedRandomSelect(),
  321. shortRetry: shortRetryCnt,
  322. }
  323. pool.entries[id] = entry
  324. // initialize previously unknown peers with good statistics to give a chance to prove themselves
  325. entry.connectStats.add(1, initStatsWeight)
  326. entry.delayStats.add(0, initStatsWeight)
  327. entry.responseStats.add(0, initStatsWeight)
  328. entry.timeoutStats.add(0, initStatsWeight)
  329. }
  330. entry.lastDiscovered = now
  331. addr := &poolEntryAddress{
  332. ip: ip,
  333. port: port,
  334. }
  335. if a, ok := entry.addr[addr.strKey()]; ok {
  336. addr = a
  337. } else {
  338. entry.addr[addr.strKey()] = addr
  339. }
  340. addr.lastSeen = now
  341. entry.addrSelect.update(addr)
  342. if !entry.known {
  343. pool.newQueue.setLatest(entry)
  344. }
  345. return entry
  346. }
  347. // loadNodes loads known nodes and their statistics from the database
  348. func (pool *serverPool) loadNodes() {
  349. enc, err := pool.db.Get(pool.dbKey)
  350. if err != nil {
  351. return
  352. }
  353. var list []*poolEntry
  354. err = rlp.DecodeBytes(enc, &list)
  355. if err != nil {
  356. log.Debug("Failed to decode node list", "err", err)
  357. return
  358. }
  359. for _, e := range list {
  360. log.Debug("Loaded server stats", "id", e.id, "fails", e.lastConnected.fails,
  361. "conn", fmt.Sprintf("%v/%v", e.connectStats.avg, e.connectStats.weight),
  362. "delay", fmt.Sprintf("%v/%v", time.Duration(e.delayStats.avg), e.delayStats.weight),
  363. "response", fmt.Sprintf("%v/%v", time.Duration(e.responseStats.avg), e.responseStats.weight),
  364. "timeout", fmt.Sprintf("%v/%v", e.timeoutStats.avg, e.timeoutStats.weight))
  365. pool.entries[e.id] = e
  366. pool.knownQueue.setLatest(e)
  367. pool.knownSelect.update((*knownEntry)(e))
  368. }
  369. }
  370. // saveNodes saves known nodes and their statistics into the database. Nodes are
  371. // ordered from least to most recently connected.
  372. func (pool *serverPool) saveNodes() {
  373. list := make([]*poolEntry, len(pool.knownQueue.queue))
  374. for i := range list {
  375. list[i] = pool.knownQueue.fetchOldest()
  376. }
  377. enc, err := rlp.EncodeToBytes(list)
  378. if err == nil {
  379. pool.db.Put(pool.dbKey, enc)
  380. }
  381. }
  382. // removeEntry removes a pool entry when the entry count limit is reached.
  383. // Note that it is called by the new/known queues from which the entry has already
  384. // been removed so removing it from the queues is not necessary.
  385. func (pool *serverPool) removeEntry(entry *poolEntry) {
  386. pool.newSelect.remove((*discoveredEntry)(entry))
  387. pool.knownSelect.remove((*knownEntry)(entry))
  388. entry.removed = true
  389. delete(pool.entries, entry.id)
  390. }
  391. // setRetryDial starts the timer which will enable dialing a certain node again
  392. func (pool *serverPool) setRetryDial(entry *poolEntry) {
  393. delay := longRetryDelay
  394. if entry.shortRetry > 0 {
  395. entry.shortRetry--
  396. delay = shortRetryDelay
  397. }
  398. delay += time.Duration(rand.Int63n(int64(delay) + 1))
  399. entry.delayedRetry = true
  400. go func() {
  401. select {
  402. case <-pool.quit:
  403. case <-time.After(delay):
  404. select {
  405. case <-pool.quit:
  406. case pool.enableRetry <- entry:
  407. }
  408. }
  409. }()
  410. }
  411. // updateCheckDial is called when an entry can potentially be dialed again. It updates
  412. // its selection weights and checks if new dials can/should be made.
  413. func (pool *serverPool) updateCheckDial(entry *poolEntry) {
  414. pool.newSelect.update((*discoveredEntry)(entry))
  415. pool.knownSelect.update((*knownEntry)(entry))
  416. pool.checkDial()
  417. }
  418. // checkDial checks if new dials can/should be made. It tries to select servers both
  419. // based on good statistics and recent discovery.
  420. func (pool *serverPool) checkDial() {
  421. fillWithKnownSelects := !pool.fastDiscover
  422. for pool.knownSelected < targetKnownSelect {
  423. entry := pool.knownSelect.choose()
  424. if entry == nil {
  425. fillWithKnownSelects = false
  426. break
  427. }
  428. pool.dial((*poolEntry)(entry.(*knownEntry)), true)
  429. }
  430. for pool.knownSelected+pool.newSelected < targetServerCount {
  431. entry := pool.newSelect.choose()
  432. if entry == nil {
  433. break
  434. }
  435. pool.dial((*poolEntry)(entry.(*discoveredEntry)), false)
  436. }
  437. if fillWithKnownSelects {
  438. // no more newly discovered nodes to select and since fast discover period
  439. // is over, we probably won't find more in the near future so select more
  440. // known entries if possible
  441. for pool.knownSelected < targetServerCount {
  442. entry := pool.knownSelect.choose()
  443. if entry == nil {
  444. break
  445. }
  446. pool.dial((*poolEntry)(entry.(*knownEntry)), true)
  447. }
  448. }
  449. }
  450. // dial initiates a new connection
  451. func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
  452. if pool.server == nil || entry.state != psNotConnected {
  453. return
  454. }
  455. entry.state = psDialed
  456. entry.knownSelected = knownSelected
  457. if knownSelected {
  458. pool.knownSelected++
  459. } else {
  460. pool.newSelected++
  461. }
  462. addr := entry.addrSelect.choose().(*poolEntryAddress)
  463. log.Debug("Dialing new peer", "lesaddr", entry.id.String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected)
  464. entry.dialed = addr
  465. go func() {
  466. pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port))
  467. select {
  468. case <-pool.quit:
  469. case <-time.After(dialTimeout):
  470. select {
  471. case <-pool.quit:
  472. case pool.timeout <- entry:
  473. }
  474. }
  475. }()
  476. }
  477. // checkDialTimeout checks if the node is still in dialed state and if so, resets it
  478. // and adjusts connection statistics accordingly.
  479. func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
  480. if entry.state != psDialed {
  481. return
  482. }
  483. log.Debug("Dial timeout", "lesaddr", entry.id.String()+"@"+entry.dialed.strKey())
  484. entry.state = psNotConnected
  485. if entry.knownSelected {
  486. pool.knownSelected--
  487. } else {
  488. pool.newSelected--
  489. }
  490. entry.connectStats.add(0, 1)
  491. entry.dialed.fails++
  492. pool.setRetryDial(entry)
  493. }
  494. const (
  495. psNotConnected = iota
  496. psDialed
  497. psConnected
  498. psRegistered
  499. )
  500. // poolEntry represents a server node and stores its current state and statistics.
  501. type poolEntry struct {
  502. peer *peer
  503. id discover.NodeID
  504. addr map[string]*poolEntryAddress
  505. lastConnected, dialed *poolEntryAddress
  506. addrSelect weightedRandomSelect
  507. lastDiscovered mclock.AbsTime
  508. known, knownSelected bool
  509. connectStats, delayStats poolStats
  510. responseStats, timeoutStats poolStats
  511. state int
  512. regTime mclock.AbsTime
  513. queueIdx int
  514. removed bool
  515. delayedRetry bool
  516. shortRetry int
  517. }
  518. func (e *poolEntry) EncodeRLP(w io.Writer) error {
  519. return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats})
  520. }
  521. func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
  522. var entry struct {
  523. ID discover.NodeID
  524. IP net.IP
  525. Port uint16
  526. Fails uint
  527. CStat, DStat, RStat, TStat poolStats
  528. }
  529. if err := s.Decode(&entry); err != nil {
  530. return err
  531. }
  532. addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()}
  533. e.id = entry.ID
  534. e.addr = make(map[string]*poolEntryAddress)
  535. e.addr[addr.strKey()] = addr
  536. e.addrSelect = *newWeightedRandomSelect()
  537. e.addrSelect.update(addr)
  538. e.lastConnected = addr
  539. e.connectStats = entry.CStat
  540. e.delayStats = entry.DStat
  541. e.responseStats = entry.RStat
  542. e.timeoutStats = entry.TStat
  543. e.shortRetry = shortRetryCnt
  544. e.known = true
  545. return nil
  546. }
  547. // discoveredEntry implements wrsItem
  548. type discoveredEntry poolEntry
  549. // Weight calculates random selection weight for newly discovered entries
  550. func (e *discoveredEntry) Weight() int64 {
  551. if e.state != psNotConnected || e.delayedRetry {
  552. return 0
  553. }
  554. t := time.Duration(mclock.Now() - e.lastDiscovered)
  555. if t <= discoverExpireStart {
  556. return 1000000000
  557. }
  558. return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst)))
  559. }
  560. // knownEntry implements wrsItem
  561. type knownEntry poolEntry
  562. // Weight calculates random selection weight for known entries
  563. func (e *knownEntry) Weight() int64 {
  564. if e.state != psNotConnected || !e.known || e.delayedRetry {
  565. return 0
  566. }
  567. return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow(1-e.timeoutStats.recentAvg(), timeoutPow))
  568. }
  569. // poolEntryAddress is a separate object because currently it is necessary to remember
  570. // multiple potential network addresses for a pool entry. This will be removed after
  571. // the final implementation of v5 discovery which will retrieve signed and serial
  572. // numbered advertisements, making it clear which IP/port is the latest one.
  573. type poolEntryAddress struct {
  574. ip net.IP
  575. port uint16
  576. lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db
  577. fails uint // connection failures since last successful connection (persistent)
  578. }
  579. func (a *poolEntryAddress) Weight() int64 {
  580. t := time.Duration(mclock.Now() - a.lastSeen)
  581. return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1
  582. }
  583. func (a *poolEntryAddress) strKey() string {
  584. return a.ip.String() + ":" + strconv.Itoa(int(a.port))
  585. }
  586. // poolStats implement statistics for a certain quantity with a long term average
  587. // and a short term value which is adjusted exponentially with a factor of
  588. // pstatRecentAdjust with each update and also returned exponentially to the
  589. // average with the time constant pstatReturnToMeanTC
  590. type poolStats struct {
  591. sum, weight, avg, recent float64
  592. lastRecalc mclock.AbsTime
  593. }
  594. // init initializes stats with a long term sum/update count pair retrieved from the database
  595. func (s *poolStats) init(sum, weight float64) {
  596. s.sum = sum
  597. s.weight = weight
  598. var avg float64
  599. if weight > 0 {
  600. avg = s.sum / weight
  601. }
  602. s.avg = avg
  603. s.recent = avg
  604. s.lastRecalc = mclock.Now()
  605. }
  606. // recalc recalculates recent value return-to-mean and long term average
  607. func (s *poolStats) recalc() {
  608. now := mclock.Now()
  609. s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC))
  610. if s.sum == 0 {
  611. s.avg = 0
  612. } else {
  613. if s.sum > s.weight*1e30 {
  614. s.avg = 1e30
  615. } else {
  616. s.avg = s.sum / s.weight
  617. }
  618. }
  619. s.lastRecalc = now
  620. }
  621. // add updates the stats with a new value
  622. func (s *poolStats) add(value, weight float64) {
  623. s.weight += weight
  624. s.sum += value * weight
  625. s.recalc()
  626. }
  627. // recentAvg returns the short-term adjusted average
  628. func (s *poolStats) recentAvg() float64 {
  629. s.recalc()
  630. return s.recent
  631. }
  632. func (s *poolStats) EncodeRLP(w io.Writer) error {
  633. return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)})
  634. }
  635. func (s *poolStats) DecodeRLP(st *rlp.Stream) error {
  636. var stats struct {
  637. SumUint, WeightUint uint64
  638. }
  639. if err := st.Decode(&stats); err != nil {
  640. return err
  641. }
  642. s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint))
  643. return nil
  644. }
  645. // poolEntryQueue keeps track of its least recently accessed entries and removes
  646. // them when the number of entries reaches the limit
  647. type poolEntryQueue struct {
  648. queue map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value
  649. newPtr, oldPtr, maxCnt int
  650. removeFromPool func(*poolEntry)
  651. }
  652. // newPoolEntryQueue returns a new poolEntryQueue
  653. func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue {
  654. return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool}
  655. }
  656. // fetchOldest returns and removes the least recently accessed entry
  657. func (q *poolEntryQueue) fetchOldest() *poolEntry {
  658. if len(q.queue) == 0 {
  659. return nil
  660. }
  661. for {
  662. if e := q.queue[q.oldPtr]; e != nil {
  663. delete(q.queue, q.oldPtr)
  664. q.oldPtr++
  665. return e
  666. }
  667. q.oldPtr++
  668. }
  669. }
  670. // remove removes an entry from the queue
  671. func (q *poolEntryQueue) remove(entry *poolEntry) {
  672. if q.queue[entry.queueIdx] == entry {
  673. delete(q.queue, entry.queueIdx)
  674. }
  675. }
  676. // setLatest adds or updates a recently accessed entry. It also checks if an old entry
  677. // needs to be removed and removes it from the parent pool too with a callback function.
  678. func (q *poolEntryQueue) setLatest(entry *poolEntry) {
  679. if q.queue[entry.queueIdx] == entry {
  680. delete(q.queue, entry.queueIdx)
  681. } else {
  682. if len(q.queue) == q.maxCnt {
  683. e := q.fetchOldest()
  684. q.remove(e)
  685. q.removeFromPool(e)
  686. }
  687. }
  688. entry.queueIdx = q.newPtr
  689. q.queue[entry.queueIdx] = entry
  690. q.newPtr++
  691. }