udp.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discover
  17. import (
  18. "bytes"
  19. "container/list"
  20. "crypto/ecdsa"
  21. "errors"
  22. "fmt"
  23. "net"
  24. "time"
  25. "github.com/ethereum/go-ethereum/crypto"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/p2p/nat"
  28. "github.com/ethereum/go-ethereum/p2p/netutil"
  29. "github.com/ethereum/go-ethereum/rlp"
  30. )
  31. const Version = 4
  32. // Errors
  33. var (
  34. errPacketTooSmall = errors.New("too small")
  35. errBadHash = errors.New("bad hash")
  36. errExpired = errors.New("expired")
  37. errUnsolicitedReply = errors.New("unsolicited reply")
  38. errUnknownNode = errors.New("unknown node")
  39. errTimeout = errors.New("RPC timeout")
  40. errClockWarp = errors.New("reply deadline too far in the future")
  41. errClosed = errors.New("socket closed")
  42. )
  43. // Timeouts
  44. const (
  45. respTimeout = 500 * time.Millisecond
  46. expiration = 20 * time.Second
  47. ntpFailureThreshold = 32 // Continuous timeouts after which to check NTP
  48. ntpWarningCooldown = 10 * time.Minute // Minimum amount of time to pass before repeating NTP warning
  49. driftThreshold = 10 * time.Second // Allowed clock drift before warning user
  50. )
  51. // RPC packet types
  52. const (
  53. pingPacket = iota + 1 // zero is 'reserved'
  54. pongPacket
  55. findnodePacket
  56. neighborsPacket
  57. )
  58. // RPC request structures
  59. type (
  60. ping struct {
  61. Version uint
  62. From, To rpcEndpoint
  63. Expiration uint64
  64. // Ignore additional fields (for forward compatibility).
  65. Rest []rlp.RawValue `rlp:"tail"`
  66. }
  67. // pong is the reply to ping.
  68. pong struct {
  69. // This field should mirror the UDP envelope address
  70. // of the ping packet, which provides a way to discover the
  71. // the external address (after NAT).
  72. To rpcEndpoint
  73. ReplyTok []byte // This contains the hash of the ping packet.
  74. Expiration uint64 // Absolute timestamp at which the packet becomes invalid.
  75. // Ignore additional fields (for forward compatibility).
  76. Rest []rlp.RawValue `rlp:"tail"`
  77. }
  78. // findnode is a query for nodes close to the given target.
  79. findnode struct {
  80. Target NodeID // doesn't need to be an actual public key
  81. Expiration uint64
  82. // Ignore additional fields (for forward compatibility).
  83. Rest []rlp.RawValue `rlp:"tail"`
  84. }
  85. // reply to findnode
  86. neighbors struct {
  87. Nodes []rpcNode
  88. Expiration uint64
  89. // Ignore additional fields (for forward compatibility).
  90. Rest []rlp.RawValue `rlp:"tail"`
  91. }
  92. rpcNode struct {
  93. IP net.IP // len 4 for IPv4 or 16 for IPv6
  94. UDP uint16 // for discovery protocol
  95. TCP uint16 // for RLPx protocol
  96. ID NodeID
  97. }
  98. rpcEndpoint struct {
  99. IP net.IP // len 4 for IPv4 or 16 for IPv6
  100. UDP uint16 // for discovery protocol
  101. TCP uint16 // for RLPx protocol
  102. }
  103. )
  104. func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint {
  105. ip := addr.IP.To4()
  106. if ip == nil {
  107. ip = addr.IP.To16()
  108. }
  109. return rpcEndpoint{IP: ip, UDP: uint16(addr.Port), TCP: tcpPort}
  110. }
  111. func (t *udp) nodeFromRPC(sender *net.UDPAddr, rn rpcNode) (*Node, error) {
  112. if rn.UDP <= 1024 {
  113. return nil, errors.New("low port")
  114. }
  115. if err := netutil.CheckRelayIP(sender.IP, rn.IP); err != nil {
  116. return nil, err
  117. }
  118. if t.netrestrict != nil && !t.netrestrict.Contains(rn.IP) {
  119. return nil, errors.New("not contained in netrestrict whitelist")
  120. }
  121. n := NewNode(rn.ID, rn.IP, rn.UDP, rn.TCP)
  122. err := n.validateComplete()
  123. return n, err
  124. }
  125. func nodeToRPC(n *Node) rpcNode {
  126. return rpcNode{ID: n.ID, IP: n.IP, UDP: n.UDP, TCP: n.TCP}
  127. }
  128. type packet interface {
  129. handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
  130. name() string
  131. }
  132. type conn interface {
  133. ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
  134. WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
  135. Close() error
  136. LocalAddr() net.Addr
  137. }
  138. // udp implements the RPC protocol.
  139. type udp struct {
  140. conn conn
  141. netrestrict *netutil.Netlist
  142. priv *ecdsa.PrivateKey
  143. ourEndpoint rpcEndpoint
  144. addpending chan *pending
  145. gotreply chan reply
  146. closing chan struct{}
  147. nat nat.Interface
  148. *Table
  149. }
  150. // pending represents a pending reply.
  151. //
  152. // some implementations of the protocol wish to send more than one
  153. // reply packet to findnode. in general, any neighbors packet cannot
  154. // be matched up with a specific findnode packet.
  155. //
  156. // our implementation handles this by storing a callback function for
  157. // each pending reply. incoming packets from a node are dispatched
  158. // to all the callback functions for that node.
  159. type pending struct {
  160. // these fields must match in the reply.
  161. from NodeID
  162. ptype byte
  163. // time when the request must complete
  164. deadline time.Time
  165. // callback is called when a matching reply arrives. if it returns
  166. // true, the callback is removed from the pending reply queue.
  167. // if it returns false, the reply is considered incomplete and
  168. // the callback will be invoked again for the next matching reply.
  169. callback func(resp interface{}) (done bool)
  170. // errc receives nil when the callback indicates completion or an
  171. // error if no further reply is received within the timeout.
  172. errc chan<- error
  173. }
  174. type reply struct {
  175. from NodeID
  176. ptype byte
  177. data interface{}
  178. // loop indicates whether there was
  179. // a matching request by sending on this channel.
  180. matched chan<- bool
  181. }
  182. // ReadPacket is sent to the unhandled channel when it could not be processed
  183. type ReadPacket struct {
  184. Data []byte
  185. Addr *net.UDPAddr
  186. }
  187. // Config holds Table-related settings.
  188. type Config struct {
  189. // These settings are required and configure the UDP listener:
  190. PrivateKey *ecdsa.PrivateKey
  191. // These settings are optional:
  192. AnnounceAddr *net.UDPAddr // local address announced in the DHT
  193. NodeDBPath string // if set, the node database is stored at this filesystem location
  194. NetRestrict *netutil.Netlist // network whitelist
  195. Bootnodes []*Node // list of bootstrap nodes
  196. Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
  197. }
  198. // ListenUDP returns a new table that listens for UDP packets on laddr.
  199. func ListenUDP(c conn, cfg Config) (*Table, error) {
  200. tab, _, err := newUDP(c, cfg)
  201. if err != nil {
  202. return nil, err
  203. }
  204. log.Info("UDP listener up", "self", tab.self)
  205. return tab, nil
  206. }
  207. func newUDP(c conn, cfg Config) (*Table, *udp, error) {
  208. udp := &udp{
  209. conn: c,
  210. priv: cfg.PrivateKey,
  211. netrestrict: cfg.NetRestrict,
  212. closing: make(chan struct{}),
  213. gotreply: make(chan reply),
  214. addpending: make(chan *pending),
  215. }
  216. realaddr := c.LocalAddr().(*net.UDPAddr)
  217. if cfg.AnnounceAddr != nil {
  218. realaddr = cfg.AnnounceAddr
  219. }
  220. // TODO: separate TCP port
  221. udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
  222. tab, err := newTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes)
  223. if err != nil {
  224. return nil, nil, err
  225. }
  226. udp.Table = tab
  227. go udp.loop()
  228. go udp.readLoop(cfg.Unhandled)
  229. return udp.Table, udp, nil
  230. }
  231. func (t *udp) close() {
  232. close(t.closing)
  233. t.conn.Close()
  234. // TODO: wait for the loops to end.
  235. }
  236. // ping sends a ping message to the given node and waits for a reply.
  237. func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
  238. req := &ping{
  239. Version: Version,
  240. From: t.ourEndpoint,
  241. To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB
  242. Expiration: uint64(time.Now().Add(expiration).Unix()),
  243. }
  244. packet, hash, err := encodePacket(t.priv, pingPacket, req)
  245. if err != nil {
  246. return err
  247. }
  248. errc := t.pending(toid, pongPacket, func(p interface{}) bool {
  249. return bytes.Equal(p.(*pong).ReplyTok, hash)
  250. })
  251. t.write(toaddr, req.name(), packet)
  252. return <-errc
  253. }
  254. func (t *udp) waitping(from NodeID) error {
  255. return <-t.pending(from, pingPacket, func(interface{}) bool { return true })
  256. }
  257. // findnode sends a findnode request to the given node and waits until
  258. // the node has sent up to k neighbors.
  259. func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
  260. nodes := make([]*Node, 0, bucketSize)
  261. nreceived := 0
  262. errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
  263. reply := r.(*neighbors)
  264. for _, rn := range reply.Nodes {
  265. nreceived++
  266. n, err := t.nodeFromRPC(toaddr, rn)
  267. if err != nil {
  268. log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err)
  269. continue
  270. }
  271. nodes = append(nodes, n)
  272. }
  273. return nreceived >= bucketSize
  274. })
  275. t.send(toaddr, findnodePacket, &findnode{
  276. Target: target,
  277. Expiration: uint64(time.Now().Add(expiration).Unix()),
  278. })
  279. err := <-errc
  280. return nodes, err
  281. }
  282. // pending adds a reply callback to the pending reply queue.
  283. // see the documentation of type pending for a detailed explanation.
  284. func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-chan error {
  285. ch := make(chan error, 1)
  286. p := &pending{from: id, ptype: ptype, callback: callback, errc: ch}
  287. select {
  288. case t.addpending <- p:
  289. // loop will handle it
  290. case <-t.closing:
  291. ch <- errClosed
  292. }
  293. return ch
  294. }
  295. func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
  296. matched := make(chan bool, 1)
  297. select {
  298. case t.gotreply <- reply{from, ptype, req, matched}:
  299. // loop will handle it
  300. return <-matched
  301. case <-t.closing:
  302. return false
  303. }
  304. }
  305. // loop runs in its own goroutine. it keeps track of
  306. // the refresh timer and the pending reply queue.
  307. func (t *udp) loop() {
  308. var (
  309. plist = list.New()
  310. timeout = time.NewTimer(0)
  311. nextTimeout *pending // head of plist when timeout was last reset
  312. contTimeouts = 0 // number of continuous timeouts to do NTP checks
  313. ntpWarnTime = time.Unix(0, 0)
  314. )
  315. <-timeout.C // ignore first timeout
  316. defer timeout.Stop()
  317. resetTimeout := func() {
  318. if plist.Front() == nil || nextTimeout == plist.Front().Value {
  319. return
  320. }
  321. // Start the timer so it fires when the next pending reply has expired.
  322. now := time.Now()
  323. for el := plist.Front(); el != nil; el = el.Next() {
  324. nextTimeout = el.Value.(*pending)
  325. if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout {
  326. timeout.Reset(dist)
  327. return
  328. }
  329. // Remove pending replies whose deadline is too far in the
  330. // future. These can occur if the system clock jumped
  331. // backwards after the deadline was assigned.
  332. nextTimeout.errc <- errClockWarp
  333. plist.Remove(el)
  334. }
  335. nextTimeout = nil
  336. timeout.Stop()
  337. }
  338. for {
  339. resetTimeout()
  340. select {
  341. case <-t.closing:
  342. for el := plist.Front(); el != nil; el = el.Next() {
  343. el.Value.(*pending).errc <- errClosed
  344. }
  345. return
  346. case p := <-t.addpending:
  347. p.deadline = time.Now().Add(respTimeout)
  348. plist.PushBack(p)
  349. case r := <-t.gotreply:
  350. var matched bool
  351. for el := plist.Front(); el != nil; el = el.Next() {
  352. p := el.Value.(*pending)
  353. if p.from == r.from && p.ptype == r.ptype {
  354. matched = true
  355. // Remove the matcher if its callback indicates
  356. // that all replies have been received. This is
  357. // required for packet types that expect multiple
  358. // reply packets.
  359. if p.callback(r.data) {
  360. p.errc <- nil
  361. plist.Remove(el)
  362. }
  363. // Reset the continuous timeout counter (time drift detection)
  364. contTimeouts = 0
  365. }
  366. }
  367. r.matched <- matched
  368. case now := <-timeout.C:
  369. nextTimeout = nil
  370. // Notify and remove callbacks whose deadline is in the past.
  371. for el := plist.Front(); el != nil; el = el.Next() {
  372. p := el.Value.(*pending)
  373. if now.After(p.deadline) || now.Equal(p.deadline) {
  374. p.errc <- errTimeout
  375. plist.Remove(el)
  376. contTimeouts++
  377. }
  378. }
  379. // If we've accumulated too many timeouts, do an NTP time sync check
  380. if contTimeouts > ntpFailureThreshold {
  381. if time.Since(ntpWarnTime) >= ntpWarningCooldown {
  382. ntpWarnTime = time.Now()
  383. go checkClockDrift()
  384. }
  385. contTimeouts = 0
  386. }
  387. }
  388. }
  389. }
  390. const (
  391. macSize = 256 / 8
  392. sigSize = 520 / 8
  393. headSize = macSize + sigSize // space of packet frame data
  394. )
  395. var (
  396. headSpace = make([]byte, headSize)
  397. // Neighbors replies are sent across multiple packets to
  398. // stay below the 1280 byte limit. We compute the maximum number
  399. // of entries by stuffing a packet until it grows too large.
  400. maxNeighbors int
  401. )
  402. func init() {
  403. p := neighbors{Expiration: ^uint64(0)}
  404. maxSizeNode := rpcNode{IP: make(net.IP, 16), UDP: ^uint16(0), TCP: ^uint16(0)}
  405. for n := 0; ; n++ {
  406. p.Nodes = append(p.Nodes, maxSizeNode)
  407. size, _, err := rlp.EncodeToReader(p)
  408. if err != nil {
  409. // If this ever happens, it will be caught by the unit tests.
  410. panic("cannot encode: " + err.Error())
  411. }
  412. if headSize+size+1 >= 1280 {
  413. maxNeighbors = n
  414. break
  415. }
  416. }
  417. }
  418. func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req packet) ([]byte, error) {
  419. packet, hash, err := encodePacket(t.priv, ptype, req)
  420. if err != nil {
  421. return hash, err
  422. }
  423. return hash, t.write(toaddr, req.name(), packet)
  424. }
  425. func (t *udp) write(toaddr *net.UDPAddr, what string, packet []byte) error {
  426. _, err := t.conn.WriteToUDP(packet, toaddr)
  427. log.Trace(">> "+what, "addr", toaddr, "err", err)
  428. return err
  429. }
  430. func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) (packet, hash []byte, err error) {
  431. b := new(bytes.Buffer)
  432. b.Write(headSpace)
  433. b.WriteByte(ptype)
  434. if err := rlp.Encode(b, req); err != nil {
  435. log.Error("Can't encode discv4 packet", "err", err)
  436. return nil, nil, err
  437. }
  438. packet = b.Bytes()
  439. sig, err := crypto.Sign(crypto.Keccak256(packet[headSize:]), priv)
  440. if err != nil {
  441. log.Error("Can't sign discv4 packet", "err", err)
  442. return nil, nil, err
  443. }
  444. copy(packet[macSize:], sig)
  445. // add the hash to the front. Note: this doesn't protect the
  446. // packet in any way. Our public key will be part of this hash in
  447. // The future.
  448. hash = crypto.Keccak256(packet[macSize:])
  449. copy(packet, hash)
  450. return packet, hash, nil
  451. }
  452. // readLoop runs in its own goroutine. it handles incoming UDP packets.
  453. func (t *udp) readLoop(unhandled chan<- ReadPacket) {
  454. defer t.conn.Close()
  455. if unhandled != nil {
  456. defer close(unhandled)
  457. }
  458. // Discovery packets are defined to be no larger than 1280 bytes.
  459. // Packets larger than this size will be cut at the end and treated
  460. // as invalid because their hash won't match.
  461. buf := make([]byte, 1280)
  462. for {
  463. nbytes, from, err := t.conn.ReadFromUDP(buf)
  464. if netutil.IsTemporaryError(err) {
  465. // Ignore temporary read errors.
  466. log.Debug("Temporary UDP read error", "err", err)
  467. continue
  468. } else if err != nil {
  469. // Shut down the loop for permament errors.
  470. log.Debug("UDP read error", "err", err)
  471. return
  472. }
  473. if t.handlePacket(from, buf[:nbytes]) != nil && unhandled != nil {
  474. select {
  475. case unhandled <- ReadPacket{buf[:nbytes], from}:
  476. default:
  477. }
  478. }
  479. }
  480. }
  481. func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
  482. packet, fromID, hash, err := decodePacket(buf)
  483. if err != nil {
  484. log.Debug("Bad discv4 packet", "addr", from, "err", err)
  485. return err
  486. }
  487. err = packet.handle(t, from, fromID, hash)
  488. log.Trace("<< "+packet.name(), "addr", from, "err", err)
  489. return err
  490. }
  491. func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
  492. if len(buf) < headSize+1 {
  493. return nil, NodeID{}, nil, errPacketTooSmall
  494. }
  495. hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:]
  496. shouldhash := crypto.Keccak256(buf[macSize:])
  497. if !bytes.Equal(hash, shouldhash) {
  498. return nil, NodeID{}, nil, errBadHash
  499. }
  500. fromID, err := recoverNodeID(crypto.Keccak256(buf[headSize:]), sig)
  501. if err != nil {
  502. return nil, NodeID{}, hash, err
  503. }
  504. var req packet
  505. switch ptype := sigdata[0]; ptype {
  506. case pingPacket:
  507. req = new(ping)
  508. case pongPacket:
  509. req = new(pong)
  510. case findnodePacket:
  511. req = new(findnode)
  512. case neighborsPacket:
  513. req = new(neighbors)
  514. default:
  515. return nil, fromID, hash, fmt.Errorf("unknown type: %d", ptype)
  516. }
  517. s := rlp.NewStream(bytes.NewReader(sigdata[1:]), 0)
  518. err = s.Decode(req)
  519. return req, fromID, hash, err
  520. }
  521. func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  522. if expired(req.Expiration) {
  523. return errExpired
  524. }
  525. t.send(from, pongPacket, &pong{
  526. To: makeEndpoint(from, req.From.TCP),
  527. ReplyTok: mac,
  528. Expiration: uint64(time.Now().Add(expiration).Unix()),
  529. })
  530. if !t.handleReply(fromID, pingPacket, req) {
  531. // Note: we're ignoring the provided IP address right now
  532. go t.bond(true, fromID, from, req.From.TCP)
  533. }
  534. return nil
  535. }
  536. func (req *ping) name() string { return "PING/v4" }
  537. func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  538. if expired(req.Expiration) {
  539. return errExpired
  540. }
  541. if !t.handleReply(fromID, pongPacket, req) {
  542. return errUnsolicitedReply
  543. }
  544. return nil
  545. }
  546. func (req *pong) name() string { return "PONG/v4" }
  547. func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  548. if expired(req.Expiration) {
  549. return errExpired
  550. }
  551. if !t.db.hasBond(fromID) {
  552. // No bond exists, we don't process the packet. This prevents
  553. // an attack vector where the discovery protocol could be used
  554. // to amplify traffic in a DDOS attack. A malicious actor
  555. // would send a findnode request with the IP address and UDP
  556. // port of the target as the source address. The recipient of
  557. // the findnode packet would then send a neighbors packet
  558. // (which is a much bigger packet than findnode) to the victim.
  559. return errUnknownNode
  560. }
  561. target := crypto.Keccak256Hash(req.Target[:])
  562. t.mutex.Lock()
  563. closest := t.closest(target, bucketSize).entries
  564. t.mutex.Unlock()
  565. p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
  566. var sent bool
  567. // Send neighbors in chunks with at most maxNeighbors per packet
  568. // to stay below the 1280 byte limit.
  569. for _, n := range closest {
  570. if netutil.CheckRelayIP(from.IP, n.IP) == nil {
  571. p.Nodes = append(p.Nodes, nodeToRPC(n))
  572. }
  573. if len(p.Nodes) == maxNeighbors {
  574. t.send(from, neighborsPacket, &p)
  575. p.Nodes = p.Nodes[:0]
  576. sent = true
  577. }
  578. }
  579. if len(p.Nodes) > 0 || !sent {
  580. t.send(from, neighborsPacket, &p)
  581. }
  582. return nil
  583. }
  584. func (req *findnode) name() string { return "FINDNODE/v4" }
  585. func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
  586. if expired(req.Expiration) {
  587. return errExpired
  588. }
  589. if !t.handleReply(fromID, neighborsPacket, req) {
  590. return errUnsolicitedReply
  591. }
  592. return nil
  593. }
  594. func (req *neighbors) name() string { return "NEIGHBORS/v4" }
  595. func expired(ts uint64) bool {
  596. return time.Unix(int64(ts), 0).Before(time.Now())
  597. }