123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174 |
- // Copyright (c) 2013-2017 The btcsuite developers
- // Copyright (c) 2015-2018 The Decred developers
- // Use of this source code is governed by an ISC
- // license that can be found in the LICENSE file.
- package main
- import (
- "bytes"
- "crypto/rand"
- "crypto/tls"
- "encoding/binary"
- "fmt"
- "math"
- mathrand "math/rand"
- "net"
- "runtime"
- "sort"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/pkt-cash/pktd/addrmgr"
- "github.com/pkt-cash/pktd/blockchain"
- "github.com/pkt-cash/pktd/blockchain/indexers"
- "github.com/pkt-cash/pktd/btcutil"
- "github.com/pkt-cash/pktd/btcutil/bloom"
- "github.com/pkt-cash/pktd/btcutil/er"
- "github.com/pkt-cash/pktd/chaincfg"
- "github.com/pkt-cash/pktd/chaincfg/chainhash"
- "github.com/pkt-cash/pktd/connmgr"
- "github.com/pkt-cash/pktd/database"
- "github.com/pkt-cash/pktd/mempool"
- "github.com/pkt-cash/pktd/mining"
- "github.com/pkt-cash/pktd/mining/cpuminer"
- "github.com/pkt-cash/pktd/netsync"
- "github.com/pkt-cash/pktd/peer"
- "github.com/pkt-cash/pktd/pktconfig/version"
- "github.com/pkt-cash/pktd/pktlog/log"
- "github.com/pkt-cash/pktd/txscript"
- "github.com/pkt-cash/pktd/wire"
- "github.com/pkt-cash/pktd/wire/protocol"
- )
- const (
- // defaultServices describes the default services that are supported by
- // the server.
- defaultServices = protocol.SFNodeNetwork | protocol.SFNodeBloom |
- protocol.SFNodeWitness | protocol.SFNodeCF
- // defaultRequiredServices describes the default services that are
- // required to be supported by outbound peers.
- defaultRequiredServices = protocol.SFNodeNetwork
- // defaultTargetOutbound is the default number of outbound peers to
- // target. We are normalizing the Bitcoin Core in allowing 16 here,
- // For Bitcoin Core latest Bitcoin Core, 14 connections are used for
- // full relaying and 2 are used for "block only" "fast" connections,
- // although we don't yet make such a distinction.
- defaultTargetOutbound = 14
- // connectionRetryInterval is the base amount of time to wait in between
- // retries when connecting to persistent peers. It is adjusted by the
- // number of retries such that there is a retry backoff.
- connectionRetryInterval = time.Second * 5
- )
- // simpleAddr implements the net.Addr interface with two struct fields
- type simpleAddr struct {
- net, addr string
- }
- // String returns the address.
- //
- // This is part of the net.Addr interface.
- func (a simpleAddr) String() string {
- return a.addr
- }
- // Network returns the network.
- //
- // This is part of the net.Addr interface.
- func (a simpleAddr) Network() string {
- return a.net
- }
- // Ensure simpleAddr implements the net.Addr interface.
- var _ net.Addr = simpleAddr{}
- // broadcastMsg provides the ability to house a bitcoin message to be broadcast
- // to all connected peers except specified excluded peers.
- type broadcastMsg struct {
- message wire.Message
- excludePeers []*serverPeer
- }
- // broadcastInventoryAdd is a type used to declare that the InvVect it contains
- // needs to be added to the rebroadcast map
- type broadcastInventoryAdd relayMsg
- // broadcastInventoryDel is a type used to declare that the InvVect it contains
- // needs to be removed from the rebroadcast map
- type broadcastInventoryDel *wire.InvVect
- // relayMsg packages an inventory vector along with the newly discovered
- // inventory so the relay has access to that information.
- type relayMsg struct {
- invVect *wire.InvVect
- data interface{}
- }
- // updatePeerHeightsMsg is a message sent from the blockmanager to the server
- // after a new block has been accepted. The purpose of the message is to update
- // the heights of peers that were known to announce the block before we
- // connected it to the main chain or recognized it as an orphan. With these
- // updates, peer heights will be kept up to date, allowing for fresh data when
- // selecting sync peer candidacy.
- type updatePeerHeightsMsg struct {
- newHash *chainhash.Hash
- newHeight int32
- originPeer *peer.Peer
- }
- // peerState maintains state of inbound, persistent, outbound peers as well
- // as banned peers and outbound groups.
- type peerState struct {
- inboundPeers map[int32]*serverPeer
- outboundPeers map[int32]*serverPeer
- persistentPeers map[int32]*serverPeer
- banned map[string]time.Time
- outboundGroups map[string]int
- }
- // Count returns the count of all known peers.
- func (ps *peerState) Count() int {
- return len(ps.inboundPeers) + len(ps.outboundPeers) +
- len(ps.persistentPeers)
- }
- // forAllOutboundPeers is a helper function that runs closure on all outbound
- // peers known to peerState.
- func (ps *peerState) forAllOutboundPeers(closure func(sp *serverPeer)) {
- for _, e := range ps.outboundPeers {
- closure(e)
- }
- for _, e := range ps.persistentPeers {
- closure(e)
- }
- }
- // forAllPeers is a helper function that runs closure on all peers known to
- // peerState.
- func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) {
- for _, e := range ps.inboundPeers {
- closure(e)
- }
- ps.forAllOutboundPeers(closure)
- }
- // cfHeaderKV is a tuple of a filter header and its associated block hash. The
- // struct is used to cache cfcheckpt responses.
- type cfHeaderKV struct {
- blockHash chainhash.Hash
- filterHeader chainhash.Hash
- }
- // server provides a bitcoin server for handling communications to and from
- // bitcoin peers.
- type server struct {
- // The following variables must only be used atomically.
- // Putting the uint64s first makes them 64-bit aligned for 32-bit systems.
- bytesReceived uint64 // Total bytes received from all peers since start.
- bytesSent uint64 // Total bytes sent by all peers since start.
- started int32
- shutdown int32
- startupTime int64
- chainParams *chaincfg.Params
- addrManager *addrmgr.AddrManager
- connManager *connmgr.ConnManager
- sigCache *txscript.SigCache
- hashCache *txscript.HashCache
- rpcServer *rpcServer
- syncManager *netsync.SyncManager
- chain *blockchain.BlockChain
- txMemPool *mempool.TxPool
- cpuMiner *cpuminer.CPUMiner
- modifyRebroadcastInv chan interface{}
- newPeers chan *serverPeer
- donePeers chan *serverPeer
- banPeers chan *serverPeer
- query chan interface{}
- relayInv chan relayMsg
- broadcast chan broadcastMsg
- peerHeightsUpdate chan updatePeerHeightsMsg
- wg sync.WaitGroup
- quit chan struct{}
- nat NAT
- db database.DB
- timeSource blockchain.MedianTimeSource
- services protocol.ServiceFlag
- // The following fields are used for optional indexes. They will be nil
- // if the associated index is not enabled. These fields are set during
- // initial creation of the server and never changed afterwards, so they
- // do not need to be protected for concurrent access.
- txIndex *indexers.TxIndex
- addrIndex *indexers.AddrIndex
- cfIndex *indexers.CfIndex
- // The fee estimator keeps track of how long transactions are left in
- // the mempool before they are mined into blocks.
- feeEstimator *mempool.FeeEstimator
- // cfCheckptCaches stores a cached slice of filter headers for cfcheckpt
- // messages for each filter type.
- cfCheckptCaches map[wire.FilterType][]cfHeaderKV
- cfCheckptCachesMtx sync.RWMutex
- // agentBlacklist is a list of blacklisted substrings by which to filter
- // user agents.
- agentBlacklist []string
- // agentWhitelist is a list of whitelisted user agent substrings, no
- // whitelisting will be applied if the list is empty or nil.
- agentWhitelist []string
- }
- // serverPeer extends the peer to maintain state shared by the server and
- // the blockmanager.
- type serverPeer struct {
- // The following variables must only be used atomically
- feeFilter int64
- *peer.Peer
- connReq *connmgr.ConnReq
- server *server
- persistent bool
- continueHash *chainhash.Hash
- relayMtx sync.Mutex
- disableRelayTx bool
- sentAddrs bool
- isWhitelisted bool
- filter *bloom.Filter
- addressesMtx sync.RWMutex
- knownAddresses map[string]struct{}
- banScore connmgr.DynamicBanScore
- quit chan struct{}
- // The following chans are used to sync blockmanager and server.
- txProcessed chan struct{}
- blockProcessed chan struct{}
- }
- // newServerPeer returns a new serverPeer instance. The peer needs to be set by
- // the caller.
- func newServerPeer(s *server, isPersistent bool) *serverPeer {
- return &serverPeer{
- server: s,
- persistent: isPersistent,
- filter: bloom.LoadFilter(nil),
- knownAddresses: make(map[string]struct{}),
- quit: make(chan struct{}),
- txProcessed: make(chan struct{}, 1),
- blockProcessed: make(chan struct{}, 1),
- }
- }
- // newestBlock returns the current best block hash and height using the format
- // required by the configuration for the peer package.
- func (sp *serverPeer) newestBlock() (*chainhash.Hash, int32, er.R) {
- best := sp.server.chain.BestSnapshot()
- return &best.Hash, best.Height, nil
- }
- // addKnownAddresses adds the given addresses to the set of known addresses to
- // the peer to prevent sending duplicate addresses.
- func (sp *serverPeer) addKnownAddresses(addresses []*wire.NetAddress) {
- sp.addressesMtx.Lock()
- for _, na := range addresses {
- sp.knownAddresses[addrmgr.NetAddressKey(na)] = struct{}{}
- }
- sp.addressesMtx.Unlock()
- }
- // addressKnown true if the given address is already known to the peer.
- // XXX trn safe for concurrent callers?
- func (sp *serverPeer) addressKnown(na *wire.NetAddress) bool {
- sp.addressesMtx.Lock()
- defer sp.addressesMtx.Unlock()
- _, exists := sp.knownAddresses[addrmgr.NetAddressKey(na)]
- return exists
- }
- // setDisableRelayTx toggles relaying of transactions for the given peer.
- // It is safe for concurrent access.
- func (sp *serverPeer) setDisableRelayTx(disable bool) {
- sp.relayMtx.Lock()
- sp.disableRelayTx = disable
- sp.relayMtx.Unlock()
- }
- // relayTxDisabled returns whether or not relaying of transactions for the given
- // peer is disabled.
- // It is safe for concurrent access.
- func (sp *serverPeer) relayTxDisabled() bool {
- sp.relayMtx.Lock()
- isDisabled := sp.disableRelayTx
- sp.relayMtx.Unlock()
- return isDisabled
- }
- // pushAddrMsg sends an addr message to the connected peer using the provided
- // addresses.
- func (sp *serverPeer) pushAddrMsg(addresses []*wire.NetAddress) {
- // Filter addresses already known to the peer.
- addrs := make([]*wire.NetAddress, 0, len(addresses))
- for _, addr := range addresses {
- if !sp.addressKnown(addr) {
- addrs = append(addrs, addr)
- }
- }
- known, err := sp.PushAddrMsg(addrs)
- if err != nil {
- log.Errorf("Can't push address message to %s: %v", sp.Peer, err)
- sp.Disconnect()
- return
- }
- sp.addKnownAddresses(known)
- }
- // addBanScore increases the persistent and decaying ban score fields by the
- // values passed as parameters. If the resulting score exceeds half of the ban
- // threshold, a warning is logged including the reason provided. Further, if
- // the score is above the ban threshold, the peer will be banned and
- // disconnected.
- func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
- // No warning is logged and no score is calculated if banning is disabled.
- if cfg.DisableBanning {
- return
- }
- if sp.isWhitelisted {
- log.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason)
- return
- }
- warnThreshold := cfg.BanThreshold >> 1
- if transient == 0 && persistent == 0 {
- // The score is not being increased, but a warning message is still
- // logged if the score is above the warn threshold.
- score := sp.banScore.Int()
- if score > warnThreshold {
- log.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+
- "it was not increased this time", sp, reason, score)
- }
- return
- }
- score := sp.banScore.Increase(persistent, transient)
- if score > warnThreshold {
- log.Warnf("Misbehaving peer %s: %s -- ban score increased to %d",
- sp, reason, score)
- if score > cfg.BanThreshold {
- log.Warnf("Misbehaving peer %s -- banning and disconnecting",
- sp)
- sp.server.BanPeer(sp)
- sp.Disconnect()
- }
- }
- }
- // hasServices returns whether or not the provided advertised service flags have
- // all of the provided desired service flags set.
- func hasServices(advertised, desired protocol.ServiceFlag) bool {
- return advertised&desired == desired
- }
- // OnVersion is invoked when a peer receives a version bitcoin message
- // and is used to negotiate the protocol version details as well as kick start
- // the communications.
- func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject {
- // Update the address manager with the advertised services for outbound
- // connections in case they have changed. This is not done for inbound
- // connections to help prevent malicious behavior and is skipped when
- // running on the simulation test network since it is only intended to
- // connect to specified peers and actively avoids advertising and
- // connecting to discovered peers.
- //
- // NOTE: This is done before rejecting peers that are too old to ensure
- // it is updated regardless in the case a new minimum protocol version is
- // enforced and the remote node has not upgraded yet.
- isInbound := sp.Inbound()
- remoteAddr := sp.NA()
- addrManager := sp.server.addrManager
- if !cfg.SimNet && !isInbound {
- addrManager.SetServices(remoteAddr, msg.Services)
- }
- // Ignore peers that have a protcol version that is too old. The peer
- // negotiation logic will disconnect it after this callback returns.
- if msg.ProtocolVersion < int32(peer.MinAcceptableProtocolVersion) {
- return nil
- }
- // Reject outbound peers that are not full nodes.
- wantServices := protocol.SFNodeNetwork
- if !isInbound && !hasServices(msg.Services, wantServices) {
- missingServices := wantServices & ^msg.Services
- log.Debugf("Rejecting peer %s with services %v due to not "+
- "providing desired services %v", sp.Peer, msg.Services,
- missingServices)
- reason := fmt.Sprintf("required services %#x not offered",
- uint64(missingServices))
- return wire.NewMsgReject(msg.Command(), wire.RejectNonstandard, reason)
- }
- if !cfg.SimNet && !isInbound {
- // After soft-fork activation, only make outbound
- // connection to peers if they flag that they're segwit
- // enabled.
- chain := sp.server.chain
- segwitActive, err := chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
- if err != nil {
- log.Errorf("Unable to query for segwit soft-fork state: %v",
- err)
- return nil
- }
- if segwitActive && !sp.IsWitnessEnabled() {
- log.Infof("Disconnecting non-segwit peer %v, isn't segwit "+
- "enabled and we need more segwit enabled peers", sp)
- sp.Disconnect()
- return nil
- }
- }
- // Add the remote peer time as a sample for creating an offset against
- // the local clock to keep the network time in sync.
- sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp)
- // Choose whether or not to relay transactions before a filter command
- // is received.
- sp.setDisableRelayTx(msg.DisableRelayTx)
- return nil
- }
- // OnVerAck is invoked when a peer receives a verack bitcoin message and is used
- // to kick start communication with them.
- func (sp *serverPeer) OnVerAck(_ *peer.Peer, _ *wire.MsgVerAck) {
- sp.server.AddPeer(sp)
- }
- // OnMemPool is invoked when a peer receives a mempool bitcoin message.
- // It creates and sends an inventory message with the contents of the memory
- // pool up to the maximum inventory allowed per message. When the peer has a
- // bloom filter loaded, the contents are filtered accordingly.
- func (sp *serverPeer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) {
- // Only allow mempool requests if the server has bloom filtering
- // enabled.
- if sp.server.services&protocol.SFNodeBloom != protocol.SFNodeBloom {
- log.Debugf("peer %v sent mempool request with bloom "+
- "filtering disabled -- disconnecting", sp)
- sp.Disconnect()
- return
- }
- // A decaying ban score increase is applied to prevent flooding.
- // The ban score accumulates and passes the ban threshold if a burst of
- // mempool messages comes from a peer. The score decays each minute to
- // half of its value.
- sp.addBanScore(0, 33, "mempool")
- // Generate inventory message with the available transactions in the
- // transaction memory pool. Limit it to the max allowed inventory
- // per message. The NewMsgInvSizeHint function automatically limits
- // the passed hint to the maximum allowed, so it's safe to pass it
- // without double checking it here.
- txMemPool := sp.server.txMemPool
- txDescs := txMemPool.TxDescs()
- invMsg := wire.NewMsgInvSizeHint(uint(len(txDescs)))
- for _, txDesc := range txDescs {
- // Either add all transactions when there is no bloom filter,
- // or only the transactions that match the filter when there is
- // one.
- if !sp.filter.IsLoaded() || sp.filter.MatchTxAndUpdate(txDesc.Tx) {
- iv := wire.NewInvVect(wire.InvTypeTx, txDesc.Tx.Hash())
- invMsg.AddInvVect(iv)
- if len(invMsg.InvList)+1 > wire.MaxInvPerMsg {
- break
- }
- }
- }
- // Send the inventory message if there is anything to send.
- if len(invMsg.InvList) > 0 {
- sp.QueueMessage(invMsg, nil)
- }
- }
- // OnTx is invoked when a peer receives a tx bitcoin message. It blocks
- // until the bitcoin transaction has been fully processed. Unlock the block
- // handler this does not serialize all transactions through a single thread
- // transactions don't rely on the previous one in a linear fashion like blocks.
- func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) {
- if cfg.BlocksOnly {
- log.Tracef("Ignoring tx %v from %v - blocksonly enabled",
- msg.TxHash(), sp)
- return
- }
- // Add the transaction to the known inventory for the peer.
- // Convert the raw MsgTx to a btcutil.Tx which provides some convenience
- // methods and things such as hash caching.
- tx := btcutil.NewTx(msg)
- iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
- sp.AddKnownInventory(iv)
- // Queue the transaction up to be handled by the sync manager and
- // intentionally block further receives until the transaction is fully
- // processed and known good or bad. This helps prevent a malicious peer
- // from queuing up a bunch of bad transactions before disconnecting (or
- // being disconnected) and wasting memory.
- sp.server.syncManager.QueueTx(tx, sp.Peer, sp.txProcessed)
- <-sp.txProcessed
- }
- // OnBlock is invoked when a peer receives a block bitcoin message. It
- // blocks until the bitcoin block has been fully processed.
- func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
- // Convert the raw MsgBlock to a btcutil.Block which provides some
- // convenience methods and things such as hash caching.
- block := btcutil.NewBlockFromBlockAndBytes(msg, buf)
- // Add the block to the known inventory for the peer.
- iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
- sp.AddKnownInventory(iv)
- // Queue the block up to be handled by the block
- // manager and intentionally block further receives
- // until the bitcoin block is fully processed and known
- // good or bad. This helps prevent a malicious peer
- // from queuing up a bunch of bad blocks before
- // disconnecting (or being disconnected) and wasting
- // memory. Additionally, this behavior is depended on
- // by at least the block acceptance test tool as the
- // reference implementation processes blocks in the same
- // thread and therefore blocks further messages until
- // the bitcoin block has been fully processed.
- sp.server.syncManager.QueueBlock(block, sp.Peer, sp.blockProcessed)
- <-sp.blockProcessed
- }
- // OnInv is invoked when a peer receives an inv bitcoin message and is
- // used to examine the inventory being advertised by the remote peer and react
- // accordingly. We pass the message down to blockmanager which will call
- // QueueMessage with any appropriate responses.
- func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
- if !cfg.BlocksOnly {
- if len(msg.InvList) > 0 {
- sp.server.syncManager.QueueInv(msg, sp.Peer)
- }
- return
- }
- newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList)))
- for _, invVect := range msg.InvList {
- if invVect.Type == wire.InvTypeTx {
- log.Tracef("Ignoring tx %v in inv from %v -- "+
- "blocksonly enabled", invVect.Hash, sp)
- if sp.ProtocolVersion() >= protocol.BIP0037Version {
- log.Infof("Peer %v is announcing "+
- "transactions -- disconnecting", sp)
- sp.Disconnect()
- return
- }
- continue
- }
- err := newInv.AddInvVect(invVect)
- if err != nil {
- log.Errorf("Failed to add inventory vector: %v", err)
- break
- }
- }
- if len(newInv.InvList) > 0 {
- sp.server.syncManager.QueueInv(newInv, sp.Peer)
- }
- }
- // OnHeaders is invoked when a peer receives a headers bitcoin
- // message. The message is passed down to the sync manager.
- func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
- sp.server.syncManager.QueueHeaders(msg, sp.Peer)
- }
- // handleGetData is invoked when a peer receives a getdata bitcoin message and
- // is used to deliver block and transaction information.
- func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
- numAdded := 0
- notFound := wire.NewMsgNotFound()
- length := len(msg.InvList)
- // A decaying ban score increase is applied to prevent exhausting resources
- // with unusually large inventory queries.
- // Requesting more than the maximum inventory vector length within a short
- // period of time yields a score above the default ban threshold. Sustained
- // bursts of small requests are not penalized as that would potentially ban
- // peers performing IBD.
- // This incremental score decays each minute to half of its value.
- sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata")
- // We wait on this wait channel periodically to prevent queuing
- // far more data than we can send in a reasonable time, wasting memory.
- // The waiting occurs after the database fetch for the next one to
- // provide a little pipelining.
- var waitChan chan struct{}
- doneChan := make(chan struct{}, 1)
- for i, iv := range msg.InvList {
- var c chan struct{}
- // If this will be the last message we send.
- if i == length-1 && len(notFound.InvList) == 0 {
- c = doneChan
- } else if (i+1)%3 == 0 {
- // Buffered so as to not make the send goroutine block.
- c = make(chan struct{}, 1)
- }
- var err er.R
- switch iv.Type {
- case wire.InvTypeWitnessTx:
- err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
- case wire.InvTypeTx:
- err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
- case wire.InvTypeWitnessBlock:
- err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
- case wire.InvTypeBlock:
- err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
- case wire.InvTypeFilteredWitnessBlock:
- err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
- case wire.InvTypeFilteredBlock:
- err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
- default:
- log.Warnf("Unknown type in inventory request %d",
- iv.Type)
- continue
- }
- if err != nil {
- notFound.AddInvVect(iv)
- // When there is a failure fetching the final entry
- // and the done channel was sent in due to there
- // being no outstanding not found inventory, consume
- // it here because there is now not found inventory
- // that will use the channel momentarily.
- if i == len(msg.InvList)-1 && c != nil {
- <-c
- }
- }
- numAdded++
- waitChan = c
- }
- if len(notFound.InvList) != 0 {
- sp.QueueMessage(notFound, doneChan)
- }
- // Wait for messages to be sent. We can send quite a lot of data at this
- // point and this will keep the peer busy for a decent amount of time.
- // We don't process anything else by them in this time so that we
- // have an idea of when we should hear back from them - else the idle
- // timeout could fire when we were only half done sending the blocks.
- if numAdded > 0 {
- <-doneChan
- }
- }
- // OnGetBlocks is invoked when a peer receives a getblocks bitcoin
- // message.
- func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) {
- // Find the most recent known block in the best chain based on the block
- // locator and fetch all of the block hashes after it until either
- // wire.MaxBlocksPerMsg have been fetched or the provided stop hash is
- // encountered.
- //
- // Use the block after the genesis block if no other blocks in the
- // provided locator are known. This does mean the client will start
- // over with the genesis block if unknown block locators are provided.
- //
- // This mirrors the behavior in the reference implementation.
- chain := sp.server.chain
- hashList := chain.LocateBlocks(msg.BlockLocatorHashes, &msg.HashStop,
- wire.MaxBlocksPerMsg)
- // Generate inventory message.
- invMsg := wire.NewMsgInv()
- for i := range hashList {
- iv := wire.NewInvVect(wire.InvTypeBlock, &hashList[i])
- invMsg.AddInvVect(iv)
- }
- // Send the inventory message if there is anything to send.
- if len(invMsg.InvList) > 0 {
- invListLen := len(invMsg.InvList)
- if invListLen == wire.MaxBlocksPerMsg {
- // Intentionally use a copy of the final hash so there
- // is not a reference into the inventory slice which
- // would prevent the entire slice from being eligible
- // for GC as soon as it's sent.
- continueHash := invMsg.InvList[invListLen-1].Hash
- sp.continueHash = &continueHash
- }
- sp.QueueMessage(invMsg, nil)
- }
- }
- // OnGetHeaders is invoked when a peer receives a getheaders bitcoin
- // message.
- func (sp *serverPeer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) {
- // Ignore getheaders requests if not in sync.
- if !sp.server.syncManager.IsCurrent() {
- return
- }
- // Find the most recent known block in the best chain based on the block
- // locator and fetch all of the headers after it until either
- // wire.MaxBlockHeadersPerMsg have been fetched or the provided stop
- // hash is encountered.
- //
- // Use the block after the genesis block if no other blocks in the
- // provided locator are known. This does mean the client will start
- // over with the genesis block if unknown block locators are provided.
- //
- // This mirrors the behavior in the reference implementation.
- chain := sp.server.chain
- headers := chain.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop)
- // Send found headers to the requesting peer.
- blockHeaders := make([]*wire.BlockHeader, len(headers))
- for i := range headers {
- blockHeaders[i] = &headers[i]
- }
- sp.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil)
- }
- // OnGetCFilters is invoked when a peer receives a getcfilters bitcoin message.
- func (sp *serverPeer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) {
- // Ignore getcfilters requests if not in sync.
- if !sp.server.syncManager.IsCurrent() {
- return
- }
- // We'll also ensure that the remote party is requesting a set of
- // filters that we actually currently maintain.
- switch msg.FilterType {
- case wire.GCSFilterRegular:
- break
- default:
- log.Debugf("Filter request for unknown filter: %v",
- msg.FilterType)
- return
- }
- hashes, err := sp.server.chain.HeightToHashRange(
- int32(msg.StartHeight), &msg.StopHash, wire.MaxGetCFiltersReqRange,
- )
- if err != nil {
- log.Debugf("Invalid getcfilters request: %v", err)
- return
- }
- // Create []*chainhash.Hash from []chainhash.Hash to pass to
- // FiltersByBlockHashes.
- hashPtrs := make([]*chainhash.Hash, len(hashes))
- for i := range hashes {
- hashPtrs[i] = &hashes[i]
- }
- filters, err := sp.server.cfIndex.FiltersByBlockHashes(
- hashPtrs, msg.FilterType,
- )
- if err != nil {
- log.Errorf("Error retrieving cfilters: %v", err)
- return
- }
- for i, filterBytes := range filters {
- if len(filterBytes) == 0 {
- log.Warnf("Could not obtain cfilter for %v",
- hashes[i])
- return
- }
- filterMsg := wire.NewMsgCFilter(
- msg.FilterType, &hashes[i], filterBytes,
- )
- sp.QueueMessage(filterMsg, nil)
- }
- }
- // OnGetCFHeaders is invoked when a peer receives a getcfheader bitcoin message.
- func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) {
- // Ignore getcfilterheader requests if not in sync.
- if !sp.server.syncManager.IsCurrent() {
- return
- }
- // We'll also ensure that the remote party is requesting a set of
- // headers for filters that we actually currently maintain.
- switch msg.FilterType {
- case wire.GCSFilterRegular:
- break
- default:
- log.Debug("Filter request for unknown headers for "+
- "filter: %v", msg.FilterType)
- return
- }
- startHeight := int32(msg.StartHeight)
- maxResults := wire.MaxCFHeadersPerMsg
- // If StartHeight is positive, fetch the predecessor block hash so we
- // can populate the PrevFilterHeader field.
- if msg.StartHeight > 0 {
- startHeight--
- maxResults++
- }
- // Fetch the hashes from the block index.
- hashList, err := sp.server.chain.HeightToHashRange(
- startHeight, &msg.StopHash, maxResults,
- )
- if err != nil {
- log.Debugf("Invalid getcfheaders request: %v", err)
- }
- // This is possible if StartHeight is one greater that the height of
- // StopHash, and we pull a valid range of hashes including the previous
- // filter header.
- if len(hashList) == 0 || (msg.StartHeight > 0 && len(hashList) == 1) {
- log.Debug("No results for getcfheaders request")
- return
- }
- // Create []*chainhash.Hash from []chainhash.Hash to pass to
- // FilterHeadersByBlockHashes.
- hashPtrs := make([]*chainhash.Hash, len(hashList))
- for i := range hashList {
- hashPtrs[i] = &hashList[i]
- }
- // Fetch the raw filter hash bytes from the database for all blocks.
- filterHashes, err := sp.server.cfIndex.FilterHashesByBlockHashes(
- hashPtrs, msg.FilterType,
- )
- if err != nil {
- log.Errorf("Error retrieving cfilter hashes: %v", err)
- return
- }
- // Generate cfheaders message and send it.
- headersMsg := wire.NewMsgCFHeaders()
- // Populate the PrevFilterHeader field.
- if msg.StartHeight > 0 {
- prevBlockHash := &hashList[0]
- // Fetch the raw committed filter header bytes from the
- // database.
- headerBytes, err := sp.server.cfIndex.FilterHeaderByBlockHash(
- prevBlockHash, msg.FilterType)
- if err != nil {
- log.Errorf("Error retrieving CF header: %v", err)
- return
- }
- if len(headerBytes) == 0 {
- log.Warnf("Could not obtain CF header for %v", prevBlockHash)
- return
- }
- // Deserialize the hash into PrevFilterHeader.
- err = headersMsg.PrevFilterHeader.SetBytes(headerBytes)
- if err != nil {
- log.Warnf("Committed filter header deserialize "+
- "failed: %v", err)
- return
- }
- hashList = hashList[1:]
- filterHashes = filterHashes[1:]
- }
- // Populate HeaderHashes.
- for i, hashBytes := range filterHashes {
- if len(hashBytes) == 0 {
- log.Warnf("Could not obtain CF hash for %v", hashList[i])
- return
- }
- // Deserialize the hash.
- filterHash, err := chainhash.NewHash(hashBytes)
- if err != nil {
- log.Warnf("Committed filter hash deserialize "+
- "failed: %v", err)
- return
- }
- headersMsg.AddCFHash(filterHash)
- }
- headersMsg.FilterType = msg.FilterType
- headersMsg.StopHash = msg.StopHash
- sp.QueueMessage(headersMsg, nil)
- }
- // OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin message.
- func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
- // Ignore getcfcheckpt requests if not in sync.
- if !sp.server.syncManager.IsCurrent() {
- return
- }
- // We'll also ensure that the remote party is requesting a set of
- // checkpoints for filters that we actually currently maintain.
- switch msg.FilterType {
- case wire.GCSFilterRegular:
- break
- default:
- log.Debug("Filter request for unknown checkpoints for "+
- "filter: %v", msg.FilterType)
- return
- }
- // Now that we know the client is fetching a filter that we know of,
- // we'll fetch the block hashes et each check point interval so we can
- // compare against our cache, and create new check points if necessary.
- blockHashes, err := sp.server.chain.IntervalBlockHashes(
- &msg.StopHash, wire.CFCheckptInterval,
- )
- if err != nil {
- log.Debugf("Invalid getcfilters request: %v", err)
- return
- }
- checkptMsg := wire.NewMsgCFCheckpt(
- msg.FilterType, &msg.StopHash, len(blockHashes),
- )
- // Fetch the current existing cache so we can decide if we need to
- // extend it or if its adequate as is.
- sp.server.cfCheckptCachesMtx.RLock()
- checkptCache := sp.server.cfCheckptCaches[msg.FilterType]
- // If the set of block hashes is beyond the current size of the cache,
- // then we'll expand the size of the cache and also retain the write
- // lock.
- var updateCache bool
- if len(blockHashes) > len(checkptCache) {
- // Now that we know we'll need to modify the size of the cache,
- // we'll release the read lock and grab the write lock to
- // possibly expand the cache size.
- sp.server.cfCheckptCachesMtx.RUnlock()
- sp.server.cfCheckptCachesMtx.Lock()
- defer sp.server.cfCheckptCachesMtx.Unlock()
- // Now that we have the write lock, we'll check again as it's
- // possible that the cache has already been expanded.
- checkptCache = sp.server.cfCheckptCaches[msg.FilterType]
- // If we still need to expand the cache, then We'll mark that
- // we need to update the cache for below and also expand the
- // size of the cache in place.
- if len(blockHashes) > len(checkptCache) {
- updateCache = true
- additionalLength := len(blockHashes) - len(checkptCache)
- newEntries := make([]cfHeaderKV, additionalLength)
- log.Infof("Growing size of checkpoint cache from %v to %v "+
- "block hashes", len(checkptCache), len(blockHashes))
- checkptCache = append(
- sp.server.cfCheckptCaches[msg.FilterType],
- newEntries...,
- )
- }
- } else {
- // Otherwise, we'll hold onto the read lock for the remainder
- // of this method.
- defer sp.server.cfCheckptCachesMtx.RUnlock()
- log.Tracef("Serving stale cache of size %v",
- len(checkptCache))
- }
- // Now that we know the cache is of an appropriate size, we'll iterate
- // backwards until the find the block hash. We do this as it's possible
- // a re-org has occurred so items in the db are now in the main china
- // while the cache has been partially invalidated.
- var forkIdx int
- for forkIdx = len(blockHashes); forkIdx > 0; forkIdx-- {
- if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] {
- break
- }
- }
- // Now that we know the how much of the cache is relevant for this
- // query, we'll populate our check point message with the cache as is.
- // Shortly below, we'll populate the new elements of the cache.
- for i := 0; i < forkIdx; i++ {
- checkptMsg.AddCFHeader(&checkptCache[i].filterHeader)
- }
- // We'll now collect the set of hashes that are beyond our cache so we
- // can look up the filter headers to populate the final cache.
- blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx)
- for i := forkIdx; i < len(blockHashes); i++ {
- blockHashPtrs = append(blockHashPtrs, &blockHashes[i])
- }
- filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes(
- blockHashPtrs, msg.FilterType,
- )
- if err != nil {
- log.Errorf("Error retrieving cfilter headers: %v", err)
- return
- }
- // Now that we have the full set of filter headers, we'll add them to
- // the checkpoint message, and also update our cache in line.
- for i, filterHeaderBytes := range filterHeaders {
- if len(filterHeaderBytes) == 0 {
- log.Warnf("Could not obtain CF header for %v",
- blockHashPtrs[i])
- return
- }
- filterHeader, err := chainhash.NewHash(filterHeaderBytes)
- if err != nil {
- log.Warnf("Committed filter header deserialize "+
- "failed: %v", err)
- return
- }
- checkptMsg.AddCFHeader(filterHeader)
- // If the new main chain is longer than what's in the cache,
- // then we'll override it beyond the fork point.
- if updateCache {
- checkptCache[forkIdx+i] = cfHeaderKV{
- blockHash: blockHashes[forkIdx+i],
- filterHeader: *filterHeader,
- }
- }
- }
- // Finally, we'll update the cache if we need to, and send the final
- // message back to the requesting peer.
- if updateCache {
- sp.server.cfCheckptCaches[msg.FilterType] = checkptCache
- }
- sp.QueueMessage(checkptMsg, nil)
- }
- // enforceNodeBloomFlag disconnects the peer if the server is not configured to
- // allow bloom filters. Additionally, if the peer has negotiated to a protocol
- // version that is high enough to observe the bloom filter service support bit,
- // it will be banned since it is intentionally violating the protocol.
- func (sp *serverPeer) enforceNodeBloomFlag(cmd string) bool {
- if sp.server.services&protocol.SFNodeBloom != protocol.SFNodeBloom {
- // Ban the peer if the protocol version is high enough that the
- // peer is knowingly violating the protocol and banning is
- // enabled.
- //
- // NOTE: Even though the addBanScore function already examines
- // whether or not banning is enabled, it is checked here as well
- // to ensure the violation is logged and the peer is
- // disconnected regardless.
- if sp.ProtocolVersion() >= protocol.BIP0111Version &&
- !cfg.DisableBanning {
- // Disconnect the peer regardless of whether it was
- // banned.
- sp.addBanScore(100, 0, cmd)
- sp.Disconnect()
- return false
- }
- // Disconnect the peer regardless of protocol version or banning
- // state.
- log.Debugf("%s sent an unsupported %s request -- "+
- "disconnecting", sp, cmd)
- sp.Disconnect()
- return false
- }
- return true
- }
- // OnFeeFilter is invoked when a peer receives a feefilter bitcoin message and
- // is used by remote peers to request that no transactions which have a fee rate
- // lower than provided value are inventoried to them. The peer will be
- // disconnected if an invalid fee filter value is provided.
- func (sp *serverPeer) OnFeeFilter(_ *peer.Peer, msg *wire.MsgFeeFilter) {
- // Check that the passed minimum fee is a valid amount.
- if msg.MinFee < 0 || msg.MinFee > int64(btcutil.MaxUnits()) {
- log.Debugf("Peer %v sent an invalid feefilter '%v' -- "+
- "disconnecting", sp, btcutil.Amount(msg.MinFee))
- sp.Disconnect()
- return
- }
- atomic.StoreInt64(&sp.feeFilter, msg.MinFee)
- }
- // OnFilterAdd is invoked when a peer receives a filteradd bitcoin
- // message and is used by remote peers to add data to an already loaded bloom
- // filter. The peer will be disconnected if a filter is not loaded when this
- // message is received or the server is not configured to allow bloom filters.
- func (sp *serverPeer) OnFilterAdd(_ *peer.Peer, msg *wire.MsgFilterAdd) {
- // Disconnect and/or ban depending on the node bloom services flag and
- // negotiated protocol version.
- if !sp.enforceNodeBloomFlag(msg.Command()) {
- return
- }
- if !sp.filter.IsLoaded() {
- log.Debugf("%s sent a filteradd request with no filter "+
- "loaded -- disconnecting", sp)
- sp.Disconnect()
- return
- }
- sp.filter.Add(msg.Data)
- }
- // OnFilterClear is invoked when a peer receives a filterclear bitcoin
- // message and is used by remote peers to clear an already loaded bloom filter.
- // The peer will be disconnected if a filter is not loaded when this message is
- // received or the server is not configured to allow bloom filters.
- func (sp *serverPeer) OnFilterClear(_ *peer.Peer, msg *wire.MsgFilterClear) {
- // Disconnect and/or ban depending on the node bloom services flag and
- // negotiated protocol version.
- if !sp.enforceNodeBloomFlag(msg.Command()) {
- return
- }
- if !sp.filter.IsLoaded() {
- log.Debugf("%s sent a filterclear request with no "+
- "filter loaded -- disconnecting", sp)
- sp.Disconnect()
- return
- }
- sp.filter.Unload()
- }
- // OnFilterLoad is invoked when a peer receives a filterload bitcoin
- // message and it used to load a bloom filter that should be used for
- // delivering merkle blocks and associated transactions that match the filter.
- // The peer will be disconnected if the server is not configured to allow bloom
- // filters.
- func (sp *serverPeer) OnFilterLoad(_ *peer.Peer, msg *wire.MsgFilterLoad) {
- // Disconnect and/or ban depending on the node bloom services flag and
- // negotiated protocol version.
- if !sp.enforceNodeBloomFlag(msg.Command()) {
- return
- }
- sp.setDisableRelayTx(false)
- sp.filter.Reload(msg)
- }
- // OnGetAddr is invoked when a peer receives a getaddr bitcoin message
- // and is used to provide the peer with known addresses from the address
- // manager.
- func (sp *serverPeer) OnGetAddr(_ *peer.Peer, msg *wire.MsgGetAddr) {
- // Don't return any addresses when running on the simulation test
- // network. This helps prevent the network from becoming another
- // public test network since it will not be able to learn about other
- // peers that have not specifically been provided.
- if cfg.SimNet {
- return
- }
- // Do not accept getaddr requests from outbound peers. This reduces
- // fingerprinting attacks.
- if !sp.Inbound() {
- log.Debugf("Ignoring getaddr request from outbound peer [%v]", sp)
- return
- }
- // Only allow one getaddr request per connection to discourage
- // address stamping of inv announcements.
- if sp.sentAddrs {
- log.Debugf("Ignoring repeated getaddr request from peer [%v]", sp)
- return
- }
- sp.sentAddrs = true
- // Get the current known addresses from the address manager.
- addrCache := sp.server.addrManager.AddressCache()
- // Add the best addresses we have for peer discovery here - if
- // we have a port of 0 then that means nothing good was found,
- // so don't rebroracast that. At this point, we trim the cache
- // size by one entry if we add a record so we don't flood past
- // the maximum allowed size and trigger bans.
- bestAddress := sp.server.addrManager.GetBestLocalAddress(sp.NA())
- if bestAddress.Port != 0 {
- if len(addrCache) > 0 {
- addrCache = addrCache[1:]
- }
- addrCache = append(addrCache, bestAddress)
- }
- // Now, push the addresses we got.
- sp.pushAddrMsg(addrCache)
- }
- // OnAddr is invoked when a peer receives an addr bitcoin message and is
- // used to notify the server about advertised addresses.
- func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) {
- // Ignore addresses when running on the simulation test network. This
- // helps prevent the network from becoming another public test network
- // since it will not be able to learn about other peers that have not
- // specifically been provided.
- if cfg.SimNet {
- return
- }
- // Ignore old style addresses which don't include a timestamp.
- if sp.ProtocolVersion() < protocol.NetAddressTimeVersion {
- return
- }
- // A message that has no addresses produces a warning.
- if len(msg.AddrList) == 0 {
- log.Warnf("Command [%s] from %s does not contain any addresses",
- msg.Command(), sp.Peer)
- }
- for _, na := range msg.AddrList {
- // Don't add more address if we're disconnecting.
- if !sp.Connected() {
- return
- }
- // Set the timestamp to 5 days ago if it's more than 24 hours
- // in the future so this address is one of the first to be
- // removed when space is needed.
- now := time.Now()
- if na.Timestamp.After(now.Add(time.Minute * 10)) {
- na.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
- }
- // Add address to known addresses for this peer.
- sp.addKnownAddresses([]*wire.NetAddress{na})
- }
- // Add addresses to server address manager. The address manager handles
- // the details of things such as preventing duplicate addresses, max
- // addresses, and last seen updates.
- // XXX bitcoind gives a 2 hour time penalty here, do we want to do the
- // same?
- sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA())
- }
- // OnRead is invoked when a peer receives a message and it is used to update
- // the bytes received by the server.
- func (sp *serverPeer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err er.R) {
- sp.server.AddBytesReceived(uint64(bytesRead))
- }
- // OnWrite is invoked when a peer sends a message and it is used to update
- // the bytes sent by the server.
- func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, msg wire.Message, err er.R) {
- sp.server.AddBytesSent(uint64(bytesWritten))
- }
- // randomUint16Number returns a random uint16 in a specified input range. Note
- // that the range is in zeroth ordering; if you pass it 1800, you will get
- // values from 0 to 1800.
- func randomUint16Number(max uint16) uint16 {
- // In order to avoid modulo bias and ensure every possible outcome in
- // [0, max) has equal probability, the random number must be sampled
- // from a random source that has a range limited to a multiple of the
- // modulus.
- var randomNumber uint16
- limitRange := (math.MaxUint16 / max) * max
- for {
- errr := binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
- if errr != nil {
- panic("randomUint16Number: binary.Read failed.")
- }
- if randomNumber < limitRange {
- return (randomNumber % max)
- }
- }
- }
- // AddRebroadcastInventory adds 'iv' to the list of inventories to be
- // rebroadcasted at random intervals until they show up in a block.
- func (s *server) AddRebroadcastInventory(iv *wire.InvVect, data interface{}) {
- // Ignore if shutting down.
- if atomic.LoadInt32(&s.shutdown) != 0 {
- return
- }
- s.modifyRebroadcastInv <- broadcastInventoryAdd{invVect: iv, data: data}
- }
- // RemoveRebroadcastInventory removes 'iv' from the list of items to be
- // rebroadcasted if present.
- func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) {
- // Ignore if shutting down.
- if atomic.LoadInt32(&s.shutdown) != 0 {
- return
- }
- s.modifyRebroadcastInv <- broadcastInventoryDel(iv)
- }
- // relayTransactions generates and relays inventory vectors for all of the
- // passed transactions to all connected peers.
- func (s *server) relayTransactions(txns []*mempool.TxDesc) {
- for _, txD := range txns {
- iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash())
- s.RelayInventory(iv, txD)
- }
- }
- // AnnounceNewTransactions generates and relays inventory vectors and notifies
- // both websocket and getblocktemplate long poll clients of the passed
- // transactions. This function should be called whenever new transactions
- // are added to the mempool.
- func (s *server) AnnounceNewTransactions(txns []*mempool.TxDesc) {
- // Generate and relay inventory vectors for all newly accepted
- // transactions.
- s.relayTransactions(txns)
- // Notify both websocket and getblocktemplate long poll clients of all
- // newly accepted transactions.
- if s.rpcServer != nil {
- s.rpcServer.NotifyNewTransactions(txns)
- }
- }
- // Transaction has one confirmation on the main chain. Now we can mark it as no
- // longer needing rebroadcasting.
- func (s *server) TransactionConfirmed(tx *btcutil.Tx) {
- // Rebroadcasting is only necessary when the RPC server is active.
- if s.rpcServer == nil {
- return
- }
- iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
- s.RemoveRebroadcastInventory(iv)
- }
- // pushTxMsg sends a tx message for the provided transaction hash to the
- // connected peer. An error is returned if the transaction hash is not known.
- func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{},
- waitChan <-chan struct{}, encoding wire.MessageEncoding) er.R {
- // Attempt to fetch the requested transaction from the pool. A
- // call could be made to check for existence first, but simply trying
- // to fetch a missing transaction results in the same behavior.
- tx, err := s.txMemPool.FetchTransaction(hash)
- if err != nil {
- log.Tracef("Unable to fetch tx %v from transaction "+
- "pool: %v", hash, err)
- if doneChan != nil {
- doneChan <- struct{}{}
- }
- return err
- }
- // Once we have fetched data wait for any previous operation to finish.
- if waitChan != nil {
- <-waitChan
- }
- sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding)
- return nil
- }
- // pushBlockMsg sends a block message for the provided block hash to the
- // connected peer. An error is returned if the block hash is not known.
- func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{},
- waitChan <-chan struct{}, encoding wire.MessageEncoding) er.R {
- // Fetch the raw block bytes from the database.
- var blockBytes []byte
- err := sp.server.db.View(func(dbTx database.Tx) er.R {
- var err er.R
- blockBytes, err = dbTx.FetchBlock(hash)
- return err
- })
- if err != nil {
- log.Tracef("Unable to fetch requested block hash %v: %v",
- hash, err)
- if doneChan != nil {
- doneChan <- struct{}{}
- }
- return err
- }
- // Deserialize the block.
- var msgBlock wire.MsgBlock
- err = msgBlock.Deserialize(bytes.NewReader(blockBytes))
- if err != nil {
- log.Tracef("Unable to deserialize requested block hash "+
- "%v: %v", hash, err)
- if doneChan != nil {
- doneChan <- struct{}{}
- }
- return err
- }
- // Once we have fetched data wait for any previous operation to finish.
- if waitChan != nil {
- <-waitChan
- }
- // We only send the channel for this message if we aren't sending
- // an inv straight after.
- var dc chan<- struct{}
- continueHash := sp.continueHash
- sendInv := continueHash != nil && continueHash.IsEqual(hash)
- if !sendInv {
- dc = doneChan
- }
- sp.QueueMessageWithEncoding(&msgBlock, dc, encoding)
- // When the peer requests the final block that was advertised in
- // response to a getblocks message which requested more blocks than
- // would fit into a single message, send it a new inventory message
- // to trigger it to issue another getblocks message for the next
- // batch of inventory.
- if sendInv {
- best := sp.server.chain.BestSnapshot()
- invMsg := wire.NewMsgInvSizeHint(1)
- iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash)
- invMsg.AddInvVect(iv)
- sp.QueueMessage(invMsg, doneChan)
- sp.continueHash = nil
- }
- return nil
- }
- // pushMerkleBlockMsg sends a merkleblock message for the provided block hash to
- // the connected peer. Since a merkle block requires the peer to have a filter
- // loaded, this call will simply be ignored if there is no filter loaded. An
- // error is returned if the block hash is not known.
- func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash,
- doneChan chan<- struct{}, waitChan <-chan struct{}, encoding wire.MessageEncoding) er.R {
- // Do not send a response if the peer doesn't have a filter loaded.
- if !sp.filter.IsLoaded() {
- if doneChan != nil {
- doneChan <- struct{}{}
- }
- return nil
- }
- // Fetch the raw block bytes from the database.
- blk, err := sp.server.chain.BlockByHash(hash)
- if err != nil {
- log.Tracef("Unable to fetch requested block hash %v: %v",
- hash, err)
- if doneChan != nil {
- doneChan <- struct{}{}
- }
- return err
- }
- // Generate a merkle block by filtering the requested block according
- // to the filter for the peer.
- merkle, matchedTxIndices := bloom.NewMerkleBlock(blk, sp.filter)
- // Once we have fetched data wait for any previous operation to finish.
- if waitChan != nil {
- <-waitChan
- }
- // Send the merkleblock. Only send the done channel with this message
- // if no transactions will be sent afterwards.
- var dc chan<- struct{}
- if len(matchedTxIndices) == 0 {
- dc = doneChan
- }
- sp.QueueMessage(merkle, dc)
- // Finally, send any matched transactions.
- blkTransactions := blk.MsgBlock().Transactions
- for i, txIndex := range matchedTxIndices {
- // Only send the done channel on the final transaction.
- var dc chan<- struct{}
- if i == len(matchedTxIndices)-1 {
- dc = doneChan
- }
- if txIndex < uint32(len(blkTransactions)) {
- sp.QueueMessageWithEncoding(blkTransactions[txIndex], dc,
- encoding)
- }
- }
- return nil
- }
- // handleUpdatePeerHeight updates the heights of all peers who were known to
- // announce a block we recently accepted.
- func (s *server) handleUpdatePeerHeights(state *peerState, umsg updatePeerHeightsMsg) {
- state.forAllPeers(func(sp *serverPeer) {
- // The origin peer should already have the updated height.
- if sp.Peer == umsg.originPeer {
- return
- }
- // This is a pointer to the underlying memory which doesn't
- // change.
- latestBlkHash := sp.LastAnnouncedBlock()
- // Skip this peer if it hasn't recently announced any new blocks.
- if latestBlkHash == nil {
- return
- }
- // If the peer has recently announced a block, and this block
- // matches our newly accepted block, then update their block
- // height.
- if *latestBlkHash == *umsg.newHash {
- sp.UpdateLastBlockHeight(umsg.newHeight)
- sp.UpdateLastAnnouncedBlock(nil)
- }
- })
- }
- // handleAddPeerMsg deals with adding new peers. It is invoked from the
- // peerHandler goroutine.
- func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
- if sp == nil || !sp.Connected() {
- return false
- }
- // Disconnect peers with unwanted user agents.
- if sp.HasUndesiredUserAgent(s.agentBlacklist, s.agentWhitelist) {
- sp.Disconnect()
- return false
- }
- // Ignore new peers if we're shutting down.
- if atomic.LoadInt32(&s.shutdown) != 0 {
- log.Infof("New peer %s ignored - server is shutting down", sp)
- sp.Disconnect()
- return false
- }
- // Disconnect banned peers.
- host, _, err := net.SplitHostPort(sp.Addr())
- if err != nil {
- log.Debugf("can't split hostport %v", err)
- sp.Disconnect()
- return false
- }
- if banEnd, ok := state.banned[host]; ok {
- if time.Now().Before(banEnd) {
- log.Debugf("Peer %s is banned for another %v - disconnecting",
- host, time.Until(banEnd))
- sp.Disconnect()
- return false
- }
- log.Infof("Peer %s is no longer banned", host)
- delete(state.banned, host)
- }
- // TODO: Check for max peers from a single IP.
- // Limit max number of total peers.
- if state.Count() >= cfg.MaxPeers {
- log.Infof("Max peers reached [%d] - disconnecting peer %s",
- cfg.MaxPeers, sp)
- sp.Disconnect()
- // TODO: how to handle permanent peers here?
- // they should be rescheduled.
- return false
- }
- // Add the new peer and start it.
- log.Debugf("New peer %s", sp)
- if sp.Inbound() {
- state.inboundPeers[sp.ID()] = sp
- } else {
- state.outboundGroups[addrmgr.GroupKey(sp.NA())]++
- if sp.persistent {
- state.persistentPeers[sp.ID()] = sp
- } else {
- state.outboundPeers[sp.ID()] = sp
- }
- }
- // Update the address' last seen time if the peer has acknowledged
- // our version and has sent us its version as well.
- if sp.VerAckReceived() && sp.VersionKnown() && sp.NA() != nil {
- s.addrManager.Connected(sp.NA())
- }
- // Signal the sync manager this peer is a new sync candidate.
- s.syncManager.NewPeer(sp.Peer)
- // Update the address manager and request known addresses from the
- // remote peer for outbound connections. This is skipped when running on
- // the simulation test network since it is only intended to connect to
- // specified peers and actively avoids advertising and connecting to
- // discovered peers.
- if !cfg.SimNet && !sp.Inbound() {
- // Advertise the local address when the server accepts incoming
- // connections and it believes itself to be close to the best
- // known tip.
- if !cfg.DisableListen && s.syncManager.IsCurrent() {
- // Get address that best matches.
- lna := s.addrManager.GetBestLocalAddress(sp.NA())
- if addrmgr.IsRoutable(lna) {
- // Filter addresses the peer already knows about.
- addresses := []*wire.NetAddress{lna}
- sp.pushAddrMsg(addresses)
- }
- }
- // Request known addresses if the server address manager needs
- // more and the peer has a protocol version new enough to
- // include a timestamp with addresses.
- hasTimestamp := sp.ProtocolVersion() >= protocol.NetAddressTimeVersion
- if s.addrManager.NeedMoreAddresses() && hasTimestamp {
- sp.QueueMessage(wire.NewMsgGetAddr(), nil)
- }
- // Mark the address as a known good address.
- s.addrManager.Good(sp.NA())
- }
- // Notify the connection manager of finality
- s.connManager.NotifyConnectionRequestActuallyCompleted()
- return true
- }
- // handleDonePeerMsg deals with peers that have signaled they are done. It is
- // invoked from the peerHandler goroutine.
- func (s *server) handleDonePeerMsg(state *peerState, sp *serverPeer) {
- var list map[int32]*serverPeer
- if sp.persistent {
- list = state.persistentPeers
- } else if sp.Inbound() {
- list = state.inboundPeers
- } else {
- list = state.outboundPeers
- }
- // Regardless of whether the peer was found in our list, we'll inform
- // our connection manager about the disconnection. This can happen if we
- // process a peer's `done` message before its `add`.
- if !sp.Inbound() {
- if sp.persistent {
- s.connManager.Disconnect(sp.connReq.ID())
- } else {
- s.connManager.Remove(sp.connReq.ID())
- go s.connManager.NewConnReq()
- }
- }
- if _, ok := list[sp.ID()]; ok {
- if !sp.Inbound() && sp.VersionKnown() {
- state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
- }
- delete(list, sp.ID())
- log.Debugf("Removed peer %s", sp)
- return
- }
- }
- // handleBanPeerMsg deals with banning peers. It is invoked from the
- // peerHandler goroutine.
- func (s *server) handleBanPeerMsg(state *peerState, sp *serverPeer) {
- host, _, err := net.SplitHostPort(sp.Addr())
- if err != nil {
- log.Debugf("can't split ban peer %s %v", sp.Addr(), err)
- return
- }
- direction := directionString(sp.Inbound())
- log.Infof("Banned peer %s (%s) for %v", host, direction,
- cfg.BanDuration)
- state.banned[host] = time.Now().Add(cfg.BanDuration)
- }
- func (s *server) sendInvMsgToPeer(sp *serverPeer, msg relayMsg) bool {
- if !sp.Connected() {
- return false
- }
- // If the inventory is a block and the peer prefers headers,
- // generate and send a headers message instead of an inventory
- // message.
- if msg.invVect.Type == wire.InvTypeBlock && sp.WantsHeaders() {
- blockHeader, ok := msg.data.(wire.BlockHeader)
- if !ok {
- log.Warnf("Underlying data for headers" +
- " is not a block header")
- return false
- }
- msgHeaders := wire.NewMsgHeaders()
- if err := msgHeaders.AddBlockHeader(&blockHeader); err != nil {
- log.Errorf("Failed to add block"+
- " header: %v", err)
- return false
- }
- sp.QueueMessage(msgHeaders, nil)
- return false
- }
- if msg.invVect.Type == wire.InvTypeTx {
- // Don't relay the transaction to the peer when it has
- // transaction relaying disabled.
- if sp.relayTxDisabled() {
- return false
- }
- txD, ok := msg.data.(*mempool.TxDesc)
- if !ok {
- log.Warnf("Underlying data for tx inv "+
- "relay is not a *mempool.TxDesc: %T",
- msg.data)
- return false
- }
- // Don't relay the transaction if the transaction fee-per-kb
- // is less than the peer's feefilter.
- feeFilter := atomic.LoadInt64(&sp.feeFilter)
- if feeFilter > 0 && txD.FeePerKB < feeFilter {
- return false
- }
- // Don't relay the transaction if there is a bloom
- // filter loaded and the transaction doesn't match it.
- if sp.filter.IsLoaded() {
- if !sp.filter.MatchTxAndUpdate(txD.Tx) {
- return false
- }
- }
- }
- // Queue the inventory to be relayed with the next batch.
- // It will be ignored if the peer is already known to
- // have the inventory.
- sp.QueueInventory(msg.invVect)
- return true
- }
- // handleRelayInvMsg deals with relaying inventory to peers that are not already
- // known to have it. It is invoked from the peerHandler goroutine.
- func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) {
- state.forAllPeers(func(sp *serverPeer) {
- s.sendInvMsgToPeer(sp, msg)
- })
- }
- // handleBroadcastMsg deals with broadcasting messages to peers. It is invoked
- // from the peerHandler goroutine.
- func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) {
- state.forAllPeers(func(sp *serverPeer) {
- if !sp.Connected() {
- return
- }
- for _, ep := range bmsg.excludePeers {
- if sp == ep {
- return
- }
- }
- sp.QueueMessage(bmsg.message, nil)
- })
- }
- type getConnCountMsg struct {
- reply chan int32
- }
- type getPeersMsg struct {
- reply chan []*serverPeer
- }
- type getOutboundGroup struct {
- key string
- reply chan int
- }
- type getAddedNodesMsg struct {
- reply chan []*serverPeer
- }
- type disconnectNodeMsg struct {
- cmp func(*serverPeer) bool
- reply chan er.R
- }
- type connectNodeMsg struct {
- addr string
- permanent bool
- reply chan er.R
- }
- type removeNodeMsg struct {
- cmp func(*serverPeer) bool
- reply chan er.R
- }
- // handleQuery is the central handler for all queries and commands from other
- // goroutines related to peer state.
- func (s *server) handleQuery(state *peerState, querymsg interface{}) {
- switch msg := querymsg.(type) {
- case getConnCountMsg:
- nconnected := int32(0)
- state.forAllPeers(func(sp *serverPeer) {
- if sp.Connected() {
- nconnected++
- }
- })
- msg.reply <- nconnected
- case getPeersMsg:
- peers := make([]*serverPeer, 0, state.Count())
- state.forAllPeers(func(sp *serverPeer) {
- if !sp.Connected() {
- return
- }
- peers = append(peers, sp)
- })
- msg.reply <- peers
- case connectNodeMsg:
- // TODO: duplicate oneshots?
- // Limit max number of total peers.
- if state.Count() >= cfg.MaxPeers {
- msg.reply <- er.New("max peers reached")
- return
- }
- for _, peer := range state.persistentPeers {
- if peer.Addr() == msg.addr {
- if msg.permanent {
- msg.reply <- er.New("peer already connected")
- } else {
- msg.reply <- er.New("peer exists as a permanent peer")
- }
- return
- }
- }
- netAddr, err := addrStringToNetAddr(msg.addr)
- if err != nil {
- msg.reply <- err
- return
- }
- // TODO: if too many, nuke a non-perm peer.
- go s.connManager.Connect(&connmgr.ConnReq{
- Addr: netAddr,
- Permanent: msg.permanent,
- })
- msg.reply <- nil
- case removeNodeMsg:
- found := disconnectPeer(state.persistentPeers, msg.cmp, func(sp *serverPeer) {
- // Keep group counts ok since we remove from
- // the list now.
- state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
- })
- if found {
- msg.reply <- nil
- } else {
- msg.reply <- er.New("peer not found")
- }
- case getOutboundGroup:
- count, ok := state.outboundGroups[msg.key]
- if ok {
- msg.reply <- count
- } else {
- msg.reply <- 0
- }
- // Request a list of the persistent (added) peers.
- case getAddedNodesMsg:
- // Respond with a slice of the relevant peers.
- peers := make([]*serverPeer, 0, len(state.persistentPeers))
- for _, sp := range state.persistentPeers {
- peers = append(peers, sp)
- }
- msg.reply <- peers
- case disconnectNodeMsg:
- // Check inbound peers. We pass a nil callback since we don't
- // require any additional actions on disconnect for inbound peers.
- found := disconnectPeer(state.inboundPeers, msg.cmp, nil)
- if found {
- msg.reply <- nil
- return
- }
- // Check outbound peers.
- found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
- // Keep group counts ok since we remove from
- // the list now.
- state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
- })
- if found {
- // If there are multiple outbound connections to the same
- // ip:port, continue disconnecting them all until no such
- // peers are found.
- for found {
- found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
- state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
- })
- }
- msg.reply <- nil
- return
- }
- msg.reply <- er.New("peer not found")
- }
- }
- // disconnectPeer attempts to drop the connection of a targeted peer in the
- // passed peer list. Targets are identified via usage of the passed
- // `compareFunc`, which should return `true` if the passed peer is the target
- // peer. This function returns true on success and false if the peer is unable
- // to be located. If the peer is found, and the passed callback: `whenFound'
- // isn't nil, we call it with the peer as the argument before it is removed
- // from the peerList, and is disconnected from the server.
- func disconnectPeer(peerList map[int32]*serverPeer, compareFunc func(*serverPeer) bool, whenFound func(*serverPeer)) bool {
- for addr, peer := range peerList {
- if compareFunc(peer) {
- if whenFound != nil {
- whenFound(peer)
- }
- // This is ok because we are not continuing
- // to iterate so won't corrupt the loop.
- delete(peerList, addr)
- peer.Disconnect()
- return true
- }
- }
- return false
- }
- // newPeerConfig returns the configuration for the given serverPeer.
- func newPeerConfig(sp *serverPeer) *peer.Config {
- return &peer.Config{
- Listeners: peer.MessageListeners{
- OnVersion: sp.OnVersion,
- OnVerAck: sp.OnVerAck,
- OnMemPool: sp.OnMemPool,
- OnTx: sp.OnTx,
- OnBlock: sp.OnBlock,
- OnInv: sp.OnInv,
- OnHeaders: sp.OnHeaders,
- OnGetData: sp.OnGetData,
- OnGetBlocks: sp.OnGetBlocks,
- OnGetHeaders: sp.OnGetHeaders,
- OnGetCFilters: sp.OnGetCFilters,
- OnGetCFHeaders: sp.OnGetCFHeaders,
- OnGetCFCheckpt: sp.OnGetCFCheckpt,
- OnFeeFilter: sp.OnFeeFilter,
- OnFilterAdd: sp.OnFilterAdd,
- OnFilterClear: sp.OnFilterClear,
- OnFilterLoad: sp.OnFilterLoad,
- OnGetAddr: sp.OnGetAddr,
- OnAddr: sp.OnAddr,
- OnRead: sp.OnRead,
- OnWrite: sp.OnWrite,
- },
- NewestBlock: sp.newestBlock,
- HostToNetAddress: sp.server.addrManager.HostToNetAddress,
- UserAgentName: version.UserAgentName(),
- UserAgentVersion: version.UserAgentVersion(),
- UserAgentComments: cfg.UserAgentComments,
- ChainParams: sp.server.chainParams,
- Services: sp.server.services,
- DisableRelayTx: cfg.BlocksOnly,
- ProtocolVersion: peer.MaxProtocolVersion,
- TrickleInterval: cfg.TrickleInterval,
- }
- }
- // inboundPeerConnected is invoked by the connection manager when a new inbound
- // connection is established. It initializes a new inbound server peer
- // instance, associates it with the connection, and starts a goroutine to wait
- // for disconnection.
- func (s *server) inboundPeerConnected(conn net.Conn) {
- sp := newServerPeer(s, false)
- sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
- sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
- sp.AssociateConnection(conn)
- go s.peerDoneHandler(sp)
- }
- // outboundPeerConnected is invoked by the connection manager when a new
- // outbound connection is established. It initializes a new outbound server
- // peer instance, associates it with the relevant state such as the connection
- // request instance and the connection itself, and finally notifies the address
- // manager of the attempt.
- func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
- sp := newServerPeer(s, c.Permanent)
- p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
- if err != nil {
- log.Debugf("Cannot create outbound peer %s: %v", c.Addr, err)
- if c.Permanent {
- s.connManager.Disconnect(c.ID())
- } else {
- s.connManager.Remove(c.ID())
- go s.connManager.NewConnReq()
- }
- return
- }
- sp.Peer = p
- sp.connReq = c
- sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
- sp.AssociateConnection(conn)
- go s.peerDoneHandler(sp)
- }
- // peerDoneHandler handles peer disconnects by notifiying the server that it's
- // done along with other performing other desirable cleanup.
- func (s *server) peerDoneHandler(sp *serverPeer) {
- sp.WaitForDisconnect()
- s.donePeers <- sp
- if sp.VerAckReceived() {
- s.syncManager.DonePeer(sp.Peer)
- // Evict any remaining orphans that were sent by the peer.
- numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID()))
- if numEvicted > 0 {
- log.Debugf("Evicted %d orphan(s) from peer %v (id %d)",
- numEvicted, sp, sp.ID())
- }
- }
- close(sp.quit)
- }
- func (s *server) sendRandomInv(state *peerState) {
- descs := s.txMemPool.TxDescs()
- if len(descs) == 0 {
- return
- }
- winningTx := descs[mathrand.Intn(len(descs))]
- candidates := make([]*serverPeer, 0, state.Count())
- state.forAllPeers(func(sp *serverPeer) {
- if !sp.Connected() {
- return
- }
- candidates = append(candidates, sp)
- })
- iv := wire.NewInvVect(wire.InvTypeTx, winningTx.Tx.Hash())
- msg := relayMsg{invVect: iv, data: winningTx}
- for {
- if len(candidates) == 0 {
- return
- }
- i := mathrand.Intn(len(candidates))
- winningPeer := candidates[i]
- if !s.sendInvMsgToPeer(winningPeer, msg) {
- candidates = append(candidates[:i], candidates[i+1:]...)
- continue
- }
- log.Debugf("Sending random tx [%s] to random peer [%s]",
- winningTx.Tx.Hash().String(), winningPeer.String())
- break
- }
- }
- // peerHandler is used to handle peer operations such as adding and removing
- // peers to and from the server, banning peers, and broadcasting messages to
- // peers. It must be run in a goroutine.
- func (s *server) peerHandler() {
- // Start the address manager and sync manager, both of which are needed
- // by peers. This is done here since their lifecycle is closely tied
- // to this handler and rather than adding more channels to sychronize
- // things, it's easier and slightly faster to simply start and stop them
- // in this handler.
- s.addrManager.Start()
- s.syncManager.Start()
- log.Tracef("Starting peer handler")
- randomInvTicker := time.NewTicker(time.Second * 10)
- state := &peerState{
- inboundPeers: make(map[int32]*serverPeer),
- persistentPeers: make(map[int32]*serverPeer),
- outboundPeers: make(map[int32]*serverPeer),
- banned: make(map[string]time.Time),
- outboundGroups: make(map[string]int),
- }
- if !cfg.DisableDNSSeed {
- // Add peers discovered through DNS to the address manager.
- connmgr.SeedFromDNS(activeNetParams.Params, defaultRequiredServices,
- pktdLookup, func(addrs []*wire.NetAddress) {
- // Bitcoind uses a lookup of the dns seeder here. This
- // is rather strange since the values looked up by the
- // DNS seed lookups will vary quite a lot.
- // to replicate this behavior we put all addresses as
- // having come from the first one.
- s.addrManager.AddAddresses(addrs, addrs[0])
- })
- }
- go s.connManager.Start()
- out:
- for {
- select {
- // New peers connected to the server.
- case p := <-s.newPeers:
- s.handleAddPeerMsg(state, p)
- // Disconnected peers.
- case p := <-s.donePeers:
- s.handleDonePeerMsg(state, p)
- // Block accepted in mainchain or orphan, update peer height.
- case umsg := <-s.peerHeightsUpdate:
- s.handleUpdatePeerHeights(state, umsg)
- // Peer to ban.
- case p := <-s.banPeers:
- s.handleBanPeerMsg(state, p)
- // New inventory to potentially be relayed to other peers.
- case invMsg := <-s.relayInv:
- s.handleRelayInvMsg(state, invMsg)
- // Message to broadcast to all connected peers except those
- // which are excluded by the message.
- case bmsg := <-s.broadcast:
- s.handleBroadcastMsg(state, &bmsg)
- case qmsg := <-s.query:
- s.handleQuery(state, qmsg)
- case <-s.quit:
- // Disconnect all peers on server shutdown.
- state.forAllPeers(func(sp *serverPeer) {
- log.Tracef("Shutdown peer %s", sp)
- sp.Disconnect()
- })
- break out
- case <-randomInvTicker.C:
- s.sendRandomInv(state)
- }
- }
- s.connManager.Stop()
- s.syncManager.Stop()
- s.addrManager.Stop()
- // Drain channels before exiting so nothing is left waiting around
- // to send.
- cleanup:
- for {
- select {
- case <-s.newPeers:
- case <-s.donePeers:
- case <-s.peerHeightsUpdate:
- case <-s.relayInv:
- case <-s.broadcast:
- case <-s.query:
- default:
- break cleanup
- }
- }
- s.wg.Done()
- log.Tracef("Peer handler done")
- }
- // AddPeer adds a new peer that has already been connected to the server.
- func (s *server) AddPeer(sp *serverPeer) {
- s.newPeers <- sp
- }
- // BanPeer bans a peer that has already been connected to the server by ip.
- func (s *server) BanPeer(sp *serverPeer) {
- s.banPeers <- sp
- }
- // RelayInventory relays the passed inventory vector to all connected peers
- // that are not already known to have it.
- func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}) {
- s.relayInv <- relayMsg{invVect: invVect, data: data}
- }
- // BroadcastMessage sends msg to all peers currently connected to the server
- // except those in the passed peers to exclude.
- func (s *server) BroadcastMessage(msg wire.Message, exclPeers ...*serverPeer) {
- bmsg := broadcastMsg{message: msg, excludePeers: exclPeers}
- s.broadcast <- bmsg
- }
- // ConnectedCount returns the number of currently connected peers.
- func (s *server) ConnectedCount() int32 {
- replyChan := make(chan int32)
- s.query <- getConnCountMsg{reply: replyChan}
- return <-replyChan
- }
- // OutboundGroupCount returns the number of peers connected to the given
- // outbound group key.
- func (s *server) OutboundGroupCount(key string) int {
- replyChan := make(chan int)
- s.query <- getOutboundGroup{key: key, reply: replyChan}
- return <-replyChan
- }
- // AddBytesSent adds the passed number of bytes to the total bytes sent counter
- // for the server. It is safe for concurrent access.
- func (s *server) AddBytesSent(bytesSent uint64) {
- atomic.AddUint64(&s.bytesSent, bytesSent)
- }
- // AddBytesReceived adds the passed number of bytes to the total bytes received
- // counter for the server. It is safe for concurrent access.
- func (s *server) AddBytesReceived(bytesReceived uint64) {
- atomic.AddUint64(&s.bytesReceived, bytesReceived)
- }
- // NetTotals returns the sum of all bytes received and sent across the network
- // for all peers. It is safe for concurrent access.
- func (s *server) NetTotals() (uint64, uint64) {
- return atomic.LoadUint64(&s.bytesReceived),
- atomic.LoadUint64(&s.bytesSent)
- }
- // UpdatePeerHeights updates the heights of all peers who have have announced
- // the latest connected main chain block, or a recognized orphan. These height
- // updates allow us to dynamically refresh peer heights, ensuring sync peer
- // selection has access to the latest block heights for each peer.
- func (s *server) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *peer.Peer) {
- s.peerHeightsUpdate <- updatePeerHeightsMsg{
- newHash: latestBlkHash,
- newHeight: latestHeight,
- originPeer: updateSource,
- }
- }
- // rebroadcastHandler keeps track of user submitted inventories that we have
- // sent out but have not yet made it into a block. We periodically rebroadcast
- // them in case our peers restarted or otherwise lost track of them.
- func (s *server) rebroadcastHandler() {
- // Wait 5 min before first tx rebroadcast.
- timer := time.NewTimer(5 * time.Minute)
- pendingInvs := make(map[wire.InvVect]interface{})
- out:
- for {
- select {
- case riv := <-s.modifyRebroadcastInv:
- switch msg := riv.(type) {
- // Incoming InvVects are added to our map of RPC txs.
- case broadcastInventoryAdd:
- pendingInvs[*msg.invVect] = msg.data
- // When an InvVect has been added to a block, we can
- // now remove it, if it was present.
- case broadcastInventoryDel:
- delete(pendingInvs, *msg)
- }
- case <-timer.C:
- // Any inventory we have has not made it into a block
- // yet. We periodically resubmit them until they have.
- for iv, data := range pendingInvs {
- ivCopy := iv
- s.RelayInventory(&ivCopy, data)
- }
- // Process at a random time up to 30mins (in seconds)
- // in the future.
- timer.Reset(time.Second *
- time.Duration(randomUint16Number(1800)))
- case <-s.quit:
- break out
- }
- }
- timer.Stop()
- // Drain channels before exiting so nothing is left waiting around
- // to send.
- cleanup:
- for {
- select {
- case <-s.modifyRebroadcastInv:
- default:
- break cleanup
- }
- }
- s.wg.Done()
- }
- // Start begins accepting connections from peers.
- func (s *server) Start() {
- // Already started?
- if atomic.AddInt32(&s.started, 1) != 1 {
- return
- }
- log.Trace("Starting server")
- // Start the peer handler which in turn starts the address and block
- // managers.
- s.wg.Add(1)
- go s.peerHandler()
- if s.nat != nil {
- s.wg.Add(1)
- go s.upnpUpdateThread()
- }
- if !cfg.DisableRPC {
- s.wg.Add(1)
- // Start the rebroadcastHandler, which ensures user tx received by
- // the RPC server are rebroadcast until being included in a block.
- go s.rebroadcastHandler()
- s.rpcServer.Start()
- }
- // Start the CPU miner if generation is enabled.
- if cfg.Generate {
- s.cpuMiner.Start()
- }
- }
- // Stop gracefully shuts down the server by stopping and disconnecting all
- // peers and the main listener.
- func (s *server) Stop() er.R {
- // Make sure this only happens once.
- if atomic.AddInt32(&s.shutdown, 1) != 1 {
- log.Infof("Server is already in the process of shutting down")
- return nil
- }
- log.Warnf("Server shutting down")
- // Stop the CPU miner if needed
- s.cpuMiner.Stop()
- // Shutdown the RPC server if it's not disabled.
- if !cfg.DisableRPC {
- s.rpcServer.Stop()
- }
- // Save fee estimator state in the database.
- s.db.Update(func(tx database.Tx) er.R {
- metadata := tx.Metadata()
- metadata.Put(mempool.EstimateFeeDatabaseKey, s.feeEstimator.Save())
- return nil
- })
- // Signal the remaining goroutines to quit.
- close(s.quit)
- return nil
- }
- // WaitForShutdown blocks until the main listener and peer handlers are stopped.
- func (s *server) WaitForShutdown() {
- s.wg.Wait()
- }
- // parseListeners determines whether each listen address is IPv4 and IPv6 and
- // returns a slice of appropriate net.Addrs to listen on with TCP. It also
- // properly detects addresses which apply to "all interfaces" and adds the
- // address as both IPv4 and IPv6.
- func parseListeners(addrs []string) ([]net.Addr, er.R) {
- netAddrs := make([]net.Addr, 0, len(addrs)*2)
- for _, addr := range addrs {
- host, _, errr := net.SplitHostPort(addr)
- if errr != nil {
- // Shouldn't happen due to already being normalized.
- return nil, er.E(errr)
- }
- // Empty host or host of * on plan9 is both IPv4 and IPv6.
- if host == "" || (host == "*" && runtime.GOOS == "plan9") {
- netAddrs = append(netAddrs, simpleAddr{net: "tcp4", addr: addr}, simpleAddr{net: "tcp6", addr: addr})
- continue
- }
- // Strip IPv6 zone id if present since net.ParseIP does not
- // handle it.
- zoneIndex := strings.LastIndex(host, "%")
- if zoneIndex > 0 {
- host = host[:zoneIndex]
- }
- // Parse the IP.
- ip := net.ParseIP(host)
- if ip == nil {
- return nil, er.Errorf("'%s' is not a valid IP address", host)
- }
- // To4 returns nil when the IP is not an IPv4 address, so use
- // this determine the address type.
- if ip.To4() == nil {
- netAddrs = append(netAddrs, simpleAddr{net: "tcp6", addr: addr})
- } else {
- netAddrs = append(netAddrs, simpleAddr{net: "tcp4", addr: addr})
- }
- }
- return netAddrs, nil
- }
- func (s *server) upnpUpdateThread() {
- // Go off immediately to prevent code duplication, thereafter we renew
- // lease every 15 minutes.
- timer := time.NewTimer(0 * time.Second)
- lport, errr := strconv.ParseInt(activeNetParams.DefaultPort, 10, 16)
- if errr == nil {
- panic("upnpUpdateThread: lport == nil")
- }
- first := true
- out:
- for {
- select {
- case <-timer.C:
- // TODO: pick external port more cleverly
- // TODO: know which ports we are listening to on an external net.
- // TODO: if specific listen port doesn't work then ask for wildcard
- // listen port?
- // XXX this assumes timeout is in seconds.
- listenPort, err := s.nat.AddPortMapping("tcp", int(lport), int(lport),
- "pktd listen port", 20*60)
- if err != nil {
- log.Warnf("can't add UPnP port mapping: %v", err)
- }
- if first && err == nil {
- // TODO: look this up periodically to see if upnp domain changed
- // and so did ip.
- externalip, err := s.nat.GetExternalAddress()
- if err != nil {
- log.Warnf("UPnP can't get external address: %v", err)
- continue out
- }
- na := wire.NewNetAddressIPPort(externalip, uint16(listenPort),
- s.services)
- err = s.addrManager.AddLocalAddress(na, addrmgr.UpnpPrio)
- if err != nil {
- log.Warnf("UPnP AddLocalAddress() failed %v", err)
- err = s.nat.DeletePortMapping("tcp", int(lport), int(lport))
- if err != nil {
- log.Warnf("UPnP DeletePortMapping() failed %v", err)
- }
- continue
- }
- log.Warnf("Successfully bound via UPnP to %s", addrmgr.NetAddressKey(na))
- first = false
- }
- timer.Reset(time.Minute * 15)
- case <-s.quit:
- break out
- }
- }
- timer.Stop()
- if err := s.nat.DeletePortMapping("tcp", int(lport), int(lport)); err != nil {
- log.Warnf("unable to remove UPnP port mapping: %v", err)
- } else {
- log.Debugf("successfully disestablished UPnP port mapping")
- }
- s.wg.Done()
- }
- // setupRPCListeners returns a slice of listeners that are configured for use
- // with the RPC server depending on the configuration settings for listen
- // addresses and TLS.
- func setupRPCListeners() ([]net.Listener, er.R) {
- // Setup TLS if not disabled.
- listenFunc := net.Listen
- if cfg.EnableTLS {
- // Generate the TLS cert and key file if both don't already
- // exist.
- if !fileExists(cfg.RPCKey) && !fileExists(cfg.RPCCert) {
- err := genCertPair(cfg.RPCCert, cfg.RPCKey)
- if err != nil {
- return nil, err
- }
- }
- keypair, errr := tls.LoadX509KeyPair(cfg.RPCCert, cfg.RPCKey)
- if errr != nil {
- return nil, er.E(errr)
- }
- tlsConfig := tls.Config{
- Certificates: []tls.Certificate{keypair},
- MinVersion: tls.VersionTLS12,
- }
- // Change the standard net.Listen function to the tls one.
- listenFunc = func(net, laddr string) (net.Listener, error) {
- return tls.Listen(net, laddr, &tlsConfig)
- }
- }
- netAddrs, err := parseListeners(cfg.RPCListeners)
- if err != nil {
- return nil, err
- }
- listeners := make([]net.Listener, 0, len(netAddrs))
- for _, addr := range netAddrs {
- listener, err := listenFunc(addr.Network(), addr.String())
- if err != nil {
- log.Warnf("Can't listen on %s: %v", addr, err)
- continue
- }
- listeners = append(listeners, listener)
- }
- return listeners, nil
- }
- // newServer returns a new pktd server configured to listen on addr for the
- // bitcoin network type specified by chainParams. Use start to begin accepting
- // connections from peers.
- func newServer(listenAddrs, agentBlacklist, agentWhitelist []string,
- db database.DB, chainParams *chaincfg.Params,
- interrupt <-chan struct{}) (*server, er.R) {
- services := defaultServices
- if cfg.NoPeerBloomFilters {
- services &^= protocol.SFNodeBloom
- }
- if cfg.NoCFilters {
- services &^= protocol.SFNodeCF
- }
- amgr := addrmgr.New(cfg.DataDir, pktdLookup)
- var listeners []net.Listener
- var nat NAT
- if !cfg.DisableListen {
- var err er.R
- listeners, nat, err = initListeners(amgr, listenAddrs, services)
- if err != nil {
- return nil, err
- }
- if len(listeners) == 0 {
- return nil, er.New("no valid listen address")
- }
- }
- if len(agentBlacklist) > 0 {
- log.Infof("User-agent blacklist %s", agentBlacklist)
- }
- if len(agentWhitelist) > 0 {
- log.Infof("User-agent whitelist %s", agentWhitelist)
- }
- s := server{
- startupTime: time.Now().Unix(),
- chainParams: chainParams,
- addrManager: amgr,
- newPeers: make(chan *serverPeer, cfg.MaxPeers),
- donePeers: make(chan *serverPeer, cfg.MaxPeers),
- banPeers: make(chan *serverPeer, cfg.MaxPeers),
- query: make(chan interface{}),
- relayInv: make(chan relayMsg, cfg.MaxPeers),
- broadcast: make(chan broadcastMsg, cfg.MaxPeers),
- quit: make(chan struct{}),
- modifyRebroadcastInv: make(chan interface{}),
- peerHeightsUpdate: make(chan updatePeerHeightsMsg),
- nat: nat,
- db: db,
- timeSource: blockchain.NewMedianTime(),
- services: services,
- sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize),
- hashCache: txscript.NewHashCache(cfg.SigCacheMaxSize),
- cfCheckptCaches: make(map[wire.FilterType][]cfHeaderKV),
- agentBlacklist: agentBlacklist,
- agentWhitelist: agentWhitelist,
- }
- // Create the transaction and address indexes if needed.
- //
- // CAUTION: the txindex needs to be first in the indexes array because
- // the addrindex uses data from the txindex during catchup. If the
- // addrindex is run first, it may not have the transactions from the
- // current block indexed.
- var indexes []indexers.Indexer
- if cfg.TxIndex || cfg.AddrIndex {
- // Enable transaction index if address index is enabled since it
- // requires it.
- if !cfg.TxIndex {
- log.Infof("Transaction index enabled because it " +
- "is required by the address index")
- cfg.TxIndex = true
- } else {
- log.Info("Transaction index is enabled")
- }
- s.txIndex = indexers.NewTxIndex(db)
- indexes = append(indexes, s.txIndex)
- }
- if cfg.AddrIndex {
- log.Info("Address index is enabled")
- s.addrIndex = indexers.NewAddrIndex(db, chainParams)
- indexes = append(indexes, s.addrIndex)
- }
- if !cfg.NoCFilters {
- log.Info("Committed filter index is enabled")
- s.cfIndex = indexers.NewCfIndex(db, chainParams)
- indexes = append(indexes, s.cfIndex)
- }
- // Create an index manager if any of the optional indexes are enabled.
- var indexManager blockchain.IndexManager
- if len(indexes) > 0 {
- indexManager = indexers.NewManager(db, indexes)
- }
- // Merge given checkpoints with the default ones unless they are disabled.
- var checkpoints []chaincfg.Checkpoint
- if !cfg.DisableCheckpoints {
- checkpoints = mergeCheckpoints(s.chainParams.Checkpoints, cfg.addCheckpoints)
- }
- // Create a new block chain instance with the appropriate configuration.
- var err er.R
- s.chain, err = blockchain.New(&blockchain.Config{
- DB: s.db,
- Interrupt: interrupt,
- ChainParams: s.chainParams,
- Checkpoints: checkpoints,
- TimeSource: s.timeSource,
- SigCache: s.sigCache,
- IndexManager: indexManager,
- HashCache: s.hashCache,
- })
- if err != nil {
- return nil, err
- }
- // Search for a FeeEstimator state in the database. If none can be found
- // or if it cannot be loaded, create a new one.
- db.Update(func(tx database.Tx) er.R {
- metadata := tx.Metadata()
- feeEstimationData := metadata.Get(mempool.EstimateFeeDatabaseKey)
- if feeEstimationData != nil {
- // delete it from the database so that we don't try to restore the
- // same thing again somehow.
- metadata.Delete(mempool.EstimateFeeDatabaseKey)
- // If there is an error, log it and make a new fee estimator.
- var err er.R
- s.feeEstimator, err = mempool.RestoreFeeEstimator(feeEstimationData)
- if err != nil {
- log.Errorf("Failed to restore fee estimator %v", err)
- }
- }
- return nil
- })
- // If no feeEstimator has been found, or if the one that has been found
- // is behind somehow, create a new one and start over.
- if s.feeEstimator == nil || s.feeEstimator.LastKnownHeight() != s.chain.BestSnapshot().Height {
- s.feeEstimator = mempool.NewFeeEstimator(
- mempool.DefaultEstimateFeeMaxRollback,
- mempool.DefaultEstimateFeeMinRegisteredBlocks)
- }
- txC := mempool.Config{
- Policy: mempool.Policy{
- DisableRelayPriority: cfg.NoRelayPriority,
- AcceptNonStd: cfg.RelayNonStd,
- FreeTxRelayLimit: cfg.FreeTxRelayLimit,
- MaxOrphanTxs: cfg.MaxOrphanTxs,
- MaxOrphanTxSize: defaultMaxOrphanTxSize,
- MaxSigOpCostPerTx: blockchain.MaxBlockSigOpsCost / 4,
- MinRelayTxFee: cfg.minRelayTxFee,
- MaxTxVersion: 2,
- RejectReplacement: cfg.RejectReplacement,
- },
- ChainParams: chainParams,
- FetchUtxoView: s.chain.FetchUtxoView,
- BestHeight: func() int32 { return s.chain.BestSnapshot().Height },
- MedianTimePast: func() time.Time { return s.chain.BestSnapshot().MedianTime },
- CalcSequenceLock: func(tx *btcutil.Tx, view *blockchain.UtxoViewpoint) (*blockchain.SequenceLock, er.R) {
- return s.chain.CalcSequenceLock(tx, view, true)
- },
- IsDeploymentActive: s.chain.IsDeploymentActive,
- SigCache: s.sigCache,
- HashCache: s.hashCache,
- AddrIndex: s.addrIndex,
- FeeEstimator: s.feeEstimator,
- }
- s.txMemPool = mempool.New(&txC)
- s.syncManager, err = netsync.New(&netsync.Config{
- PeerNotifier: &s,
- Chain: s.chain,
- TxMemPool: s.txMemPool,
- ChainParams: s.chainParams,
- DisableCheckpoints: cfg.DisableCheckpoints,
- MaxPeers: cfg.MaxPeers,
- FeeEstimator: s.feeEstimator,
- })
- if err != nil {
- return nil, err
- }
- msc := 0
- switch cfg.MiningSkipChecks {
- case "txns":
- msc = mining.CheckTxns
- case "template":
- msc = mining.CheckBlkTemplate
- case "both":
- msc = mining.CheckBoth
- }
- // Create the mining policy and block template generator based on the
- // configuration options.
- //
- // NOTE: The CPU miner relies on the mempool, so the mempool has to be
- // created before calling the function to create the CPU miner.
- policy := mining.Policy{
- SkipChecks: msc,
- BlockMinWeight: cfg.BlockMinWeight,
- BlockMaxWeight: cfg.BlockMaxWeight,
- BlockMinSize: cfg.BlockMinSize,
- BlockMaxSize: cfg.BlockMaxSize,
- BlockPrioritySize: cfg.BlockPrioritySize,
- TxMinFreeFee: cfg.minRelayTxFee,
- }
- blockTemplateGenerator := mining.NewBlkTmplGenerator(&policy,
- s.chainParams, s.txMemPool, s.chain, s.timeSource,
- s.sigCache, s.hashCache)
- s.cpuMiner = cpuminer.New(&cpuminer.Config{
- ChainParams: chainParams,
- BlockTemplateGenerator: blockTemplateGenerator,
- MiningAddrs: cfg.miningAddrs,
- ProcessBlock: s.syncManager.ProcessBlock,
- ConnectedCount: s.ConnectedCount,
- IsCurrent: s.syncManager.IsCurrent,
- })
- // Only setup a function to return new addresses to connect to when
- // not running in connect-only mode. The simulation network is always
- // in connect-only mode since it is only intended to connect to
- // specified peers and actively avoid advertising and connecting to
- // discovered peers in order to prevent it from becoming a public test
- // network.
- var newAddressFunc func() (net.Addr, er.R)
- if !cfg.SimNet && !cfg.RegressionTest && len(cfg.ConnectPeers) == 0 {
- newAddressFunc = func() (net.Addr, er.R) {
- for tries := 0; tries < 100; tries++ {
- addr := s.addrManager.GetAddress()
- if addr == nil {
- break
- }
- // Address will not be invalid, local or unroutable
- // because addrmanager rejects those on addition.
- // Just check that we don't already have an address
- // in the same group so that we are not connecting
- // to the same network segment at the expense of
- // others.
- key := addrmgr.GroupKey(addr.NetAddress())
- if s.OutboundGroupCount(key) != 0 {
- continue
- }
- // only allow recent nodes (10mins) after we failed 10
- // times
- lastTime := s.addrManager.GetLastAttempt(addr.NetAddress())
- if tries < 10 && time.Since(lastTime) < 10*time.Minute {
- continue
- }
- // allow nondefault ports after 20 failed tries.
- if tries < 20 && fmt.Sprintf("%d", addr.NetAddress().Port) !=
- activeNetParams.DefaultPort {
- continue
- }
- // Mark an attempt for the valid address.
- s.addrManager.Attempt(addr.NetAddress())
- addrString := addrmgr.NetAddressKey(addr.NetAddress())
- return addrStringToNetAddr(addrString)
- }
- return nil, er.New("no valid connect address")
- }
- }
- // Create a connection manager.
- targetOutbound := defaultTargetOutbound
- if cfg.MaxPeers < targetOutbound {
- targetOutbound = cfg.MaxPeers
- }
- cmgr, err := connmgr.New(&connmgr.Config{
- Listeners: listeners,
- OnAccept: s.inboundPeerConnected,
- RetryDuration: connectionRetryInterval,
- TargetOutbound: uint32(targetOutbound),
- Dial: pktdDial,
- OnConnection: s.outboundPeerConnected,
- GetNewAddress: newAddressFunc,
- })
- if err != nil {
- return nil, err
- }
- s.connManager = cmgr
- // Start up persistent peers.
- permanentPeers := cfg.ConnectPeers
- if len(permanentPeers) == 0 {
- permanentPeers = cfg.AddPeers
- }
- for _, addr := range permanentPeers {
- netAddr, err := addrStringToNetAddr(addr)
- if err != nil {
- return nil, err
- }
- go s.connManager.Connect(&connmgr.ConnReq{
- Addr: netAddr,
- Permanent: true,
- })
- }
- if !cfg.DisableRPC {
- // Setup listeners for the configured RPC listen addresses and
- // TLS settings.
- rpcListeners, err := setupRPCListeners()
- if err != nil {
- return nil, err
- }
- if len(rpcListeners) == 0 {
- return nil, er.New("RPCS: No valid listen address")
- }
- s.rpcServer, err = newRPCServer(&rpcserverConfig{
- Listeners: rpcListeners,
- StartupTime: s.startupTime,
- ConnMgr: &rpcConnManager{&s},
- SyncMgr: &rpcSyncMgr{&s, s.syncManager},
- TimeSource: s.timeSource,
- Chain: s.chain,
- ChainParams: chainParams,
- DB: db,
- TxMemPool: s.txMemPool,
- Generator: blockTemplateGenerator,
- CPUMiner: s.cpuMiner,
- TxIndexOrNil: s.txIndex,
- AddrIndex: s.addrIndex,
- CfIndex: s.cfIndex,
- FeeEstimator: s.feeEstimator,
- ServiceFlags: services,
- })
- if err != nil {
- return nil, err
- }
- // Signal process shutdown when the RPC server requests it.
- go func() {
- <-s.rpcServer.RequestedProcessShutdown()
- shutdownRequestChannel <- struct{}{}
- }()
- }
- return &s, nil
- }
- // initListeners initializes the configured net listeners and adds any bound
- // addresses to the address manager. Returns the listeners and a NAT interface,
- // which is non-nil if UPnP is in use.
- func initListeners(amgr *addrmgr.AddrManager, listenAddrs []string, services protocol.ServiceFlag) ([]net.Listener, NAT, er.R) {
- // Listen for TCP connections at the configured addresses
- netAddrs, err := parseListeners(listenAddrs)
- if err != nil {
- return nil, nil, err
- }
- listeners := make([]net.Listener, 0, len(netAddrs))
- for _, addr := range netAddrs {
- listener, err := net.Listen(addr.Network(), addr.String())
- if err != nil {
- log.Warnf("Can't listen on %s: %v", addr, err)
- continue
- }
- listeners = append(listeners, listener)
- }
- var nat NAT
- if len(cfg.ExternalIPs) != 0 {
- defaultPort, errr := strconv.ParseUint(activeNetParams.DefaultPort, 10, 16)
- if errr != nil {
- log.Errorf("Can not parse default port %s for active chain: %v",
- activeNetParams.DefaultPort, errr)
- return nil, nil, er.E(errr)
- }
- for _, sip := range cfg.ExternalIPs {
- eport := uint16(defaultPort)
- host, portstr, errr := net.SplitHostPort(sip)
- if errr != nil {
- // no port, use default.
- host = sip
- } else {
- port, err := strconv.ParseUint(portstr, 10, 16)
- if err != nil {
- log.Warnf("Can not parse port from %s for "+
- "externalip: %v", sip, err)
- continue
- }
- eport = uint16(port)
- }
- na, err := amgr.HostToNetAddress(host, eport, services)
- if err != nil {
- log.Warnf("Not adding %s as externalip: %v", sip, err)
- continue
- }
- err = amgr.AddLocalAddress(na, addrmgr.ManualPrio)
- if err != nil {
- log.Warnf("Skipping specified external IP: %v", err)
- }
- }
- } else {
- if cfg.Upnp {
- var err er.R
- nat, err = Discover()
- if err != nil {
- log.Warnf("Can't discover upnp: %v", err)
- }
- // nil nat here is fine, just means no upnp on network.
- }
- // Add bound addresses to address manager to be advertised to peers.
- for _, listener := range listeners {
- addr := listener.Addr().String()
- err := addLocalAddress(amgr, addr, services)
- if err != nil {
- log.Warnf("Skipping bound address %s: %v", addr, err)
- }
- }
- }
- return listeners, nat, nil
- }
- // addrStringToNetAddr takes an address in the form of 'host:port'
- // and returns a net.Addr which maps to the original address with
- // any host names resolved to IP addresses.
- func addrStringToNetAddr(addr string) (net.Addr, er.R) {
- host, strPort, errr := net.SplitHostPort(addr)
- if errr != nil {
- return nil, er.E(errr)
- }
- port, errr := strconv.Atoi(strPort)
- if errr != nil {
- return nil, er.E(errr)
- }
- // Skip if host is already an IP address.
- if ip := net.ParseIP(host); ip != nil {
- return &net.TCPAddr{
- IP: ip,
- Port: port,
- }, nil
- }
- // Attempt to look up an IP address associated with the parsed host.
- ips, err := pktdLookup(host)
- if err != nil {
- return nil, err
- }
- if len(ips) == 0 {
- return nil, er.Errorf("no addresses found for %s", host)
- }
- return &net.TCPAddr{
- IP: ips[0],
- Port: port,
- }, nil
- }
- // addLocalAddress adds an address that this node is listening on to the
- // address manager so that it may be relayed to peers.
- func addLocalAddress(addrMgr *addrmgr.AddrManager, addr string, services protocol.ServiceFlag) er.R {
- host, portStr, errr := net.SplitHostPort(addr)
- if errr != nil {
- return er.E(errr)
- }
- port, errr := strconv.ParseUint(portStr, 10, 16)
- if errr != nil {
- return er.E(errr)
- }
- if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() {
- // If bound to unspecified address, advertise all local interfaces
- addrs, errr := net.InterfaceAddrs()
- if errr != nil {
- return er.E(errr)
- }
- for _, addr := range addrs {
- ifaceIP, _, err := net.ParseCIDR(addr.String())
- if err != nil {
- continue
- }
- // If bound to 0.0.0.0, do not add IPv6 interfaces and if bound to
- // ::, do not add IPv4 interfaces.
- if (ip.To4() == nil) != (ifaceIP.To4() == nil) {
- continue
- }
- netAddr := wire.NewNetAddressIPPort(ifaceIP, uint16(port), services)
- addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio)
- }
- } else {
- netAddr, err := addrMgr.HostToNetAddress(host, uint16(port), services)
- if err != nil {
- return err
- }
- addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio)
- }
- return nil
- }
- // isWhitelisted returns whether the IP address is included in the whitelisted
- // networks and IPs.
- func isWhitelisted(addr net.Addr) bool {
- if len(cfg.whitelists) == 0 {
- return false
- }
- host, _, err := net.SplitHostPort(addr.String())
- if err != nil {
- log.Warnf("Unable to SplitHostPort on '%s': %v", addr, err)
- return false
- }
- ip := net.ParseIP(host)
- if ip == nil {
- log.Warnf("Unable to parse IP '%s'", addr)
- return false
- }
- for _, ipnet := range cfg.whitelists {
- if ipnet.Contains(ip) {
- return true
- }
- }
- return false
- }
- // checkpointSorter implements sort.Interface to allow a slice of checkpoints to
- // be sorted.
- type checkpointSorter []chaincfg.Checkpoint
- // Len returns the number of checkpoints in the slice. It is part of the
- // sort.Interface implementation.
- func (s checkpointSorter) Len() int {
- return len(s)
- }
- // Swap swaps the checkpoints at the passed indices. It is part of the
- // sort.Interface implementation.
- func (s checkpointSorter) Swap(i, j int) {
- s[i], s[j] = s[j], s[i]
- }
- // Less returns whether the checkpoint with index i should sort before the
- // checkpoint with index j. It is part of the sort.Interface implementation.
- func (s checkpointSorter) Less(i, j int) bool {
- return s[i].Height < s[j].Height
- }
- // mergeCheckpoints returns two slices of checkpoints merged into one slice
- // such that the checkpoints are sorted by height. In the case the additional
- // checkpoints contain a checkpoint with the same height as a checkpoint in the
- // default checkpoints, the additional checkpoint will take precedence and
- // overwrite the default one.
- func mergeCheckpoints(defaultCheckpoints, additional []chaincfg.Checkpoint) []chaincfg.Checkpoint {
- // Create a map of the additional checkpoints to remove duplicates while
- // leaving the most recently-specified checkpoint.
- extra := make(map[int32]chaincfg.Checkpoint)
- for _, checkpoint := range additional {
- extra[checkpoint.Height] = checkpoint
- }
- // Add all default checkpoints that do not have an override in the
- // additional checkpoints.
- numDefault := len(defaultCheckpoints)
- checkpoints := make([]chaincfg.Checkpoint, 0, numDefault+len(extra))
- for _, checkpoint := range defaultCheckpoints {
- if _, exists := extra[checkpoint.Height]; !exists {
- checkpoints = append(checkpoints, checkpoint)
- }
- }
- // Append the additional checkpoints and return the sorted results.
- for _, checkpoint := range extra {
- checkpoints = append(checkpoints, checkpoint)
- }
- sort.Sort(checkpointSorter(checkpoints))
- return checkpoints
- }
- // HasUndesiredUserAgent determines whether the server should continue to pursue
- // a connection with this peer based on its advertised user agent. It performs
- // the following steps:
- // 1) Reject the peer if it contains a blacklisted agent.
- // 2) If no whitelist is provided, accept all user agents.
- // 3) Accept the peer if it contains a whitelisted agent.
- // 4) Reject all other peers.
- func (sp *serverPeer) HasUndesiredUserAgent(blacklistedAgents,
- whitelistedAgents []string) bool {
- agent := sp.UserAgent()
- // First, if peer's user agent contains any blacklisted substring, we
- // will ignore the connection request.
- for _, blacklistedAgent := range blacklistedAgents {
- if strings.Contains(agent, blacklistedAgent) {
- log.Debugf("Ignoring peer %s, user agent "+
- "contains blacklisted user agent: %s", sp,
- agent)
- return true
- }
- }
- // If no whitelist is provided, we will accept all user agents.
- if len(whitelistedAgents) == 0 {
- return false
- }
- // Peer's user agent passed blacklist. Now check to see if it contains
- // one of our whitelisted user agents, if so accept.
- for _, whitelistedAgent := range whitelistedAgents {
- if strings.Contains(agent, whitelistedAgent) {
- return false
- }
- }
- // Otherwise, the peer's user agent was not included in our whitelist.
- // Ignore just in case it could stall the initial block download.
- log.Debugf("Ignoring peer %s, user agent: %s not found in "+
- "whitelist", sp, agent)
- return true
- }
|