peer.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "crypto/ecdsa"
  20. "encoding/binary"
  21. "errors"
  22. "fmt"
  23. "math/big"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/core/types"
  28. "github.com/ethereum/go-ethereum/eth"
  29. "github.com/ethereum/go-ethereum/les/flowcontrol"
  30. "github.com/ethereum/go-ethereum/light"
  31. "github.com/ethereum/go-ethereum/p2p"
  32. "github.com/ethereum/go-ethereum/rlp"
  33. )
  34. var (
  35. errClosed = errors.New("peer set is closed")
  36. errAlreadyRegistered = errors.New("peer is already registered")
  37. errNotRegistered = errors.New("peer is not registered")
  38. )
  39. const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
  40. const (
  41. announceTypeNone = iota
  42. announceTypeSimple
  43. announceTypeSigned
  44. )
  45. type peer struct {
  46. *p2p.Peer
  47. pubKey *ecdsa.PublicKey
  48. rw p2p.MsgReadWriter
  49. version int // Protocol version negotiated
  50. network uint64 // Network ID being on
  51. announceType, requestAnnounceType uint64
  52. id string
  53. headInfo *announceData
  54. lock sync.RWMutex
  55. announceChn chan announceData
  56. sendQueue *execQueue
  57. poolEntry *poolEntry
  58. hasBlock func(common.Hash, uint64) bool
  59. responseErrors int
  60. fcClient *flowcontrol.ClientNode // nil if the peer is server only
  61. fcServer *flowcontrol.ServerNode // nil if the peer is client only
  62. fcServerParams *flowcontrol.ServerParams
  63. fcCosts requestCostTable
  64. }
  65. func newPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  66. id := p.ID()
  67. pubKey, _ := id.Pubkey()
  68. return &peer{
  69. Peer: p,
  70. pubKey: pubKey,
  71. rw: rw,
  72. version: version,
  73. network: network,
  74. id: fmt.Sprintf("%x", id[:8]),
  75. announceChn: make(chan announceData, 20),
  76. }
  77. }
  78. func (p *peer) canQueue() bool {
  79. return p.sendQueue.canQueue()
  80. }
  81. func (p *peer) queueSend(f func()) {
  82. p.sendQueue.queue(f)
  83. }
  84. // Info gathers and returns a collection of metadata known about a peer.
  85. func (p *peer) Info() *eth.PeerInfo {
  86. return &eth.PeerInfo{
  87. Version: p.version,
  88. Difficulty: p.Td(),
  89. Head: fmt.Sprintf("%x", p.Head()),
  90. }
  91. }
  92. // Head retrieves a copy of the current head (most recent) hash of the peer.
  93. func (p *peer) Head() (hash common.Hash) {
  94. p.lock.RLock()
  95. defer p.lock.RUnlock()
  96. copy(hash[:], p.headInfo.Hash[:])
  97. return hash
  98. }
  99. func (p *peer) HeadAndTd() (hash common.Hash, td *big.Int) {
  100. p.lock.RLock()
  101. defer p.lock.RUnlock()
  102. copy(hash[:], p.headInfo.Hash[:])
  103. return hash, p.headInfo.Td
  104. }
  105. func (p *peer) headBlockInfo() blockInfo {
  106. p.lock.RLock()
  107. defer p.lock.RUnlock()
  108. return blockInfo{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td}
  109. }
  110. // Td retrieves the current total difficulty of a peer.
  111. func (p *peer) Td() *big.Int {
  112. p.lock.RLock()
  113. defer p.lock.RUnlock()
  114. return new(big.Int).Set(p.headInfo.Td)
  115. }
  116. // waitBefore implements distPeer interface
  117. func (p *peer) waitBefore(maxCost uint64) (time.Duration, float64) {
  118. return p.fcServer.CanSend(maxCost)
  119. }
  120. func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error {
  121. type req struct {
  122. ReqID uint64
  123. Data interface{}
  124. }
  125. return p2p.Send(w, msgcode, req{reqID, data})
  126. }
  127. func sendResponse(w p2p.MsgWriter, msgcode, reqID, bv uint64, data interface{}) error {
  128. type resp struct {
  129. ReqID, BV uint64
  130. Data interface{}
  131. }
  132. return p2p.Send(w, msgcode, resp{reqID, bv, data})
  133. }
  134. func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
  135. p.lock.RLock()
  136. defer p.lock.RUnlock()
  137. cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount)
  138. if cost > p.fcServerParams.BufLimit {
  139. cost = p.fcServerParams.BufLimit
  140. }
  141. return cost
  142. }
  143. // HasBlock checks if the peer has a given block
  144. func (p *peer) HasBlock(hash common.Hash, number uint64) bool {
  145. p.lock.RLock()
  146. hasBlock := p.hasBlock
  147. p.lock.RUnlock()
  148. return hasBlock != nil && hasBlock(hash, number)
  149. }
  150. // SendAnnounce announces the availability of a number of blocks through
  151. // a hash notification.
  152. func (p *peer) SendAnnounce(request announceData) error {
  153. return p2p.Send(p.rw, AnnounceMsg, request)
  154. }
  155. // SendBlockHeaders sends a batch of block headers to the remote peer.
  156. func (p *peer) SendBlockHeaders(reqID, bv uint64, headers []*types.Header) error {
  157. return sendResponse(p.rw, BlockHeadersMsg, reqID, bv, headers)
  158. }
  159. // SendBlockBodiesRLP sends a batch of block contents to the remote peer from
  160. // an already RLP encoded format.
  161. func (p *peer) SendBlockBodiesRLP(reqID, bv uint64, bodies []rlp.RawValue) error {
  162. return sendResponse(p.rw, BlockBodiesMsg, reqID, bv, bodies)
  163. }
  164. // SendCodeRLP sends a batch of arbitrary internal data, corresponding to the
  165. // hashes requested.
  166. func (p *peer) SendCode(reqID, bv uint64, data [][]byte) error {
  167. return sendResponse(p.rw, CodeMsg, reqID, bv, data)
  168. }
  169. // SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
  170. // ones requested from an already RLP encoded format.
  171. func (p *peer) SendReceiptsRLP(reqID, bv uint64, receipts []rlp.RawValue) error {
  172. return sendResponse(p.rw, ReceiptsMsg, reqID, bv, receipts)
  173. }
  174. // SendProofs sends a batch of legacy LES/1 merkle proofs, corresponding to the ones requested.
  175. func (p *peer) SendProofs(reqID, bv uint64, proofs proofsData) error {
  176. return sendResponse(p.rw, ProofsV1Msg, reqID, bv, proofs)
  177. }
  178. // SendProofsV2 sends a batch of merkle proofs, corresponding to the ones requested.
  179. func (p *peer) SendProofsV2(reqID, bv uint64, proofs light.NodeList) error {
  180. return sendResponse(p.rw, ProofsV2Msg, reqID, bv, proofs)
  181. }
  182. // SendHeaderProofs sends a batch of legacy LES/1 header proofs, corresponding to the ones requested.
  183. func (p *peer) SendHeaderProofs(reqID, bv uint64, proofs []ChtResp) error {
  184. return sendResponse(p.rw, HeaderProofsMsg, reqID, bv, proofs)
  185. }
  186. // SendHelperTrieProofs sends a batch of HelperTrie proofs, corresponding to the ones requested.
  187. func (p *peer) SendHelperTrieProofs(reqID, bv uint64, resp HelperTrieResps) error {
  188. return sendResponse(p.rw, HelperTrieProofsMsg, reqID, bv, resp)
  189. }
  190. // SendTxStatus sends a batch of transaction status records, corresponding to the ones requested.
  191. func (p *peer) SendTxStatus(reqID, bv uint64, stats []txStatus) error {
  192. return sendResponse(p.rw, TxStatusMsg, reqID, bv, stats)
  193. }
  194. // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
  195. // specified header query, based on the hash of an origin block.
  196. func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool) error {
  197. p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
  198. return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
  199. }
  200. // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
  201. // specified header query, based on the number of an origin block.
  202. func (p *peer) RequestHeadersByNumber(reqID, cost, origin uint64, amount int, skip int, reverse bool) error {
  203. p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
  204. return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
  205. }
  206. // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
  207. // specified.
  208. func (p *peer) RequestBodies(reqID, cost uint64, hashes []common.Hash) error {
  209. p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
  210. return sendRequest(p.rw, GetBlockBodiesMsg, reqID, cost, hashes)
  211. }
  212. // RequestCode fetches a batch of arbitrary data from a node's known state
  213. // data, corresponding to the specified hashes.
  214. func (p *peer) RequestCode(reqID, cost uint64, reqs []CodeReq) error {
  215. p.Log().Debug("Fetching batch of codes", "count", len(reqs))
  216. return sendRequest(p.rw, GetCodeMsg, reqID, cost, reqs)
  217. }
  218. // RequestReceipts fetches a batch of transaction receipts from a remote node.
  219. func (p *peer) RequestReceipts(reqID, cost uint64, hashes []common.Hash) error {
  220. p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
  221. return sendRequest(p.rw, GetReceiptsMsg, reqID, cost, hashes)
  222. }
  223. // RequestProofs fetches a batch of merkle proofs from a remote node.
  224. func (p *peer) RequestProofs(reqID, cost uint64, reqs []ProofReq) error {
  225. p.Log().Debug("Fetching batch of proofs", "count", len(reqs))
  226. switch p.version {
  227. case lpv1:
  228. return sendRequest(p.rw, GetProofsV1Msg, reqID, cost, reqs)
  229. case lpv2:
  230. return sendRequest(p.rw, GetProofsV2Msg, reqID, cost, reqs)
  231. default:
  232. panic(nil)
  233. }
  234. }
  235. // RequestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node.
  236. func (p *peer) RequestHelperTrieProofs(reqID, cost uint64, reqs []HelperTrieReq) error {
  237. p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs))
  238. switch p.version {
  239. case lpv1:
  240. reqsV1 := make([]ChtReq, len(reqs))
  241. for i, req := range reqs {
  242. if req.Type != htCanonical || req.AuxReq != auxHeader || len(req.Key) != 8 {
  243. return fmt.Errorf("Request invalid in LES/1 mode")
  244. }
  245. blockNum := binary.BigEndian.Uint64(req.Key)
  246. // convert HelperTrie request to old CHT request
  247. reqsV1[i] = ChtReq{ChtNum: (req.TrieIdx + 1) * (light.CHTFrequencyClient / light.CHTFrequencyServer), BlockNum: blockNum, FromLevel: req.FromLevel}
  248. }
  249. return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqsV1)
  250. case lpv2:
  251. return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, cost, reqs)
  252. default:
  253. panic(nil)
  254. }
  255. }
  256. // RequestTxStatus fetches a batch of transaction status records from a remote node.
  257. func (p *peer) RequestTxStatus(reqID, cost uint64, txHashes []common.Hash) error {
  258. p.Log().Debug("Requesting transaction status", "count", len(txHashes))
  259. return sendRequest(p.rw, GetTxStatusMsg, reqID, cost, txHashes)
  260. }
  261. // SendTxStatus sends a batch of transactions to be added to the remote transaction pool.
  262. func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error {
  263. p.Log().Debug("Fetching batch of transactions", "count", len(txs))
  264. switch p.version {
  265. case lpv1:
  266. return p2p.Send(p.rw, SendTxMsg, txs) // old message format does not include reqID
  267. case lpv2:
  268. return sendRequest(p.rw, SendTxV2Msg, reqID, cost, txs)
  269. default:
  270. panic(nil)
  271. }
  272. }
  273. type keyValueEntry struct {
  274. Key string
  275. Value rlp.RawValue
  276. }
  277. type keyValueList []keyValueEntry
  278. type keyValueMap map[string]rlp.RawValue
  279. func (l keyValueList) add(key string, val interface{}) keyValueList {
  280. var entry keyValueEntry
  281. entry.Key = key
  282. if val == nil {
  283. val = uint64(0)
  284. }
  285. enc, err := rlp.EncodeToBytes(val)
  286. if err == nil {
  287. entry.Value = enc
  288. }
  289. return append(l, entry)
  290. }
  291. func (l keyValueList) decode() keyValueMap {
  292. m := make(keyValueMap)
  293. for _, entry := range l {
  294. m[entry.Key] = entry.Value
  295. }
  296. return m
  297. }
  298. func (m keyValueMap) get(key string, val interface{}) error {
  299. enc, ok := m[key]
  300. if !ok {
  301. return errResp(ErrMissingKey, "%s", key)
  302. }
  303. if val == nil {
  304. return nil
  305. }
  306. return rlp.DecodeBytes(enc, val)
  307. }
  308. func (p *peer) sendReceiveHandshake(sendList keyValueList) (keyValueList, error) {
  309. // Send out own handshake in a new thread
  310. errc := make(chan error, 1)
  311. go func() {
  312. errc <- p2p.Send(p.rw, StatusMsg, sendList)
  313. }()
  314. // In the mean time retrieve the remote status message
  315. msg, err := p.rw.ReadMsg()
  316. if err != nil {
  317. return nil, err
  318. }
  319. if msg.Code != StatusMsg {
  320. return nil, errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
  321. }
  322. if msg.Size > ProtocolMaxMsgSize {
  323. return nil, errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  324. }
  325. // Decode the handshake
  326. var recvList keyValueList
  327. if err := msg.Decode(&recvList); err != nil {
  328. return nil, errResp(ErrDecode, "msg %v: %v", msg, err)
  329. }
  330. if err := <-errc; err != nil {
  331. return nil, err
  332. }
  333. return recvList, nil
  334. }
  335. // Handshake executes the les protocol handshake, negotiating version number,
  336. // network IDs, difficulties, head and genesis blocks.
  337. func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, server *LesServer) error {
  338. p.lock.Lock()
  339. defer p.lock.Unlock()
  340. var send keyValueList
  341. send = send.add("protocolVersion", uint64(p.version))
  342. send = send.add("networkId", p.network)
  343. send = send.add("headTd", td)
  344. send = send.add("headHash", head)
  345. send = send.add("headNum", headNum)
  346. send = send.add("genesisHash", genesis)
  347. if server != nil {
  348. send = send.add("serveHeaders", nil)
  349. send = send.add("serveChainSince", uint64(0))
  350. send = send.add("serveStateSince", uint64(0))
  351. send = send.add("txRelay", nil)
  352. send = send.add("flowControl/BL", server.defParams.BufLimit)
  353. send = send.add("flowControl/MRR", server.defParams.MinRecharge)
  354. list := server.fcCostStats.getCurrentList()
  355. send = send.add("flowControl/MRC", list)
  356. p.fcCosts = list.decode()
  357. } else {
  358. p.requestAnnounceType = announceTypeSimple // set to default until "very light" client mode is implemented
  359. send = send.add("announceType", p.requestAnnounceType)
  360. }
  361. recvList, err := p.sendReceiveHandshake(send)
  362. if err != nil {
  363. return err
  364. }
  365. recv := recvList.decode()
  366. var rGenesis, rHash common.Hash
  367. var rVersion, rNetwork, rNum uint64
  368. var rTd *big.Int
  369. if err := recv.get("protocolVersion", &rVersion); err != nil {
  370. return err
  371. }
  372. if err := recv.get("networkId", &rNetwork); err != nil {
  373. return err
  374. }
  375. if err := recv.get("headTd", &rTd); err != nil {
  376. return err
  377. }
  378. if err := recv.get("headHash", &rHash); err != nil {
  379. return err
  380. }
  381. if err := recv.get("headNum", &rNum); err != nil {
  382. return err
  383. }
  384. if err := recv.get("genesisHash", &rGenesis); err != nil {
  385. return err
  386. }
  387. if rGenesis != genesis {
  388. return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", rGenesis[:8], genesis[:8])
  389. }
  390. if rNetwork != p.network {
  391. return errResp(ErrNetworkIdMismatch, "%d (!= %d)", rNetwork, p.network)
  392. }
  393. if int(rVersion) != p.version {
  394. return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", rVersion, p.version)
  395. }
  396. if server != nil {
  397. // until we have a proper peer connectivity API, allow LES connection to other servers
  398. /*if recv.get("serveStateSince", nil) == nil {
  399. return errResp(ErrUselessPeer, "wanted client, got server")
  400. }*/
  401. if recv.get("announceType", &p.announceType) != nil {
  402. p.announceType = announceTypeSimple
  403. }
  404. p.fcClient = flowcontrol.NewClientNode(server.fcManager, server.defParams)
  405. } else {
  406. if recv.get("serveChainSince", nil) != nil {
  407. return errResp(ErrUselessPeer, "peer cannot serve chain")
  408. }
  409. if recv.get("serveStateSince", nil) != nil {
  410. return errResp(ErrUselessPeer, "peer cannot serve state")
  411. }
  412. if recv.get("txRelay", nil) != nil {
  413. return errResp(ErrUselessPeer, "peer cannot relay transactions")
  414. }
  415. params := &flowcontrol.ServerParams{}
  416. if err := recv.get("flowControl/BL", &params.BufLimit); err != nil {
  417. return err
  418. }
  419. if err := recv.get("flowControl/MRR", &params.MinRecharge); err != nil {
  420. return err
  421. }
  422. var MRC RequestCostList
  423. if err := recv.get("flowControl/MRC", &MRC); err != nil {
  424. return err
  425. }
  426. p.fcServerParams = params
  427. p.fcServer = flowcontrol.NewServerNode(params)
  428. p.fcCosts = MRC.decode()
  429. }
  430. p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum}
  431. return nil
  432. }
  433. // String implements fmt.Stringer.
  434. func (p *peer) String() string {
  435. return fmt.Sprintf("Peer %s [%s]", p.id,
  436. fmt.Sprintf("les/%d", p.version),
  437. )
  438. }
  439. // peerSetNotify is a callback interface to notify services about added or
  440. // removed peers
  441. type peerSetNotify interface {
  442. registerPeer(*peer)
  443. unregisterPeer(*peer)
  444. }
  445. // peerSet represents the collection of active peers currently participating in
  446. // the Light Ethereum sub-protocol.
  447. type peerSet struct {
  448. peers map[string]*peer
  449. lock sync.RWMutex
  450. notifyList []peerSetNotify
  451. closed bool
  452. }
  453. // newPeerSet creates a new peer set to track the active participants.
  454. func newPeerSet() *peerSet {
  455. return &peerSet{
  456. peers: make(map[string]*peer),
  457. }
  458. }
  459. // notify adds a service to be notified about added or removed peers
  460. func (ps *peerSet) notify(n peerSetNotify) {
  461. ps.lock.Lock()
  462. ps.notifyList = append(ps.notifyList, n)
  463. peers := make([]*peer, 0, len(ps.peers))
  464. for _, p := range ps.peers {
  465. peers = append(peers, p)
  466. }
  467. ps.lock.Unlock()
  468. for _, p := range peers {
  469. n.registerPeer(p)
  470. }
  471. }
  472. // Register injects a new peer into the working set, or returns an error if the
  473. // peer is already known.
  474. func (ps *peerSet) Register(p *peer) error {
  475. ps.lock.Lock()
  476. if ps.closed {
  477. ps.lock.Unlock()
  478. return errClosed
  479. }
  480. if _, ok := ps.peers[p.id]; ok {
  481. ps.lock.Unlock()
  482. return errAlreadyRegistered
  483. }
  484. ps.peers[p.id] = p
  485. p.sendQueue = newExecQueue(100)
  486. peers := make([]peerSetNotify, len(ps.notifyList))
  487. copy(peers, ps.notifyList)
  488. ps.lock.Unlock()
  489. for _, n := range peers {
  490. n.registerPeer(p)
  491. }
  492. return nil
  493. }
  494. // Unregister removes a remote peer from the active set, disabling any further
  495. // actions to/from that particular entity. It also initiates disconnection at the networking layer.
  496. func (ps *peerSet) Unregister(id string) error {
  497. ps.lock.Lock()
  498. if p, ok := ps.peers[id]; !ok {
  499. ps.lock.Unlock()
  500. return errNotRegistered
  501. } else {
  502. delete(ps.peers, id)
  503. peers := make([]peerSetNotify, len(ps.notifyList))
  504. copy(peers, ps.notifyList)
  505. ps.lock.Unlock()
  506. for _, n := range peers {
  507. n.unregisterPeer(p)
  508. }
  509. p.sendQueue.quit()
  510. p.Peer.Disconnect(p2p.DiscUselessPeer)
  511. return nil
  512. }
  513. }
  514. // AllPeerIDs returns a list of all registered peer IDs
  515. func (ps *peerSet) AllPeerIDs() []string {
  516. ps.lock.RLock()
  517. defer ps.lock.RUnlock()
  518. res := make([]string, len(ps.peers))
  519. idx := 0
  520. for id := range ps.peers {
  521. res[idx] = id
  522. idx++
  523. }
  524. return res
  525. }
  526. // Peer retrieves the registered peer with the given id.
  527. func (ps *peerSet) Peer(id string) *peer {
  528. ps.lock.RLock()
  529. defer ps.lock.RUnlock()
  530. return ps.peers[id]
  531. }
  532. // Len returns if the current number of peers in the set.
  533. func (ps *peerSet) Len() int {
  534. ps.lock.RLock()
  535. defer ps.lock.RUnlock()
  536. return len(ps.peers)
  537. }
  538. // BestPeer retrieves the known peer with the currently highest total difficulty.
  539. func (ps *peerSet) BestPeer() *peer {
  540. ps.lock.RLock()
  541. defer ps.lock.RUnlock()
  542. var (
  543. bestPeer *peer
  544. bestTd *big.Int
  545. )
  546. for _, p := range ps.peers {
  547. if td := p.Td(); bestPeer == nil || td.Cmp(bestTd) > 0 {
  548. bestPeer, bestTd = p, td
  549. }
  550. }
  551. return bestPeer
  552. }
  553. // AllPeers returns all peers in a list
  554. func (ps *peerSet) AllPeers() []*peer {
  555. ps.lock.RLock()
  556. defer ps.lock.RUnlock()
  557. list := make([]*peer, len(ps.peers))
  558. i := 0
  559. for _, peer := range ps.peers {
  560. list[i] = peer
  561. i++
  562. }
  563. return list
  564. }
  565. // Close disconnects all peers.
  566. // No new peers can be registered after Close has returned.
  567. func (ps *peerSet) Close() {
  568. ps.lock.Lock()
  569. defer ps.lock.Unlock()
  570. for _, p := range ps.peers {
  571. p.Disconnect(p2p.DiscQuitting)
  572. }
  573. ps.closed = true
  574. }