server.go 98 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174
  1. // Copyright (c) 2013-2017 The btcsuite developers
  2. // Copyright (c) 2015-2018 The Decred developers
  3. // Use of this source code is governed by an ISC
  4. // license that can be found in the LICENSE file.
  5. package main
  6. import (
  7. "bytes"
  8. "crypto/rand"
  9. "crypto/tls"
  10. "encoding/binary"
  11. "fmt"
  12. "math"
  13. mathrand "math/rand"
  14. "net"
  15. "runtime"
  16. "sort"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "github.com/pkt-cash/pktd/addrmgr"
  23. "github.com/pkt-cash/pktd/blockchain"
  24. "github.com/pkt-cash/pktd/blockchain/indexers"
  25. "github.com/pkt-cash/pktd/btcutil"
  26. "github.com/pkt-cash/pktd/btcutil/bloom"
  27. "github.com/pkt-cash/pktd/btcutil/er"
  28. "github.com/pkt-cash/pktd/chaincfg"
  29. "github.com/pkt-cash/pktd/chaincfg/chainhash"
  30. "github.com/pkt-cash/pktd/connmgr"
  31. "github.com/pkt-cash/pktd/database"
  32. "github.com/pkt-cash/pktd/mempool"
  33. "github.com/pkt-cash/pktd/mining"
  34. "github.com/pkt-cash/pktd/mining/cpuminer"
  35. "github.com/pkt-cash/pktd/netsync"
  36. "github.com/pkt-cash/pktd/peer"
  37. "github.com/pkt-cash/pktd/pktconfig/version"
  38. "github.com/pkt-cash/pktd/pktlog/log"
  39. "github.com/pkt-cash/pktd/txscript"
  40. "github.com/pkt-cash/pktd/wire"
  41. "github.com/pkt-cash/pktd/wire/protocol"
  42. )
  43. const (
  44. // defaultServices describes the default services that are supported by
  45. // the server.
  46. defaultServices = protocol.SFNodeNetwork | protocol.SFNodeBloom |
  47. protocol.SFNodeWitness | protocol.SFNodeCF
  48. // defaultRequiredServices describes the default services that are
  49. // required to be supported by outbound peers.
  50. defaultRequiredServices = protocol.SFNodeNetwork
  51. // defaultTargetOutbound is the default number of outbound peers to
  52. // target. We are normalizing the Bitcoin Core in allowing 16 here,
  53. // For Bitcoin Core latest Bitcoin Core, 14 connections are used for
  54. // full relaying and 2 are used for "block only" "fast" connections,
  55. // although we don't yet make such a distinction.
  56. defaultTargetOutbound = 14
  57. // connectionRetryInterval is the base amount of time to wait in between
  58. // retries when connecting to persistent peers. It is adjusted by the
  59. // number of retries such that there is a retry backoff.
  60. connectionRetryInterval = time.Second * 5
  61. )
  62. // simpleAddr implements the net.Addr interface with two struct fields
  63. type simpleAddr struct {
  64. net, addr string
  65. }
  66. // String returns the address.
  67. //
  68. // This is part of the net.Addr interface.
  69. func (a simpleAddr) String() string {
  70. return a.addr
  71. }
  72. // Network returns the network.
  73. //
  74. // This is part of the net.Addr interface.
  75. func (a simpleAddr) Network() string {
  76. return a.net
  77. }
  78. // Ensure simpleAddr implements the net.Addr interface.
  79. var _ net.Addr = simpleAddr{}
  80. // broadcastMsg provides the ability to house a bitcoin message to be broadcast
  81. // to all connected peers except specified excluded peers.
  82. type broadcastMsg struct {
  83. message wire.Message
  84. excludePeers []*serverPeer
  85. }
  86. // broadcastInventoryAdd is a type used to declare that the InvVect it contains
  87. // needs to be added to the rebroadcast map
  88. type broadcastInventoryAdd relayMsg
  89. // broadcastInventoryDel is a type used to declare that the InvVect it contains
  90. // needs to be removed from the rebroadcast map
  91. type broadcastInventoryDel *wire.InvVect
  92. // relayMsg packages an inventory vector along with the newly discovered
  93. // inventory so the relay has access to that information.
  94. type relayMsg struct {
  95. invVect *wire.InvVect
  96. data interface{}
  97. }
  98. // updatePeerHeightsMsg is a message sent from the blockmanager to the server
  99. // after a new block has been accepted. The purpose of the message is to update
  100. // the heights of peers that were known to announce the block before we
  101. // connected it to the main chain or recognized it as an orphan. With these
  102. // updates, peer heights will be kept up to date, allowing for fresh data when
  103. // selecting sync peer candidacy.
  104. type updatePeerHeightsMsg struct {
  105. newHash *chainhash.Hash
  106. newHeight int32
  107. originPeer *peer.Peer
  108. }
  109. // peerState maintains state of inbound, persistent, outbound peers as well
  110. // as banned peers and outbound groups.
  111. type peerState struct {
  112. inboundPeers map[int32]*serverPeer
  113. outboundPeers map[int32]*serverPeer
  114. persistentPeers map[int32]*serverPeer
  115. banned map[string]time.Time
  116. outboundGroups map[string]int
  117. }
  118. // Count returns the count of all known peers.
  119. func (ps *peerState) Count() int {
  120. return len(ps.inboundPeers) + len(ps.outboundPeers) +
  121. len(ps.persistentPeers)
  122. }
  123. // forAllOutboundPeers is a helper function that runs closure on all outbound
  124. // peers known to peerState.
  125. func (ps *peerState) forAllOutboundPeers(closure func(sp *serverPeer)) {
  126. for _, e := range ps.outboundPeers {
  127. closure(e)
  128. }
  129. for _, e := range ps.persistentPeers {
  130. closure(e)
  131. }
  132. }
  133. // forAllPeers is a helper function that runs closure on all peers known to
  134. // peerState.
  135. func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) {
  136. for _, e := range ps.inboundPeers {
  137. closure(e)
  138. }
  139. ps.forAllOutboundPeers(closure)
  140. }
  141. // cfHeaderKV is a tuple of a filter header and its associated block hash. The
  142. // struct is used to cache cfcheckpt responses.
  143. type cfHeaderKV struct {
  144. blockHash chainhash.Hash
  145. filterHeader chainhash.Hash
  146. }
  147. // server provides a bitcoin server for handling communications to and from
  148. // bitcoin peers.
  149. type server struct {
  150. // The following variables must only be used atomically.
  151. // Putting the uint64s first makes them 64-bit aligned for 32-bit systems.
  152. bytesReceived uint64 // Total bytes received from all peers since start.
  153. bytesSent uint64 // Total bytes sent by all peers since start.
  154. started int32
  155. shutdown int32
  156. startupTime int64
  157. chainParams *chaincfg.Params
  158. addrManager *addrmgr.AddrManager
  159. connManager *connmgr.ConnManager
  160. sigCache *txscript.SigCache
  161. hashCache *txscript.HashCache
  162. rpcServer *rpcServer
  163. syncManager *netsync.SyncManager
  164. chain *blockchain.BlockChain
  165. txMemPool *mempool.TxPool
  166. cpuMiner *cpuminer.CPUMiner
  167. modifyRebroadcastInv chan interface{}
  168. newPeers chan *serverPeer
  169. donePeers chan *serverPeer
  170. banPeers chan *serverPeer
  171. query chan interface{}
  172. relayInv chan relayMsg
  173. broadcast chan broadcastMsg
  174. peerHeightsUpdate chan updatePeerHeightsMsg
  175. wg sync.WaitGroup
  176. quit chan struct{}
  177. nat NAT
  178. db database.DB
  179. timeSource blockchain.MedianTimeSource
  180. services protocol.ServiceFlag
  181. // The following fields are used for optional indexes. They will be nil
  182. // if the associated index is not enabled. These fields are set during
  183. // initial creation of the server and never changed afterwards, so they
  184. // do not need to be protected for concurrent access.
  185. txIndex *indexers.TxIndex
  186. addrIndex *indexers.AddrIndex
  187. cfIndex *indexers.CfIndex
  188. // The fee estimator keeps track of how long transactions are left in
  189. // the mempool before they are mined into blocks.
  190. feeEstimator *mempool.FeeEstimator
  191. // cfCheckptCaches stores a cached slice of filter headers for cfcheckpt
  192. // messages for each filter type.
  193. cfCheckptCaches map[wire.FilterType][]cfHeaderKV
  194. cfCheckptCachesMtx sync.RWMutex
  195. // agentBlacklist is a list of blacklisted substrings by which to filter
  196. // user agents.
  197. agentBlacklist []string
  198. // agentWhitelist is a list of whitelisted user agent substrings, no
  199. // whitelisting will be applied if the list is empty or nil.
  200. agentWhitelist []string
  201. }
  202. // serverPeer extends the peer to maintain state shared by the server and
  203. // the blockmanager.
  204. type serverPeer struct {
  205. // The following variables must only be used atomically
  206. feeFilter int64
  207. *peer.Peer
  208. connReq *connmgr.ConnReq
  209. server *server
  210. persistent bool
  211. continueHash *chainhash.Hash
  212. relayMtx sync.Mutex
  213. disableRelayTx bool
  214. sentAddrs bool
  215. isWhitelisted bool
  216. filter *bloom.Filter
  217. addressesMtx sync.RWMutex
  218. knownAddresses map[string]struct{}
  219. banScore connmgr.DynamicBanScore
  220. quit chan struct{}
  221. // The following chans are used to sync blockmanager and server.
  222. txProcessed chan struct{}
  223. blockProcessed chan struct{}
  224. }
  225. // newServerPeer returns a new serverPeer instance. The peer needs to be set by
  226. // the caller.
  227. func newServerPeer(s *server, isPersistent bool) *serverPeer {
  228. return &serverPeer{
  229. server: s,
  230. persistent: isPersistent,
  231. filter: bloom.LoadFilter(nil),
  232. knownAddresses: make(map[string]struct{}),
  233. quit: make(chan struct{}),
  234. txProcessed: make(chan struct{}, 1),
  235. blockProcessed: make(chan struct{}, 1),
  236. }
  237. }
  238. // newestBlock returns the current best block hash and height using the format
  239. // required by the configuration for the peer package.
  240. func (sp *serverPeer) newestBlock() (*chainhash.Hash, int32, er.R) {
  241. best := sp.server.chain.BestSnapshot()
  242. return &best.Hash, best.Height, nil
  243. }
  244. // addKnownAddresses adds the given addresses to the set of known addresses to
  245. // the peer to prevent sending duplicate addresses.
  246. func (sp *serverPeer) addKnownAddresses(addresses []*wire.NetAddress) {
  247. sp.addressesMtx.Lock()
  248. for _, na := range addresses {
  249. sp.knownAddresses[addrmgr.NetAddressKey(na)] = struct{}{}
  250. }
  251. sp.addressesMtx.Unlock()
  252. }
  253. // addressKnown true if the given address is already known to the peer.
  254. // XXX trn safe for concurrent callers?
  255. func (sp *serverPeer) addressKnown(na *wire.NetAddress) bool {
  256. sp.addressesMtx.Lock()
  257. defer sp.addressesMtx.Unlock()
  258. _, exists := sp.knownAddresses[addrmgr.NetAddressKey(na)]
  259. return exists
  260. }
  261. // setDisableRelayTx toggles relaying of transactions for the given peer.
  262. // It is safe for concurrent access.
  263. func (sp *serverPeer) setDisableRelayTx(disable bool) {
  264. sp.relayMtx.Lock()
  265. sp.disableRelayTx = disable
  266. sp.relayMtx.Unlock()
  267. }
  268. // relayTxDisabled returns whether or not relaying of transactions for the given
  269. // peer is disabled.
  270. // It is safe for concurrent access.
  271. func (sp *serverPeer) relayTxDisabled() bool {
  272. sp.relayMtx.Lock()
  273. isDisabled := sp.disableRelayTx
  274. sp.relayMtx.Unlock()
  275. return isDisabled
  276. }
  277. // pushAddrMsg sends an addr message to the connected peer using the provided
  278. // addresses.
  279. func (sp *serverPeer) pushAddrMsg(addresses []*wire.NetAddress) {
  280. // Filter addresses already known to the peer.
  281. addrs := make([]*wire.NetAddress, 0, len(addresses))
  282. for _, addr := range addresses {
  283. if !sp.addressKnown(addr) {
  284. addrs = append(addrs, addr)
  285. }
  286. }
  287. known, err := sp.PushAddrMsg(addrs)
  288. if err != nil {
  289. log.Errorf("Can't push address message to %s: %v", sp.Peer, err)
  290. sp.Disconnect()
  291. return
  292. }
  293. sp.addKnownAddresses(known)
  294. }
  295. // addBanScore increases the persistent and decaying ban score fields by the
  296. // values passed as parameters. If the resulting score exceeds half of the ban
  297. // threshold, a warning is logged including the reason provided. Further, if
  298. // the score is above the ban threshold, the peer will be banned and
  299. // disconnected.
  300. func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
  301. // No warning is logged and no score is calculated if banning is disabled.
  302. if cfg.DisableBanning {
  303. return
  304. }
  305. if sp.isWhitelisted {
  306. log.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason)
  307. return
  308. }
  309. warnThreshold := cfg.BanThreshold >> 1
  310. if transient == 0 && persistent == 0 {
  311. // The score is not being increased, but a warning message is still
  312. // logged if the score is above the warn threshold.
  313. score := sp.banScore.Int()
  314. if score > warnThreshold {
  315. log.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+
  316. "it was not increased this time", sp, reason, score)
  317. }
  318. return
  319. }
  320. score := sp.banScore.Increase(persistent, transient)
  321. if score > warnThreshold {
  322. log.Warnf("Misbehaving peer %s: %s -- ban score increased to %d",
  323. sp, reason, score)
  324. if score > cfg.BanThreshold {
  325. log.Warnf("Misbehaving peer %s -- banning and disconnecting",
  326. sp)
  327. sp.server.BanPeer(sp)
  328. sp.Disconnect()
  329. }
  330. }
  331. }
  332. // hasServices returns whether or not the provided advertised service flags have
  333. // all of the provided desired service flags set.
  334. func hasServices(advertised, desired protocol.ServiceFlag) bool {
  335. return advertised&desired == desired
  336. }
  337. // OnVersion is invoked when a peer receives a version bitcoin message
  338. // and is used to negotiate the protocol version details as well as kick start
  339. // the communications.
  340. func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject {
  341. // Update the address manager with the advertised services for outbound
  342. // connections in case they have changed. This is not done for inbound
  343. // connections to help prevent malicious behavior and is skipped when
  344. // running on the simulation test network since it is only intended to
  345. // connect to specified peers and actively avoids advertising and
  346. // connecting to discovered peers.
  347. //
  348. // NOTE: This is done before rejecting peers that are too old to ensure
  349. // it is updated regardless in the case a new minimum protocol version is
  350. // enforced and the remote node has not upgraded yet.
  351. isInbound := sp.Inbound()
  352. remoteAddr := sp.NA()
  353. addrManager := sp.server.addrManager
  354. if !cfg.SimNet && !isInbound {
  355. addrManager.SetServices(remoteAddr, msg.Services)
  356. }
  357. // Ignore peers that have a protcol version that is too old. The peer
  358. // negotiation logic will disconnect it after this callback returns.
  359. if msg.ProtocolVersion < int32(peer.MinAcceptableProtocolVersion) {
  360. return nil
  361. }
  362. // Reject outbound peers that are not full nodes.
  363. wantServices := protocol.SFNodeNetwork
  364. if !isInbound && !hasServices(msg.Services, wantServices) {
  365. missingServices := wantServices & ^msg.Services
  366. log.Debugf("Rejecting peer %s with services %v due to not "+
  367. "providing desired services %v", sp.Peer, msg.Services,
  368. missingServices)
  369. reason := fmt.Sprintf("required services %#x not offered",
  370. uint64(missingServices))
  371. return wire.NewMsgReject(msg.Command(), wire.RejectNonstandard, reason)
  372. }
  373. if !cfg.SimNet && !isInbound {
  374. // After soft-fork activation, only make outbound
  375. // connection to peers if they flag that they're segwit
  376. // enabled.
  377. chain := sp.server.chain
  378. segwitActive, err := chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
  379. if err != nil {
  380. log.Errorf("Unable to query for segwit soft-fork state: %v",
  381. err)
  382. return nil
  383. }
  384. if segwitActive && !sp.IsWitnessEnabled() {
  385. log.Infof("Disconnecting non-segwit peer %v, isn't segwit "+
  386. "enabled and we need more segwit enabled peers", sp)
  387. sp.Disconnect()
  388. return nil
  389. }
  390. }
  391. // Add the remote peer time as a sample for creating an offset against
  392. // the local clock to keep the network time in sync.
  393. sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp)
  394. // Choose whether or not to relay transactions before a filter command
  395. // is received.
  396. sp.setDisableRelayTx(msg.DisableRelayTx)
  397. return nil
  398. }
  399. // OnVerAck is invoked when a peer receives a verack bitcoin message and is used
  400. // to kick start communication with them.
  401. func (sp *serverPeer) OnVerAck(_ *peer.Peer, _ *wire.MsgVerAck) {
  402. sp.server.AddPeer(sp)
  403. }
  404. // OnMemPool is invoked when a peer receives a mempool bitcoin message.
  405. // It creates and sends an inventory message with the contents of the memory
  406. // pool up to the maximum inventory allowed per message. When the peer has a
  407. // bloom filter loaded, the contents are filtered accordingly.
  408. func (sp *serverPeer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) {
  409. // Only allow mempool requests if the server has bloom filtering
  410. // enabled.
  411. if sp.server.services&protocol.SFNodeBloom != protocol.SFNodeBloom {
  412. log.Debugf("peer %v sent mempool request with bloom "+
  413. "filtering disabled -- disconnecting", sp)
  414. sp.Disconnect()
  415. return
  416. }
  417. // A decaying ban score increase is applied to prevent flooding.
  418. // The ban score accumulates and passes the ban threshold if a burst of
  419. // mempool messages comes from a peer. The score decays each minute to
  420. // half of its value.
  421. sp.addBanScore(0, 33, "mempool")
  422. // Generate inventory message with the available transactions in the
  423. // transaction memory pool. Limit it to the max allowed inventory
  424. // per message. The NewMsgInvSizeHint function automatically limits
  425. // the passed hint to the maximum allowed, so it's safe to pass it
  426. // without double checking it here.
  427. txMemPool := sp.server.txMemPool
  428. txDescs := txMemPool.TxDescs()
  429. invMsg := wire.NewMsgInvSizeHint(uint(len(txDescs)))
  430. for _, txDesc := range txDescs {
  431. // Either add all transactions when there is no bloom filter,
  432. // or only the transactions that match the filter when there is
  433. // one.
  434. if !sp.filter.IsLoaded() || sp.filter.MatchTxAndUpdate(txDesc.Tx) {
  435. iv := wire.NewInvVect(wire.InvTypeTx, txDesc.Tx.Hash())
  436. invMsg.AddInvVect(iv)
  437. if len(invMsg.InvList)+1 > wire.MaxInvPerMsg {
  438. break
  439. }
  440. }
  441. }
  442. // Send the inventory message if there is anything to send.
  443. if len(invMsg.InvList) > 0 {
  444. sp.QueueMessage(invMsg, nil)
  445. }
  446. }
  447. // OnTx is invoked when a peer receives a tx bitcoin message. It blocks
  448. // until the bitcoin transaction has been fully processed. Unlock the block
  449. // handler this does not serialize all transactions through a single thread
  450. // transactions don't rely on the previous one in a linear fashion like blocks.
  451. func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) {
  452. if cfg.BlocksOnly {
  453. log.Tracef("Ignoring tx %v from %v - blocksonly enabled",
  454. msg.TxHash(), sp)
  455. return
  456. }
  457. // Add the transaction to the known inventory for the peer.
  458. // Convert the raw MsgTx to a btcutil.Tx which provides some convenience
  459. // methods and things such as hash caching.
  460. tx := btcutil.NewTx(msg)
  461. iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
  462. sp.AddKnownInventory(iv)
  463. // Queue the transaction up to be handled by the sync manager and
  464. // intentionally block further receives until the transaction is fully
  465. // processed and known good or bad. This helps prevent a malicious peer
  466. // from queuing up a bunch of bad transactions before disconnecting (or
  467. // being disconnected) and wasting memory.
  468. sp.server.syncManager.QueueTx(tx, sp.Peer, sp.txProcessed)
  469. <-sp.txProcessed
  470. }
  471. // OnBlock is invoked when a peer receives a block bitcoin message. It
  472. // blocks until the bitcoin block has been fully processed.
  473. func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
  474. // Convert the raw MsgBlock to a btcutil.Block which provides some
  475. // convenience methods and things such as hash caching.
  476. block := btcutil.NewBlockFromBlockAndBytes(msg, buf)
  477. // Add the block to the known inventory for the peer.
  478. iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
  479. sp.AddKnownInventory(iv)
  480. // Queue the block up to be handled by the block
  481. // manager and intentionally block further receives
  482. // until the bitcoin block is fully processed and known
  483. // good or bad. This helps prevent a malicious peer
  484. // from queuing up a bunch of bad blocks before
  485. // disconnecting (or being disconnected) and wasting
  486. // memory. Additionally, this behavior is depended on
  487. // by at least the block acceptance test tool as the
  488. // reference implementation processes blocks in the same
  489. // thread and therefore blocks further messages until
  490. // the bitcoin block has been fully processed.
  491. sp.server.syncManager.QueueBlock(block, sp.Peer, sp.blockProcessed)
  492. <-sp.blockProcessed
  493. }
  494. // OnInv is invoked when a peer receives an inv bitcoin message and is
  495. // used to examine the inventory being advertised by the remote peer and react
  496. // accordingly. We pass the message down to blockmanager which will call
  497. // QueueMessage with any appropriate responses.
  498. func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
  499. if !cfg.BlocksOnly {
  500. if len(msg.InvList) > 0 {
  501. sp.server.syncManager.QueueInv(msg, sp.Peer)
  502. }
  503. return
  504. }
  505. newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList)))
  506. for _, invVect := range msg.InvList {
  507. if invVect.Type == wire.InvTypeTx {
  508. log.Tracef("Ignoring tx %v in inv from %v -- "+
  509. "blocksonly enabled", invVect.Hash, sp)
  510. if sp.ProtocolVersion() >= protocol.BIP0037Version {
  511. log.Infof("Peer %v is announcing "+
  512. "transactions -- disconnecting", sp)
  513. sp.Disconnect()
  514. return
  515. }
  516. continue
  517. }
  518. err := newInv.AddInvVect(invVect)
  519. if err != nil {
  520. log.Errorf("Failed to add inventory vector: %v", err)
  521. break
  522. }
  523. }
  524. if len(newInv.InvList) > 0 {
  525. sp.server.syncManager.QueueInv(newInv, sp.Peer)
  526. }
  527. }
  528. // OnHeaders is invoked when a peer receives a headers bitcoin
  529. // message. The message is passed down to the sync manager.
  530. func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
  531. sp.server.syncManager.QueueHeaders(msg, sp.Peer)
  532. }
  533. // handleGetData is invoked when a peer receives a getdata bitcoin message and
  534. // is used to deliver block and transaction information.
  535. func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
  536. numAdded := 0
  537. notFound := wire.NewMsgNotFound()
  538. length := len(msg.InvList)
  539. // A decaying ban score increase is applied to prevent exhausting resources
  540. // with unusually large inventory queries.
  541. // Requesting more than the maximum inventory vector length within a short
  542. // period of time yields a score above the default ban threshold. Sustained
  543. // bursts of small requests are not penalized as that would potentially ban
  544. // peers performing IBD.
  545. // This incremental score decays each minute to half of its value.
  546. sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata")
  547. // We wait on this wait channel periodically to prevent queuing
  548. // far more data than we can send in a reasonable time, wasting memory.
  549. // The waiting occurs after the database fetch for the next one to
  550. // provide a little pipelining.
  551. var waitChan chan struct{}
  552. doneChan := make(chan struct{}, 1)
  553. for i, iv := range msg.InvList {
  554. var c chan struct{}
  555. // If this will be the last message we send.
  556. if i == length-1 && len(notFound.InvList) == 0 {
  557. c = doneChan
  558. } else if (i+1)%3 == 0 {
  559. // Buffered so as to not make the send goroutine block.
  560. c = make(chan struct{}, 1)
  561. }
  562. var err er.R
  563. switch iv.Type {
  564. case wire.InvTypeWitnessTx:
  565. err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
  566. case wire.InvTypeTx:
  567. err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
  568. case wire.InvTypeWitnessBlock:
  569. err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
  570. case wire.InvTypeBlock:
  571. err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
  572. case wire.InvTypeFilteredWitnessBlock:
  573. err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
  574. case wire.InvTypeFilteredBlock:
  575. err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
  576. default:
  577. log.Warnf("Unknown type in inventory request %d",
  578. iv.Type)
  579. continue
  580. }
  581. if err != nil {
  582. notFound.AddInvVect(iv)
  583. // When there is a failure fetching the final entry
  584. // and the done channel was sent in due to there
  585. // being no outstanding not found inventory, consume
  586. // it here because there is now not found inventory
  587. // that will use the channel momentarily.
  588. if i == len(msg.InvList)-1 && c != nil {
  589. <-c
  590. }
  591. }
  592. numAdded++
  593. waitChan = c
  594. }
  595. if len(notFound.InvList) != 0 {
  596. sp.QueueMessage(notFound, doneChan)
  597. }
  598. // Wait for messages to be sent. We can send quite a lot of data at this
  599. // point and this will keep the peer busy for a decent amount of time.
  600. // We don't process anything else by them in this time so that we
  601. // have an idea of when we should hear back from them - else the idle
  602. // timeout could fire when we were only half done sending the blocks.
  603. if numAdded > 0 {
  604. <-doneChan
  605. }
  606. }
  607. // OnGetBlocks is invoked when a peer receives a getblocks bitcoin
  608. // message.
  609. func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) {
  610. // Find the most recent known block in the best chain based on the block
  611. // locator and fetch all of the block hashes after it until either
  612. // wire.MaxBlocksPerMsg have been fetched or the provided stop hash is
  613. // encountered.
  614. //
  615. // Use the block after the genesis block if no other blocks in the
  616. // provided locator are known. This does mean the client will start
  617. // over with the genesis block if unknown block locators are provided.
  618. //
  619. // This mirrors the behavior in the reference implementation.
  620. chain := sp.server.chain
  621. hashList := chain.LocateBlocks(msg.BlockLocatorHashes, &msg.HashStop,
  622. wire.MaxBlocksPerMsg)
  623. // Generate inventory message.
  624. invMsg := wire.NewMsgInv()
  625. for i := range hashList {
  626. iv := wire.NewInvVect(wire.InvTypeBlock, &hashList[i])
  627. invMsg.AddInvVect(iv)
  628. }
  629. // Send the inventory message if there is anything to send.
  630. if len(invMsg.InvList) > 0 {
  631. invListLen := len(invMsg.InvList)
  632. if invListLen == wire.MaxBlocksPerMsg {
  633. // Intentionally use a copy of the final hash so there
  634. // is not a reference into the inventory slice which
  635. // would prevent the entire slice from being eligible
  636. // for GC as soon as it's sent.
  637. continueHash := invMsg.InvList[invListLen-1].Hash
  638. sp.continueHash = &continueHash
  639. }
  640. sp.QueueMessage(invMsg, nil)
  641. }
  642. }
  643. // OnGetHeaders is invoked when a peer receives a getheaders bitcoin
  644. // message.
  645. func (sp *serverPeer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) {
  646. // Ignore getheaders requests if not in sync.
  647. if !sp.server.syncManager.IsCurrent() {
  648. return
  649. }
  650. // Find the most recent known block in the best chain based on the block
  651. // locator and fetch all of the headers after it until either
  652. // wire.MaxBlockHeadersPerMsg have been fetched or the provided stop
  653. // hash is encountered.
  654. //
  655. // Use the block after the genesis block if no other blocks in the
  656. // provided locator are known. This does mean the client will start
  657. // over with the genesis block if unknown block locators are provided.
  658. //
  659. // This mirrors the behavior in the reference implementation.
  660. chain := sp.server.chain
  661. headers := chain.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop)
  662. // Send found headers to the requesting peer.
  663. blockHeaders := make([]*wire.BlockHeader, len(headers))
  664. for i := range headers {
  665. blockHeaders[i] = &headers[i]
  666. }
  667. sp.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil)
  668. }
  669. // OnGetCFilters is invoked when a peer receives a getcfilters bitcoin message.
  670. func (sp *serverPeer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) {
  671. // Ignore getcfilters requests if not in sync.
  672. if !sp.server.syncManager.IsCurrent() {
  673. return
  674. }
  675. // We'll also ensure that the remote party is requesting a set of
  676. // filters that we actually currently maintain.
  677. switch msg.FilterType {
  678. case wire.GCSFilterRegular:
  679. break
  680. default:
  681. log.Debugf("Filter request for unknown filter: %v",
  682. msg.FilterType)
  683. return
  684. }
  685. hashes, err := sp.server.chain.HeightToHashRange(
  686. int32(msg.StartHeight), &msg.StopHash, wire.MaxGetCFiltersReqRange,
  687. )
  688. if err != nil {
  689. log.Debugf("Invalid getcfilters request: %v", err)
  690. return
  691. }
  692. // Create []*chainhash.Hash from []chainhash.Hash to pass to
  693. // FiltersByBlockHashes.
  694. hashPtrs := make([]*chainhash.Hash, len(hashes))
  695. for i := range hashes {
  696. hashPtrs[i] = &hashes[i]
  697. }
  698. filters, err := sp.server.cfIndex.FiltersByBlockHashes(
  699. hashPtrs, msg.FilterType,
  700. )
  701. if err != nil {
  702. log.Errorf("Error retrieving cfilters: %v", err)
  703. return
  704. }
  705. for i, filterBytes := range filters {
  706. if len(filterBytes) == 0 {
  707. log.Warnf("Could not obtain cfilter for %v",
  708. hashes[i])
  709. return
  710. }
  711. filterMsg := wire.NewMsgCFilter(
  712. msg.FilterType, &hashes[i], filterBytes,
  713. )
  714. sp.QueueMessage(filterMsg, nil)
  715. }
  716. }
  717. // OnGetCFHeaders is invoked when a peer receives a getcfheader bitcoin message.
  718. func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) {
  719. // Ignore getcfilterheader requests if not in sync.
  720. if !sp.server.syncManager.IsCurrent() {
  721. return
  722. }
  723. // We'll also ensure that the remote party is requesting a set of
  724. // headers for filters that we actually currently maintain.
  725. switch msg.FilterType {
  726. case wire.GCSFilterRegular:
  727. break
  728. default:
  729. log.Debug("Filter request for unknown headers for "+
  730. "filter: %v", msg.FilterType)
  731. return
  732. }
  733. startHeight := int32(msg.StartHeight)
  734. maxResults := wire.MaxCFHeadersPerMsg
  735. // If StartHeight is positive, fetch the predecessor block hash so we
  736. // can populate the PrevFilterHeader field.
  737. if msg.StartHeight > 0 {
  738. startHeight--
  739. maxResults++
  740. }
  741. // Fetch the hashes from the block index.
  742. hashList, err := sp.server.chain.HeightToHashRange(
  743. startHeight, &msg.StopHash, maxResults,
  744. )
  745. if err != nil {
  746. log.Debugf("Invalid getcfheaders request: %v", err)
  747. }
  748. // This is possible if StartHeight is one greater that the height of
  749. // StopHash, and we pull a valid range of hashes including the previous
  750. // filter header.
  751. if len(hashList) == 0 || (msg.StartHeight > 0 && len(hashList) == 1) {
  752. log.Debug("No results for getcfheaders request")
  753. return
  754. }
  755. // Create []*chainhash.Hash from []chainhash.Hash to pass to
  756. // FilterHeadersByBlockHashes.
  757. hashPtrs := make([]*chainhash.Hash, len(hashList))
  758. for i := range hashList {
  759. hashPtrs[i] = &hashList[i]
  760. }
  761. // Fetch the raw filter hash bytes from the database for all blocks.
  762. filterHashes, err := sp.server.cfIndex.FilterHashesByBlockHashes(
  763. hashPtrs, msg.FilterType,
  764. )
  765. if err != nil {
  766. log.Errorf("Error retrieving cfilter hashes: %v", err)
  767. return
  768. }
  769. // Generate cfheaders message and send it.
  770. headersMsg := wire.NewMsgCFHeaders()
  771. // Populate the PrevFilterHeader field.
  772. if msg.StartHeight > 0 {
  773. prevBlockHash := &hashList[0]
  774. // Fetch the raw committed filter header bytes from the
  775. // database.
  776. headerBytes, err := sp.server.cfIndex.FilterHeaderByBlockHash(
  777. prevBlockHash, msg.FilterType)
  778. if err != nil {
  779. log.Errorf("Error retrieving CF header: %v", err)
  780. return
  781. }
  782. if len(headerBytes) == 0 {
  783. log.Warnf("Could not obtain CF header for %v", prevBlockHash)
  784. return
  785. }
  786. // Deserialize the hash into PrevFilterHeader.
  787. err = headersMsg.PrevFilterHeader.SetBytes(headerBytes)
  788. if err != nil {
  789. log.Warnf("Committed filter header deserialize "+
  790. "failed: %v", err)
  791. return
  792. }
  793. hashList = hashList[1:]
  794. filterHashes = filterHashes[1:]
  795. }
  796. // Populate HeaderHashes.
  797. for i, hashBytes := range filterHashes {
  798. if len(hashBytes) == 0 {
  799. log.Warnf("Could not obtain CF hash for %v", hashList[i])
  800. return
  801. }
  802. // Deserialize the hash.
  803. filterHash, err := chainhash.NewHash(hashBytes)
  804. if err != nil {
  805. log.Warnf("Committed filter hash deserialize "+
  806. "failed: %v", err)
  807. return
  808. }
  809. headersMsg.AddCFHash(filterHash)
  810. }
  811. headersMsg.FilterType = msg.FilterType
  812. headersMsg.StopHash = msg.StopHash
  813. sp.QueueMessage(headersMsg, nil)
  814. }
  815. // OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin message.
  816. func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
  817. // Ignore getcfcheckpt requests if not in sync.
  818. if !sp.server.syncManager.IsCurrent() {
  819. return
  820. }
  821. // We'll also ensure that the remote party is requesting a set of
  822. // checkpoints for filters that we actually currently maintain.
  823. switch msg.FilterType {
  824. case wire.GCSFilterRegular:
  825. break
  826. default:
  827. log.Debug("Filter request for unknown checkpoints for "+
  828. "filter: %v", msg.FilterType)
  829. return
  830. }
  831. // Now that we know the client is fetching a filter that we know of,
  832. // we'll fetch the block hashes et each check point interval so we can
  833. // compare against our cache, and create new check points if necessary.
  834. blockHashes, err := sp.server.chain.IntervalBlockHashes(
  835. &msg.StopHash, wire.CFCheckptInterval,
  836. )
  837. if err != nil {
  838. log.Debugf("Invalid getcfilters request: %v", err)
  839. return
  840. }
  841. checkptMsg := wire.NewMsgCFCheckpt(
  842. msg.FilterType, &msg.StopHash, len(blockHashes),
  843. )
  844. // Fetch the current existing cache so we can decide if we need to
  845. // extend it or if its adequate as is.
  846. sp.server.cfCheckptCachesMtx.RLock()
  847. checkptCache := sp.server.cfCheckptCaches[msg.FilterType]
  848. // If the set of block hashes is beyond the current size of the cache,
  849. // then we'll expand the size of the cache and also retain the write
  850. // lock.
  851. var updateCache bool
  852. if len(blockHashes) > len(checkptCache) {
  853. // Now that we know we'll need to modify the size of the cache,
  854. // we'll release the read lock and grab the write lock to
  855. // possibly expand the cache size.
  856. sp.server.cfCheckptCachesMtx.RUnlock()
  857. sp.server.cfCheckptCachesMtx.Lock()
  858. defer sp.server.cfCheckptCachesMtx.Unlock()
  859. // Now that we have the write lock, we'll check again as it's
  860. // possible that the cache has already been expanded.
  861. checkptCache = sp.server.cfCheckptCaches[msg.FilterType]
  862. // If we still need to expand the cache, then We'll mark that
  863. // we need to update the cache for below and also expand the
  864. // size of the cache in place.
  865. if len(blockHashes) > len(checkptCache) {
  866. updateCache = true
  867. additionalLength := len(blockHashes) - len(checkptCache)
  868. newEntries := make([]cfHeaderKV, additionalLength)
  869. log.Infof("Growing size of checkpoint cache from %v to %v "+
  870. "block hashes", len(checkptCache), len(blockHashes))
  871. checkptCache = append(
  872. sp.server.cfCheckptCaches[msg.FilterType],
  873. newEntries...,
  874. )
  875. }
  876. } else {
  877. // Otherwise, we'll hold onto the read lock for the remainder
  878. // of this method.
  879. defer sp.server.cfCheckptCachesMtx.RUnlock()
  880. log.Tracef("Serving stale cache of size %v",
  881. len(checkptCache))
  882. }
  883. // Now that we know the cache is of an appropriate size, we'll iterate
  884. // backwards until the find the block hash. We do this as it's possible
  885. // a re-org has occurred so items in the db are now in the main china
  886. // while the cache has been partially invalidated.
  887. var forkIdx int
  888. for forkIdx = len(blockHashes); forkIdx > 0; forkIdx-- {
  889. if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] {
  890. break
  891. }
  892. }
  893. // Now that we know the how much of the cache is relevant for this
  894. // query, we'll populate our check point message with the cache as is.
  895. // Shortly below, we'll populate the new elements of the cache.
  896. for i := 0; i < forkIdx; i++ {
  897. checkptMsg.AddCFHeader(&checkptCache[i].filterHeader)
  898. }
  899. // We'll now collect the set of hashes that are beyond our cache so we
  900. // can look up the filter headers to populate the final cache.
  901. blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx)
  902. for i := forkIdx; i < len(blockHashes); i++ {
  903. blockHashPtrs = append(blockHashPtrs, &blockHashes[i])
  904. }
  905. filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes(
  906. blockHashPtrs, msg.FilterType,
  907. )
  908. if err != nil {
  909. log.Errorf("Error retrieving cfilter headers: %v", err)
  910. return
  911. }
  912. // Now that we have the full set of filter headers, we'll add them to
  913. // the checkpoint message, and also update our cache in line.
  914. for i, filterHeaderBytes := range filterHeaders {
  915. if len(filterHeaderBytes) == 0 {
  916. log.Warnf("Could not obtain CF header for %v",
  917. blockHashPtrs[i])
  918. return
  919. }
  920. filterHeader, err := chainhash.NewHash(filterHeaderBytes)
  921. if err != nil {
  922. log.Warnf("Committed filter header deserialize "+
  923. "failed: %v", err)
  924. return
  925. }
  926. checkptMsg.AddCFHeader(filterHeader)
  927. // If the new main chain is longer than what's in the cache,
  928. // then we'll override it beyond the fork point.
  929. if updateCache {
  930. checkptCache[forkIdx+i] = cfHeaderKV{
  931. blockHash: blockHashes[forkIdx+i],
  932. filterHeader: *filterHeader,
  933. }
  934. }
  935. }
  936. // Finally, we'll update the cache if we need to, and send the final
  937. // message back to the requesting peer.
  938. if updateCache {
  939. sp.server.cfCheckptCaches[msg.FilterType] = checkptCache
  940. }
  941. sp.QueueMessage(checkptMsg, nil)
  942. }
  943. // enforceNodeBloomFlag disconnects the peer if the server is not configured to
  944. // allow bloom filters. Additionally, if the peer has negotiated to a protocol
  945. // version that is high enough to observe the bloom filter service support bit,
  946. // it will be banned since it is intentionally violating the protocol.
  947. func (sp *serverPeer) enforceNodeBloomFlag(cmd string) bool {
  948. if sp.server.services&protocol.SFNodeBloom != protocol.SFNodeBloom {
  949. // Ban the peer if the protocol version is high enough that the
  950. // peer is knowingly violating the protocol and banning is
  951. // enabled.
  952. //
  953. // NOTE: Even though the addBanScore function already examines
  954. // whether or not banning is enabled, it is checked here as well
  955. // to ensure the violation is logged and the peer is
  956. // disconnected regardless.
  957. if sp.ProtocolVersion() >= protocol.BIP0111Version &&
  958. !cfg.DisableBanning {
  959. // Disconnect the peer regardless of whether it was
  960. // banned.
  961. sp.addBanScore(100, 0, cmd)
  962. sp.Disconnect()
  963. return false
  964. }
  965. // Disconnect the peer regardless of protocol version or banning
  966. // state.
  967. log.Debugf("%s sent an unsupported %s request -- "+
  968. "disconnecting", sp, cmd)
  969. sp.Disconnect()
  970. return false
  971. }
  972. return true
  973. }
  974. // OnFeeFilter is invoked when a peer receives a feefilter bitcoin message and
  975. // is used by remote peers to request that no transactions which have a fee rate
  976. // lower than provided value are inventoried to them. The peer will be
  977. // disconnected if an invalid fee filter value is provided.
  978. func (sp *serverPeer) OnFeeFilter(_ *peer.Peer, msg *wire.MsgFeeFilter) {
  979. // Check that the passed minimum fee is a valid amount.
  980. if msg.MinFee < 0 || msg.MinFee > int64(btcutil.MaxUnits()) {
  981. log.Debugf("Peer %v sent an invalid feefilter '%v' -- "+
  982. "disconnecting", sp, btcutil.Amount(msg.MinFee))
  983. sp.Disconnect()
  984. return
  985. }
  986. atomic.StoreInt64(&sp.feeFilter, msg.MinFee)
  987. }
  988. // OnFilterAdd is invoked when a peer receives a filteradd bitcoin
  989. // message and is used by remote peers to add data to an already loaded bloom
  990. // filter. The peer will be disconnected if a filter is not loaded when this
  991. // message is received or the server is not configured to allow bloom filters.
  992. func (sp *serverPeer) OnFilterAdd(_ *peer.Peer, msg *wire.MsgFilterAdd) {
  993. // Disconnect and/or ban depending on the node bloom services flag and
  994. // negotiated protocol version.
  995. if !sp.enforceNodeBloomFlag(msg.Command()) {
  996. return
  997. }
  998. if !sp.filter.IsLoaded() {
  999. log.Debugf("%s sent a filteradd request with no filter "+
  1000. "loaded -- disconnecting", sp)
  1001. sp.Disconnect()
  1002. return
  1003. }
  1004. sp.filter.Add(msg.Data)
  1005. }
  1006. // OnFilterClear is invoked when a peer receives a filterclear bitcoin
  1007. // message and is used by remote peers to clear an already loaded bloom filter.
  1008. // The peer will be disconnected if a filter is not loaded when this message is
  1009. // received or the server is not configured to allow bloom filters.
  1010. func (sp *serverPeer) OnFilterClear(_ *peer.Peer, msg *wire.MsgFilterClear) {
  1011. // Disconnect and/or ban depending on the node bloom services flag and
  1012. // negotiated protocol version.
  1013. if !sp.enforceNodeBloomFlag(msg.Command()) {
  1014. return
  1015. }
  1016. if !sp.filter.IsLoaded() {
  1017. log.Debugf("%s sent a filterclear request with no "+
  1018. "filter loaded -- disconnecting", sp)
  1019. sp.Disconnect()
  1020. return
  1021. }
  1022. sp.filter.Unload()
  1023. }
  1024. // OnFilterLoad is invoked when a peer receives a filterload bitcoin
  1025. // message and it used to load a bloom filter that should be used for
  1026. // delivering merkle blocks and associated transactions that match the filter.
  1027. // The peer will be disconnected if the server is not configured to allow bloom
  1028. // filters.
  1029. func (sp *serverPeer) OnFilterLoad(_ *peer.Peer, msg *wire.MsgFilterLoad) {
  1030. // Disconnect and/or ban depending on the node bloom services flag and
  1031. // negotiated protocol version.
  1032. if !sp.enforceNodeBloomFlag(msg.Command()) {
  1033. return
  1034. }
  1035. sp.setDisableRelayTx(false)
  1036. sp.filter.Reload(msg)
  1037. }
  1038. // OnGetAddr is invoked when a peer receives a getaddr bitcoin message
  1039. // and is used to provide the peer with known addresses from the address
  1040. // manager.
  1041. func (sp *serverPeer) OnGetAddr(_ *peer.Peer, msg *wire.MsgGetAddr) {
  1042. // Don't return any addresses when running on the simulation test
  1043. // network. This helps prevent the network from becoming another
  1044. // public test network since it will not be able to learn about other
  1045. // peers that have not specifically been provided.
  1046. if cfg.SimNet {
  1047. return
  1048. }
  1049. // Do not accept getaddr requests from outbound peers. This reduces
  1050. // fingerprinting attacks.
  1051. if !sp.Inbound() {
  1052. log.Debugf("Ignoring getaddr request from outbound peer [%v]", sp)
  1053. return
  1054. }
  1055. // Only allow one getaddr request per connection to discourage
  1056. // address stamping of inv announcements.
  1057. if sp.sentAddrs {
  1058. log.Debugf("Ignoring repeated getaddr request from peer [%v]", sp)
  1059. return
  1060. }
  1061. sp.sentAddrs = true
  1062. // Get the current known addresses from the address manager.
  1063. addrCache := sp.server.addrManager.AddressCache()
  1064. // Add the best addresses we have for peer discovery here - if
  1065. // we have a port of 0 then that means nothing good was found,
  1066. // so don't rebroracast that. At this point, we trim the cache
  1067. // size by one entry if we add a record so we don't flood past
  1068. // the maximum allowed size and trigger bans.
  1069. bestAddress := sp.server.addrManager.GetBestLocalAddress(sp.NA())
  1070. if bestAddress.Port != 0 {
  1071. if len(addrCache) > 0 {
  1072. addrCache = addrCache[1:]
  1073. }
  1074. addrCache = append(addrCache, bestAddress)
  1075. }
  1076. // Now, push the addresses we got.
  1077. sp.pushAddrMsg(addrCache)
  1078. }
  1079. // OnAddr is invoked when a peer receives an addr bitcoin message and is
  1080. // used to notify the server about advertised addresses.
  1081. func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) {
  1082. // Ignore addresses when running on the simulation test network. This
  1083. // helps prevent the network from becoming another public test network
  1084. // since it will not be able to learn about other peers that have not
  1085. // specifically been provided.
  1086. if cfg.SimNet {
  1087. return
  1088. }
  1089. // Ignore old style addresses which don't include a timestamp.
  1090. if sp.ProtocolVersion() < protocol.NetAddressTimeVersion {
  1091. return
  1092. }
  1093. // A message that has no addresses produces a warning.
  1094. if len(msg.AddrList) == 0 {
  1095. log.Warnf("Command [%s] from %s does not contain any addresses",
  1096. msg.Command(), sp.Peer)
  1097. }
  1098. for _, na := range msg.AddrList {
  1099. // Don't add more address if we're disconnecting.
  1100. if !sp.Connected() {
  1101. return
  1102. }
  1103. // Set the timestamp to 5 days ago if it's more than 24 hours
  1104. // in the future so this address is one of the first to be
  1105. // removed when space is needed.
  1106. now := time.Now()
  1107. if na.Timestamp.After(now.Add(time.Minute * 10)) {
  1108. na.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
  1109. }
  1110. // Add address to known addresses for this peer.
  1111. sp.addKnownAddresses([]*wire.NetAddress{na})
  1112. }
  1113. // Add addresses to server address manager. The address manager handles
  1114. // the details of things such as preventing duplicate addresses, max
  1115. // addresses, and last seen updates.
  1116. // XXX bitcoind gives a 2 hour time penalty here, do we want to do the
  1117. // same?
  1118. sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA())
  1119. }
  1120. // OnRead is invoked when a peer receives a message and it is used to update
  1121. // the bytes received by the server.
  1122. func (sp *serverPeer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err er.R) {
  1123. sp.server.AddBytesReceived(uint64(bytesRead))
  1124. }
  1125. // OnWrite is invoked when a peer sends a message and it is used to update
  1126. // the bytes sent by the server.
  1127. func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, msg wire.Message, err er.R) {
  1128. sp.server.AddBytesSent(uint64(bytesWritten))
  1129. }
  1130. // randomUint16Number returns a random uint16 in a specified input range. Note
  1131. // that the range is in zeroth ordering; if you pass it 1800, you will get
  1132. // values from 0 to 1800.
  1133. func randomUint16Number(max uint16) uint16 {
  1134. // In order to avoid modulo bias and ensure every possible outcome in
  1135. // [0, max) has equal probability, the random number must be sampled
  1136. // from a random source that has a range limited to a multiple of the
  1137. // modulus.
  1138. var randomNumber uint16
  1139. limitRange := (math.MaxUint16 / max) * max
  1140. for {
  1141. errr := binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
  1142. if errr != nil {
  1143. panic("randomUint16Number: binary.Read failed.")
  1144. }
  1145. if randomNumber < limitRange {
  1146. return (randomNumber % max)
  1147. }
  1148. }
  1149. }
  1150. // AddRebroadcastInventory adds 'iv' to the list of inventories to be
  1151. // rebroadcasted at random intervals until they show up in a block.
  1152. func (s *server) AddRebroadcastInventory(iv *wire.InvVect, data interface{}) {
  1153. // Ignore if shutting down.
  1154. if atomic.LoadInt32(&s.shutdown) != 0 {
  1155. return
  1156. }
  1157. s.modifyRebroadcastInv <- broadcastInventoryAdd{invVect: iv, data: data}
  1158. }
  1159. // RemoveRebroadcastInventory removes 'iv' from the list of items to be
  1160. // rebroadcasted if present.
  1161. func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) {
  1162. // Ignore if shutting down.
  1163. if atomic.LoadInt32(&s.shutdown) != 0 {
  1164. return
  1165. }
  1166. s.modifyRebroadcastInv <- broadcastInventoryDel(iv)
  1167. }
  1168. // relayTransactions generates and relays inventory vectors for all of the
  1169. // passed transactions to all connected peers.
  1170. func (s *server) relayTransactions(txns []*mempool.TxDesc) {
  1171. for _, txD := range txns {
  1172. iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash())
  1173. s.RelayInventory(iv, txD)
  1174. }
  1175. }
  1176. // AnnounceNewTransactions generates and relays inventory vectors and notifies
  1177. // both websocket and getblocktemplate long poll clients of the passed
  1178. // transactions. This function should be called whenever new transactions
  1179. // are added to the mempool.
  1180. func (s *server) AnnounceNewTransactions(txns []*mempool.TxDesc) {
  1181. // Generate and relay inventory vectors for all newly accepted
  1182. // transactions.
  1183. s.relayTransactions(txns)
  1184. // Notify both websocket and getblocktemplate long poll clients of all
  1185. // newly accepted transactions.
  1186. if s.rpcServer != nil {
  1187. s.rpcServer.NotifyNewTransactions(txns)
  1188. }
  1189. }
  1190. // Transaction has one confirmation on the main chain. Now we can mark it as no
  1191. // longer needing rebroadcasting.
  1192. func (s *server) TransactionConfirmed(tx *btcutil.Tx) {
  1193. // Rebroadcasting is only necessary when the RPC server is active.
  1194. if s.rpcServer == nil {
  1195. return
  1196. }
  1197. iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
  1198. s.RemoveRebroadcastInventory(iv)
  1199. }
  1200. // pushTxMsg sends a tx message for the provided transaction hash to the
  1201. // connected peer. An error is returned if the transaction hash is not known.
  1202. func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{},
  1203. waitChan <-chan struct{}, encoding wire.MessageEncoding) er.R {
  1204. // Attempt to fetch the requested transaction from the pool. A
  1205. // call could be made to check for existence first, but simply trying
  1206. // to fetch a missing transaction results in the same behavior.
  1207. tx, err := s.txMemPool.FetchTransaction(hash)
  1208. if err != nil {
  1209. log.Tracef("Unable to fetch tx %v from transaction "+
  1210. "pool: %v", hash, err)
  1211. if doneChan != nil {
  1212. doneChan <- struct{}{}
  1213. }
  1214. return err
  1215. }
  1216. // Once we have fetched data wait for any previous operation to finish.
  1217. if waitChan != nil {
  1218. <-waitChan
  1219. }
  1220. sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding)
  1221. return nil
  1222. }
  1223. // pushBlockMsg sends a block message for the provided block hash to the
  1224. // connected peer. An error is returned if the block hash is not known.
  1225. func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{},
  1226. waitChan <-chan struct{}, encoding wire.MessageEncoding) er.R {
  1227. // Fetch the raw block bytes from the database.
  1228. var blockBytes []byte
  1229. err := sp.server.db.View(func(dbTx database.Tx) er.R {
  1230. var err er.R
  1231. blockBytes, err = dbTx.FetchBlock(hash)
  1232. return err
  1233. })
  1234. if err != nil {
  1235. log.Tracef("Unable to fetch requested block hash %v: %v",
  1236. hash, err)
  1237. if doneChan != nil {
  1238. doneChan <- struct{}{}
  1239. }
  1240. return err
  1241. }
  1242. // Deserialize the block.
  1243. var msgBlock wire.MsgBlock
  1244. err = msgBlock.Deserialize(bytes.NewReader(blockBytes))
  1245. if err != nil {
  1246. log.Tracef("Unable to deserialize requested block hash "+
  1247. "%v: %v", hash, err)
  1248. if doneChan != nil {
  1249. doneChan <- struct{}{}
  1250. }
  1251. return err
  1252. }
  1253. // Once we have fetched data wait for any previous operation to finish.
  1254. if waitChan != nil {
  1255. <-waitChan
  1256. }
  1257. // We only send the channel for this message if we aren't sending
  1258. // an inv straight after.
  1259. var dc chan<- struct{}
  1260. continueHash := sp.continueHash
  1261. sendInv := continueHash != nil && continueHash.IsEqual(hash)
  1262. if !sendInv {
  1263. dc = doneChan
  1264. }
  1265. sp.QueueMessageWithEncoding(&msgBlock, dc, encoding)
  1266. // When the peer requests the final block that was advertised in
  1267. // response to a getblocks message which requested more blocks than
  1268. // would fit into a single message, send it a new inventory message
  1269. // to trigger it to issue another getblocks message for the next
  1270. // batch of inventory.
  1271. if sendInv {
  1272. best := sp.server.chain.BestSnapshot()
  1273. invMsg := wire.NewMsgInvSizeHint(1)
  1274. iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash)
  1275. invMsg.AddInvVect(iv)
  1276. sp.QueueMessage(invMsg, doneChan)
  1277. sp.continueHash = nil
  1278. }
  1279. return nil
  1280. }
  1281. // pushMerkleBlockMsg sends a merkleblock message for the provided block hash to
  1282. // the connected peer. Since a merkle block requires the peer to have a filter
  1283. // loaded, this call will simply be ignored if there is no filter loaded. An
  1284. // error is returned if the block hash is not known.
  1285. func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash,
  1286. doneChan chan<- struct{}, waitChan <-chan struct{}, encoding wire.MessageEncoding) er.R {
  1287. // Do not send a response if the peer doesn't have a filter loaded.
  1288. if !sp.filter.IsLoaded() {
  1289. if doneChan != nil {
  1290. doneChan <- struct{}{}
  1291. }
  1292. return nil
  1293. }
  1294. // Fetch the raw block bytes from the database.
  1295. blk, err := sp.server.chain.BlockByHash(hash)
  1296. if err != nil {
  1297. log.Tracef("Unable to fetch requested block hash %v: %v",
  1298. hash, err)
  1299. if doneChan != nil {
  1300. doneChan <- struct{}{}
  1301. }
  1302. return err
  1303. }
  1304. // Generate a merkle block by filtering the requested block according
  1305. // to the filter for the peer.
  1306. merkle, matchedTxIndices := bloom.NewMerkleBlock(blk, sp.filter)
  1307. // Once we have fetched data wait for any previous operation to finish.
  1308. if waitChan != nil {
  1309. <-waitChan
  1310. }
  1311. // Send the merkleblock. Only send the done channel with this message
  1312. // if no transactions will be sent afterwards.
  1313. var dc chan<- struct{}
  1314. if len(matchedTxIndices) == 0 {
  1315. dc = doneChan
  1316. }
  1317. sp.QueueMessage(merkle, dc)
  1318. // Finally, send any matched transactions.
  1319. blkTransactions := blk.MsgBlock().Transactions
  1320. for i, txIndex := range matchedTxIndices {
  1321. // Only send the done channel on the final transaction.
  1322. var dc chan<- struct{}
  1323. if i == len(matchedTxIndices)-1 {
  1324. dc = doneChan
  1325. }
  1326. if txIndex < uint32(len(blkTransactions)) {
  1327. sp.QueueMessageWithEncoding(blkTransactions[txIndex], dc,
  1328. encoding)
  1329. }
  1330. }
  1331. return nil
  1332. }
  1333. // handleUpdatePeerHeight updates the heights of all peers who were known to
  1334. // announce a block we recently accepted.
  1335. func (s *server) handleUpdatePeerHeights(state *peerState, umsg updatePeerHeightsMsg) {
  1336. state.forAllPeers(func(sp *serverPeer) {
  1337. // The origin peer should already have the updated height.
  1338. if sp.Peer == umsg.originPeer {
  1339. return
  1340. }
  1341. // This is a pointer to the underlying memory which doesn't
  1342. // change.
  1343. latestBlkHash := sp.LastAnnouncedBlock()
  1344. // Skip this peer if it hasn't recently announced any new blocks.
  1345. if latestBlkHash == nil {
  1346. return
  1347. }
  1348. // If the peer has recently announced a block, and this block
  1349. // matches our newly accepted block, then update their block
  1350. // height.
  1351. if *latestBlkHash == *umsg.newHash {
  1352. sp.UpdateLastBlockHeight(umsg.newHeight)
  1353. sp.UpdateLastAnnouncedBlock(nil)
  1354. }
  1355. })
  1356. }
  1357. // handleAddPeerMsg deals with adding new peers. It is invoked from the
  1358. // peerHandler goroutine.
  1359. func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
  1360. if sp == nil || !sp.Connected() {
  1361. return false
  1362. }
  1363. // Disconnect peers with unwanted user agents.
  1364. if sp.HasUndesiredUserAgent(s.agentBlacklist, s.agentWhitelist) {
  1365. sp.Disconnect()
  1366. return false
  1367. }
  1368. // Ignore new peers if we're shutting down.
  1369. if atomic.LoadInt32(&s.shutdown) != 0 {
  1370. log.Infof("New peer %s ignored - server is shutting down", sp)
  1371. sp.Disconnect()
  1372. return false
  1373. }
  1374. // Disconnect banned peers.
  1375. host, _, err := net.SplitHostPort(sp.Addr())
  1376. if err != nil {
  1377. log.Debugf("can't split hostport %v", err)
  1378. sp.Disconnect()
  1379. return false
  1380. }
  1381. if banEnd, ok := state.banned[host]; ok {
  1382. if time.Now().Before(banEnd) {
  1383. log.Debugf("Peer %s is banned for another %v - disconnecting",
  1384. host, time.Until(banEnd))
  1385. sp.Disconnect()
  1386. return false
  1387. }
  1388. log.Infof("Peer %s is no longer banned", host)
  1389. delete(state.banned, host)
  1390. }
  1391. // TODO: Check for max peers from a single IP.
  1392. // Limit max number of total peers.
  1393. if state.Count() >= cfg.MaxPeers {
  1394. log.Infof("Max peers reached [%d] - disconnecting peer %s",
  1395. cfg.MaxPeers, sp)
  1396. sp.Disconnect()
  1397. // TODO: how to handle permanent peers here?
  1398. // they should be rescheduled.
  1399. return false
  1400. }
  1401. // Add the new peer and start it.
  1402. log.Debugf("New peer %s", sp)
  1403. if sp.Inbound() {
  1404. state.inboundPeers[sp.ID()] = sp
  1405. } else {
  1406. state.outboundGroups[addrmgr.GroupKey(sp.NA())]++
  1407. if sp.persistent {
  1408. state.persistentPeers[sp.ID()] = sp
  1409. } else {
  1410. state.outboundPeers[sp.ID()] = sp
  1411. }
  1412. }
  1413. // Update the address' last seen time if the peer has acknowledged
  1414. // our version and has sent us its version as well.
  1415. if sp.VerAckReceived() && sp.VersionKnown() && sp.NA() != nil {
  1416. s.addrManager.Connected(sp.NA())
  1417. }
  1418. // Signal the sync manager this peer is a new sync candidate.
  1419. s.syncManager.NewPeer(sp.Peer)
  1420. // Update the address manager and request known addresses from the
  1421. // remote peer for outbound connections. This is skipped when running on
  1422. // the simulation test network since it is only intended to connect to
  1423. // specified peers and actively avoids advertising and connecting to
  1424. // discovered peers.
  1425. if !cfg.SimNet && !sp.Inbound() {
  1426. // Advertise the local address when the server accepts incoming
  1427. // connections and it believes itself to be close to the best
  1428. // known tip.
  1429. if !cfg.DisableListen && s.syncManager.IsCurrent() {
  1430. // Get address that best matches.
  1431. lna := s.addrManager.GetBestLocalAddress(sp.NA())
  1432. if addrmgr.IsRoutable(lna) {
  1433. // Filter addresses the peer already knows about.
  1434. addresses := []*wire.NetAddress{lna}
  1435. sp.pushAddrMsg(addresses)
  1436. }
  1437. }
  1438. // Request known addresses if the server address manager needs
  1439. // more and the peer has a protocol version new enough to
  1440. // include a timestamp with addresses.
  1441. hasTimestamp := sp.ProtocolVersion() >= protocol.NetAddressTimeVersion
  1442. if s.addrManager.NeedMoreAddresses() && hasTimestamp {
  1443. sp.QueueMessage(wire.NewMsgGetAddr(), nil)
  1444. }
  1445. // Mark the address as a known good address.
  1446. s.addrManager.Good(sp.NA())
  1447. }
  1448. // Notify the connection manager of finality
  1449. s.connManager.NotifyConnectionRequestActuallyCompleted()
  1450. return true
  1451. }
  1452. // handleDonePeerMsg deals with peers that have signaled they are done. It is
  1453. // invoked from the peerHandler goroutine.
  1454. func (s *server) handleDonePeerMsg(state *peerState, sp *serverPeer) {
  1455. var list map[int32]*serverPeer
  1456. if sp.persistent {
  1457. list = state.persistentPeers
  1458. } else if sp.Inbound() {
  1459. list = state.inboundPeers
  1460. } else {
  1461. list = state.outboundPeers
  1462. }
  1463. // Regardless of whether the peer was found in our list, we'll inform
  1464. // our connection manager about the disconnection. This can happen if we
  1465. // process a peer's `done` message before its `add`.
  1466. if !sp.Inbound() {
  1467. if sp.persistent {
  1468. s.connManager.Disconnect(sp.connReq.ID())
  1469. } else {
  1470. s.connManager.Remove(sp.connReq.ID())
  1471. go s.connManager.NewConnReq()
  1472. }
  1473. }
  1474. if _, ok := list[sp.ID()]; ok {
  1475. if !sp.Inbound() && sp.VersionKnown() {
  1476. state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
  1477. }
  1478. delete(list, sp.ID())
  1479. log.Debugf("Removed peer %s", sp)
  1480. return
  1481. }
  1482. }
  1483. // handleBanPeerMsg deals with banning peers. It is invoked from the
  1484. // peerHandler goroutine.
  1485. func (s *server) handleBanPeerMsg(state *peerState, sp *serverPeer) {
  1486. host, _, err := net.SplitHostPort(sp.Addr())
  1487. if err != nil {
  1488. log.Debugf("can't split ban peer %s %v", sp.Addr(), err)
  1489. return
  1490. }
  1491. direction := directionString(sp.Inbound())
  1492. log.Infof("Banned peer %s (%s) for %v", host, direction,
  1493. cfg.BanDuration)
  1494. state.banned[host] = time.Now().Add(cfg.BanDuration)
  1495. }
  1496. func (s *server) sendInvMsgToPeer(sp *serverPeer, msg relayMsg) bool {
  1497. if !sp.Connected() {
  1498. return false
  1499. }
  1500. // If the inventory is a block and the peer prefers headers,
  1501. // generate and send a headers message instead of an inventory
  1502. // message.
  1503. if msg.invVect.Type == wire.InvTypeBlock && sp.WantsHeaders() {
  1504. blockHeader, ok := msg.data.(wire.BlockHeader)
  1505. if !ok {
  1506. log.Warnf("Underlying data for headers" +
  1507. " is not a block header")
  1508. return false
  1509. }
  1510. msgHeaders := wire.NewMsgHeaders()
  1511. if err := msgHeaders.AddBlockHeader(&blockHeader); err != nil {
  1512. log.Errorf("Failed to add block"+
  1513. " header: %v", err)
  1514. return false
  1515. }
  1516. sp.QueueMessage(msgHeaders, nil)
  1517. return false
  1518. }
  1519. if msg.invVect.Type == wire.InvTypeTx {
  1520. // Don't relay the transaction to the peer when it has
  1521. // transaction relaying disabled.
  1522. if sp.relayTxDisabled() {
  1523. return false
  1524. }
  1525. txD, ok := msg.data.(*mempool.TxDesc)
  1526. if !ok {
  1527. log.Warnf("Underlying data for tx inv "+
  1528. "relay is not a *mempool.TxDesc: %T",
  1529. msg.data)
  1530. return false
  1531. }
  1532. // Don't relay the transaction if the transaction fee-per-kb
  1533. // is less than the peer's feefilter.
  1534. feeFilter := atomic.LoadInt64(&sp.feeFilter)
  1535. if feeFilter > 0 && txD.FeePerKB < feeFilter {
  1536. return false
  1537. }
  1538. // Don't relay the transaction if there is a bloom
  1539. // filter loaded and the transaction doesn't match it.
  1540. if sp.filter.IsLoaded() {
  1541. if !sp.filter.MatchTxAndUpdate(txD.Tx) {
  1542. return false
  1543. }
  1544. }
  1545. }
  1546. // Queue the inventory to be relayed with the next batch.
  1547. // It will be ignored if the peer is already known to
  1548. // have the inventory.
  1549. sp.QueueInventory(msg.invVect)
  1550. return true
  1551. }
  1552. // handleRelayInvMsg deals with relaying inventory to peers that are not already
  1553. // known to have it. It is invoked from the peerHandler goroutine.
  1554. func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) {
  1555. state.forAllPeers(func(sp *serverPeer) {
  1556. s.sendInvMsgToPeer(sp, msg)
  1557. })
  1558. }
  1559. // handleBroadcastMsg deals with broadcasting messages to peers. It is invoked
  1560. // from the peerHandler goroutine.
  1561. func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) {
  1562. state.forAllPeers(func(sp *serverPeer) {
  1563. if !sp.Connected() {
  1564. return
  1565. }
  1566. for _, ep := range bmsg.excludePeers {
  1567. if sp == ep {
  1568. return
  1569. }
  1570. }
  1571. sp.QueueMessage(bmsg.message, nil)
  1572. })
  1573. }
  1574. type getConnCountMsg struct {
  1575. reply chan int32
  1576. }
  1577. type getPeersMsg struct {
  1578. reply chan []*serverPeer
  1579. }
  1580. type getOutboundGroup struct {
  1581. key string
  1582. reply chan int
  1583. }
  1584. type getAddedNodesMsg struct {
  1585. reply chan []*serverPeer
  1586. }
  1587. type disconnectNodeMsg struct {
  1588. cmp func(*serverPeer) bool
  1589. reply chan er.R
  1590. }
  1591. type connectNodeMsg struct {
  1592. addr string
  1593. permanent bool
  1594. reply chan er.R
  1595. }
  1596. type removeNodeMsg struct {
  1597. cmp func(*serverPeer) bool
  1598. reply chan er.R
  1599. }
  1600. // handleQuery is the central handler for all queries and commands from other
  1601. // goroutines related to peer state.
  1602. func (s *server) handleQuery(state *peerState, querymsg interface{}) {
  1603. switch msg := querymsg.(type) {
  1604. case getConnCountMsg:
  1605. nconnected := int32(0)
  1606. state.forAllPeers(func(sp *serverPeer) {
  1607. if sp.Connected() {
  1608. nconnected++
  1609. }
  1610. })
  1611. msg.reply <- nconnected
  1612. case getPeersMsg:
  1613. peers := make([]*serverPeer, 0, state.Count())
  1614. state.forAllPeers(func(sp *serverPeer) {
  1615. if !sp.Connected() {
  1616. return
  1617. }
  1618. peers = append(peers, sp)
  1619. })
  1620. msg.reply <- peers
  1621. case connectNodeMsg:
  1622. // TODO: duplicate oneshots?
  1623. // Limit max number of total peers.
  1624. if state.Count() >= cfg.MaxPeers {
  1625. msg.reply <- er.New("max peers reached")
  1626. return
  1627. }
  1628. for _, peer := range state.persistentPeers {
  1629. if peer.Addr() == msg.addr {
  1630. if msg.permanent {
  1631. msg.reply <- er.New("peer already connected")
  1632. } else {
  1633. msg.reply <- er.New("peer exists as a permanent peer")
  1634. }
  1635. return
  1636. }
  1637. }
  1638. netAddr, err := addrStringToNetAddr(msg.addr)
  1639. if err != nil {
  1640. msg.reply <- err
  1641. return
  1642. }
  1643. // TODO: if too many, nuke a non-perm peer.
  1644. go s.connManager.Connect(&connmgr.ConnReq{
  1645. Addr: netAddr,
  1646. Permanent: msg.permanent,
  1647. })
  1648. msg.reply <- nil
  1649. case removeNodeMsg:
  1650. found := disconnectPeer(state.persistentPeers, msg.cmp, func(sp *serverPeer) {
  1651. // Keep group counts ok since we remove from
  1652. // the list now.
  1653. state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
  1654. })
  1655. if found {
  1656. msg.reply <- nil
  1657. } else {
  1658. msg.reply <- er.New("peer not found")
  1659. }
  1660. case getOutboundGroup:
  1661. count, ok := state.outboundGroups[msg.key]
  1662. if ok {
  1663. msg.reply <- count
  1664. } else {
  1665. msg.reply <- 0
  1666. }
  1667. // Request a list of the persistent (added) peers.
  1668. case getAddedNodesMsg:
  1669. // Respond with a slice of the relevant peers.
  1670. peers := make([]*serverPeer, 0, len(state.persistentPeers))
  1671. for _, sp := range state.persistentPeers {
  1672. peers = append(peers, sp)
  1673. }
  1674. msg.reply <- peers
  1675. case disconnectNodeMsg:
  1676. // Check inbound peers. We pass a nil callback since we don't
  1677. // require any additional actions on disconnect for inbound peers.
  1678. found := disconnectPeer(state.inboundPeers, msg.cmp, nil)
  1679. if found {
  1680. msg.reply <- nil
  1681. return
  1682. }
  1683. // Check outbound peers.
  1684. found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
  1685. // Keep group counts ok since we remove from
  1686. // the list now.
  1687. state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
  1688. })
  1689. if found {
  1690. // If there are multiple outbound connections to the same
  1691. // ip:port, continue disconnecting them all until no such
  1692. // peers are found.
  1693. for found {
  1694. found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
  1695. state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
  1696. })
  1697. }
  1698. msg.reply <- nil
  1699. return
  1700. }
  1701. msg.reply <- er.New("peer not found")
  1702. }
  1703. }
  1704. // disconnectPeer attempts to drop the connection of a targeted peer in the
  1705. // passed peer list. Targets are identified via usage of the passed
  1706. // `compareFunc`, which should return `true` if the passed peer is the target
  1707. // peer. This function returns true on success and false if the peer is unable
  1708. // to be located. If the peer is found, and the passed callback: `whenFound'
  1709. // isn't nil, we call it with the peer as the argument before it is removed
  1710. // from the peerList, and is disconnected from the server.
  1711. func disconnectPeer(peerList map[int32]*serverPeer, compareFunc func(*serverPeer) bool, whenFound func(*serverPeer)) bool {
  1712. for addr, peer := range peerList {
  1713. if compareFunc(peer) {
  1714. if whenFound != nil {
  1715. whenFound(peer)
  1716. }
  1717. // This is ok because we are not continuing
  1718. // to iterate so won't corrupt the loop.
  1719. delete(peerList, addr)
  1720. peer.Disconnect()
  1721. return true
  1722. }
  1723. }
  1724. return false
  1725. }
  1726. // newPeerConfig returns the configuration for the given serverPeer.
  1727. func newPeerConfig(sp *serverPeer) *peer.Config {
  1728. return &peer.Config{
  1729. Listeners: peer.MessageListeners{
  1730. OnVersion: sp.OnVersion,
  1731. OnVerAck: sp.OnVerAck,
  1732. OnMemPool: sp.OnMemPool,
  1733. OnTx: sp.OnTx,
  1734. OnBlock: sp.OnBlock,
  1735. OnInv: sp.OnInv,
  1736. OnHeaders: sp.OnHeaders,
  1737. OnGetData: sp.OnGetData,
  1738. OnGetBlocks: sp.OnGetBlocks,
  1739. OnGetHeaders: sp.OnGetHeaders,
  1740. OnGetCFilters: sp.OnGetCFilters,
  1741. OnGetCFHeaders: sp.OnGetCFHeaders,
  1742. OnGetCFCheckpt: sp.OnGetCFCheckpt,
  1743. OnFeeFilter: sp.OnFeeFilter,
  1744. OnFilterAdd: sp.OnFilterAdd,
  1745. OnFilterClear: sp.OnFilterClear,
  1746. OnFilterLoad: sp.OnFilterLoad,
  1747. OnGetAddr: sp.OnGetAddr,
  1748. OnAddr: sp.OnAddr,
  1749. OnRead: sp.OnRead,
  1750. OnWrite: sp.OnWrite,
  1751. },
  1752. NewestBlock: sp.newestBlock,
  1753. HostToNetAddress: sp.server.addrManager.HostToNetAddress,
  1754. UserAgentName: version.UserAgentName(),
  1755. UserAgentVersion: version.UserAgentVersion(),
  1756. UserAgentComments: cfg.UserAgentComments,
  1757. ChainParams: sp.server.chainParams,
  1758. Services: sp.server.services,
  1759. DisableRelayTx: cfg.BlocksOnly,
  1760. ProtocolVersion: peer.MaxProtocolVersion,
  1761. TrickleInterval: cfg.TrickleInterval,
  1762. }
  1763. }
  1764. // inboundPeerConnected is invoked by the connection manager when a new inbound
  1765. // connection is established. It initializes a new inbound server peer
  1766. // instance, associates it with the connection, and starts a goroutine to wait
  1767. // for disconnection.
  1768. func (s *server) inboundPeerConnected(conn net.Conn) {
  1769. sp := newServerPeer(s, false)
  1770. sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
  1771. sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
  1772. sp.AssociateConnection(conn)
  1773. go s.peerDoneHandler(sp)
  1774. }
  1775. // outboundPeerConnected is invoked by the connection manager when a new
  1776. // outbound connection is established. It initializes a new outbound server
  1777. // peer instance, associates it with the relevant state such as the connection
  1778. // request instance and the connection itself, and finally notifies the address
  1779. // manager of the attempt.
  1780. func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
  1781. sp := newServerPeer(s, c.Permanent)
  1782. p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
  1783. if err != nil {
  1784. log.Debugf("Cannot create outbound peer %s: %v", c.Addr, err)
  1785. if c.Permanent {
  1786. s.connManager.Disconnect(c.ID())
  1787. } else {
  1788. s.connManager.Remove(c.ID())
  1789. go s.connManager.NewConnReq()
  1790. }
  1791. return
  1792. }
  1793. sp.Peer = p
  1794. sp.connReq = c
  1795. sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
  1796. sp.AssociateConnection(conn)
  1797. go s.peerDoneHandler(sp)
  1798. }
  1799. // peerDoneHandler handles peer disconnects by notifiying the server that it's
  1800. // done along with other performing other desirable cleanup.
  1801. func (s *server) peerDoneHandler(sp *serverPeer) {
  1802. sp.WaitForDisconnect()
  1803. s.donePeers <- sp
  1804. if sp.VerAckReceived() {
  1805. s.syncManager.DonePeer(sp.Peer)
  1806. // Evict any remaining orphans that were sent by the peer.
  1807. numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID()))
  1808. if numEvicted > 0 {
  1809. log.Debugf("Evicted %d orphan(s) from peer %v (id %d)",
  1810. numEvicted, sp, sp.ID())
  1811. }
  1812. }
  1813. close(sp.quit)
  1814. }
  1815. func (s *server) sendRandomInv(state *peerState) {
  1816. descs := s.txMemPool.TxDescs()
  1817. if len(descs) == 0 {
  1818. return
  1819. }
  1820. winningTx := descs[mathrand.Intn(len(descs))]
  1821. candidates := make([]*serverPeer, 0, state.Count())
  1822. state.forAllPeers(func(sp *serverPeer) {
  1823. if !sp.Connected() {
  1824. return
  1825. }
  1826. candidates = append(candidates, sp)
  1827. })
  1828. iv := wire.NewInvVect(wire.InvTypeTx, winningTx.Tx.Hash())
  1829. msg := relayMsg{invVect: iv, data: winningTx}
  1830. for {
  1831. if len(candidates) == 0 {
  1832. return
  1833. }
  1834. i := mathrand.Intn(len(candidates))
  1835. winningPeer := candidates[i]
  1836. if !s.sendInvMsgToPeer(winningPeer, msg) {
  1837. candidates = append(candidates[:i], candidates[i+1:]...)
  1838. continue
  1839. }
  1840. log.Debugf("Sending random tx [%s] to random peer [%s]",
  1841. winningTx.Tx.Hash().String(), winningPeer.String())
  1842. break
  1843. }
  1844. }
  1845. // peerHandler is used to handle peer operations such as adding and removing
  1846. // peers to and from the server, banning peers, and broadcasting messages to
  1847. // peers. It must be run in a goroutine.
  1848. func (s *server) peerHandler() {
  1849. // Start the address manager and sync manager, both of which are needed
  1850. // by peers. This is done here since their lifecycle is closely tied
  1851. // to this handler and rather than adding more channels to sychronize
  1852. // things, it's easier and slightly faster to simply start and stop them
  1853. // in this handler.
  1854. s.addrManager.Start()
  1855. s.syncManager.Start()
  1856. log.Tracef("Starting peer handler")
  1857. randomInvTicker := time.NewTicker(time.Second * 10)
  1858. state := &peerState{
  1859. inboundPeers: make(map[int32]*serverPeer),
  1860. persistentPeers: make(map[int32]*serverPeer),
  1861. outboundPeers: make(map[int32]*serverPeer),
  1862. banned: make(map[string]time.Time),
  1863. outboundGroups: make(map[string]int),
  1864. }
  1865. if !cfg.DisableDNSSeed {
  1866. // Add peers discovered through DNS to the address manager.
  1867. connmgr.SeedFromDNS(activeNetParams.Params, defaultRequiredServices,
  1868. pktdLookup, func(addrs []*wire.NetAddress) {
  1869. // Bitcoind uses a lookup of the dns seeder here. This
  1870. // is rather strange since the values looked up by the
  1871. // DNS seed lookups will vary quite a lot.
  1872. // to replicate this behavior we put all addresses as
  1873. // having come from the first one.
  1874. s.addrManager.AddAddresses(addrs, addrs[0])
  1875. })
  1876. }
  1877. go s.connManager.Start()
  1878. out:
  1879. for {
  1880. select {
  1881. // New peers connected to the server.
  1882. case p := <-s.newPeers:
  1883. s.handleAddPeerMsg(state, p)
  1884. // Disconnected peers.
  1885. case p := <-s.donePeers:
  1886. s.handleDonePeerMsg(state, p)
  1887. // Block accepted in mainchain or orphan, update peer height.
  1888. case umsg := <-s.peerHeightsUpdate:
  1889. s.handleUpdatePeerHeights(state, umsg)
  1890. // Peer to ban.
  1891. case p := <-s.banPeers:
  1892. s.handleBanPeerMsg(state, p)
  1893. // New inventory to potentially be relayed to other peers.
  1894. case invMsg := <-s.relayInv:
  1895. s.handleRelayInvMsg(state, invMsg)
  1896. // Message to broadcast to all connected peers except those
  1897. // which are excluded by the message.
  1898. case bmsg := <-s.broadcast:
  1899. s.handleBroadcastMsg(state, &bmsg)
  1900. case qmsg := <-s.query:
  1901. s.handleQuery(state, qmsg)
  1902. case <-s.quit:
  1903. // Disconnect all peers on server shutdown.
  1904. state.forAllPeers(func(sp *serverPeer) {
  1905. log.Tracef("Shutdown peer %s", sp)
  1906. sp.Disconnect()
  1907. })
  1908. break out
  1909. case <-randomInvTicker.C:
  1910. s.sendRandomInv(state)
  1911. }
  1912. }
  1913. s.connManager.Stop()
  1914. s.syncManager.Stop()
  1915. s.addrManager.Stop()
  1916. // Drain channels before exiting so nothing is left waiting around
  1917. // to send.
  1918. cleanup:
  1919. for {
  1920. select {
  1921. case <-s.newPeers:
  1922. case <-s.donePeers:
  1923. case <-s.peerHeightsUpdate:
  1924. case <-s.relayInv:
  1925. case <-s.broadcast:
  1926. case <-s.query:
  1927. default:
  1928. break cleanup
  1929. }
  1930. }
  1931. s.wg.Done()
  1932. log.Tracef("Peer handler done")
  1933. }
  1934. // AddPeer adds a new peer that has already been connected to the server.
  1935. func (s *server) AddPeer(sp *serverPeer) {
  1936. s.newPeers <- sp
  1937. }
  1938. // BanPeer bans a peer that has already been connected to the server by ip.
  1939. func (s *server) BanPeer(sp *serverPeer) {
  1940. s.banPeers <- sp
  1941. }
  1942. // RelayInventory relays the passed inventory vector to all connected peers
  1943. // that are not already known to have it.
  1944. func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}) {
  1945. s.relayInv <- relayMsg{invVect: invVect, data: data}
  1946. }
  1947. // BroadcastMessage sends msg to all peers currently connected to the server
  1948. // except those in the passed peers to exclude.
  1949. func (s *server) BroadcastMessage(msg wire.Message, exclPeers ...*serverPeer) {
  1950. bmsg := broadcastMsg{message: msg, excludePeers: exclPeers}
  1951. s.broadcast <- bmsg
  1952. }
  1953. // ConnectedCount returns the number of currently connected peers.
  1954. func (s *server) ConnectedCount() int32 {
  1955. replyChan := make(chan int32)
  1956. s.query <- getConnCountMsg{reply: replyChan}
  1957. return <-replyChan
  1958. }
  1959. // OutboundGroupCount returns the number of peers connected to the given
  1960. // outbound group key.
  1961. func (s *server) OutboundGroupCount(key string) int {
  1962. replyChan := make(chan int)
  1963. s.query <- getOutboundGroup{key: key, reply: replyChan}
  1964. return <-replyChan
  1965. }
  1966. // AddBytesSent adds the passed number of bytes to the total bytes sent counter
  1967. // for the server. It is safe for concurrent access.
  1968. func (s *server) AddBytesSent(bytesSent uint64) {
  1969. atomic.AddUint64(&s.bytesSent, bytesSent)
  1970. }
  1971. // AddBytesReceived adds the passed number of bytes to the total bytes received
  1972. // counter for the server. It is safe for concurrent access.
  1973. func (s *server) AddBytesReceived(bytesReceived uint64) {
  1974. atomic.AddUint64(&s.bytesReceived, bytesReceived)
  1975. }
  1976. // NetTotals returns the sum of all bytes received and sent across the network
  1977. // for all peers. It is safe for concurrent access.
  1978. func (s *server) NetTotals() (uint64, uint64) {
  1979. return atomic.LoadUint64(&s.bytesReceived),
  1980. atomic.LoadUint64(&s.bytesSent)
  1981. }
  1982. // UpdatePeerHeights updates the heights of all peers who have have announced
  1983. // the latest connected main chain block, or a recognized orphan. These height
  1984. // updates allow us to dynamically refresh peer heights, ensuring sync peer
  1985. // selection has access to the latest block heights for each peer.
  1986. func (s *server) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *peer.Peer) {
  1987. s.peerHeightsUpdate <- updatePeerHeightsMsg{
  1988. newHash: latestBlkHash,
  1989. newHeight: latestHeight,
  1990. originPeer: updateSource,
  1991. }
  1992. }
  1993. // rebroadcastHandler keeps track of user submitted inventories that we have
  1994. // sent out but have not yet made it into a block. We periodically rebroadcast
  1995. // them in case our peers restarted or otherwise lost track of them.
  1996. func (s *server) rebroadcastHandler() {
  1997. // Wait 5 min before first tx rebroadcast.
  1998. timer := time.NewTimer(5 * time.Minute)
  1999. pendingInvs := make(map[wire.InvVect]interface{})
  2000. out:
  2001. for {
  2002. select {
  2003. case riv := <-s.modifyRebroadcastInv:
  2004. switch msg := riv.(type) {
  2005. // Incoming InvVects are added to our map of RPC txs.
  2006. case broadcastInventoryAdd:
  2007. pendingInvs[*msg.invVect] = msg.data
  2008. // When an InvVect has been added to a block, we can
  2009. // now remove it, if it was present.
  2010. case broadcastInventoryDel:
  2011. delete(pendingInvs, *msg)
  2012. }
  2013. case <-timer.C:
  2014. // Any inventory we have has not made it into a block
  2015. // yet. We periodically resubmit them until they have.
  2016. for iv, data := range pendingInvs {
  2017. ivCopy := iv
  2018. s.RelayInventory(&ivCopy, data)
  2019. }
  2020. // Process at a random time up to 30mins (in seconds)
  2021. // in the future.
  2022. timer.Reset(time.Second *
  2023. time.Duration(randomUint16Number(1800)))
  2024. case <-s.quit:
  2025. break out
  2026. }
  2027. }
  2028. timer.Stop()
  2029. // Drain channels before exiting so nothing is left waiting around
  2030. // to send.
  2031. cleanup:
  2032. for {
  2033. select {
  2034. case <-s.modifyRebroadcastInv:
  2035. default:
  2036. break cleanup
  2037. }
  2038. }
  2039. s.wg.Done()
  2040. }
  2041. // Start begins accepting connections from peers.
  2042. func (s *server) Start() {
  2043. // Already started?
  2044. if atomic.AddInt32(&s.started, 1) != 1 {
  2045. return
  2046. }
  2047. log.Trace("Starting server")
  2048. // Start the peer handler which in turn starts the address and block
  2049. // managers.
  2050. s.wg.Add(1)
  2051. go s.peerHandler()
  2052. if s.nat != nil {
  2053. s.wg.Add(1)
  2054. go s.upnpUpdateThread()
  2055. }
  2056. if !cfg.DisableRPC {
  2057. s.wg.Add(1)
  2058. // Start the rebroadcastHandler, which ensures user tx received by
  2059. // the RPC server are rebroadcast until being included in a block.
  2060. go s.rebroadcastHandler()
  2061. s.rpcServer.Start()
  2062. }
  2063. // Start the CPU miner if generation is enabled.
  2064. if cfg.Generate {
  2065. s.cpuMiner.Start()
  2066. }
  2067. }
  2068. // Stop gracefully shuts down the server by stopping and disconnecting all
  2069. // peers and the main listener.
  2070. func (s *server) Stop() er.R {
  2071. // Make sure this only happens once.
  2072. if atomic.AddInt32(&s.shutdown, 1) != 1 {
  2073. log.Infof("Server is already in the process of shutting down")
  2074. return nil
  2075. }
  2076. log.Warnf("Server shutting down")
  2077. // Stop the CPU miner if needed
  2078. s.cpuMiner.Stop()
  2079. // Shutdown the RPC server if it's not disabled.
  2080. if !cfg.DisableRPC {
  2081. s.rpcServer.Stop()
  2082. }
  2083. // Save fee estimator state in the database.
  2084. s.db.Update(func(tx database.Tx) er.R {
  2085. metadata := tx.Metadata()
  2086. metadata.Put(mempool.EstimateFeeDatabaseKey, s.feeEstimator.Save())
  2087. return nil
  2088. })
  2089. // Signal the remaining goroutines to quit.
  2090. close(s.quit)
  2091. return nil
  2092. }
  2093. // WaitForShutdown blocks until the main listener and peer handlers are stopped.
  2094. func (s *server) WaitForShutdown() {
  2095. s.wg.Wait()
  2096. }
  2097. // parseListeners determines whether each listen address is IPv4 and IPv6 and
  2098. // returns a slice of appropriate net.Addrs to listen on with TCP. It also
  2099. // properly detects addresses which apply to "all interfaces" and adds the
  2100. // address as both IPv4 and IPv6.
  2101. func parseListeners(addrs []string) ([]net.Addr, er.R) {
  2102. netAddrs := make([]net.Addr, 0, len(addrs)*2)
  2103. for _, addr := range addrs {
  2104. host, _, errr := net.SplitHostPort(addr)
  2105. if errr != nil {
  2106. // Shouldn't happen due to already being normalized.
  2107. return nil, er.E(errr)
  2108. }
  2109. // Empty host or host of * on plan9 is both IPv4 and IPv6.
  2110. if host == "" || (host == "*" && runtime.GOOS == "plan9") {
  2111. netAddrs = append(netAddrs, simpleAddr{net: "tcp4", addr: addr}, simpleAddr{net: "tcp6", addr: addr})
  2112. continue
  2113. }
  2114. // Strip IPv6 zone id if present since net.ParseIP does not
  2115. // handle it.
  2116. zoneIndex := strings.LastIndex(host, "%")
  2117. if zoneIndex > 0 {
  2118. host = host[:zoneIndex]
  2119. }
  2120. // Parse the IP.
  2121. ip := net.ParseIP(host)
  2122. if ip == nil {
  2123. return nil, er.Errorf("'%s' is not a valid IP address", host)
  2124. }
  2125. // To4 returns nil when the IP is not an IPv4 address, so use
  2126. // this determine the address type.
  2127. if ip.To4() == nil {
  2128. netAddrs = append(netAddrs, simpleAddr{net: "tcp6", addr: addr})
  2129. } else {
  2130. netAddrs = append(netAddrs, simpleAddr{net: "tcp4", addr: addr})
  2131. }
  2132. }
  2133. return netAddrs, nil
  2134. }
  2135. func (s *server) upnpUpdateThread() {
  2136. // Go off immediately to prevent code duplication, thereafter we renew
  2137. // lease every 15 minutes.
  2138. timer := time.NewTimer(0 * time.Second)
  2139. lport, errr := strconv.ParseInt(activeNetParams.DefaultPort, 10, 16)
  2140. if errr == nil {
  2141. panic("upnpUpdateThread: lport == nil")
  2142. }
  2143. first := true
  2144. out:
  2145. for {
  2146. select {
  2147. case <-timer.C:
  2148. // TODO: pick external port more cleverly
  2149. // TODO: know which ports we are listening to on an external net.
  2150. // TODO: if specific listen port doesn't work then ask for wildcard
  2151. // listen port?
  2152. // XXX this assumes timeout is in seconds.
  2153. listenPort, err := s.nat.AddPortMapping("tcp", int(lport), int(lport),
  2154. "pktd listen port", 20*60)
  2155. if err != nil {
  2156. log.Warnf("can't add UPnP port mapping: %v", err)
  2157. }
  2158. if first && err == nil {
  2159. // TODO: look this up periodically to see if upnp domain changed
  2160. // and so did ip.
  2161. externalip, err := s.nat.GetExternalAddress()
  2162. if err != nil {
  2163. log.Warnf("UPnP can't get external address: %v", err)
  2164. continue out
  2165. }
  2166. na := wire.NewNetAddressIPPort(externalip, uint16(listenPort),
  2167. s.services)
  2168. err = s.addrManager.AddLocalAddress(na, addrmgr.UpnpPrio)
  2169. if err != nil {
  2170. log.Warnf("UPnP AddLocalAddress() failed %v", err)
  2171. err = s.nat.DeletePortMapping("tcp", int(lport), int(lport))
  2172. if err != nil {
  2173. log.Warnf("UPnP DeletePortMapping() failed %v", err)
  2174. }
  2175. continue
  2176. }
  2177. log.Warnf("Successfully bound via UPnP to %s", addrmgr.NetAddressKey(na))
  2178. first = false
  2179. }
  2180. timer.Reset(time.Minute * 15)
  2181. case <-s.quit:
  2182. break out
  2183. }
  2184. }
  2185. timer.Stop()
  2186. if err := s.nat.DeletePortMapping("tcp", int(lport), int(lport)); err != nil {
  2187. log.Warnf("unable to remove UPnP port mapping: %v", err)
  2188. } else {
  2189. log.Debugf("successfully disestablished UPnP port mapping")
  2190. }
  2191. s.wg.Done()
  2192. }
  2193. // setupRPCListeners returns a slice of listeners that are configured for use
  2194. // with the RPC server depending on the configuration settings for listen
  2195. // addresses and TLS.
  2196. func setupRPCListeners() ([]net.Listener, er.R) {
  2197. // Setup TLS if not disabled.
  2198. listenFunc := net.Listen
  2199. if cfg.EnableTLS {
  2200. // Generate the TLS cert and key file if both don't already
  2201. // exist.
  2202. if !fileExists(cfg.RPCKey) && !fileExists(cfg.RPCCert) {
  2203. err := genCertPair(cfg.RPCCert, cfg.RPCKey)
  2204. if err != nil {
  2205. return nil, err
  2206. }
  2207. }
  2208. keypair, errr := tls.LoadX509KeyPair(cfg.RPCCert, cfg.RPCKey)
  2209. if errr != nil {
  2210. return nil, er.E(errr)
  2211. }
  2212. tlsConfig := tls.Config{
  2213. Certificates: []tls.Certificate{keypair},
  2214. MinVersion: tls.VersionTLS12,
  2215. }
  2216. // Change the standard net.Listen function to the tls one.
  2217. listenFunc = func(net, laddr string) (net.Listener, error) {
  2218. return tls.Listen(net, laddr, &tlsConfig)
  2219. }
  2220. }
  2221. netAddrs, err := parseListeners(cfg.RPCListeners)
  2222. if err != nil {
  2223. return nil, err
  2224. }
  2225. listeners := make([]net.Listener, 0, len(netAddrs))
  2226. for _, addr := range netAddrs {
  2227. listener, err := listenFunc(addr.Network(), addr.String())
  2228. if err != nil {
  2229. log.Warnf("Can't listen on %s: %v", addr, err)
  2230. continue
  2231. }
  2232. listeners = append(listeners, listener)
  2233. }
  2234. return listeners, nil
  2235. }
  2236. // newServer returns a new pktd server configured to listen on addr for the
  2237. // bitcoin network type specified by chainParams. Use start to begin accepting
  2238. // connections from peers.
  2239. func newServer(listenAddrs, agentBlacklist, agentWhitelist []string,
  2240. db database.DB, chainParams *chaincfg.Params,
  2241. interrupt <-chan struct{}) (*server, er.R) {
  2242. services := defaultServices
  2243. if cfg.NoPeerBloomFilters {
  2244. services &^= protocol.SFNodeBloom
  2245. }
  2246. if cfg.NoCFilters {
  2247. services &^= protocol.SFNodeCF
  2248. }
  2249. amgr := addrmgr.New(cfg.DataDir, pktdLookup)
  2250. var listeners []net.Listener
  2251. var nat NAT
  2252. if !cfg.DisableListen {
  2253. var err er.R
  2254. listeners, nat, err = initListeners(amgr, listenAddrs, services)
  2255. if err != nil {
  2256. return nil, err
  2257. }
  2258. if len(listeners) == 0 {
  2259. return nil, er.New("no valid listen address")
  2260. }
  2261. }
  2262. if len(agentBlacklist) > 0 {
  2263. log.Infof("User-agent blacklist %s", agentBlacklist)
  2264. }
  2265. if len(agentWhitelist) > 0 {
  2266. log.Infof("User-agent whitelist %s", agentWhitelist)
  2267. }
  2268. s := server{
  2269. startupTime: time.Now().Unix(),
  2270. chainParams: chainParams,
  2271. addrManager: amgr,
  2272. newPeers: make(chan *serverPeer, cfg.MaxPeers),
  2273. donePeers: make(chan *serverPeer, cfg.MaxPeers),
  2274. banPeers: make(chan *serverPeer, cfg.MaxPeers),
  2275. query: make(chan interface{}),
  2276. relayInv: make(chan relayMsg, cfg.MaxPeers),
  2277. broadcast: make(chan broadcastMsg, cfg.MaxPeers),
  2278. quit: make(chan struct{}),
  2279. modifyRebroadcastInv: make(chan interface{}),
  2280. peerHeightsUpdate: make(chan updatePeerHeightsMsg),
  2281. nat: nat,
  2282. db: db,
  2283. timeSource: blockchain.NewMedianTime(),
  2284. services: services,
  2285. sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize),
  2286. hashCache: txscript.NewHashCache(cfg.SigCacheMaxSize),
  2287. cfCheckptCaches: make(map[wire.FilterType][]cfHeaderKV),
  2288. agentBlacklist: agentBlacklist,
  2289. agentWhitelist: agentWhitelist,
  2290. }
  2291. // Create the transaction and address indexes if needed.
  2292. //
  2293. // CAUTION: the txindex needs to be first in the indexes array because
  2294. // the addrindex uses data from the txindex during catchup. If the
  2295. // addrindex is run first, it may not have the transactions from the
  2296. // current block indexed.
  2297. var indexes []indexers.Indexer
  2298. if cfg.TxIndex || cfg.AddrIndex {
  2299. // Enable transaction index if address index is enabled since it
  2300. // requires it.
  2301. if !cfg.TxIndex {
  2302. log.Infof("Transaction index enabled because it " +
  2303. "is required by the address index")
  2304. cfg.TxIndex = true
  2305. } else {
  2306. log.Info("Transaction index is enabled")
  2307. }
  2308. s.txIndex = indexers.NewTxIndex(db)
  2309. indexes = append(indexes, s.txIndex)
  2310. }
  2311. if cfg.AddrIndex {
  2312. log.Info("Address index is enabled")
  2313. s.addrIndex = indexers.NewAddrIndex(db, chainParams)
  2314. indexes = append(indexes, s.addrIndex)
  2315. }
  2316. if !cfg.NoCFilters {
  2317. log.Info("Committed filter index is enabled")
  2318. s.cfIndex = indexers.NewCfIndex(db, chainParams)
  2319. indexes = append(indexes, s.cfIndex)
  2320. }
  2321. // Create an index manager if any of the optional indexes are enabled.
  2322. var indexManager blockchain.IndexManager
  2323. if len(indexes) > 0 {
  2324. indexManager = indexers.NewManager(db, indexes)
  2325. }
  2326. // Merge given checkpoints with the default ones unless they are disabled.
  2327. var checkpoints []chaincfg.Checkpoint
  2328. if !cfg.DisableCheckpoints {
  2329. checkpoints = mergeCheckpoints(s.chainParams.Checkpoints, cfg.addCheckpoints)
  2330. }
  2331. // Create a new block chain instance with the appropriate configuration.
  2332. var err er.R
  2333. s.chain, err = blockchain.New(&blockchain.Config{
  2334. DB: s.db,
  2335. Interrupt: interrupt,
  2336. ChainParams: s.chainParams,
  2337. Checkpoints: checkpoints,
  2338. TimeSource: s.timeSource,
  2339. SigCache: s.sigCache,
  2340. IndexManager: indexManager,
  2341. HashCache: s.hashCache,
  2342. })
  2343. if err != nil {
  2344. return nil, err
  2345. }
  2346. // Search for a FeeEstimator state in the database. If none can be found
  2347. // or if it cannot be loaded, create a new one.
  2348. db.Update(func(tx database.Tx) er.R {
  2349. metadata := tx.Metadata()
  2350. feeEstimationData := metadata.Get(mempool.EstimateFeeDatabaseKey)
  2351. if feeEstimationData != nil {
  2352. // delete it from the database so that we don't try to restore the
  2353. // same thing again somehow.
  2354. metadata.Delete(mempool.EstimateFeeDatabaseKey)
  2355. // If there is an error, log it and make a new fee estimator.
  2356. var err er.R
  2357. s.feeEstimator, err = mempool.RestoreFeeEstimator(feeEstimationData)
  2358. if err != nil {
  2359. log.Errorf("Failed to restore fee estimator %v", err)
  2360. }
  2361. }
  2362. return nil
  2363. })
  2364. // If no feeEstimator has been found, or if the one that has been found
  2365. // is behind somehow, create a new one and start over.
  2366. if s.feeEstimator == nil || s.feeEstimator.LastKnownHeight() != s.chain.BestSnapshot().Height {
  2367. s.feeEstimator = mempool.NewFeeEstimator(
  2368. mempool.DefaultEstimateFeeMaxRollback,
  2369. mempool.DefaultEstimateFeeMinRegisteredBlocks)
  2370. }
  2371. txC := mempool.Config{
  2372. Policy: mempool.Policy{
  2373. DisableRelayPriority: cfg.NoRelayPriority,
  2374. AcceptNonStd: cfg.RelayNonStd,
  2375. FreeTxRelayLimit: cfg.FreeTxRelayLimit,
  2376. MaxOrphanTxs: cfg.MaxOrphanTxs,
  2377. MaxOrphanTxSize: defaultMaxOrphanTxSize,
  2378. MaxSigOpCostPerTx: blockchain.MaxBlockSigOpsCost / 4,
  2379. MinRelayTxFee: cfg.minRelayTxFee,
  2380. MaxTxVersion: 2,
  2381. RejectReplacement: cfg.RejectReplacement,
  2382. },
  2383. ChainParams: chainParams,
  2384. FetchUtxoView: s.chain.FetchUtxoView,
  2385. BestHeight: func() int32 { return s.chain.BestSnapshot().Height },
  2386. MedianTimePast: func() time.Time { return s.chain.BestSnapshot().MedianTime },
  2387. CalcSequenceLock: func(tx *btcutil.Tx, view *blockchain.UtxoViewpoint) (*blockchain.SequenceLock, er.R) {
  2388. return s.chain.CalcSequenceLock(tx, view, true)
  2389. },
  2390. IsDeploymentActive: s.chain.IsDeploymentActive,
  2391. SigCache: s.sigCache,
  2392. HashCache: s.hashCache,
  2393. AddrIndex: s.addrIndex,
  2394. FeeEstimator: s.feeEstimator,
  2395. }
  2396. s.txMemPool = mempool.New(&txC)
  2397. s.syncManager, err = netsync.New(&netsync.Config{
  2398. PeerNotifier: &s,
  2399. Chain: s.chain,
  2400. TxMemPool: s.txMemPool,
  2401. ChainParams: s.chainParams,
  2402. DisableCheckpoints: cfg.DisableCheckpoints,
  2403. MaxPeers: cfg.MaxPeers,
  2404. FeeEstimator: s.feeEstimator,
  2405. })
  2406. if err != nil {
  2407. return nil, err
  2408. }
  2409. msc := 0
  2410. switch cfg.MiningSkipChecks {
  2411. case "txns":
  2412. msc = mining.CheckTxns
  2413. case "template":
  2414. msc = mining.CheckBlkTemplate
  2415. case "both":
  2416. msc = mining.CheckBoth
  2417. }
  2418. // Create the mining policy and block template generator based on the
  2419. // configuration options.
  2420. //
  2421. // NOTE: The CPU miner relies on the mempool, so the mempool has to be
  2422. // created before calling the function to create the CPU miner.
  2423. policy := mining.Policy{
  2424. SkipChecks: msc,
  2425. BlockMinWeight: cfg.BlockMinWeight,
  2426. BlockMaxWeight: cfg.BlockMaxWeight,
  2427. BlockMinSize: cfg.BlockMinSize,
  2428. BlockMaxSize: cfg.BlockMaxSize,
  2429. BlockPrioritySize: cfg.BlockPrioritySize,
  2430. TxMinFreeFee: cfg.minRelayTxFee,
  2431. }
  2432. blockTemplateGenerator := mining.NewBlkTmplGenerator(&policy,
  2433. s.chainParams, s.txMemPool, s.chain, s.timeSource,
  2434. s.sigCache, s.hashCache)
  2435. s.cpuMiner = cpuminer.New(&cpuminer.Config{
  2436. ChainParams: chainParams,
  2437. BlockTemplateGenerator: blockTemplateGenerator,
  2438. MiningAddrs: cfg.miningAddrs,
  2439. ProcessBlock: s.syncManager.ProcessBlock,
  2440. ConnectedCount: s.ConnectedCount,
  2441. IsCurrent: s.syncManager.IsCurrent,
  2442. })
  2443. // Only setup a function to return new addresses to connect to when
  2444. // not running in connect-only mode. The simulation network is always
  2445. // in connect-only mode since it is only intended to connect to
  2446. // specified peers and actively avoid advertising and connecting to
  2447. // discovered peers in order to prevent it from becoming a public test
  2448. // network.
  2449. var newAddressFunc func() (net.Addr, er.R)
  2450. if !cfg.SimNet && !cfg.RegressionTest && len(cfg.ConnectPeers) == 0 {
  2451. newAddressFunc = func() (net.Addr, er.R) {
  2452. for tries := 0; tries < 100; tries++ {
  2453. addr := s.addrManager.GetAddress()
  2454. if addr == nil {
  2455. break
  2456. }
  2457. // Address will not be invalid, local or unroutable
  2458. // because addrmanager rejects those on addition.
  2459. // Just check that we don't already have an address
  2460. // in the same group so that we are not connecting
  2461. // to the same network segment at the expense of
  2462. // others.
  2463. key := addrmgr.GroupKey(addr.NetAddress())
  2464. if s.OutboundGroupCount(key) != 0 {
  2465. continue
  2466. }
  2467. // only allow recent nodes (10mins) after we failed 10
  2468. // times
  2469. lastTime := s.addrManager.GetLastAttempt(addr.NetAddress())
  2470. if tries < 10 && time.Since(lastTime) < 10*time.Minute {
  2471. continue
  2472. }
  2473. // allow nondefault ports after 20 failed tries.
  2474. if tries < 20 && fmt.Sprintf("%d", addr.NetAddress().Port) !=
  2475. activeNetParams.DefaultPort {
  2476. continue
  2477. }
  2478. // Mark an attempt for the valid address.
  2479. s.addrManager.Attempt(addr.NetAddress())
  2480. addrString := addrmgr.NetAddressKey(addr.NetAddress())
  2481. return addrStringToNetAddr(addrString)
  2482. }
  2483. return nil, er.New("no valid connect address")
  2484. }
  2485. }
  2486. // Create a connection manager.
  2487. targetOutbound := defaultTargetOutbound
  2488. if cfg.MaxPeers < targetOutbound {
  2489. targetOutbound = cfg.MaxPeers
  2490. }
  2491. cmgr, err := connmgr.New(&connmgr.Config{
  2492. Listeners: listeners,
  2493. OnAccept: s.inboundPeerConnected,
  2494. RetryDuration: connectionRetryInterval,
  2495. TargetOutbound: uint32(targetOutbound),
  2496. Dial: pktdDial,
  2497. OnConnection: s.outboundPeerConnected,
  2498. GetNewAddress: newAddressFunc,
  2499. })
  2500. if err != nil {
  2501. return nil, err
  2502. }
  2503. s.connManager = cmgr
  2504. // Start up persistent peers.
  2505. permanentPeers := cfg.ConnectPeers
  2506. if len(permanentPeers) == 0 {
  2507. permanentPeers = cfg.AddPeers
  2508. }
  2509. for _, addr := range permanentPeers {
  2510. netAddr, err := addrStringToNetAddr(addr)
  2511. if err != nil {
  2512. return nil, err
  2513. }
  2514. go s.connManager.Connect(&connmgr.ConnReq{
  2515. Addr: netAddr,
  2516. Permanent: true,
  2517. })
  2518. }
  2519. if !cfg.DisableRPC {
  2520. // Setup listeners for the configured RPC listen addresses and
  2521. // TLS settings.
  2522. rpcListeners, err := setupRPCListeners()
  2523. if err != nil {
  2524. return nil, err
  2525. }
  2526. if len(rpcListeners) == 0 {
  2527. return nil, er.New("RPCS: No valid listen address")
  2528. }
  2529. s.rpcServer, err = newRPCServer(&rpcserverConfig{
  2530. Listeners: rpcListeners,
  2531. StartupTime: s.startupTime,
  2532. ConnMgr: &rpcConnManager{&s},
  2533. SyncMgr: &rpcSyncMgr{&s, s.syncManager},
  2534. TimeSource: s.timeSource,
  2535. Chain: s.chain,
  2536. ChainParams: chainParams,
  2537. DB: db,
  2538. TxMemPool: s.txMemPool,
  2539. Generator: blockTemplateGenerator,
  2540. CPUMiner: s.cpuMiner,
  2541. TxIndexOrNil: s.txIndex,
  2542. AddrIndex: s.addrIndex,
  2543. CfIndex: s.cfIndex,
  2544. FeeEstimator: s.feeEstimator,
  2545. ServiceFlags: services,
  2546. })
  2547. if err != nil {
  2548. return nil, err
  2549. }
  2550. // Signal process shutdown when the RPC server requests it.
  2551. go func() {
  2552. <-s.rpcServer.RequestedProcessShutdown()
  2553. shutdownRequestChannel <- struct{}{}
  2554. }()
  2555. }
  2556. return &s, nil
  2557. }
  2558. // initListeners initializes the configured net listeners and adds any bound
  2559. // addresses to the address manager. Returns the listeners and a NAT interface,
  2560. // which is non-nil if UPnP is in use.
  2561. func initListeners(amgr *addrmgr.AddrManager, listenAddrs []string, services protocol.ServiceFlag) ([]net.Listener, NAT, er.R) {
  2562. // Listen for TCP connections at the configured addresses
  2563. netAddrs, err := parseListeners(listenAddrs)
  2564. if err != nil {
  2565. return nil, nil, err
  2566. }
  2567. listeners := make([]net.Listener, 0, len(netAddrs))
  2568. for _, addr := range netAddrs {
  2569. listener, err := net.Listen(addr.Network(), addr.String())
  2570. if err != nil {
  2571. log.Warnf("Can't listen on %s: %v", addr, err)
  2572. continue
  2573. }
  2574. listeners = append(listeners, listener)
  2575. }
  2576. var nat NAT
  2577. if len(cfg.ExternalIPs) != 0 {
  2578. defaultPort, errr := strconv.ParseUint(activeNetParams.DefaultPort, 10, 16)
  2579. if errr != nil {
  2580. log.Errorf("Can not parse default port %s for active chain: %v",
  2581. activeNetParams.DefaultPort, errr)
  2582. return nil, nil, er.E(errr)
  2583. }
  2584. for _, sip := range cfg.ExternalIPs {
  2585. eport := uint16(defaultPort)
  2586. host, portstr, errr := net.SplitHostPort(sip)
  2587. if errr != nil {
  2588. // no port, use default.
  2589. host = sip
  2590. } else {
  2591. port, err := strconv.ParseUint(portstr, 10, 16)
  2592. if err != nil {
  2593. log.Warnf("Can not parse port from %s for "+
  2594. "externalip: %v", sip, err)
  2595. continue
  2596. }
  2597. eport = uint16(port)
  2598. }
  2599. na, err := amgr.HostToNetAddress(host, eport, services)
  2600. if err != nil {
  2601. log.Warnf("Not adding %s as externalip: %v", sip, err)
  2602. continue
  2603. }
  2604. err = amgr.AddLocalAddress(na, addrmgr.ManualPrio)
  2605. if err != nil {
  2606. log.Warnf("Skipping specified external IP: %v", err)
  2607. }
  2608. }
  2609. } else {
  2610. if cfg.Upnp {
  2611. var err er.R
  2612. nat, err = Discover()
  2613. if err != nil {
  2614. log.Warnf("Can't discover upnp: %v", err)
  2615. }
  2616. // nil nat here is fine, just means no upnp on network.
  2617. }
  2618. // Add bound addresses to address manager to be advertised to peers.
  2619. for _, listener := range listeners {
  2620. addr := listener.Addr().String()
  2621. err := addLocalAddress(amgr, addr, services)
  2622. if err != nil {
  2623. log.Warnf("Skipping bound address %s: %v", addr, err)
  2624. }
  2625. }
  2626. }
  2627. return listeners, nat, nil
  2628. }
  2629. // addrStringToNetAddr takes an address in the form of 'host:port'
  2630. // and returns a net.Addr which maps to the original address with
  2631. // any host names resolved to IP addresses.
  2632. func addrStringToNetAddr(addr string) (net.Addr, er.R) {
  2633. host, strPort, errr := net.SplitHostPort(addr)
  2634. if errr != nil {
  2635. return nil, er.E(errr)
  2636. }
  2637. port, errr := strconv.Atoi(strPort)
  2638. if errr != nil {
  2639. return nil, er.E(errr)
  2640. }
  2641. // Skip if host is already an IP address.
  2642. if ip := net.ParseIP(host); ip != nil {
  2643. return &net.TCPAddr{
  2644. IP: ip,
  2645. Port: port,
  2646. }, nil
  2647. }
  2648. // Attempt to look up an IP address associated with the parsed host.
  2649. ips, err := pktdLookup(host)
  2650. if err != nil {
  2651. return nil, err
  2652. }
  2653. if len(ips) == 0 {
  2654. return nil, er.Errorf("no addresses found for %s", host)
  2655. }
  2656. return &net.TCPAddr{
  2657. IP: ips[0],
  2658. Port: port,
  2659. }, nil
  2660. }
  2661. // addLocalAddress adds an address that this node is listening on to the
  2662. // address manager so that it may be relayed to peers.
  2663. func addLocalAddress(addrMgr *addrmgr.AddrManager, addr string, services protocol.ServiceFlag) er.R {
  2664. host, portStr, errr := net.SplitHostPort(addr)
  2665. if errr != nil {
  2666. return er.E(errr)
  2667. }
  2668. port, errr := strconv.ParseUint(portStr, 10, 16)
  2669. if errr != nil {
  2670. return er.E(errr)
  2671. }
  2672. if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() {
  2673. // If bound to unspecified address, advertise all local interfaces
  2674. addrs, errr := net.InterfaceAddrs()
  2675. if errr != nil {
  2676. return er.E(errr)
  2677. }
  2678. for _, addr := range addrs {
  2679. ifaceIP, _, err := net.ParseCIDR(addr.String())
  2680. if err != nil {
  2681. continue
  2682. }
  2683. // If bound to 0.0.0.0, do not add IPv6 interfaces and if bound to
  2684. // ::, do not add IPv4 interfaces.
  2685. if (ip.To4() == nil) != (ifaceIP.To4() == nil) {
  2686. continue
  2687. }
  2688. netAddr := wire.NewNetAddressIPPort(ifaceIP, uint16(port), services)
  2689. addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio)
  2690. }
  2691. } else {
  2692. netAddr, err := addrMgr.HostToNetAddress(host, uint16(port), services)
  2693. if err != nil {
  2694. return err
  2695. }
  2696. addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio)
  2697. }
  2698. return nil
  2699. }
  2700. // isWhitelisted returns whether the IP address is included in the whitelisted
  2701. // networks and IPs.
  2702. func isWhitelisted(addr net.Addr) bool {
  2703. if len(cfg.whitelists) == 0 {
  2704. return false
  2705. }
  2706. host, _, err := net.SplitHostPort(addr.String())
  2707. if err != nil {
  2708. log.Warnf("Unable to SplitHostPort on '%s': %v", addr, err)
  2709. return false
  2710. }
  2711. ip := net.ParseIP(host)
  2712. if ip == nil {
  2713. log.Warnf("Unable to parse IP '%s'", addr)
  2714. return false
  2715. }
  2716. for _, ipnet := range cfg.whitelists {
  2717. if ipnet.Contains(ip) {
  2718. return true
  2719. }
  2720. }
  2721. return false
  2722. }
  2723. // checkpointSorter implements sort.Interface to allow a slice of checkpoints to
  2724. // be sorted.
  2725. type checkpointSorter []chaincfg.Checkpoint
  2726. // Len returns the number of checkpoints in the slice. It is part of the
  2727. // sort.Interface implementation.
  2728. func (s checkpointSorter) Len() int {
  2729. return len(s)
  2730. }
  2731. // Swap swaps the checkpoints at the passed indices. It is part of the
  2732. // sort.Interface implementation.
  2733. func (s checkpointSorter) Swap(i, j int) {
  2734. s[i], s[j] = s[j], s[i]
  2735. }
  2736. // Less returns whether the checkpoint with index i should sort before the
  2737. // checkpoint with index j. It is part of the sort.Interface implementation.
  2738. func (s checkpointSorter) Less(i, j int) bool {
  2739. return s[i].Height < s[j].Height
  2740. }
  2741. // mergeCheckpoints returns two slices of checkpoints merged into one slice
  2742. // such that the checkpoints are sorted by height. In the case the additional
  2743. // checkpoints contain a checkpoint with the same height as a checkpoint in the
  2744. // default checkpoints, the additional checkpoint will take precedence and
  2745. // overwrite the default one.
  2746. func mergeCheckpoints(defaultCheckpoints, additional []chaincfg.Checkpoint) []chaincfg.Checkpoint {
  2747. // Create a map of the additional checkpoints to remove duplicates while
  2748. // leaving the most recently-specified checkpoint.
  2749. extra := make(map[int32]chaincfg.Checkpoint)
  2750. for _, checkpoint := range additional {
  2751. extra[checkpoint.Height] = checkpoint
  2752. }
  2753. // Add all default checkpoints that do not have an override in the
  2754. // additional checkpoints.
  2755. numDefault := len(defaultCheckpoints)
  2756. checkpoints := make([]chaincfg.Checkpoint, 0, numDefault+len(extra))
  2757. for _, checkpoint := range defaultCheckpoints {
  2758. if _, exists := extra[checkpoint.Height]; !exists {
  2759. checkpoints = append(checkpoints, checkpoint)
  2760. }
  2761. }
  2762. // Append the additional checkpoints and return the sorted results.
  2763. for _, checkpoint := range extra {
  2764. checkpoints = append(checkpoints, checkpoint)
  2765. }
  2766. sort.Sort(checkpointSorter(checkpoints))
  2767. return checkpoints
  2768. }
  2769. // HasUndesiredUserAgent determines whether the server should continue to pursue
  2770. // a connection with this peer based on its advertised user agent. It performs
  2771. // the following steps:
  2772. // 1) Reject the peer if it contains a blacklisted agent.
  2773. // 2) If no whitelist is provided, accept all user agents.
  2774. // 3) Accept the peer if it contains a whitelisted agent.
  2775. // 4) Reject all other peers.
  2776. func (sp *serverPeer) HasUndesiredUserAgent(blacklistedAgents,
  2777. whitelistedAgents []string) bool {
  2778. agent := sp.UserAgent()
  2779. // First, if peer's user agent contains any blacklisted substring, we
  2780. // will ignore the connection request.
  2781. for _, blacklistedAgent := range blacklistedAgents {
  2782. if strings.Contains(agent, blacklistedAgent) {
  2783. log.Debugf("Ignoring peer %s, user agent "+
  2784. "contains blacklisted user agent: %s", sp,
  2785. agent)
  2786. return true
  2787. }
  2788. }
  2789. // If no whitelist is provided, we will accept all user agents.
  2790. if len(whitelistedAgents) == 0 {
  2791. return false
  2792. }
  2793. // Peer's user agent passed blacklist. Now check to see if it contains
  2794. // one of our whitelisted user agents, if so accept.
  2795. for _, whitelistedAgent := range whitelistedAgents {
  2796. if strings.Contains(agent, whitelistedAgent) {
  2797. return false
  2798. }
  2799. }
  2800. // Otherwise, the peer's user agent was not included in our whitelist.
  2801. // Ignore just in case it could stall the initial block download.
  2802. log.Debugf("Ignoring peer %s, user agent: %s not found in "+
  2803. "whitelist", sp, agent)
  2804. return true
  2805. }