sim_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discv5
  17. import (
  18. "crypto/ecdsa"
  19. "encoding/binary"
  20. "fmt"
  21. "math/rand"
  22. "net"
  23. "strconv"
  24. "sync"
  25. "sync/atomic"
  26. "testing"
  27. "time"
  28. "github.com/ethereum/go-ethereum/common"
  29. )
  30. // In this test, nodes try to randomly resolve each other.
  31. func TestSimRandomResolve(t *testing.T) {
  32. t.Skip("boring")
  33. if runWithPlaygroundTime(t) {
  34. return
  35. }
  36. sim := newSimulation()
  37. bootnode := sim.launchNode(false)
  38. // A new node joins every 10s.
  39. launcher := time.NewTicker(10 * time.Second)
  40. go func() {
  41. for range launcher.C {
  42. net := sim.launchNode(false)
  43. go randomResolves(t, sim, net)
  44. if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
  45. panic(err)
  46. }
  47. fmt.Printf("launched @ %v: %x\n", time.Now(), net.Self().ID[:16])
  48. }
  49. }()
  50. time.Sleep(3 * time.Hour)
  51. launcher.Stop()
  52. sim.shutdown()
  53. sim.printStats()
  54. }
  55. func TestSimTopics(t *testing.T) {
  56. t.Skip("NaCl test")
  57. if runWithPlaygroundTime(t) {
  58. return
  59. }
  60. sim := newSimulation()
  61. bootnode := sim.launchNode(false)
  62. go func() {
  63. nets := make([]*Network, 1024)
  64. for i := range nets {
  65. net := sim.launchNode(false)
  66. nets[i] = net
  67. if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
  68. panic(err)
  69. }
  70. time.Sleep(time.Second * 5)
  71. }
  72. for i, net := range nets {
  73. if i < 256 {
  74. stop := make(chan struct{})
  75. go net.RegisterTopic(testTopic, stop)
  76. go func() {
  77. //time.Sleep(time.Second * 36000)
  78. time.Sleep(time.Second * 40000)
  79. close(stop)
  80. }()
  81. time.Sleep(time.Millisecond * 100)
  82. }
  83. // time.Sleep(time.Second * 10)
  84. //time.Sleep(time.Second)
  85. /*if i%500 == 499 {
  86. time.Sleep(time.Second * 9501)
  87. } else {
  88. time.Sleep(time.Second)
  89. }*/
  90. }
  91. }()
  92. // A new node joins every 10s.
  93. /* launcher := time.NewTicker(5 * time.Second)
  94. cnt := 0
  95. var printNet *Network
  96. go func() {
  97. for range launcher.C {
  98. cnt++
  99. if cnt <= 1000 {
  100. log := false //(cnt == 500)
  101. net := sim.launchNode(log)
  102. if log {
  103. printNet = net
  104. }
  105. if cnt > 500 {
  106. go net.RegisterTopic(testTopic, nil)
  107. }
  108. if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
  109. panic(err)
  110. }
  111. }
  112. //fmt.Printf("launched @ %v: %x\n", time.Now(), net.Self().ID[:16])
  113. }
  114. }()
  115. */
  116. time.Sleep(55000 * time.Second)
  117. //launcher.Stop()
  118. sim.shutdown()
  119. //sim.printStats()
  120. //printNet.log.printLogs()
  121. }
  122. /*func testHierarchicalTopics(i int) []Topic {
  123. digits := strconv.FormatInt(int64(256+i/4), 4)
  124. res := make([]Topic, 5)
  125. for i, _ := range res {
  126. res[i] = Topic("foo" + digits[1:i+1])
  127. }
  128. return res
  129. }*/
  130. func testHierarchicalTopics(i int) []Topic {
  131. digits := strconv.FormatInt(int64(128+i/8), 2)
  132. res := make([]Topic, 8)
  133. for i := range res {
  134. res[i] = Topic("foo" + digits[1:i+1])
  135. }
  136. return res
  137. }
  138. func TestSimTopicHierarchy(t *testing.T) {
  139. t.Skip("NaCl test")
  140. if runWithPlaygroundTime(t) {
  141. return
  142. }
  143. sim := newSimulation()
  144. bootnode := sim.launchNode(false)
  145. go func() {
  146. nets := make([]*Network, 1024)
  147. for i := range nets {
  148. net := sim.launchNode(false)
  149. nets[i] = net
  150. if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil {
  151. panic(err)
  152. }
  153. time.Sleep(time.Second * 5)
  154. }
  155. stop := make(chan struct{})
  156. for i, net := range nets {
  157. //if i < 256 {
  158. for _, topic := range testHierarchicalTopics(i)[:5] {
  159. //fmt.Println("reg", topic)
  160. go net.RegisterTopic(topic, stop)
  161. }
  162. time.Sleep(time.Millisecond * 100)
  163. //}
  164. }
  165. time.Sleep(time.Second * 90000)
  166. close(stop)
  167. }()
  168. time.Sleep(100000 * time.Second)
  169. sim.shutdown()
  170. }
  171. func randomResolves(t *testing.T, s *simulation, net *Network) {
  172. randtime := func() time.Duration {
  173. return time.Duration(rand.Intn(50)+20) * time.Second
  174. }
  175. lookup := func(target NodeID) bool {
  176. result := net.Resolve(target)
  177. return result != nil && result.ID == target
  178. }
  179. timer := time.NewTimer(randtime())
  180. for {
  181. select {
  182. case <-timer.C:
  183. target := s.randomNode().Self().ID
  184. if !lookup(target) {
  185. t.Errorf("node %x: target %x not found", net.Self().ID[:8], target[:8])
  186. }
  187. timer.Reset(randtime())
  188. case <-net.closed:
  189. return
  190. }
  191. }
  192. }
  193. type simulation struct {
  194. mu sync.RWMutex
  195. nodes map[NodeID]*Network
  196. nodectr uint32
  197. }
  198. func newSimulation() *simulation {
  199. return &simulation{nodes: make(map[NodeID]*Network)}
  200. }
  201. func (s *simulation) shutdown() {
  202. s.mu.RLock()
  203. alive := make([]*Network, 0, len(s.nodes))
  204. for _, n := range s.nodes {
  205. alive = append(alive, n)
  206. }
  207. defer s.mu.RUnlock()
  208. for _, n := range alive {
  209. n.Close()
  210. }
  211. }
  212. func (s *simulation) printStats() {
  213. s.mu.Lock()
  214. defer s.mu.Unlock()
  215. fmt.Println("node counter:", s.nodectr)
  216. fmt.Println("alive nodes:", len(s.nodes))
  217. // for _, n := range s.nodes {
  218. // fmt.Printf("%x\n", n.tab.self.ID[:8])
  219. // transport := n.conn.(*simTransport)
  220. // fmt.Println(" joined:", transport.joinTime)
  221. // fmt.Println(" sends:", transport.hashctr)
  222. // fmt.Println(" table size:", n.tab.count)
  223. // }
  224. /*for _, n := range s.nodes {
  225. fmt.Println()
  226. fmt.Printf("*** Node %x\n", n.tab.self.ID[:8])
  227. n.log.printLogs()
  228. }*/
  229. }
  230. func (s *simulation) randomNode() *Network {
  231. s.mu.Lock()
  232. defer s.mu.Unlock()
  233. n := rand.Intn(len(s.nodes))
  234. for _, net := range s.nodes {
  235. if n == 0 {
  236. return net
  237. }
  238. n--
  239. }
  240. return nil
  241. }
  242. func (s *simulation) launchNode(log bool) *Network {
  243. var (
  244. num = s.nodectr
  245. key = newkey()
  246. id = PubkeyID(&key.PublicKey)
  247. ip = make(net.IP, 4)
  248. )
  249. s.nodectr++
  250. binary.BigEndian.PutUint32(ip, num)
  251. ip[0] = 10
  252. addr := &net.UDPAddr{IP: ip, Port: 30303}
  253. transport := &simTransport{joinTime: time.Now(), sender: id, senderAddr: addr, sim: s, priv: key}
  254. net, err := newNetwork(transport, key.PublicKey, "<no database>", nil)
  255. if err != nil {
  256. panic("cannot launch new node: " + err.Error())
  257. }
  258. s.mu.Lock()
  259. s.nodes[id] = net
  260. s.mu.Unlock()
  261. return net
  262. }
  263. func (s *simulation) dropNode(id NodeID) {
  264. s.mu.Lock()
  265. n := s.nodes[id]
  266. delete(s.nodes, id)
  267. s.mu.Unlock()
  268. n.Close()
  269. }
  270. type simTransport struct {
  271. joinTime time.Time
  272. sender NodeID
  273. senderAddr *net.UDPAddr
  274. sim *simulation
  275. hashctr uint64
  276. priv *ecdsa.PrivateKey
  277. }
  278. func (st *simTransport) localAddr() *net.UDPAddr {
  279. return st.senderAddr
  280. }
  281. func (st *simTransport) Close() {}
  282. func (st *simTransport) send(remote *Node, ptype nodeEvent, data interface{}) (hash []byte) {
  283. hash = st.nextHash()
  284. var raw []byte
  285. if ptype == pongPacket {
  286. var err error
  287. raw, _, err = encodePacket(st.priv, byte(ptype), data)
  288. if err != nil {
  289. panic(err)
  290. }
  291. }
  292. st.sendPacket(remote.ID, ingressPacket{
  293. remoteID: st.sender,
  294. remoteAddr: st.senderAddr,
  295. hash: hash,
  296. ev: ptype,
  297. data: data,
  298. rawData: raw,
  299. })
  300. return hash
  301. }
  302. func (st *simTransport) sendPing(remote *Node, remoteAddr *net.UDPAddr, topics []Topic) []byte {
  303. hash := st.nextHash()
  304. st.sendPacket(remote.ID, ingressPacket{
  305. remoteID: st.sender,
  306. remoteAddr: st.senderAddr,
  307. hash: hash,
  308. ev: pingPacket,
  309. data: &ping{
  310. Version: 4,
  311. From: rpcEndpoint{IP: st.senderAddr.IP, UDP: uint16(st.senderAddr.Port), TCP: 30303},
  312. To: rpcEndpoint{IP: remoteAddr.IP, UDP: uint16(remoteAddr.Port), TCP: 30303},
  313. Expiration: uint64(time.Now().Unix() + int64(expiration)),
  314. Topics: topics,
  315. },
  316. })
  317. return hash
  318. }
  319. func (st *simTransport) sendPong(remote *Node, pingHash []byte) {
  320. raddr := remote.addr()
  321. st.sendPacket(remote.ID, ingressPacket{
  322. remoteID: st.sender,
  323. remoteAddr: st.senderAddr,
  324. hash: st.nextHash(),
  325. ev: pongPacket,
  326. data: &pong{
  327. To: rpcEndpoint{IP: raddr.IP, UDP: uint16(raddr.Port), TCP: 30303},
  328. ReplyTok: pingHash,
  329. Expiration: uint64(time.Now().Unix() + int64(expiration)),
  330. },
  331. })
  332. }
  333. func (st *simTransport) sendFindnodeHash(remote *Node, target common.Hash) {
  334. st.sendPacket(remote.ID, ingressPacket{
  335. remoteID: st.sender,
  336. remoteAddr: st.senderAddr,
  337. hash: st.nextHash(),
  338. ev: findnodeHashPacket,
  339. data: &findnodeHash{
  340. Target: target,
  341. Expiration: uint64(time.Now().Unix() + int64(expiration)),
  342. },
  343. })
  344. }
  345. func (st *simTransport) sendTopicRegister(remote *Node, topics []Topic, idx int, pong []byte) {
  346. //fmt.Println("send", topics, pong)
  347. st.sendPacket(remote.ID, ingressPacket{
  348. remoteID: st.sender,
  349. remoteAddr: st.senderAddr,
  350. hash: st.nextHash(),
  351. ev: topicRegisterPacket,
  352. data: &topicRegister{
  353. Topics: topics,
  354. Idx: uint(idx),
  355. Pong: pong,
  356. },
  357. })
  358. }
  359. func (st *simTransport) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node) {
  360. rnodes := make([]rpcNode, len(nodes))
  361. for i := range nodes {
  362. rnodes[i] = nodeToRPC(nodes[i])
  363. }
  364. st.sendPacket(remote.ID, ingressPacket{
  365. remoteID: st.sender,
  366. remoteAddr: st.senderAddr,
  367. hash: st.nextHash(),
  368. ev: topicNodesPacket,
  369. data: &topicNodes{Echo: queryHash, Nodes: rnodes},
  370. })
  371. }
  372. func (st *simTransport) sendNeighbours(remote *Node, nodes []*Node) {
  373. // TODO: send multiple packets
  374. rnodes := make([]rpcNode, len(nodes))
  375. for i := range nodes {
  376. rnodes[i] = nodeToRPC(nodes[i])
  377. }
  378. st.sendPacket(remote.ID, ingressPacket{
  379. remoteID: st.sender,
  380. remoteAddr: st.senderAddr,
  381. hash: st.nextHash(),
  382. ev: neighborsPacket,
  383. data: &neighbors{
  384. Nodes: rnodes,
  385. Expiration: uint64(time.Now().Unix() + int64(expiration)),
  386. },
  387. })
  388. }
  389. func (st *simTransport) nextHash() []byte {
  390. v := atomic.AddUint64(&st.hashctr, 1)
  391. var hash common.Hash
  392. binary.BigEndian.PutUint64(hash[:], v)
  393. return hash[:]
  394. }
  395. const packetLoss = 0 // 1/1000
  396. func (st *simTransport) sendPacket(remote NodeID, p ingressPacket) {
  397. if rand.Int31n(1000) >= packetLoss {
  398. st.sim.mu.RLock()
  399. recipient := st.sim.nodes[remote]
  400. st.sim.mu.RUnlock()
  401. time.AfterFunc(200*time.Millisecond, func() {
  402. recipient.reqReadPacket(p)
  403. })
  404. }
  405. }