ticket.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discv5
  17. import (
  18. "bytes"
  19. "encoding/binary"
  20. "fmt"
  21. "math"
  22. "math/rand"
  23. "sort"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/mclock"
  27. "github.com/ethereum/go-ethereum/crypto"
  28. "github.com/ethereum/go-ethereum/log"
  29. )
  30. const (
  31. ticketTimeBucketLen = time.Minute
  32. timeWindow = 10 // * ticketTimeBucketLen
  33. wantTicketsInWindow = 10
  34. collectFrequency = time.Second * 30
  35. registerFrequency = time.Second * 60
  36. maxCollectDebt = 10
  37. maxRegisterDebt = 5
  38. keepTicketConst = time.Minute * 10
  39. keepTicketExp = time.Minute * 5
  40. targetWaitTime = time.Minute * 10
  41. topicQueryTimeout = time.Second * 5
  42. topicQueryResend = time.Minute
  43. // topic radius detection
  44. maxRadius = 0xffffffffffffffff
  45. radiusTC = time.Minute * 20
  46. radiusBucketsPerBit = 8
  47. minSlope = 1
  48. minPeakSize = 40
  49. maxNoAdjust = 20
  50. lookupWidth = 8
  51. minRightSum = 20
  52. searchForceQuery = 4
  53. )
  54. // timeBucket represents absolute monotonic time in minutes.
  55. // It is used as the index into the per-topic ticket buckets.
  56. type timeBucket int
  57. type ticket struct {
  58. topics []Topic
  59. regTime []mclock.AbsTime // Per-topic local absolute time when the ticket can be used.
  60. // The serial number that was issued by the server.
  61. serial uint32
  62. // Used by registrar, tracks absolute time when the ticket was created.
  63. issueTime mclock.AbsTime
  64. // Fields used only by registrants
  65. node *Node // the registrar node that signed this ticket
  66. refCnt int // tracks number of topics that will be registered using this ticket
  67. pong []byte // encoded pong packet signed by the registrar
  68. }
  69. // ticketRef refers to a single topic in a ticket.
  70. type ticketRef struct {
  71. t *ticket
  72. idx int // index of the topic in t.topics and t.regTime
  73. }
  74. func (ref ticketRef) topic() Topic {
  75. return ref.t.topics[ref.idx]
  76. }
  77. func (ref ticketRef) topicRegTime() mclock.AbsTime {
  78. return ref.t.regTime[ref.idx]
  79. }
  80. func pongToTicket(localTime mclock.AbsTime, topics []Topic, node *Node, p *ingressPacket) (*ticket, error) {
  81. wps := p.data.(*pong).WaitPeriods
  82. if len(topics) != len(wps) {
  83. return nil, fmt.Errorf("bad wait period list: got %d values, want %d", len(topics), len(wps))
  84. }
  85. if rlpHash(topics) != p.data.(*pong).TopicHash {
  86. return nil, fmt.Errorf("bad topic hash")
  87. }
  88. t := &ticket{
  89. issueTime: localTime,
  90. node: node,
  91. topics: topics,
  92. pong: p.rawData,
  93. regTime: make([]mclock.AbsTime, len(wps)),
  94. }
  95. // Convert wait periods to local absolute time.
  96. for i, wp := range wps {
  97. t.regTime[i] = localTime + mclock.AbsTime(time.Second*time.Duration(wp))
  98. }
  99. return t, nil
  100. }
  101. func ticketToPong(t *ticket, pong *pong) {
  102. pong.Expiration = uint64(t.issueTime / mclock.AbsTime(time.Second))
  103. pong.TopicHash = rlpHash(t.topics)
  104. pong.TicketSerial = t.serial
  105. pong.WaitPeriods = make([]uint32, len(t.regTime))
  106. for i, regTime := range t.regTime {
  107. pong.WaitPeriods[i] = uint32(time.Duration(regTime-t.issueTime) / time.Second)
  108. }
  109. }
  110. type ticketStore struct {
  111. // radius detector and target address generator
  112. // exists for both searched and registered topics
  113. radius map[Topic]*topicRadius
  114. // Contains buckets (for each absolute minute) of tickets
  115. // that can be used in that minute.
  116. // This is only set if the topic is being registered.
  117. tickets map[Topic]*topicTickets
  118. regQueue []Topic // Topic registration queue for round robin attempts
  119. regSet map[Topic]struct{} // Topic registration queue contents for fast filling
  120. nodes map[*Node]*ticket
  121. nodeLastReq map[*Node]reqInfo
  122. lastBucketFetched timeBucket
  123. nextTicketCached *ticketRef
  124. nextTicketReg mclock.AbsTime
  125. searchTopicMap map[Topic]searchTopic
  126. nextTopicQueryCleanup mclock.AbsTime
  127. queriesSent map[*Node]map[common.Hash]sentQuery
  128. }
  129. type searchTopic struct {
  130. foundChn chan<- *Node
  131. }
  132. type sentQuery struct {
  133. sent mclock.AbsTime
  134. lookup lookupInfo
  135. }
  136. type topicTickets struct {
  137. buckets map[timeBucket][]ticketRef
  138. nextLookup mclock.AbsTime
  139. nextReg mclock.AbsTime
  140. }
  141. func newTicketStore() *ticketStore {
  142. return &ticketStore{
  143. radius: make(map[Topic]*topicRadius),
  144. tickets: make(map[Topic]*topicTickets),
  145. regSet: make(map[Topic]struct{}),
  146. nodes: make(map[*Node]*ticket),
  147. nodeLastReq: make(map[*Node]reqInfo),
  148. searchTopicMap: make(map[Topic]searchTopic),
  149. queriesSent: make(map[*Node]map[common.Hash]sentQuery),
  150. }
  151. }
  152. // addTopic starts tracking a topic. If register is true,
  153. // the local node will register the topic and tickets will be collected.
  154. func (s *ticketStore) addTopic(topic Topic, register bool) {
  155. log.Trace("Adding discovery topic", "topic", topic, "register", register)
  156. if s.radius[topic] == nil {
  157. s.radius[topic] = newTopicRadius(topic)
  158. }
  159. if register && s.tickets[topic] == nil {
  160. s.tickets[topic] = &topicTickets{buckets: make(map[timeBucket][]ticketRef)}
  161. }
  162. }
  163. func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- *Node) {
  164. s.addTopic(t, false)
  165. if s.searchTopicMap[t].foundChn == nil {
  166. s.searchTopicMap[t] = searchTopic{foundChn: foundChn}
  167. }
  168. }
  169. func (s *ticketStore) removeSearchTopic(t Topic) {
  170. if st := s.searchTopicMap[t]; st.foundChn != nil {
  171. delete(s.searchTopicMap, t)
  172. }
  173. }
  174. // removeRegisterTopic deletes all tickets for the given topic.
  175. func (s *ticketStore) removeRegisterTopic(topic Topic) {
  176. log.Trace("Removing discovery topic", "topic", topic)
  177. if s.tickets[topic] == nil {
  178. log.Warn("Removing non-existent discovery topic", "topic", topic)
  179. return
  180. }
  181. for _, list := range s.tickets[topic].buckets {
  182. for _, ref := range list {
  183. ref.t.refCnt--
  184. if ref.t.refCnt == 0 {
  185. delete(s.nodes, ref.t.node)
  186. delete(s.nodeLastReq, ref.t.node)
  187. }
  188. }
  189. }
  190. delete(s.tickets, topic)
  191. }
  192. func (s *ticketStore) regTopicSet() []Topic {
  193. topics := make([]Topic, 0, len(s.tickets))
  194. for topic := range s.tickets {
  195. topics = append(topics, topic)
  196. }
  197. return topics
  198. }
  199. // nextRegisterLookup returns the target of the next lookup for ticket collection.
  200. func (s *ticketStore) nextRegisterLookup() (lookupInfo, time.Duration) {
  201. // Queue up any new topics (or discarded ones), preserving iteration order
  202. for topic := range s.tickets {
  203. if _, ok := s.regSet[topic]; !ok {
  204. s.regQueue = append(s.regQueue, topic)
  205. s.regSet[topic] = struct{}{}
  206. }
  207. }
  208. // Iterate over the set of all topics and look up the next suitable one
  209. for len(s.regQueue) > 0 {
  210. // Fetch the next topic from the queue, and ensure it still exists
  211. topic := s.regQueue[0]
  212. s.regQueue = s.regQueue[1:]
  213. delete(s.regSet, topic)
  214. if s.tickets[topic] == nil {
  215. continue
  216. }
  217. // If the topic needs more tickets, return it
  218. if s.tickets[topic].nextLookup < mclock.Now() {
  219. next, delay := s.radius[topic].nextTarget(false), 100*time.Millisecond
  220. log.Trace("Found discovery topic to register", "topic", topic, "target", next.target, "delay", delay)
  221. return next, delay
  222. }
  223. }
  224. // No registration topics found or all exhausted, sleep
  225. delay := 40 * time.Second
  226. log.Trace("No topic found to register", "delay", delay)
  227. return lookupInfo{}, delay
  228. }
  229. func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo {
  230. tr := s.radius[topic]
  231. target := tr.nextTarget(tr.radiusLookupCnt >= searchForceQuery)
  232. if target.radiusLookup {
  233. tr.radiusLookupCnt++
  234. } else {
  235. tr.radiusLookupCnt = 0
  236. }
  237. return target
  238. }
  239. // ticketsInWindow returns the tickets of a given topic in the registration window.
  240. func (s *ticketStore) ticketsInWindow(topic Topic) []ticketRef {
  241. // Sanity check that the topic still exists before operating on it
  242. if s.tickets[topic] == nil {
  243. log.Warn("Listing non-existing discovery tickets", "topic", topic)
  244. return nil
  245. }
  246. // Gather all the tickers in the next time window
  247. var tickets []ticketRef
  248. buckets := s.tickets[topic].buckets
  249. for idx := timeBucket(0); idx < timeWindow; idx++ {
  250. tickets = append(tickets, buckets[s.lastBucketFetched+idx]...)
  251. }
  252. log.Trace("Retrieved discovery registration tickets", "topic", topic, "from", s.lastBucketFetched, "tickets", len(tickets))
  253. return tickets
  254. }
  255. func (s *ticketStore) removeExcessTickets(t Topic) {
  256. tickets := s.ticketsInWindow(t)
  257. if len(tickets) <= wantTicketsInWindow {
  258. return
  259. }
  260. sort.Sort(ticketRefByWaitTime(tickets))
  261. for _, r := range tickets[wantTicketsInWindow:] {
  262. s.removeTicketRef(r)
  263. }
  264. }
  265. type ticketRefByWaitTime []ticketRef
  266. // Len is the number of elements in the collection.
  267. func (s ticketRefByWaitTime) Len() int {
  268. return len(s)
  269. }
  270. func (ref ticketRef) waitTime() mclock.AbsTime {
  271. return ref.t.regTime[ref.idx] - ref.t.issueTime
  272. }
  273. // Less reports whether the element with
  274. // index i should sort before the element with index j.
  275. func (s ticketRefByWaitTime) Less(i, j int) bool {
  276. return s[i].waitTime() < s[j].waitTime()
  277. }
  278. // Swap swaps the elements with indexes i and j.
  279. func (s ticketRefByWaitTime) Swap(i, j int) {
  280. s[i], s[j] = s[j], s[i]
  281. }
  282. func (s *ticketStore) addTicketRef(r ticketRef) {
  283. topic := r.t.topics[r.idx]
  284. tickets := s.tickets[topic]
  285. if tickets == nil {
  286. log.Warn("Adding ticket to non-existent topic", "topic", topic)
  287. return
  288. }
  289. bucket := timeBucket(r.t.regTime[r.idx] / mclock.AbsTime(ticketTimeBucketLen))
  290. tickets.buckets[bucket] = append(tickets.buckets[bucket], r)
  291. r.t.refCnt++
  292. min := mclock.Now() - mclock.AbsTime(collectFrequency)*maxCollectDebt
  293. if tickets.nextLookup < min {
  294. tickets.nextLookup = min
  295. }
  296. tickets.nextLookup += mclock.AbsTime(collectFrequency)
  297. //s.removeExcessTickets(topic)
  298. }
  299. func (s *ticketStore) nextFilteredTicket() (*ticketRef, time.Duration) {
  300. now := mclock.Now()
  301. for {
  302. ticket, wait := s.nextRegisterableTicket()
  303. if ticket == nil {
  304. return ticket, wait
  305. }
  306. log.Trace("Found discovery ticket to register", "node", ticket.t.node, "serial", ticket.t.serial, "wait", wait)
  307. regTime := now + mclock.AbsTime(wait)
  308. topic := ticket.t.topics[ticket.idx]
  309. if s.tickets[topic] != nil && regTime >= s.tickets[topic].nextReg {
  310. return ticket, wait
  311. }
  312. s.removeTicketRef(*ticket)
  313. }
  314. }
  315. func (s *ticketStore) ticketRegistered(ref ticketRef) {
  316. now := mclock.Now()
  317. topic := ref.t.topics[ref.idx]
  318. tickets := s.tickets[topic]
  319. min := now - mclock.AbsTime(registerFrequency)*maxRegisterDebt
  320. if min > tickets.nextReg {
  321. tickets.nextReg = min
  322. }
  323. tickets.nextReg += mclock.AbsTime(registerFrequency)
  324. s.tickets[topic] = tickets
  325. s.removeTicketRef(ref)
  326. }
  327. // nextRegisterableTicket returns the next ticket that can be used
  328. // to register.
  329. //
  330. // If the returned wait time <= zero the ticket can be used. For a positive
  331. // wait time, the caller should requery the next ticket later.
  332. //
  333. // A ticket can be returned more than once with <= zero wait time in case
  334. // the ticket contains multiple topics.
  335. func (s *ticketStore) nextRegisterableTicket() (*ticketRef, time.Duration) {
  336. now := mclock.Now()
  337. if s.nextTicketCached != nil {
  338. return s.nextTicketCached, time.Duration(s.nextTicketCached.topicRegTime() - now)
  339. }
  340. for bucket := s.lastBucketFetched; ; bucket++ {
  341. var (
  342. empty = true // true if there are no tickets
  343. nextTicket ticketRef // uninitialized if this bucket is empty
  344. )
  345. for _, tickets := range s.tickets {
  346. //s.removeExcessTickets(topic)
  347. if len(tickets.buckets) != 0 {
  348. empty = false
  349. list := tickets.buckets[bucket]
  350. for _, ref := range list {
  351. //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now)))
  352. if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() {
  353. nextTicket = ref
  354. }
  355. }
  356. }
  357. }
  358. if empty {
  359. return nil, 0
  360. }
  361. if nextTicket.t != nil {
  362. s.nextTicketCached = &nextTicket
  363. return &nextTicket, time.Duration(nextTicket.topicRegTime() - now)
  364. }
  365. s.lastBucketFetched = bucket
  366. }
  367. }
  368. // removeTicket removes a ticket from the ticket store
  369. func (s *ticketStore) removeTicketRef(ref ticketRef) {
  370. log.Trace("Removing discovery ticket reference", "node", ref.t.node.ID, "serial", ref.t.serial)
  371. // Make nextRegisterableTicket return the next available ticket.
  372. s.nextTicketCached = nil
  373. topic := ref.topic()
  374. tickets := s.tickets[topic]
  375. if tickets == nil {
  376. log.Trace("Removing tickets from unknown topic", "topic", topic)
  377. return
  378. }
  379. bucket := timeBucket(ref.t.regTime[ref.idx] / mclock.AbsTime(ticketTimeBucketLen))
  380. list := tickets.buckets[bucket]
  381. idx := -1
  382. for i, bt := range list {
  383. if bt.t == ref.t {
  384. idx = i
  385. break
  386. }
  387. }
  388. if idx == -1 {
  389. panic(nil)
  390. }
  391. list = append(list[:idx], list[idx+1:]...)
  392. if len(list) != 0 {
  393. tickets.buckets[bucket] = list
  394. } else {
  395. delete(tickets.buckets, bucket)
  396. }
  397. ref.t.refCnt--
  398. if ref.t.refCnt == 0 {
  399. delete(s.nodes, ref.t.node)
  400. delete(s.nodeLastReq, ref.t.node)
  401. }
  402. }
  403. type lookupInfo struct {
  404. target common.Hash
  405. topic Topic
  406. radiusLookup bool
  407. }
  408. type reqInfo struct {
  409. pingHash []byte
  410. lookup lookupInfo
  411. time mclock.AbsTime
  412. }
  413. // returns -1 if not found
  414. func (t *ticket) findIdx(topic Topic) int {
  415. for i, tt := range t.topics {
  416. if tt == topic {
  417. return i
  418. }
  419. }
  420. return -1
  421. }
  422. func (s *ticketStore) registerLookupDone(lookup lookupInfo, nodes []*Node, ping func(n *Node) []byte) {
  423. now := mclock.Now()
  424. for i, n := range nodes {
  425. if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
  426. if lookup.radiusLookup {
  427. if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
  428. s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
  429. }
  430. } else {
  431. if s.nodes[n] == nil {
  432. s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
  433. }
  434. }
  435. }
  436. }
  437. }
  438. func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, query func(n *Node, topic Topic) []byte) {
  439. now := mclock.Now()
  440. for i, n := range nodes {
  441. if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
  442. if lookup.radiusLookup {
  443. if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
  444. s.nodeLastReq[n] = reqInfo{pingHash: nil, lookup: lookup, time: now}
  445. }
  446. } // else {
  447. if s.canQueryTopic(n, lookup.topic) {
  448. hash := query(n, lookup.topic)
  449. if hash != nil {
  450. s.addTopicQuery(common.BytesToHash(hash), n, lookup)
  451. }
  452. }
  453. //}
  454. }
  455. }
  456. }
  457. func (s *ticketStore) adjustWithTicket(now mclock.AbsTime, targetHash common.Hash, t *ticket) {
  458. for i, topic := range t.topics {
  459. if tt, ok := s.radius[topic]; ok {
  460. tt.adjustWithTicket(now, targetHash, ticketRef{t, i})
  461. }
  462. }
  463. }
  464. func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, ticket *ticket) {
  465. log.Trace("Adding discovery ticket", "node", ticket.node.ID, "serial", ticket.serial)
  466. lastReq, ok := s.nodeLastReq[ticket.node]
  467. if !(ok && bytes.Equal(pingHash, lastReq.pingHash)) {
  468. return
  469. }
  470. s.adjustWithTicket(localTime, lastReq.lookup.target, ticket)
  471. if lastReq.lookup.radiusLookup || s.nodes[ticket.node] != nil {
  472. return
  473. }
  474. topic := lastReq.lookup.topic
  475. topicIdx := ticket.findIdx(topic)
  476. if topicIdx == -1 {
  477. return
  478. }
  479. bucket := timeBucket(localTime / mclock.AbsTime(ticketTimeBucketLen))
  480. if s.lastBucketFetched == 0 || bucket < s.lastBucketFetched {
  481. s.lastBucketFetched = bucket
  482. }
  483. if _, ok := s.tickets[topic]; ok {
  484. wait := ticket.regTime[topicIdx] - localTime
  485. rnd := rand.ExpFloat64()
  486. if rnd > 10 {
  487. rnd = 10
  488. }
  489. if float64(wait) < float64(keepTicketConst)+float64(keepTicketExp)*rnd {
  490. // use the ticket to register this topic
  491. //fmt.Println("addTicket", ticket.node.ID[:8], ticket.node.addr().String(), ticket.serial, ticket.pong)
  492. s.addTicketRef(ticketRef{ticket, topicIdx})
  493. }
  494. }
  495. if ticket.refCnt > 0 {
  496. s.nextTicketCached = nil
  497. s.nodes[ticket.node] = ticket
  498. }
  499. }
  500. func (s *ticketStore) getNodeTicket(node *Node) *ticket {
  501. if s.nodes[node] == nil {
  502. log.Trace("Retrieving node ticket", "node", node.ID, "serial", nil)
  503. } else {
  504. log.Trace("Retrieving node ticket", "node", node.ID, "serial", s.nodes[node].serial)
  505. }
  506. return s.nodes[node]
  507. }
  508. func (s *ticketStore) canQueryTopic(node *Node, topic Topic) bool {
  509. qq := s.queriesSent[node]
  510. if qq != nil {
  511. now := mclock.Now()
  512. for _, sq := range qq {
  513. if sq.lookup.topic == topic && sq.sent > now-mclock.AbsTime(topicQueryResend) {
  514. return false
  515. }
  516. }
  517. }
  518. return true
  519. }
  520. func (s *ticketStore) addTopicQuery(hash common.Hash, node *Node, lookup lookupInfo) {
  521. now := mclock.Now()
  522. qq := s.queriesSent[node]
  523. if qq == nil {
  524. qq = make(map[common.Hash]sentQuery)
  525. s.queriesSent[node] = qq
  526. }
  527. qq[hash] = sentQuery{sent: now, lookup: lookup}
  528. s.cleanupTopicQueries(now)
  529. }
  530. func (s *ticketStore) cleanupTopicQueries(now mclock.AbsTime) {
  531. if s.nextTopicQueryCleanup > now {
  532. return
  533. }
  534. exp := now - mclock.AbsTime(topicQueryResend)
  535. for n, qq := range s.queriesSent {
  536. for h, q := range qq {
  537. if q.sent < exp {
  538. delete(qq, h)
  539. }
  540. }
  541. if len(qq) == 0 {
  542. delete(s.queriesSent, n)
  543. }
  544. }
  545. s.nextTopicQueryCleanup = now + mclock.AbsTime(topicQueryTimeout)
  546. }
  547. func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNode) (timeout bool) {
  548. now := mclock.Now()
  549. //fmt.Println("got", from.addr().String(), hash, len(nodes))
  550. qq := s.queriesSent[from]
  551. if qq == nil {
  552. return true
  553. }
  554. q, ok := qq[hash]
  555. if !ok || now > q.sent+mclock.AbsTime(topicQueryTimeout) {
  556. return true
  557. }
  558. inside := float64(0)
  559. if len(nodes) > 0 {
  560. inside = 1
  561. }
  562. s.radius[q.lookup.topic].adjust(now, q.lookup.target, from.sha, inside)
  563. chn := s.searchTopicMap[q.lookup.topic].foundChn
  564. if chn == nil {
  565. //fmt.Println("no channel")
  566. return false
  567. }
  568. for _, node := range nodes {
  569. ip := node.IP
  570. if ip.IsUnspecified() || ip.IsLoopback() {
  571. ip = from.IP
  572. }
  573. n := NewNode(node.ID, ip, node.UDP, node.TCP)
  574. select {
  575. case chn <- n:
  576. default:
  577. return false
  578. }
  579. }
  580. return false
  581. }
  582. type topicRadius struct {
  583. topic Topic
  584. topicHashPrefix uint64
  585. radius, minRadius uint64
  586. buckets []topicRadiusBucket
  587. converged bool
  588. radiusLookupCnt int
  589. }
  590. type topicRadiusEvent int
  591. const (
  592. trOutside topicRadiusEvent = iota
  593. trInside
  594. trNoAdjust
  595. trCount
  596. )
  597. type topicRadiusBucket struct {
  598. weights [trCount]float64
  599. lastTime mclock.AbsTime
  600. value float64
  601. lookupSent map[common.Hash]mclock.AbsTime
  602. }
  603. func (b *topicRadiusBucket) update(now mclock.AbsTime) {
  604. if now == b.lastTime {
  605. return
  606. }
  607. exp := math.Exp(-float64(now-b.lastTime) / float64(radiusTC))
  608. for i, w := range b.weights {
  609. b.weights[i] = w * exp
  610. }
  611. b.lastTime = now
  612. for target, tm := range b.lookupSent {
  613. if now-tm > mclock.AbsTime(respTimeout) {
  614. b.weights[trNoAdjust] += 1
  615. delete(b.lookupSent, target)
  616. }
  617. }
  618. }
  619. func (b *topicRadiusBucket) adjust(now mclock.AbsTime, inside float64) {
  620. b.update(now)
  621. if inside <= 0 {
  622. b.weights[trOutside] += 1
  623. } else {
  624. if inside >= 1 {
  625. b.weights[trInside] += 1
  626. } else {
  627. b.weights[trInside] += inside
  628. b.weights[trOutside] += 1 - inside
  629. }
  630. }
  631. }
  632. func newTopicRadius(t Topic) *topicRadius {
  633. topicHash := crypto.Keccak256Hash([]byte(t))
  634. topicHashPrefix := binary.BigEndian.Uint64(topicHash[0:8])
  635. return &topicRadius{
  636. topic: t,
  637. topicHashPrefix: topicHashPrefix,
  638. radius: maxRadius,
  639. minRadius: maxRadius,
  640. }
  641. }
  642. func (r *topicRadius) getBucketIdx(addrHash common.Hash) int {
  643. prefix := binary.BigEndian.Uint64(addrHash[0:8])
  644. var log2 float64
  645. if prefix != r.topicHashPrefix {
  646. log2 = math.Log2(float64(prefix ^ r.topicHashPrefix))
  647. }
  648. bucket := int((64 - log2) * radiusBucketsPerBit)
  649. max := 64*radiusBucketsPerBit - 1
  650. if bucket > max {
  651. return max
  652. }
  653. if bucket < 0 {
  654. return 0
  655. }
  656. return bucket
  657. }
  658. func (r *topicRadius) targetForBucket(bucket int) common.Hash {
  659. min := math.Pow(2, 64-float64(bucket+1)/radiusBucketsPerBit)
  660. max := math.Pow(2, 64-float64(bucket)/radiusBucketsPerBit)
  661. a := uint64(min)
  662. b := randUint64n(uint64(max - min))
  663. xor := a + b
  664. if xor < a {
  665. xor = ^uint64(0)
  666. }
  667. prefix := r.topicHashPrefix ^ xor
  668. var target common.Hash
  669. binary.BigEndian.PutUint64(target[0:8], prefix)
  670. globalRandRead(target[8:])
  671. return target
  672. }
  673. // package rand provides a Read function in Go 1.6 and later, but
  674. // we can't use it yet because we still support Go 1.5.
  675. func globalRandRead(b []byte) {
  676. pos := 0
  677. val := 0
  678. for n := 0; n < len(b); n++ {
  679. if pos == 0 {
  680. val = rand.Int()
  681. pos = 7
  682. }
  683. b[n] = byte(val)
  684. val >>= 8
  685. pos--
  686. }
  687. }
  688. func (r *topicRadius) isInRadius(addrHash common.Hash) bool {
  689. nodePrefix := binary.BigEndian.Uint64(addrHash[0:8])
  690. dist := nodePrefix ^ r.topicHashPrefix
  691. return dist < r.radius
  692. }
  693. func (r *topicRadius) chooseLookupBucket(a, b int) int {
  694. if a < 0 {
  695. a = 0
  696. }
  697. if a > b {
  698. return -1
  699. }
  700. c := 0
  701. for i := a; i <= b; i++ {
  702. if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
  703. c++
  704. }
  705. }
  706. if c == 0 {
  707. return -1
  708. }
  709. rnd := randUint(uint32(c))
  710. for i := a; i <= b; i++ {
  711. if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
  712. if rnd == 0 {
  713. return i
  714. }
  715. rnd--
  716. }
  717. }
  718. panic(nil) // should never happen
  719. }
  720. func (r *topicRadius) needMoreLookups(a, b int, maxValue float64) bool {
  721. var max float64
  722. if a < 0 {
  723. a = 0
  724. }
  725. if b >= len(r.buckets) {
  726. b = len(r.buckets) - 1
  727. if r.buckets[b].value > max {
  728. max = r.buckets[b].value
  729. }
  730. }
  731. if b >= a {
  732. for i := a; i <= b; i++ {
  733. if r.buckets[i].value > max {
  734. max = r.buckets[i].value
  735. }
  736. }
  737. }
  738. return maxValue-max < minPeakSize
  739. }
  740. func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) {
  741. maxBucket := 0
  742. maxValue := float64(0)
  743. now := mclock.Now()
  744. v := float64(0)
  745. for i := range r.buckets {
  746. r.buckets[i].update(now)
  747. v += r.buckets[i].weights[trOutside] - r.buckets[i].weights[trInside]
  748. r.buckets[i].value = v
  749. //fmt.Printf("%v %v | ", v, r.buckets[i].weights[trNoAdjust])
  750. }
  751. //fmt.Println()
  752. slopeCross := -1
  753. for i, b := range r.buckets {
  754. v := b.value
  755. if v < float64(i)*minSlope {
  756. slopeCross = i
  757. break
  758. }
  759. if v > maxValue {
  760. maxValue = v
  761. maxBucket = i + 1
  762. }
  763. }
  764. minRadBucket := len(r.buckets)
  765. sum := float64(0)
  766. for minRadBucket > 0 && sum < minRightSum {
  767. minRadBucket--
  768. b := r.buckets[minRadBucket]
  769. sum += b.weights[trInside] + b.weights[trOutside]
  770. }
  771. r.minRadius = uint64(math.Pow(2, 64-float64(minRadBucket)/radiusBucketsPerBit))
  772. lookupLeft := -1
  773. if r.needMoreLookups(0, maxBucket-lookupWidth-1, maxValue) {
  774. lookupLeft = r.chooseLookupBucket(maxBucket-lookupWidth, maxBucket-1)
  775. }
  776. lookupRight := -1
  777. if slopeCross != maxBucket && (minRadBucket <= maxBucket || r.needMoreLookups(maxBucket+lookupWidth, len(r.buckets)-1, maxValue)) {
  778. for len(r.buckets) <= maxBucket+lookupWidth {
  779. r.buckets = append(r.buckets, topicRadiusBucket{lookupSent: make(map[common.Hash]mclock.AbsTime)})
  780. }
  781. lookupRight = r.chooseLookupBucket(maxBucket, maxBucket+lookupWidth-1)
  782. }
  783. if lookupLeft == -1 {
  784. radiusLookup = lookupRight
  785. } else {
  786. if lookupRight == -1 {
  787. radiusLookup = lookupLeft
  788. } else {
  789. if randUint(2) == 0 {
  790. radiusLookup = lookupLeft
  791. } else {
  792. radiusLookup = lookupRight
  793. }
  794. }
  795. }
  796. //fmt.Println("mb", maxBucket, "sc", slopeCross, "mrb", minRadBucket, "ll", lookupLeft, "lr", lookupRight, "mv", maxValue)
  797. if radiusLookup == -1 {
  798. // no more radius lookups needed at the moment, return a radius
  799. r.converged = true
  800. rad := maxBucket
  801. if minRadBucket < rad {
  802. rad = minRadBucket
  803. }
  804. radius = ^uint64(0)
  805. if rad > 0 {
  806. radius = uint64(math.Pow(2, 64-float64(rad)/radiusBucketsPerBit))
  807. }
  808. r.radius = radius
  809. }
  810. return
  811. }
  812. func (r *topicRadius) nextTarget(forceRegular bool) lookupInfo {
  813. if !forceRegular {
  814. _, radiusLookup := r.recalcRadius()
  815. if radiusLookup != -1 {
  816. target := r.targetForBucket(radiusLookup)
  817. r.buckets[radiusLookup].lookupSent[target] = mclock.Now()
  818. return lookupInfo{target: target, topic: r.topic, radiusLookup: true}
  819. }
  820. }
  821. radExt := r.radius / 2
  822. if radExt > maxRadius-r.radius {
  823. radExt = maxRadius - r.radius
  824. }
  825. rnd := randUint64n(r.radius) + randUint64n(2*radExt)
  826. if rnd > radExt {
  827. rnd -= radExt
  828. } else {
  829. rnd = radExt - rnd
  830. }
  831. prefix := r.topicHashPrefix ^ rnd
  832. var target common.Hash
  833. binary.BigEndian.PutUint64(target[0:8], prefix)
  834. globalRandRead(target[8:])
  835. return lookupInfo{target: target, topic: r.topic, radiusLookup: false}
  836. }
  837. func (r *topicRadius) adjustWithTicket(now mclock.AbsTime, targetHash common.Hash, t ticketRef) {
  838. wait := t.t.regTime[t.idx] - t.t.issueTime
  839. inside := float64(wait)/float64(targetWaitTime) - 0.5
  840. if inside > 1 {
  841. inside = 1
  842. }
  843. if inside < 0 {
  844. inside = 0
  845. }
  846. r.adjust(now, targetHash, t.t.node.sha, inside)
  847. }
  848. func (r *topicRadius) adjust(now mclock.AbsTime, targetHash, addrHash common.Hash, inside float64) {
  849. bucket := r.getBucketIdx(addrHash)
  850. //fmt.Println("adjust", bucket, len(r.buckets), inside)
  851. if bucket >= len(r.buckets) {
  852. return
  853. }
  854. r.buckets[bucket].adjust(now, inside)
  855. delete(r.buckets[bucket].lookupSent, targetHash)
  856. }