table.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package discover implements the Node Discovery Protocol.
  17. //
  18. // The Node Discovery protocol provides a way to find RLPx nodes that
  19. // can be connected to. It uses a Kademlia-like protocol to maintain a
  20. // distributed database of the IDs and endpoints of all listening
  21. // nodes.
  22. package discover
  23. import (
  24. crand "crypto/rand"
  25. "encoding/binary"
  26. "errors"
  27. "fmt"
  28. mrand "math/rand"
  29. "net"
  30. "sort"
  31. "sync"
  32. "time"
  33. "github.com/ethereum/go-ethereum/common"
  34. "github.com/ethereum/go-ethereum/crypto"
  35. "github.com/ethereum/go-ethereum/log"
  36. "github.com/ethereum/go-ethereum/p2p/netutil"
  37. )
  38. const (
  39. alpha = 3 // Kademlia concurrency factor
  40. bucketSize = 16 // Kademlia bucket size
  41. maxReplacements = 10 // Size of per-bucket replacement list
  42. // We keep buckets for the upper 1/15 of distances because
  43. // it's very unlikely we'll ever encounter a node that's closer.
  44. hashBits = len(common.Hash{}) * 8
  45. nBuckets = hashBits / 15 // Number of buckets
  46. bucketMinDistance = hashBits - nBuckets // Log distance of closest bucket
  47. // IP address limits.
  48. bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24
  49. tableIPLimit, tableSubnet = 10, 24
  50. maxBondingPingPongs = 16 // Limit on the number of concurrent ping/pong interactions
  51. maxFindnodeFailures = 5 // Nodes exceeding this limit are dropped
  52. refreshInterval = 30 * time.Minute
  53. revalidateInterval = 10 * time.Second
  54. copyNodesInterval = 30 * time.Second
  55. seedMinTableTime = 5 * time.Minute
  56. seedCount = 30
  57. seedMaxAge = 5 * 24 * time.Hour
  58. )
  59. type Table struct {
  60. mutex sync.Mutex // protects buckets, bucket content, nursery, rand
  61. buckets [nBuckets]*bucket // index of known nodes by distance
  62. nursery []*Node // bootstrap nodes
  63. rand *mrand.Rand // source of randomness, periodically reseeded
  64. ips netutil.DistinctNetSet
  65. db *nodeDB // database of known nodes
  66. refreshReq chan chan struct{}
  67. initDone chan struct{}
  68. closeReq chan struct{}
  69. closed chan struct{}
  70. bondmu sync.Mutex
  71. bonding map[NodeID]*bondproc
  72. bondslots chan struct{} // limits total number of active bonding processes
  73. nodeAddedHook func(*Node) // for testing
  74. net transport
  75. self *Node // metadata of the local node
  76. }
  77. type bondproc struct {
  78. err error
  79. n *Node
  80. done chan struct{}
  81. }
  82. // transport is implemented by the UDP transport.
  83. // it is an interface so we can test without opening lots of UDP
  84. // sockets and without generating a private key.
  85. type transport interface {
  86. ping(NodeID, *net.UDPAddr) error
  87. waitping(NodeID) error
  88. findnode(toid NodeID, addr *net.UDPAddr, target NodeID) ([]*Node, error)
  89. close()
  90. }
  91. // bucket contains nodes, ordered by their last activity. the entry
  92. // that was most recently active is the first element in entries.
  93. type bucket struct {
  94. entries []*Node // live entries, sorted by time of last contact
  95. replacements []*Node // recently seen nodes to be used if revalidation fails
  96. ips netutil.DistinctNetSet
  97. }
  98. func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) {
  99. // If no node database was given, use an in-memory one
  100. db, err := newNodeDB(nodeDBPath, Version, ourID)
  101. if err != nil {
  102. return nil, err
  103. }
  104. tab := &Table{
  105. net: t,
  106. db: db,
  107. self: NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
  108. bonding: make(map[NodeID]*bondproc),
  109. bondslots: make(chan struct{}, maxBondingPingPongs),
  110. refreshReq: make(chan chan struct{}),
  111. initDone: make(chan struct{}),
  112. closeReq: make(chan struct{}),
  113. closed: make(chan struct{}),
  114. rand: mrand.New(mrand.NewSource(0)),
  115. ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
  116. }
  117. if err := tab.setFallbackNodes(bootnodes); err != nil {
  118. return nil, err
  119. }
  120. for i := 0; i < cap(tab.bondslots); i++ {
  121. tab.bondslots <- struct{}{}
  122. }
  123. for i := range tab.buckets {
  124. tab.buckets[i] = &bucket{
  125. ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
  126. }
  127. }
  128. tab.seedRand()
  129. tab.loadSeedNodes(false)
  130. // Start the background expiration goroutine after loading seeds so that the search for
  131. // seed nodes also considers older nodes that would otherwise be removed by the
  132. // expiration.
  133. tab.db.ensureExpirer()
  134. go tab.loop()
  135. return tab, nil
  136. }
  137. func (tab *Table) seedRand() {
  138. var b [8]byte
  139. crand.Read(b[:])
  140. tab.mutex.Lock()
  141. tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:])))
  142. tab.mutex.Unlock()
  143. }
  144. // Self returns the local node.
  145. // The returned node should not be modified by the caller.
  146. func (tab *Table) Self() *Node {
  147. return tab.self
  148. }
  149. // ReadRandomNodes fills the given slice with random nodes from the
  150. // table. It will not write the same node more than once. The nodes in
  151. // the slice are copies and can be modified by the caller.
  152. func (tab *Table) ReadRandomNodes(buf []*Node) (n int) {
  153. if !tab.isInitDone() {
  154. return 0
  155. }
  156. tab.mutex.Lock()
  157. defer tab.mutex.Unlock()
  158. // Find all non-empty buckets and get a fresh slice of their entries.
  159. var buckets [][]*Node
  160. for _, b := range tab.buckets {
  161. if len(b.entries) > 0 {
  162. buckets = append(buckets, b.entries[:])
  163. }
  164. }
  165. if len(buckets) == 0 {
  166. return 0
  167. }
  168. // Shuffle the buckets.
  169. for i := len(buckets) - 1; i > 0; i-- {
  170. j := tab.rand.Intn(len(buckets))
  171. buckets[i], buckets[j] = buckets[j], buckets[i]
  172. }
  173. // Move head of each bucket into buf, removing buckets that become empty.
  174. var i, j int
  175. for ; i < len(buf); i, j = i+1, (j+1)%len(buckets) {
  176. b := buckets[j]
  177. buf[i] = &(*b[0])
  178. buckets[j] = b[1:]
  179. if len(b) == 1 {
  180. buckets = append(buckets[:j], buckets[j+1:]...)
  181. }
  182. if len(buckets) == 0 {
  183. break
  184. }
  185. }
  186. return i + 1
  187. }
  188. // Close terminates the network listener and flushes the node database.
  189. func (tab *Table) Close() {
  190. select {
  191. case <-tab.closed:
  192. // already closed.
  193. case tab.closeReq <- struct{}{}:
  194. <-tab.closed // wait for refreshLoop to end.
  195. }
  196. }
  197. // setFallbackNodes sets the initial points of contact. These nodes
  198. // are used to connect to the network if the table is empty and there
  199. // are no known nodes in the database.
  200. func (tab *Table) setFallbackNodes(nodes []*Node) error {
  201. for _, n := range nodes {
  202. if err := n.validateComplete(); err != nil {
  203. return fmt.Errorf("bad bootstrap/fallback node %q (%v)", n, err)
  204. }
  205. }
  206. tab.nursery = make([]*Node, 0, len(nodes))
  207. for _, n := range nodes {
  208. cpy := *n
  209. // Recompute cpy.sha because the node might not have been
  210. // created by NewNode or ParseNode.
  211. cpy.sha = crypto.Keccak256Hash(n.ID[:])
  212. tab.nursery = append(tab.nursery, &cpy)
  213. }
  214. return nil
  215. }
  216. // isInitDone returns whether the table's initial seeding procedure has completed.
  217. func (tab *Table) isInitDone() bool {
  218. select {
  219. case <-tab.initDone:
  220. return true
  221. default:
  222. return false
  223. }
  224. }
  225. // Resolve searches for a specific node with the given ID.
  226. // It returns nil if the node could not be found.
  227. func (tab *Table) Resolve(targetID NodeID) *Node {
  228. // If the node is present in the local table, no
  229. // network interaction is required.
  230. hash := crypto.Keccak256Hash(targetID[:])
  231. tab.mutex.Lock()
  232. cl := tab.closest(hash, 1)
  233. tab.mutex.Unlock()
  234. if len(cl.entries) > 0 && cl.entries[0].ID == targetID {
  235. return cl.entries[0]
  236. }
  237. // Otherwise, do a network lookup.
  238. result := tab.Lookup(targetID)
  239. for _, n := range result {
  240. if n.ID == targetID {
  241. return n
  242. }
  243. }
  244. return nil
  245. }
  246. // Lookup performs a network search for nodes close
  247. // to the given target. It approaches the target by querying
  248. // nodes that are closer to it on each iteration.
  249. // The given target does not need to be an actual node
  250. // identifier.
  251. func (tab *Table) Lookup(targetID NodeID) []*Node {
  252. return tab.lookup(targetID, true)
  253. }
  254. func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
  255. var (
  256. target = crypto.Keccak256Hash(targetID[:])
  257. asked = make(map[NodeID]bool)
  258. seen = make(map[NodeID]bool)
  259. reply = make(chan []*Node, alpha)
  260. pendingQueries = 0
  261. result *nodesByDistance
  262. )
  263. // don't query further if we hit ourself.
  264. // unlikely to happen often in practice.
  265. asked[tab.self.ID] = true
  266. for {
  267. tab.mutex.Lock()
  268. // generate initial result set
  269. result = tab.closest(target, bucketSize)
  270. tab.mutex.Unlock()
  271. if len(result.entries) > 0 || !refreshIfEmpty {
  272. break
  273. }
  274. // The result set is empty, all nodes were dropped, refresh.
  275. // We actually wait for the refresh to complete here. The very
  276. // first query will hit this case and run the bootstrapping
  277. // logic.
  278. <-tab.refresh()
  279. refreshIfEmpty = false
  280. }
  281. for {
  282. // ask the alpha closest nodes that we haven't asked yet
  283. for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
  284. n := result.entries[i]
  285. if !asked[n.ID] {
  286. asked[n.ID] = true
  287. pendingQueries++
  288. go func() {
  289. // Find potential neighbors to bond with
  290. r, err := tab.net.findnode(n.ID, n.addr(), targetID)
  291. if err != nil {
  292. // Bump the failure counter to detect and evacuate non-bonded entries
  293. fails := tab.db.findFails(n.ID) + 1
  294. tab.db.updateFindFails(n.ID, fails)
  295. log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)
  296. if fails >= maxFindnodeFailures {
  297. log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
  298. tab.delete(n)
  299. }
  300. }
  301. reply <- tab.bondall(r)
  302. }()
  303. }
  304. }
  305. if pendingQueries == 0 {
  306. // we have asked all closest nodes, stop the search
  307. break
  308. }
  309. // wait for the next reply
  310. for _, n := range <-reply {
  311. if n != nil && !seen[n.ID] {
  312. seen[n.ID] = true
  313. result.push(n, bucketSize)
  314. }
  315. }
  316. pendingQueries--
  317. }
  318. return result.entries
  319. }
  320. func (tab *Table) refresh() <-chan struct{} {
  321. done := make(chan struct{})
  322. select {
  323. case tab.refreshReq <- done:
  324. case <-tab.closed:
  325. close(done)
  326. }
  327. return done
  328. }
  329. // loop schedules refresh, revalidate runs and coordinates shutdown.
  330. func (tab *Table) loop() {
  331. var (
  332. revalidate = time.NewTimer(tab.nextRevalidateTime())
  333. refresh = time.NewTicker(refreshInterval)
  334. copyNodes = time.NewTicker(copyNodesInterval)
  335. revalidateDone = make(chan struct{})
  336. refreshDone = make(chan struct{}) // where doRefresh reports completion
  337. waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
  338. )
  339. defer refresh.Stop()
  340. defer revalidate.Stop()
  341. defer copyNodes.Stop()
  342. // Start initial refresh.
  343. go tab.doRefresh(refreshDone)
  344. loop:
  345. for {
  346. select {
  347. case <-refresh.C:
  348. tab.seedRand()
  349. if refreshDone == nil {
  350. refreshDone = make(chan struct{})
  351. go tab.doRefresh(refreshDone)
  352. }
  353. case req := <-tab.refreshReq:
  354. waiting = append(waiting, req)
  355. if refreshDone == nil {
  356. refreshDone = make(chan struct{})
  357. go tab.doRefresh(refreshDone)
  358. }
  359. case <-refreshDone:
  360. for _, ch := range waiting {
  361. close(ch)
  362. }
  363. waiting, refreshDone = nil, nil
  364. case <-revalidate.C:
  365. go tab.doRevalidate(revalidateDone)
  366. case <-revalidateDone:
  367. revalidate.Reset(tab.nextRevalidateTime())
  368. case <-copyNodes.C:
  369. go tab.copyBondedNodes()
  370. case <-tab.closeReq:
  371. break loop
  372. }
  373. }
  374. if tab.net != nil {
  375. tab.net.close()
  376. }
  377. if refreshDone != nil {
  378. <-refreshDone
  379. }
  380. for _, ch := range waiting {
  381. close(ch)
  382. }
  383. tab.db.close()
  384. close(tab.closed)
  385. }
  386. // doRefresh performs a lookup for a random target to keep buckets
  387. // full. seed nodes are inserted if the table is empty (initial
  388. // bootstrap or discarded faulty peers).
  389. func (tab *Table) doRefresh(done chan struct{}) {
  390. defer close(done)
  391. // Load nodes from the database and insert
  392. // them. This should yield a few previously seen nodes that are
  393. // (hopefully) still alive.
  394. tab.loadSeedNodes(true)
  395. // Run self lookup to discover new neighbor nodes.
  396. tab.lookup(tab.self.ID, false)
  397. // The Kademlia paper specifies that the bucket refresh should
  398. // perform a lookup in the least recently used bucket. We cannot
  399. // adhere to this because the findnode target is a 512bit value
  400. // (not hash-sized) and it is not easily possible to generate a
  401. // sha3 preimage that falls into a chosen bucket.
  402. // We perform a few lookups with a random target instead.
  403. for i := 0; i < 3; i++ {
  404. var target NodeID
  405. crand.Read(target[:])
  406. tab.lookup(target, false)
  407. }
  408. }
  409. func (tab *Table) loadSeedNodes(bond bool) {
  410. seeds := tab.db.querySeeds(seedCount, seedMaxAge)
  411. seeds = append(seeds, tab.nursery...)
  412. if bond {
  413. seeds = tab.bondall(seeds)
  414. }
  415. for i := range seeds {
  416. seed := seeds[i]
  417. age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.bondTime(seed.ID)) }}
  418. log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age)
  419. tab.add(seed)
  420. }
  421. }
  422. // doRevalidate checks that the last node in a random bucket is still live
  423. // and replaces or deletes the node if it isn't.
  424. func (tab *Table) doRevalidate(done chan<- struct{}) {
  425. defer func() { done <- struct{}{} }()
  426. last, bi := tab.nodeToRevalidate()
  427. if last == nil {
  428. // No non-empty bucket found.
  429. return
  430. }
  431. // Ping the selected node and wait for a pong.
  432. err := tab.ping(last.ID, last.addr())
  433. tab.mutex.Lock()
  434. defer tab.mutex.Unlock()
  435. b := tab.buckets[bi]
  436. if err == nil {
  437. // The node responded, move it to the front.
  438. log.Debug("Revalidated node", "b", bi, "id", last.ID)
  439. b.bump(last)
  440. return
  441. }
  442. // No reply received, pick a replacement or delete the node if there aren't
  443. // any replacements.
  444. if r := tab.replace(b, last); r != nil {
  445. log.Debug("Replaced dead node", "b", bi, "id", last.ID, "ip", last.IP, "r", r.ID, "rip", r.IP)
  446. } else {
  447. log.Debug("Removed dead node", "b", bi, "id", last.ID, "ip", last.IP)
  448. }
  449. }
  450. // nodeToRevalidate returns the last node in a random, non-empty bucket.
  451. func (tab *Table) nodeToRevalidate() (n *Node, bi int) {
  452. tab.mutex.Lock()
  453. defer tab.mutex.Unlock()
  454. for _, bi = range tab.rand.Perm(len(tab.buckets)) {
  455. b := tab.buckets[bi]
  456. if len(b.entries) > 0 {
  457. last := b.entries[len(b.entries)-1]
  458. return last, bi
  459. }
  460. }
  461. return nil, 0
  462. }
  463. func (tab *Table) nextRevalidateTime() time.Duration {
  464. tab.mutex.Lock()
  465. defer tab.mutex.Unlock()
  466. return time.Duration(tab.rand.Int63n(int64(revalidateInterval)))
  467. }
  468. // copyBondedNodes adds nodes from the table to the database if they have been in the table
  469. // longer then minTableTime.
  470. func (tab *Table) copyBondedNodes() {
  471. tab.mutex.Lock()
  472. defer tab.mutex.Unlock()
  473. now := time.Now()
  474. for _, b := range tab.buckets {
  475. for _, n := range b.entries {
  476. if now.Sub(n.addedAt) >= seedMinTableTime {
  477. tab.db.updateNode(n)
  478. }
  479. }
  480. }
  481. }
  482. // closest returns the n nodes in the table that are closest to the
  483. // given id. The caller must hold tab.mutex.
  484. func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
  485. // This is a very wasteful way to find the closest nodes but
  486. // obviously correct. I believe that tree-based buckets would make
  487. // this easier to implement efficiently.
  488. close := &nodesByDistance{target: target}
  489. for _, b := range tab.buckets {
  490. for _, n := range b.entries {
  491. close.push(n, nresults)
  492. }
  493. }
  494. return close
  495. }
  496. func (tab *Table) len() (n int) {
  497. for _, b := range tab.buckets {
  498. n += len(b.entries)
  499. }
  500. return n
  501. }
  502. // bondall bonds with all given nodes concurrently and returns
  503. // those nodes for which bonding has probably succeeded.
  504. func (tab *Table) bondall(nodes []*Node) (result []*Node) {
  505. rc := make(chan *Node, len(nodes))
  506. for i := range nodes {
  507. go func(n *Node) {
  508. nn, _ := tab.bond(false, n.ID, n.addr(), n.TCP)
  509. rc <- nn
  510. }(nodes[i])
  511. }
  512. for range nodes {
  513. if n := <-rc; n != nil {
  514. result = append(result, n)
  515. }
  516. }
  517. return result
  518. }
  519. // bond ensures the local node has a bond with the given remote node.
  520. // It also attempts to insert the node into the table if bonding succeeds.
  521. // The caller must not hold tab.mutex.
  522. //
  523. // A bond is must be established before sending findnode requests.
  524. // Both sides must have completed a ping/pong exchange for a bond to
  525. // exist. The total number of active bonding processes is limited in
  526. // order to restrain network use.
  527. //
  528. // bond is meant to operate idempotently in that bonding with a remote
  529. // node which still remembers a previously established bond will work.
  530. // The remote node will simply not send a ping back, causing waitping
  531. // to time out.
  532. //
  533. // If pinged is true, the remote node has just pinged us and one half
  534. // of the process can be skipped.
  535. func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) {
  536. if id == tab.self.ID {
  537. return nil, errors.New("is self")
  538. }
  539. if pinged && !tab.isInitDone() {
  540. return nil, errors.New("still initializing")
  541. }
  542. // Start bonding if we haven't seen this node for a while or if it failed findnode too often.
  543. node, fails := tab.db.node(id), tab.db.findFails(id)
  544. age := time.Since(tab.db.bondTime(id))
  545. var result error
  546. if fails > 0 || age > nodeDBNodeExpiration {
  547. log.Trace("Starting bonding ping/pong", "id", id, "known", node != nil, "failcount", fails, "age", age)
  548. tab.bondmu.Lock()
  549. w := tab.bonding[id]
  550. if w != nil {
  551. // Wait for an existing bonding process to complete.
  552. tab.bondmu.Unlock()
  553. <-w.done
  554. } else {
  555. // Register a new bonding process.
  556. w = &bondproc{done: make(chan struct{})}
  557. tab.bonding[id] = w
  558. tab.bondmu.Unlock()
  559. // Do the ping/pong. The result goes into w.
  560. tab.pingpong(w, pinged, id, addr, tcpPort)
  561. // Unregister the process after it's done.
  562. tab.bondmu.Lock()
  563. delete(tab.bonding, id)
  564. tab.bondmu.Unlock()
  565. }
  566. // Retrieve the bonding results
  567. result = w.err
  568. if result == nil {
  569. node = w.n
  570. }
  571. }
  572. // Add the node to the table even if the bonding ping/pong
  573. // fails. It will be relaced quickly if it continues to be
  574. // unresponsive.
  575. if node != nil {
  576. tab.add(node)
  577. tab.db.updateFindFails(id, 0)
  578. }
  579. return node, result
  580. }
  581. func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) {
  582. // Request a bonding slot to limit network usage
  583. <-tab.bondslots
  584. defer func() { tab.bondslots <- struct{}{} }()
  585. // Ping the remote side and wait for a pong.
  586. if w.err = tab.ping(id, addr); w.err != nil {
  587. close(w.done)
  588. return
  589. }
  590. if !pinged {
  591. // Give the remote node a chance to ping us before we start
  592. // sending findnode requests. If they still remember us,
  593. // waitping will simply time out.
  594. tab.net.waitping(id)
  595. }
  596. // Bonding succeeded, update the node database.
  597. w.n = NewNode(id, addr.IP, uint16(addr.Port), tcpPort)
  598. close(w.done)
  599. }
  600. // ping a remote endpoint and wait for a reply, also updating the node
  601. // database accordingly.
  602. func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
  603. tab.db.updateLastPing(id, time.Now())
  604. if err := tab.net.ping(id, addr); err != nil {
  605. return err
  606. }
  607. tab.db.updateBondTime(id, time.Now())
  608. return nil
  609. }
  610. // bucket returns the bucket for the given node ID hash.
  611. func (tab *Table) bucket(sha common.Hash) *bucket {
  612. d := logdist(tab.self.sha, sha)
  613. if d <= bucketMinDistance {
  614. return tab.buckets[0]
  615. }
  616. return tab.buckets[d-bucketMinDistance-1]
  617. }
  618. // add attempts to add the given node its corresponding bucket. If the
  619. // bucket has space available, adding the node succeeds immediately.
  620. // Otherwise, the node is added if the least recently active node in
  621. // the bucket does not respond to a ping packet.
  622. //
  623. // The caller must not hold tab.mutex.
  624. func (tab *Table) add(new *Node) {
  625. tab.mutex.Lock()
  626. defer tab.mutex.Unlock()
  627. b := tab.bucket(new.sha)
  628. if !tab.bumpOrAdd(b, new) {
  629. // Node is not in table. Add it to the replacement list.
  630. tab.addReplacement(b, new)
  631. }
  632. }
  633. // stuff adds nodes the table to the end of their corresponding bucket
  634. // if the bucket is not full. The caller must not hold tab.mutex.
  635. func (tab *Table) stuff(nodes []*Node) {
  636. tab.mutex.Lock()
  637. defer tab.mutex.Unlock()
  638. for _, n := range nodes {
  639. if n.ID == tab.self.ID {
  640. continue // don't add self
  641. }
  642. b := tab.bucket(n.sha)
  643. if len(b.entries) < bucketSize {
  644. tab.bumpOrAdd(b, n)
  645. }
  646. }
  647. }
  648. // delete removes an entry from the node table (used to evacuate
  649. // failed/non-bonded discovery peers).
  650. func (tab *Table) delete(node *Node) {
  651. tab.mutex.Lock()
  652. defer tab.mutex.Unlock()
  653. tab.deleteInBucket(tab.bucket(node.sha), node)
  654. }
  655. func (tab *Table) addIP(b *bucket, ip net.IP) bool {
  656. if netutil.IsLAN(ip) {
  657. return true
  658. }
  659. if !tab.ips.Add(ip) {
  660. log.Debug("IP exceeds table limit", "ip", ip)
  661. return false
  662. }
  663. if !b.ips.Add(ip) {
  664. log.Debug("IP exceeds bucket limit", "ip", ip)
  665. tab.ips.Remove(ip)
  666. return false
  667. }
  668. return true
  669. }
  670. func (tab *Table) removeIP(b *bucket, ip net.IP) {
  671. if netutil.IsLAN(ip) {
  672. return
  673. }
  674. tab.ips.Remove(ip)
  675. b.ips.Remove(ip)
  676. }
  677. func (tab *Table) addReplacement(b *bucket, n *Node) {
  678. for _, e := range b.replacements {
  679. if e.ID == n.ID {
  680. return // already in list
  681. }
  682. }
  683. if !tab.addIP(b, n.IP) {
  684. return
  685. }
  686. var removed *Node
  687. b.replacements, removed = pushNode(b.replacements, n, maxReplacements)
  688. if removed != nil {
  689. tab.removeIP(b, removed.IP)
  690. }
  691. }
  692. // replace removes n from the replacement list and replaces 'last' with it if it is the
  693. // last entry in the bucket. If 'last' isn't the last entry, it has either been replaced
  694. // with someone else or became active.
  695. func (tab *Table) replace(b *bucket, last *Node) *Node {
  696. if len(b.entries) == 0 || b.entries[len(b.entries)-1].ID != last.ID {
  697. // Entry has moved, don't replace it.
  698. return nil
  699. }
  700. // Still the last entry.
  701. if len(b.replacements) == 0 {
  702. tab.deleteInBucket(b, last)
  703. return nil
  704. }
  705. r := b.replacements[tab.rand.Intn(len(b.replacements))]
  706. b.replacements = deleteNode(b.replacements, r)
  707. b.entries[len(b.entries)-1] = r
  708. tab.removeIP(b, last.IP)
  709. return r
  710. }
  711. // bump moves the given node to the front of the bucket entry list
  712. // if it is contained in that list.
  713. func (b *bucket) bump(n *Node) bool {
  714. for i := range b.entries {
  715. if b.entries[i].ID == n.ID {
  716. // move it to the front
  717. copy(b.entries[1:], b.entries[:i])
  718. b.entries[0] = n
  719. return true
  720. }
  721. }
  722. return false
  723. }
  724. // bumpOrAdd moves n to the front of the bucket entry list or adds it if the list isn't
  725. // full. The return value is true if n is in the bucket.
  726. func (tab *Table) bumpOrAdd(b *bucket, n *Node) bool {
  727. if b.bump(n) {
  728. return true
  729. }
  730. if len(b.entries) >= bucketSize || !tab.addIP(b, n.IP) {
  731. return false
  732. }
  733. b.entries, _ = pushNode(b.entries, n, bucketSize)
  734. b.replacements = deleteNode(b.replacements, n)
  735. n.addedAt = time.Now()
  736. if tab.nodeAddedHook != nil {
  737. tab.nodeAddedHook(n)
  738. }
  739. return true
  740. }
  741. func (tab *Table) deleteInBucket(b *bucket, n *Node) {
  742. b.entries = deleteNode(b.entries, n)
  743. tab.removeIP(b, n.IP)
  744. }
  745. // pushNode adds n to the front of list, keeping at most max items.
  746. func pushNode(list []*Node, n *Node, max int) ([]*Node, *Node) {
  747. if len(list) < max {
  748. list = append(list, nil)
  749. }
  750. removed := list[len(list)-1]
  751. copy(list[1:], list)
  752. list[0] = n
  753. return list, removed
  754. }
  755. // deleteNode removes n from list.
  756. func deleteNode(list []*Node, n *Node) []*Node {
  757. for i := range list {
  758. if list[i].ID == n.ID {
  759. return append(list[:i], list[i+1:]...)
  760. }
  761. }
  762. return list
  763. }
  764. // nodesByDistance is a list of nodes, ordered by
  765. // distance to target.
  766. type nodesByDistance struct {
  767. entries []*Node
  768. target common.Hash
  769. }
  770. // push adds the given node to the list, keeping the total size below maxElems.
  771. func (h *nodesByDistance) push(n *Node, maxElems int) {
  772. ix := sort.Search(len(h.entries), func(i int) bool {
  773. return distcmp(h.target, h.entries[i].sha, n.sha) > 0
  774. })
  775. if len(h.entries) < maxElems {
  776. h.entries = append(h.entries, n)
  777. }
  778. if ix == len(h.entries) {
  779. // farther away than all nodes we already have.
  780. // if there was room for it, the node is now the last element.
  781. } else {
  782. // slide existing entries down to make room
  783. // this will overwrite the entry we just appended.
  784. copy(h.entries[ix+1:], h.entries[ix:])
  785. h.entries[ix] = n
  786. }
  787. }