server.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "crypto/ecdsa"
  20. "encoding/binary"
  21. "math"
  22. "sync"
  23. "github.com/ethereum/go-ethereum/common"
  24. "github.com/ethereum/go-ethereum/core"
  25. "github.com/ethereum/go-ethereum/core/rawdb"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/eth"
  28. "github.com/ethereum/go-ethereum/ethdb"
  29. "github.com/ethereum/go-ethereum/les/flowcontrol"
  30. "github.com/ethereum/go-ethereum/light"
  31. "github.com/ethereum/go-ethereum/log"
  32. "github.com/ethereum/go-ethereum/p2p"
  33. "github.com/ethereum/go-ethereum/p2p/discv5"
  34. "github.com/ethereum/go-ethereum/rlp"
  35. )
  36. type LesServer struct {
  37. config *eth.Config
  38. protocolManager *ProtocolManager
  39. fcManager *flowcontrol.ClientManager // nil if our node is client only
  40. fcCostStats *requestCostStats
  41. defParams *flowcontrol.ServerParams
  42. lesTopics []discv5.Topic
  43. privateKey *ecdsa.PrivateKey
  44. quitSync chan struct{}
  45. chtIndexer, bloomTrieIndexer *core.ChainIndexer
  46. }
  47. func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
  48. quitSync := make(chan struct{})
  49. pm, err := NewProtocolManager(eth.BlockChain().Config(), false, ServerProtocolVersions, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, quitSync, new(sync.WaitGroup))
  50. if err != nil {
  51. return nil, err
  52. }
  53. lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions))
  54. for i, pv := range AdvertiseProtocolVersions {
  55. lesTopics[i] = lesTopic(eth.BlockChain().Genesis().Hash(), pv)
  56. }
  57. srv := &LesServer{
  58. config: config,
  59. protocolManager: pm,
  60. quitSync: quitSync,
  61. lesTopics: lesTopics,
  62. chtIndexer: light.NewChtIndexer(eth.ChainDb(), false),
  63. bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false),
  64. }
  65. logger := log.New()
  66. chtV1SectionCount, _, _ := srv.chtIndexer.Sections() // indexer still uses LES/1 4k section size for backwards server compatibility
  67. chtV2SectionCount := chtV1SectionCount / (light.CHTFrequencyClient / light.CHTFrequencyServer)
  68. if chtV2SectionCount != 0 {
  69. // convert to LES/2 section
  70. chtLastSection := chtV2SectionCount - 1
  71. // convert last LES/2 section index back to LES/1 index for chtIndexer.SectionHead
  72. chtLastSectionV1 := (chtLastSection+1)*(light.CHTFrequencyClient/light.CHTFrequencyServer) - 1
  73. chtSectionHead := srv.chtIndexer.SectionHead(chtLastSectionV1)
  74. chtRoot := light.GetChtV2Root(pm.chainDb, chtLastSection, chtSectionHead)
  75. logger.Info("Loaded CHT", "section", chtLastSection, "head", chtSectionHead, "root", chtRoot)
  76. }
  77. bloomTrieSectionCount, _, _ := srv.bloomTrieIndexer.Sections()
  78. if bloomTrieSectionCount != 0 {
  79. bloomTrieLastSection := bloomTrieSectionCount - 1
  80. bloomTrieSectionHead := srv.bloomTrieIndexer.SectionHead(bloomTrieLastSection)
  81. bloomTrieRoot := light.GetBloomTrieRoot(pm.chainDb, bloomTrieLastSection, bloomTrieSectionHead)
  82. logger.Info("Loaded bloom trie", "section", bloomTrieLastSection, "head", bloomTrieSectionHead, "root", bloomTrieRoot)
  83. }
  84. srv.chtIndexer.Start(eth.BlockChain())
  85. pm.server = srv
  86. srv.defParams = &flowcontrol.ServerParams{
  87. BufLimit: 300000000,
  88. MinRecharge: 50000,
  89. }
  90. srv.fcManager = flowcontrol.NewClientManager(uint64(config.LightServ), 10, 1000000000)
  91. srv.fcCostStats = newCostStats(eth.ChainDb())
  92. return srv, nil
  93. }
  94. func (s *LesServer) Protocols() []p2p.Protocol {
  95. return s.protocolManager.SubProtocols
  96. }
  97. // Start starts the LES server
  98. func (s *LesServer) Start(srvr *p2p.Server) {
  99. s.protocolManager.Start(s.config.LightPeers)
  100. if srvr.DiscV5 != nil {
  101. for _, topic := range s.lesTopics {
  102. topic := topic
  103. go func() {
  104. logger := log.New("topic", topic)
  105. logger.Info("Starting topic registration")
  106. defer logger.Info("Terminated topic registration")
  107. srvr.DiscV5.RegisterTopic(topic, s.quitSync)
  108. }()
  109. }
  110. }
  111. s.privateKey = srvr.PrivateKey
  112. s.protocolManager.blockLoop()
  113. }
  114. func (s *LesServer) SetBloomBitsIndexer(bloomIndexer *core.ChainIndexer) {
  115. bloomIndexer.AddChildIndexer(s.bloomTrieIndexer)
  116. }
  117. // Stop stops the LES service
  118. func (s *LesServer) Stop() {
  119. s.chtIndexer.Close()
  120. // bloom trie indexer is closed by parent bloombits indexer
  121. s.fcCostStats.store()
  122. s.fcManager.Stop()
  123. go func() {
  124. <-s.protocolManager.noMorePeers
  125. }()
  126. s.protocolManager.Stop()
  127. }
  128. type requestCosts struct {
  129. baseCost, reqCost uint64
  130. }
  131. type requestCostTable map[uint64]*requestCosts
  132. type RequestCostList []struct {
  133. MsgCode, BaseCost, ReqCost uint64
  134. }
  135. func (list RequestCostList) decode() requestCostTable {
  136. table := make(requestCostTable)
  137. for _, e := range list {
  138. table[e.MsgCode] = &requestCosts{
  139. baseCost: e.BaseCost,
  140. reqCost: e.ReqCost,
  141. }
  142. }
  143. return table
  144. }
  145. type linReg struct {
  146. sumX, sumY, sumXX, sumXY float64
  147. cnt uint64
  148. }
  149. const linRegMaxCnt = 100000
  150. func (l *linReg) add(x, y float64) {
  151. if l.cnt >= linRegMaxCnt {
  152. sub := float64(l.cnt+1-linRegMaxCnt) / linRegMaxCnt
  153. l.sumX -= l.sumX * sub
  154. l.sumY -= l.sumY * sub
  155. l.sumXX -= l.sumXX * sub
  156. l.sumXY -= l.sumXY * sub
  157. l.cnt = linRegMaxCnt - 1
  158. }
  159. l.cnt++
  160. l.sumX += x
  161. l.sumY += y
  162. l.sumXX += x * x
  163. l.sumXY += x * y
  164. }
  165. func (l *linReg) calc() (b, m float64) {
  166. if l.cnt == 0 {
  167. return 0, 0
  168. }
  169. cnt := float64(l.cnt)
  170. d := cnt*l.sumXX - l.sumX*l.sumX
  171. if d < 0.001 {
  172. return l.sumY / cnt, 0
  173. }
  174. m = (cnt*l.sumXY - l.sumX*l.sumY) / d
  175. b = (l.sumY / cnt) - (m * l.sumX / cnt)
  176. return b, m
  177. }
  178. func (l *linReg) toBytes() []byte {
  179. var arr [40]byte
  180. binary.BigEndian.PutUint64(arr[0:8], math.Float64bits(l.sumX))
  181. binary.BigEndian.PutUint64(arr[8:16], math.Float64bits(l.sumY))
  182. binary.BigEndian.PutUint64(arr[16:24], math.Float64bits(l.sumXX))
  183. binary.BigEndian.PutUint64(arr[24:32], math.Float64bits(l.sumXY))
  184. binary.BigEndian.PutUint64(arr[32:40], l.cnt)
  185. return arr[:]
  186. }
  187. func linRegFromBytes(data []byte) *linReg {
  188. if len(data) != 40 {
  189. return nil
  190. }
  191. l := &linReg{}
  192. l.sumX = math.Float64frombits(binary.BigEndian.Uint64(data[0:8]))
  193. l.sumY = math.Float64frombits(binary.BigEndian.Uint64(data[8:16]))
  194. l.sumXX = math.Float64frombits(binary.BigEndian.Uint64(data[16:24]))
  195. l.sumXY = math.Float64frombits(binary.BigEndian.Uint64(data[24:32]))
  196. l.cnt = binary.BigEndian.Uint64(data[32:40])
  197. return l
  198. }
  199. type requestCostStats struct {
  200. lock sync.RWMutex
  201. db ethdb.Database
  202. stats map[uint64]*linReg
  203. }
  204. type requestCostStatsRlp []struct {
  205. MsgCode uint64
  206. Data []byte
  207. }
  208. var rcStatsKey = []byte("_requestCostStats")
  209. func newCostStats(db ethdb.Database) *requestCostStats {
  210. stats := make(map[uint64]*linReg)
  211. for _, code := range reqList {
  212. stats[code] = &linReg{cnt: 100}
  213. }
  214. if db != nil {
  215. data, err := db.Get(rcStatsKey)
  216. var statsRlp requestCostStatsRlp
  217. if err == nil {
  218. err = rlp.DecodeBytes(data, &statsRlp)
  219. }
  220. if err == nil {
  221. for _, r := range statsRlp {
  222. if stats[r.MsgCode] != nil {
  223. if l := linRegFromBytes(r.Data); l != nil {
  224. stats[r.MsgCode] = l
  225. }
  226. }
  227. }
  228. }
  229. }
  230. return &requestCostStats{
  231. db: db,
  232. stats: stats,
  233. }
  234. }
  235. func (s *requestCostStats) store() {
  236. s.lock.Lock()
  237. defer s.lock.Unlock()
  238. statsRlp := make(requestCostStatsRlp, len(reqList))
  239. for i, code := range reqList {
  240. statsRlp[i].MsgCode = code
  241. statsRlp[i].Data = s.stats[code].toBytes()
  242. }
  243. if data, err := rlp.EncodeToBytes(statsRlp); err == nil {
  244. s.db.Put(rcStatsKey, data)
  245. }
  246. }
  247. func (s *requestCostStats) getCurrentList() RequestCostList {
  248. s.lock.Lock()
  249. defer s.lock.Unlock()
  250. list := make(RequestCostList, len(reqList))
  251. //fmt.Println("RequestCostList")
  252. for idx, code := range reqList {
  253. b, m := s.stats[code].calc()
  254. //fmt.Println(code, s.stats[code].cnt, b/1000000, m/1000000)
  255. if m < 0 {
  256. b += m
  257. m = 0
  258. }
  259. if b < 0 {
  260. b = 0
  261. }
  262. list[idx].MsgCode = code
  263. list[idx].BaseCost = uint64(b * 2)
  264. list[idx].ReqCost = uint64(m * 2)
  265. }
  266. return list
  267. }
  268. func (s *requestCostStats) update(msgCode, reqCnt, cost uint64) {
  269. s.lock.Lock()
  270. defer s.lock.Unlock()
  271. c, ok := s.stats[msgCode]
  272. if !ok || reqCnt == 0 {
  273. return
  274. }
  275. c.add(float64(reqCnt), float64(cost))
  276. }
  277. func (pm *ProtocolManager) blockLoop() {
  278. pm.wg.Add(1)
  279. headCh := make(chan core.ChainHeadEvent, 10)
  280. headSub := pm.blockchain.SubscribeChainHeadEvent(headCh)
  281. go func() {
  282. var lastHead *types.Header
  283. lastBroadcastTd := common.Big0
  284. for {
  285. select {
  286. case ev := <-headCh:
  287. peers := pm.peers.AllPeers()
  288. if len(peers) > 0 {
  289. header := ev.Block.Header()
  290. hash := header.Hash()
  291. number := header.Number.Uint64()
  292. td := rawdb.ReadTd(pm.chainDb, hash, number)
  293. if td != nil && td.Cmp(lastBroadcastTd) > 0 {
  294. var reorg uint64
  295. if lastHead != nil {
  296. reorg = lastHead.Number.Uint64() - rawdb.FindCommonAncestor(pm.chainDb, header, lastHead).Number.Uint64()
  297. }
  298. lastHead = header
  299. lastBroadcastTd = td
  300. log.Debug("Announcing block to peers", "number", number, "hash", hash, "td", td, "reorg", reorg)
  301. announce := announceData{Hash: hash, Number: number, Td: td, ReorgDepth: reorg}
  302. var (
  303. signed bool
  304. signedAnnounce announceData
  305. )
  306. for _, p := range peers {
  307. switch p.announceType {
  308. case announceTypeSimple:
  309. select {
  310. case p.announceChn <- announce:
  311. default:
  312. pm.removePeer(p.id)
  313. }
  314. case announceTypeSigned:
  315. if !signed {
  316. signedAnnounce = announce
  317. signedAnnounce.sign(pm.server.privateKey)
  318. signed = true
  319. }
  320. select {
  321. case p.announceChn <- signedAnnounce:
  322. default:
  323. pm.removePeer(p.id)
  324. }
  325. }
  326. }
  327. }
  328. }
  329. case <-pm.quitSync:
  330. headSub.Unsubscribe()
  331. pm.wg.Done()
  332. return
  333. }
  334. }
  335. }()
  336. }