topic.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package discv5
  17. import (
  18. "container/heap"
  19. "fmt"
  20. "math"
  21. "math/rand"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. "github.com/ethereum/go-ethereum/log"
  25. )
  26. const (
  27. maxEntries = 10000
  28. maxEntriesPerTopic = 50
  29. fallbackRegistrationExpiry = 1 * time.Hour
  30. )
  31. type Topic string
  32. type topicEntry struct {
  33. topic Topic
  34. fifoIdx uint64
  35. node *Node
  36. expire mclock.AbsTime
  37. }
  38. type topicInfo struct {
  39. entries map[uint64]*topicEntry
  40. fifoHead, fifoTail uint64
  41. rqItem *topicRequestQueueItem
  42. wcl waitControlLoop
  43. }
  44. // removes tail element from the fifo
  45. func (t *topicInfo) getFifoTail() *topicEntry {
  46. for t.entries[t.fifoTail] == nil {
  47. t.fifoTail++
  48. }
  49. tail := t.entries[t.fifoTail]
  50. t.fifoTail++
  51. return tail
  52. }
  53. type nodeInfo struct {
  54. entries map[Topic]*topicEntry
  55. lastIssuedTicket, lastUsedTicket uint32
  56. // you can't register a ticket newer than lastUsedTicket before noRegUntil (absolute time)
  57. noRegUntil mclock.AbsTime
  58. }
  59. type topicTable struct {
  60. db *nodeDB
  61. self *Node
  62. nodes map[*Node]*nodeInfo
  63. topics map[Topic]*topicInfo
  64. globalEntries uint64
  65. requested topicRequestQueue
  66. requestCnt uint64
  67. lastGarbageCollection mclock.AbsTime
  68. }
  69. func newTopicTable(db *nodeDB, self *Node) *topicTable {
  70. if printTestImgLogs {
  71. fmt.Printf("*N %016x\n", self.sha[:8])
  72. }
  73. return &topicTable{
  74. db: db,
  75. nodes: make(map[*Node]*nodeInfo),
  76. topics: make(map[Topic]*topicInfo),
  77. self: self,
  78. }
  79. }
  80. func (t *topicTable) getOrNewTopic(topic Topic) *topicInfo {
  81. ti := t.topics[topic]
  82. if ti == nil {
  83. rqItem := &topicRequestQueueItem{
  84. topic: topic,
  85. priority: t.requestCnt,
  86. }
  87. ti = &topicInfo{
  88. entries: make(map[uint64]*topicEntry),
  89. rqItem: rqItem,
  90. }
  91. t.topics[topic] = ti
  92. heap.Push(&t.requested, rqItem)
  93. }
  94. return ti
  95. }
  96. func (t *topicTable) checkDeleteTopic(topic Topic) {
  97. ti := t.topics[topic]
  98. if ti == nil {
  99. return
  100. }
  101. if len(ti.entries) == 0 && ti.wcl.hasMinimumWaitPeriod() {
  102. delete(t.topics, topic)
  103. heap.Remove(&t.requested, ti.rqItem.index)
  104. }
  105. }
  106. func (t *topicTable) getOrNewNode(node *Node) *nodeInfo {
  107. n := t.nodes[node]
  108. if n == nil {
  109. //fmt.Printf("newNode %016x %016x\n", t.self.sha[:8], node.sha[:8])
  110. var issued, used uint32
  111. if t.db != nil {
  112. issued, used = t.db.fetchTopicRegTickets(node.ID)
  113. }
  114. n = &nodeInfo{
  115. entries: make(map[Topic]*topicEntry),
  116. lastIssuedTicket: issued,
  117. lastUsedTicket: used,
  118. }
  119. t.nodes[node] = n
  120. }
  121. return n
  122. }
  123. func (t *topicTable) checkDeleteNode(node *Node) {
  124. if n, ok := t.nodes[node]; ok && len(n.entries) == 0 && n.noRegUntil < mclock.Now() {
  125. //fmt.Printf("deleteNode %016x %016x\n", t.self.sha[:8], node.sha[:8])
  126. delete(t.nodes, node)
  127. }
  128. }
  129. func (t *topicTable) storeTicketCounters(node *Node) {
  130. n := t.getOrNewNode(node)
  131. if t.db != nil {
  132. t.db.updateTopicRegTickets(node.ID, n.lastIssuedTicket, n.lastUsedTicket)
  133. }
  134. }
  135. func (t *topicTable) getEntries(topic Topic) []*Node {
  136. t.collectGarbage()
  137. te := t.topics[topic]
  138. if te == nil {
  139. return nil
  140. }
  141. nodes := make([]*Node, len(te.entries))
  142. i := 0
  143. for _, e := range te.entries {
  144. nodes[i] = e.node
  145. i++
  146. }
  147. t.requestCnt++
  148. t.requested.update(te.rqItem, t.requestCnt)
  149. return nodes
  150. }
  151. func (t *topicTable) addEntry(node *Node, topic Topic) {
  152. n := t.getOrNewNode(node)
  153. // clear previous entries by the same node
  154. for _, e := range n.entries {
  155. t.deleteEntry(e)
  156. }
  157. // ***
  158. n = t.getOrNewNode(node)
  159. tm := mclock.Now()
  160. te := t.getOrNewTopic(topic)
  161. if len(te.entries) == maxEntriesPerTopic {
  162. t.deleteEntry(te.getFifoTail())
  163. }
  164. if t.globalEntries == maxEntries {
  165. t.deleteEntry(t.leastRequested()) // not empty, no need to check for nil
  166. }
  167. fifoIdx := te.fifoHead
  168. te.fifoHead++
  169. entry := &topicEntry{
  170. topic: topic,
  171. fifoIdx: fifoIdx,
  172. node: node,
  173. expire: tm + mclock.AbsTime(fallbackRegistrationExpiry),
  174. }
  175. if printTestImgLogs {
  176. fmt.Printf("*+ %d %v %016x %016x\n", tm/1000000, topic, t.self.sha[:8], node.sha[:8])
  177. }
  178. te.entries[fifoIdx] = entry
  179. n.entries[topic] = entry
  180. t.globalEntries++
  181. te.wcl.registered(tm)
  182. }
  183. // removes least requested element from the fifo
  184. func (t *topicTable) leastRequested() *topicEntry {
  185. for t.requested.Len() > 0 && t.topics[t.requested[0].topic] == nil {
  186. heap.Pop(&t.requested)
  187. }
  188. if t.requested.Len() == 0 {
  189. return nil
  190. }
  191. return t.topics[t.requested[0].topic].getFifoTail()
  192. }
  193. // entry should exist
  194. func (t *topicTable) deleteEntry(e *topicEntry) {
  195. if printTestImgLogs {
  196. fmt.Printf("*- %d %v %016x %016x\n", mclock.Now()/1000000, e.topic, t.self.sha[:8], e.node.sha[:8])
  197. }
  198. ne := t.nodes[e.node].entries
  199. delete(ne, e.topic)
  200. if len(ne) == 0 {
  201. t.checkDeleteNode(e.node)
  202. }
  203. te := t.topics[e.topic]
  204. delete(te.entries, e.fifoIdx)
  205. if len(te.entries) == 0 {
  206. t.checkDeleteTopic(e.topic)
  207. }
  208. t.globalEntries--
  209. }
  210. // It is assumed that topics and waitPeriods have the same length.
  211. func (t *topicTable) useTicket(node *Node, serialNo uint32, topics []Topic, idx int, issueTime uint64, waitPeriods []uint32) (registered bool) {
  212. log.Trace("Using discovery ticket", "serial", serialNo, "topics", topics, "waits", waitPeriods)
  213. //fmt.Println("useTicket", serialNo, topics, waitPeriods)
  214. t.collectGarbage()
  215. n := t.getOrNewNode(node)
  216. if serialNo < n.lastUsedTicket {
  217. return false
  218. }
  219. tm := mclock.Now()
  220. if serialNo > n.lastUsedTicket && tm < n.noRegUntil {
  221. return false
  222. }
  223. if serialNo != n.lastUsedTicket {
  224. n.lastUsedTicket = serialNo
  225. n.noRegUntil = tm + mclock.AbsTime(noRegTimeout())
  226. t.storeTicketCounters(node)
  227. }
  228. currTime := uint64(tm / mclock.AbsTime(time.Second))
  229. regTime := issueTime + uint64(waitPeriods[idx])
  230. relTime := int64(currTime - regTime)
  231. if relTime >= -1 && relTime <= regTimeWindow+1 { // give clients a little security margin on both ends
  232. if e := n.entries[topics[idx]]; e == nil {
  233. t.addEntry(node, topics[idx])
  234. } else {
  235. // if there is an active entry, don't move to the front of the FIFO but prolong expire time
  236. e.expire = tm + mclock.AbsTime(fallbackRegistrationExpiry)
  237. }
  238. return true
  239. }
  240. return false
  241. }
  242. func (t *topicTable) getTicket(node *Node, topics []Topic) *ticket {
  243. t.collectGarbage()
  244. now := mclock.Now()
  245. n := t.getOrNewNode(node)
  246. n.lastIssuedTicket++
  247. t.storeTicketCounters(node)
  248. tic := &ticket{
  249. issueTime: now,
  250. topics: topics,
  251. serial: n.lastIssuedTicket,
  252. regTime: make([]mclock.AbsTime, len(topics)),
  253. }
  254. for i, topic := range topics {
  255. var waitPeriod time.Duration
  256. if topic := t.topics[topic]; topic != nil {
  257. waitPeriod = topic.wcl.waitPeriod
  258. } else {
  259. waitPeriod = minWaitPeriod
  260. }
  261. tic.regTime[i] = now + mclock.AbsTime(waitPeriod)
  262. }
  263. return tic
  264. }
  265. const gcInterval = time.Minute
  266. func (t *topicTable) collectGarbage() {
  267. tm := mclock.Now()
  268. if time.Duration(tm-t.lastGarbageCollection) < gcInterval {
  269. return
  270. }
  271. t.lastGarbageCollection = tm
  272. for node, n := range t.nodes {
  273. for _, e := range n.entries {
  274. if e.expire <= tm {
  275. t.deleteEntry(e)
  276. }
  277. }
  278. t.checkDeleteNode(node)
  279. }
  280. for topic := range t.topics {
  281. t.checkDeleteTopic(topic)
  282. }
  283. }
  284. const (
  285. minWaitPeriod = time.Minute
  286. regTimeWindow = 10 // seconds
  287. avgnoRegTimeout = time.Minute * 10
  288. // target average interval between two incoming ad requests
  289. wcTargetRegInterval = time.Minute * 10 / maxEntriesPerTopic
  290. //
  291. wcTimeConst = time.Minute * 10
  292. )
  293. // initialization is not required, will set to minWaitPeriod at first registration
  294. type waitControlLoop struct {
  295. lastIncoming mclock.AbsTime
  296. waitPeriod time.Duration
  297. }
  298. func (w *waitControlLoop) registered(tm mclock.AbsTime) {
  299. w.waitPeriod = w.nextWaitPeriod(tm)
  300. w.lastIncoming = tm
  301. }
  302. func (w *waitControlLoop) nextWaitPeriod(tm mclock.AbsTime) time.Duration {
  303. period := tm - w.lastIncoming
  304. wp := time.Duration(float64(w.waitPeriod) * math.Exp((float64(wcTargetRegInterval)-float64(period))/float64(wcTimeConst)))
  305. if wp < minWaitPeriod {
  306. wp = minWaitPeriod
  307. }
  308. return wp
  309. }
  310. func (w *waitControlLoop) hasMinimumWaitPeriod() bool {
  311. return w.nextWaitPeriod(mclock.Now()) == minWaitPeriod
  312. }
  313. func noRegTimeout() time.Duration {
  314. e := rand.ExpFloat64()
  315. if e > 100 {
  316. e = 100
  317. }
  318. return time.Duration(float64(avgnoRegTimeout) * e)
  319. }
  320. type topicRequestQueueItem struct {
  321. topic Topic
  322. priority uint64
  323. index int
  324. }
  325. // A topicRequestQueue implements heap.Interface and holds topicRequestQueueItems.
  326. type topicRequestQueue []*topicRequestQueueItem
  327. func (tq topicRequestQueue) Len() int { return len(tq) }
  328. func (tq topicRequestQueue) Less(i, j int) bool {
  329. return tq[i].priority < tq[j].priority
  330. }
  331. func (tq topicRequestQueue) Swap(i, j int) {
  332. tq[i], tq[j] = tq[j], tq[i]
  333. tq[i].index = i
  334. tq[j].index = j
  335. }
  336. func (tq *topicRequestQueue) Push(x interface{}) {
  337. n := len(*tq)
  338. item := x.(*topicRequestQueueItem)
  339. item.index = n
  340. *tq = append(*tq, item)
  341. }
  342. func (tq *topicRequestQueue) Pop() interface{} {
  343. old := *tq
  344. n := len(old)
  345. item := old[n-1]
  346. item.index = -1
  347. *tq = old[0 : n-1]
  348. return item
  349. }
  350. func (tq *topicRequestQueue) update(item *topicRequestQueueItem, priority uint64) {
  351. item.priority = priority
  352. heap.Fix(tq, item.index)
  353. }