123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package discv5
- import (
- "crypto/ecdsa"
- "encoding/binary"
- "fmt"
- "math/rand"
- "net"
- "strconv"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/ethereum/go-ethereum/common"
- )
- // In this test, nodes try to randomly resolve each other.
- func TestSimRandomResolve(t *testing.T) {
- t.Skip("boring")
- if runWithPlaygroundTime(t) {
- return
- }
- sim := newSimulation()
- bootnode := sim.launchNode(false)
- // A new node joins every 10s.
- launcher := time.NewTicker(10 * time.Second)
- go func() {
- for range launcher.C {
- net := sim.launchNode(false)
- go randomResolves(t, sim, net)
- if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
- panic(err)
- }
- fmt.Printf("launched @ %v: %x\n", time.Now(), net.Self().ID[:16])
- }
- }()
- time.Sleep(3 * time.Hour)
- launcher.Stop()
- sim.shutdown()
- sim.printStats()
- }
- func TestSimTopics(t *testing.T) {
- t.Skip("NaCl test")
- if runWithPlaygroundTime(t) {
- return
- }
- sim := newSimulation()
- bootnode := sim.launchNode(false)
- go func() {
- nets := make([]*Network, 1024)
- for i := range nets {
- net := sim.launchNode(false)
- nets[i] = net
- if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
- panic(err)
- }
- time.Sleep(time.Second * 5)
- }
- for i, net := range nets {
- if i < 256 {
- stop := make(chan struct{})
- go net.RegisterTopic(testTopic, stop)
- go func() {
- //time.Sleep(time.Second * 36000)
- time.Sleep(time.Second * 40000)
- close(stop)
- }()
- time.Sleep(time.Millisecond * 100)
- }
- // time.Sleep(time.Second * 10)
- //time.Sleep(time.Second)
- /*if i%500 == 499 {
- time.Sleep(time.Second * 9501)
- } else {
- time.Sleep(time.Second)
- }*/
- }
- }()
- // A new node joins every 10s.
- /* launcher := time.NewTicker(5 * time.Second)
- cnt := 0
- var printNet *Network
- go func() {
- for range launcher.C {
- cnt++
- if cnt <= 1000 {
- log := false //(cnt == 500)
- net := sim.launchNode(log)
- if log {
- printNet = net
- }
- if cnt > 500 {
- go net.RegisterTopic(testTopic, nil)
- }
- if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
- panic(err)
- }
- }
- //fmt.Printf("launched @ %v: %x\n", time.Now(), net.Self().ID[:16])
- }
- }()
- */
- time.Sleep(55000 * time.Second)
- //launcher.Stop()
- sim.shutdown()
- //sim.printStats()
- //printNet.log.printLogs()
- }
- /*func testHierarchicalTopics(i int) []Topic {
- digits := strconv.FormatInt(int64(256+i/4), 4)
- res := make([]Topic, 5)
- for i, _ := range res {
- res[i] = Topic("foo" + digits[1:i+1])
- }
- return res
- }*/
- func testHierarchicalTopics(i int) []Topic {
- digits := strconv.FormatInt(int64(128+i/8), 2)
- res := make([]Topic, 8)
- for i := range res {
- res[i] = Topic("foo" + digits[1:i+1])
- }
- return res
- }
- func TestSimTopicHierarchy(t *testing.T) {
- t.Skip("NaCl test")
- if runWithPlaygroundTime(t) {
- return
- }
- sim := newSimulation()
- bootnode := sim.launchNode(false)
- go func() {
- nets := make([]*Network, 1024)
- for i := range nets {
- net := sim.launchNode(false)
- nets[i] = net
- if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
- panic(err)
- }
- time.Sleep(time.Second * 5)
- }
- stop := make(chan struct{})
- for i, net := range nets {
- //if i < 256 {
- for _, topic := range testHierarchicalTopics(i)[:5] {
- //fmt.Println("reg", topic)
- go net.RegisterTopic(topic, stop)
- }
- time.Sleep(time.Millisecond * 100)
- //}
- }
- time.Sleep(time.Second * 90000)
- close(stop)
- }()
- time.Sleep(100000 * time.Second)
- sim.shutdown()
- }
- func randomResolves(t *testing.T, s *simulation, net *Network) {
- randtime := func() time.Duration {
- return time.Duration(rand.Intn(50)+20) * time.Second
- }
- lookup := func(target NodeID) bool {
- result := net.Resolve(target)
- return result != nil && result.ID == target
- }
- timer := time.NewTimer(randtime())
- for {
- select {
- case <-timer.C:
- target := s.randomNode().Self().ID
- if !lookup(target) {
- t.Errorf("node %x: target %x not found", net.Self().ID[:8], target[:8])
- }
- timer.Reset(randtime())
- case <-net.closed:
- return
- }
- }
- }
- type simulation struct {
- mu sync.RWMutex
- nodes map[NodeID]*Network
- nodectr uint32
- }
- func newSimulation() *simulation {
- return &simulation{nodes: make(map[NodeID]*Network)}
- }
- func (s *simulation) shutdown() {
- s.mu.RLock()
- alive := make([]*Network, 0, len(s.nodes))
- for _, n := range s.nodes {
- alive = append(alive, n)
- }
- defer s.mu.RUnlock()
- for _, n := range alive {
- n.Close()
- }
- }
- func (s *simulation) printStats() {
- s.mu.Lock()
- defer s.mu.Unlock()
- fmt.Println("node counter:", s.nodectr)
- fmt.Println("alive nodes:", len(s.nodes))
- // for _, n := range s.nodes {
- // fmt.Printf("%x\n", n.tab.self.ID[:8])
- // transport := n.conn.(*simTransport)
- // fmt.Println(" joined:", transport.joinTime)
- // fmt.Println(" sends:", transport.hashctr)
- // fmt.Println(" table size:", n.tab.count)
- // }
- /*for _, n := range s.nodes {
- fmt.Println()
- fmt.Printf("*** Node %x\n", n.tab.self.ID[:8])
- n.log.printLogs()
- }*/
- }
- func (s *simulation) randomNode() *Network {
- s.mu.Lock()
- defer s.mu.Unlock()
- n := rand.Intn(len(s.nodes))
- for _, net := range s.nodes {
- if n == 0 {
- return net
- }
- n--
- }
- return nil
- }
- func (s *simulation) launchNode(log bool) *Network {
- var (
- num = s.nodectr
- key = newkey()
- id = PubkeyID(&key.PublicKey)
- ip = make(net.IP, 4)
- )
- s.nodectr++
- binary.BigEndian.PutUint32(ip, num)
- ip[0] = 10
- addr := &net.UDPAddr{IP: ip, Port: 30303}
- transport := &simTransport{joinTime: time.Now(), sender: id, senderAddr: addr, sim: s, priv: key}
- net, err := newNetwork(transport, key.PublicKey, "<no database>", nil)
- if err != nil {
- panic("cannot launch new node: " + err.Error())
- }
- s.mu.Lock()
- s.nodes[id] = net
- s.mu.Unlock()
- return net
- }
- func (s *simulation) dropNode(id NodeID) {
- s.mu.Lock()
- n := s.nodes[id]
- delete(s.nodes, id)
- s.mu.Unlock()
- n.Close()
- }
- type simTransport struct {
- joinTime time.Time
- sender NodeID
- senderAddr *net.UDPAddr
- sim *simulation
- hashctr uint64
- priv *ecdsa.PrivateKey
- }
- func (st *simTransport) localAddr() *net.UDPAddr {
- return st.senderAddr
- }
- func (st *simTransport) Close() {}
- func (st *simTransport) send(remote *Node, ptype nodeEvent, data interface{}) (hash []byte) {
- hash = st.nextHash()
- var raw []byte
- if ptype == pongPacket {
- var err error
- raw, _, err = encodePacket(st.priv, byte(ptype), data)
- if err != nil {
- panic(err)
- }
- }
- st.sendPacket(remote.ID, ingressPacket{
- remoteID: st.sender,
- remoteAddr: st.senderAddr,
- hash: hash,
- ev: ptype,
- data: data,
- rawData: raw,
- })
- return hash
- }
- func (st *simTransport) sendPing(remote *Node, remoteAddr *net.UDPAddr, topics []Topic) []byte {
- hash := st.nextHash()
- st.sendPacket(remote.ID, ingressPacket{
- remoteID: st.sender,
- remoteAddr: st.senderAddr,
- hash: hash,
- ev: pingPacket,
- data: &ping{
- Version: 4,
- From: rpcEndpoint{IP: st.senderAddr.IP, UDP: uint16(st.senderAddr.Port), TCP: 30303},
- To: rpcEndpoint{IP: remoteAddr.IP, UDP: uint16(remoteAddr.Port), TCP: 30303},
- Expiration: uint64(time.Now().Unix() + int64(expiration)),
- Topics: topics,
- },
- })
- return hash
- }
- func (st *simTransport) sendPong(remote *Node, pingHash []byte) {
- raddr := remote.addr()
- st.sendPacket(remote.ID, ingressPacket{
- remoteID: st.sender,
- remoteAddr: st.senderAddr,
- hash: st.nextHash(),
- ev: pongPacket,
- data: &pong{
- To: rpcEndpoint{IP: raddr.IP, UDP: uint16(raddr.Port), TCP: 30303},
- ReplyTok: pingHash,
- Expiration: uint64(time.Now().Unix() + int64(expiration)),
- },
- })
- }
- func (st *simTransport) sendFindnodeHash(remote *Node, target common.Hash) {
- st.sendPacket(remote.ID, ingressPacket{
- remoteID: st.sender,
- remoteAddr: st.senderAddr,
- hash: st.nextHash(),
- ev: findnodeHashPacket,
- data: &findnodeHash{
- Target: target,
- Expiration: uint64(time.Now().Unix() + int64(expiration)),
- },
- })
- }
- func (st *simTransport) sendTopicRegister(remote *Node, topics []Topic, idx int, pong []byte) {
- //fmt.Println("send", topics, pong)
- st.sendPacket(remote.ID, ingressPacket{
- remoteID: st.sender,
- remoteAddr: st.senderAddr,
- hash: st.nextHash(),
- ev: topicRegisterPacket,
- data: &topicRegister{
- Topics: topics,
- Idx: uint(idx),
- Pong: pong,
- },
- })
- }
- func (st *simTransport) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node) {
- rnodes := make([]rpcNode, len(nodes))
- for i := range nodes {
- rnodes[i] = nodeToRPC(nodes[i])
- }
- st.sendPacket(remote.ID, ingressPacket{
- remoteID: st.sender,
- remoteAddr: st.senderAddr,
- hash: st.nextHash(),
- ev: topicNodesPacket,
- data: &topicNodes{Echo: queryHash, Nodes: rnodes},
- })
- }
- func (st *simTransport) sendNeighbours(remote *Node, nodes []*Node) {
- // TODO: send multiple packets
- rnodes := make([]rpcNode, len(nodes))
- for i := range nodes {
- rnodes[i] = nodeToRPC(nodes[i])
- }
- st.sendPacket(remote.ID, ingressPacket{
- remoteID: st.sender,
- remoteAddr: st.senderAddr,
- hash: st.nextHash(),
- ev: neighborsPacket,
- data: &neighbors{
- Nodes: rnodes,
- Expiration: uint64(time.Now().Unix() + int64(expiration)),
- },
- })
- }
- func (st *simTransport) nextHash() []byte {
- v := atomic.AddUint64(&st.hashctr, 1)
- var hash common.Hash
- binary.BigEndian.PutUint64(hash[:], v)
- return hash[:]
- }
- const packetLoss = 0 // 1/1000
- func (st *simTransport) sendPacket(remote NodeID, p ingressPacket) {
- if rand.Int31n(1000) >= packetLoss {
- st.sim.mu.RLock()
- recipient := st.sim.nodes[remote]
- st.sim.mu.RUnlock()
- time.AfterFunc(200*time.Millisecond, func() {
- recipient.reqReadPacket(p)
- })
- }
- }
|