12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044 |
- package htlcswitch
- import (
- "bytes"
- "errors"
- "fmt"
- "math/rand"
- "sync"
- "sync/atomic"
- "time"
- "github.com/btcsuite/btcd/btcec/v2/ecdsa"
- "github.com/btcsuite/btcd/btcutil"
- "github.com/btcsuite/btcd/wire"
- "github.com/davecgh/go-spew/spew"
- "github.com/lightningnetwork/lnd/chainntnfs"
- "github.com/lightningnetwork/lnd/channeldb"
- "github.com/lightningnetwork/lnd/channeldb/models"
- "github.com/lightningnetwork/lnd/clock"
- "github.com/lightningnetwork/lnd/contractcourt"
- "github.com/lightningnetwork/lnd/htlcswitch/hop"
- "github.com/lightningnetwork/lnd/kvdb"
- "github.com/lightningnetwork/lnd/lntypes"
- "github.com/lightningnetwork/lnd/lnwallet"
- "github.com/lightningnetwork/lnd/lnwallet/chainfee"
- "github.com/lightningnetwork/lnd/lnwire"
- "github.com/lightningnetwork/lnd/ticker"
- )
- const (
- // DefaultFwdEventInterval is the duration between attempts to flush
- // pending forwarding events to disk.
- DefaultFwdEventInterval = 15 * time.Second
- // DefaultLogInterval is the duration between attempts to log statistics
- // about forwarding events.
- DefaultLogInterval = 10 * time.Second
- // DefaultAckInterval is the duration between attempts to ack any settle
- // fails in a forwarding package.
- DefaultAckInterval = 15 * time.Second
- // DefaultMailboxDeliveryTimeout is the duration after which Adds will
- // be cancelled if they could not get added to an outgoing commitment.
- DefaultMailboxDeliveryTimeout = time.Minute
- )
- var (
- // ErrChannelLinkNotFound is used when channel link hasn't been found.
- ErrChannelLinkNotFound = errors.New("channel link not found")
- // ErrDuplicateAdd signals that the ADD htlc was already forwarded
- // through the switch and is locked into another commitment txn.
- ErrDuplicateAdd = errors.New("duplicate add HTLC detected")
- // ErrUnknownErrorDecryptor signals that we were unable to locate the
- // error decryptor for this payment. This is likely due to restarting
- // the daemon.
- ErrUnknownErrorDecryptor = errors.New("unknown error decryptor")
- // ErrSwitchExiting signaled when the switch has received a shutdown
- // request.
- ErrSwitchExiting = errors.New("htlcswitch shutting down")
- // ErrNoLinksFound is an error returned when we attempt to retrieve the
- // active links in the switch for a specific destination.
- ErrNoLinksFound = errors.New("no channel links found")
- // ErrUnreadableFailureMessage is returned when the failure message
- // cannot be decrypted.
- ErrUnreadableFailureMessage = errors.New("unreadable failure message")
- // ErrLocalAddFailed signals that the ADD htlc for a local payment
- // failed to be processed.
- ErrLocalAddFailed = errors.New("local add HTLC failed")
- // errDustThresholdExceeded is only surfaced to callers of SendHTLC and
- // signals that sending the HTLC would exceed the outgoing link's dust
- // threshold.
- errDustThresholdExceeded = errors.New("dust threshold exceeded")
- // DefaultDustThreshold is the default threshold after which we'll fail
- // payments if they are dust. This is currently set to 500m msats.
- DefaultDustThreshold = lnwire.MilliSatoshi(500_000_000)
- )
- // plexPacket encapsulates switch packet and adds error channel to receive
- // error from request handler.
- type plexPacket struct {
- pkt *htlcPacket
- err chan error
- }
- // ChanClose represents a request which close a particular channel specified by
- // its id.
- type ChanClose struct {
- // CloseType is a variable which signals the type of channel closure the
- // peer should execute.
- CloseType contractcourt.ChannelCloseType
- // ChanPoint represent the id of the channel which should be closed.
- ChanPoint *wire.OutPoint
- // TargetFeePerKw is the ideal fee that was specified by the caller.
- // This value is only utilized if the closure type is CloseRegular.
- // This will be the starting offered fee when the fee negotiation
- // process for the cooperative closure transaction kicks off.
- TargetFeePerKw chainfee.SatPerKWeight
- // MaxFee is the highest fee the caller is willing to pay.
- //
- // NOTE: This field is only respected if the caller is the initiator of
- // the channel.
- MaxFee chainfee.SatPerKWeight
- // DeliveryScript is an optional delivery script to pay funds out to.
- DeliveryScript lnwire.DeliveryAddress
- // Updates is used by request creator to receive the notifications about
- // execution of the close channel request.
- Updates chan interface{}
- // Err is used by request creator to receive request execution error.
- Err chan error
- }
- // Config defines the configuration for the service. ALL elements within the
- // configuration MUST be non-nil for the service to carry out its duties.
- type Config struct {
- // FwdingLog is an interface that will be used by the switch to log
- // forwarding events. A forwarding event happens each time a payment
- // circuit is successfully completed. So when we forward an HTLC, and a
- // settle is eventually received.
- FwdingLog ForwardingLog
- // LocalChannelClose kicks-off the workflow to execute a cooperative or
- // forced unilateral closure of the channel initiated by a local
- // subsystem.
- LocalChannelClose func(pubKey []byte, request *ChanClose)
- // DB is the database backend that will be used to back the switch's
- // persistent circuit map.
- DB kvdb.Backend
- // FetchAllOpenChannels is a function that fetches all currently open
- // channels from the channel database.
- FetchAllOpenChannels func() ([]*channeldb.OpenChannel, error)
- // FetchAllChannels is a function that fetches all pending open, open,
- // and waiting close channels from the database.
- FetchAllChannels func() ([]*channeldb.OpenChannel, error)
- // FetchClosedChannels is a function that fetches all closed channels
- // from the channel database.
- FetchClosedChannels func(
- pendingOnly bool) ([]*channeldb.ChannelCloseSummary, error)
- // SwitchPackager provides access to the forwarding packages of all
- // active channels. This gives the switch the ability to read arbitrary
- // forwarding packages, and ack settles and fails contained within them.
- SwitchPackager channeldb.FwdOperator
- // ExtractErrorEncrypter is an interface allowing switch to reextract
- // error encrypters stored in the circuit map on restarts, since they
- // are not stored directly within the database.
- ExtractErrorEncrypter hop.ErrorEncrypterExtracter
- // FetchLastChannelUpdate retrieves the latest routing policy for a
- // target channel. This channel will typically be the outgoing channel
- // specified when we receive an incoming HTLC. This will be used to
- // provide payment senders our latest policy when sending encrypted
- // error messages.
- FetchLastChannelUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error)
- // Notifier is an instance of a chain notifier that we'll use to signal
- // the switch when a new block has arrived.
- Notifier chainntnfs.ChainNotifier
- // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
- // events through.
- HtlcNotifier htlcNotifier
- // FwdEventTicker is a signal that instructs the htlcswitch to flush any
- // pending forwarding events.
- FwdEventTicker ticker.Ticker
- // LogEventTicker is a signal instructing the htlcswitch to log
- // aggregate stats about it's forwarding during the last interval.
- LogEventTicker ticker.Ticker
- // AckEventTicker is a signal instructing the htlcswitch to ack any settle
- // fails in forwarding packages.
- AckEventTicker ticker.Ticker
- // AllowCircularRoute is true if the user has configured their node to
- // allow forwards that arrive and depart our node over the same channel.
- AllowCircularRoute bool
- // RejectHTLC is a flag that instructs the htlcswitch to reject any
- // HTLCs that are not from the source hop.
- RejectHTLC bool
- // Clock is a time source for the switch.
- Clock clock.Clock
- // MailboxDeliveryTimeout is the interval after which Adds will be
- // cancelled if they have not been yet been delivered to a link. The
- // computed deadline will expiry this long after the Adds are added to
- // a mailbox via AddPacket.
- MailboxDeliveryTimeout time.Duration
- // DustThreshold is the threshold in milli-satoshis after which we'll
- // fail incoming or outgoing dust payments for a particular channel.
- DustThreshold lnwire.MilliSatoshi
- // SignAliasUpdate is used when sending FailureMessages backwards for
- // option_scid_alias channels. This avoids a potential privacy leak by
- // replacing the public, confirmed SCID with the alias in the
- // ChannelUpdate.
- SignAliasUpdate func(u *lnwire.ChannelUpdate) (*ecdsa.Signature,
- error)
- // IsAlias returns whether or not a given SCID is an alias.
- IsAlias func(scid lnwire.ShortChannelID) bool
- }
- // Switch is the central messaging bus for all incoming/outgoing HTLCs.
- // Connected peers with active channels are treated as named interfaces which
- // refer to active channels as links. A link is the switch's message
- // communication point with the goroutine that manages an active channel. New
- // links are registered each time a channel is created, and unregistered once
- // the channel is closed. The switch manages the hand-off process for multi-hop
- // HTLCs, forwarding HTLCs initiated from within the daemon, and finally
- // notifies users local-systems concerning their outstanding payment requests.
- type Switch struct {
- started int32 // To be used atomically.
- shutdown int32 // To be used atomically.
- // bestHeight is the best known height of the main chain. The links will
- // be used this information to govern decisions based on HTLC timeouts.
- // This will be retrieved by the registered links atomically.
- bestHeight uint32
- wg sync.WaitGroup
- quit chan struct{}
- // cfg is a copy of the configuration struct that the htlc switch
- // service was initialized with.
- cfg *Config
- // networkResults stores the results of payments initiated by the user.
- // The store is used to later look up the payments and notify the
- // user of the result when they are complete. Each payment attempt
- // should be given a unique integer ID when it is created, otherwise
- // results might be overwritten.
- networkResults *networkResultStore
- // circuits is storage for payment circuits which are used to
- // forward the settle/fail htlc updates back to the add htlc initiator.
- circuits CircuitMap
- // mailOrchestrator manages the lifecycle of mailboxes used throughout
- // the switch, and facilitates delayed delivery of packets to links that
- // later come online.
- mailOrchestrator *mailOrchestrator
- // indexMtx is a read/write mutex that protects the set of indexes
- // below.
- indexMtx sync.RWMutex
- // pendingLinkIndex holds links that have not had their final, live
- // short_chan_id assigned.
- pendingLinkIndex map[lnwire.ChannelID]ChannelLink
- // links is a map of channel id and channel link which manages
- // this channel.
- linkIndex map[lnwire.ChannelID]ChannelLink
- // forwardingIndex is an index which is consulted by the switch when it
- // needs to locate the next hop to forward an incoming/outgoing HTLC
- // update to/from.
- //
- // TODO(roasbeef): eventually add a NetworkHop mapping before the
- // ChannelLink
- forwardingIndex map[lnwire.ShortChannelID]ChannelLink
- // interfaceIndex maps the compressed public key of a peer to all the
- // channels that the switch maintains with that peer.
- interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink
- // linkStopIndex stores the currently stopping ChannelLinks,
- // represented by their ChannelID. The key is the link's ChannelID and
- // the value is a chan that is closed when the link has fully stopped.
- // This map is only added to if RemoveLink is called and is not added
- // to when the Switch is shutting down and calls Stop() on each link.
- //
- // MUST be used with the indexMtx.
- linkStopIndex map[lnwire.ChannelID]chan struct{}
- // htlcPlex is the channel which all connected links use to coordinate
- // the setup/teardown of Sphinx (onion routing) payment circuits.
- // Active links forward any add/settle messages over this channel each
- // state transition, sending new adds/settles which are fully locked
- // in.
- htlcPlex chan *plexPacket
- // chanCloseRequests is used to transfer the channel close request to
- // the channel close handler.
- chanCloseRequests chan *ChanClose
- // resolutionMsgs is the channel that all external contract resolution
- // messages will be sent over.
- resolutionMsgs chan *resolutionMsg
- // pendingFwdingEvents is the set of forwarding events which have been
- // collected during the current interval, but hasn't yet been written
- // to the forwarding log.
- fwdEventMtx sync.Mutex
- pendingFwdingEvents []channeldb.ForwardingEvent
- // blockEpochStream is an active block epoch event stream backed by an
- // active ChainNotifier instance. This will be used to retrieve the
- // latest height of the chain.
- blockEpochStream *chainntnfs.BlockEpochEvent
- // pendingSettleFails is the set of settle/fail entries that we need to
- // ack in the forwarding package of the outgoing link. This was added to
- // make pipelining settles more efficient.
- pendingSettleFails []channeldb.SettleFailRef
- // resMsgStore is used to store the set of ResolutionMsg that come from
- // contractcourt. This is used so the Switch can properly forward them,
- // even on restarts.
- resMsgStore *resolutionStore
- // aliasToReal is a map used for option-scid-alias feature-bit links.
- // The alias SCID is the key and the real, confirmed SCID is the value.
- // If the channel is unconfirmed, there will not be a mapping for it.
- // Since channels can have multiple aliases, this map is essentially a
- // N->1 mapping for a channel. This MUST be accessed with the indexMtx.
- aliasToReal map[lnwire.ShortChannelID]lnwire.ShortChannelID
- // baseIndex is a map used for option-scid-alias feature-bit links.
- // The value is the SCID of the link's ShortChannelID. This value may
- // be an alias for zero-conf channels or a confirmed SCID for
- // non-zero-conf channels with the option-scid-alias feature-bit. The
- // key includes the value itself and also any other aliases. This MUST
- // be accessed with the indexMtx.
- baseIndex map[lnwire.ShortChannelID]lnwire.ShortChannelID
- }
- // New creates the new instance of htlc switch.
- func New(cfg Config, currentHeight uint32) (*Switch, error) {
- resStore := newResolutionStore(cfg.DB)
- circuitMap, err := NewCircuitMap(&CircuitMapConfig{
- DB: cfg.DB,
- FetchAllOpenChannels: cfg.FetchAllOpenChannels,
- FetchClosedChannels: cfg.FetchClosedChannels,
- ExtractErrorEncrypter: cfg.ExtractErrorEncrypter,
- CheckResolutionMsg: resStore.checkResolutionMsg,
- })
- if err != nil {
- return nil, err
- }
- s := &Switch{
- bestHeight: currentHeight,
- cfg: &cfg,
- circuits: circuitMap,
- linkIndex: make(map[lnwire.ChannelID]ChannelLink),
- forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink),
- interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
- pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink),
- linkStopIndex: make(map[lnwire.ChannelID]chan struct{}),
- networkResults: newNetworkResultStore(cfg.DB),
- htlcPlex: make(chan *plexPacket),
- chanCloseRequests: make(chan *ChanClose),
- resolutionMsgs: make(chan *resolutionMsg),
- resMsgStore: resStore,
- quit: make(chan struct{}),
- }
- s.aliasToReal = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
- s.baseIndex = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
- s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{
- forwardPackets: s.ForwardPackets,
- clock: s.cfg.Clock,
- expiry: s.cfg.MailboxDeliveryTimeout,
- failMailboxUpdate: s.failMailboxUpdate,
- })
- return s, nil
- }
- // resolutionMsg is a struct that wraps an existing ResolutionMsg with a done
- // channel. We'll use this channel to synchronize delivery of the message with
- // the caller.
- type resolutionMsg struct {
- contractcourt.ResolutionMsg
- errChan chan error
- }
- // ProcessContractResolution is called by active contract resolvers once a
- // contract they are watching over has been fully resolved. The message carries
- // an external signal that *would* have been sent if the outgoing channel
- // didn't need to go to the chain in order to fulfill a contract. We'll process
- // this message just as if it came from an active outgoing channel.
- func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) error {
- errChan := make(chan error, 1)
- select {
- case s.resolutionMsgs <- &resolutionMsg{
- ResolutionMsg: msg,
- errChan: errChan,
- }:
- case <-s.quit:
- return ErrSwitchExiting
- }
- select {
- case err := <-errChan:
- return err
- case <-s.quit:
- return ErrSwitchExiting
- }
- }
- // GetAttemptResult returns the result of the payment attempt with the given
- // attemptID. The paymentHash should be set to the payment's overall hash, or
- // in case of AMP payments the payment's unique identifier.
- //
- // The method returns a channel where the payment result will be sent when
- // available, or an error is encountered during forwarding. When a result is
- // received on the channel, the HTLC is guaranteed to no longer be in flight.
- // The switch shutting down is signaled by closing the channel. If the
- // attemptID is unknown, ErrPaymentIDNotFound will be returned.
- func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
- deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) {
- var (
- nChan <-chan *networkResult
- err error
- inKey = CircuitKey{
- ChanID: hop.Source,
- HtlcID: attemptID,
- }
- )
- // If the payment is not found in the circuit map, check whether a
- // result is already available.
- // Assumption: no one will add this payment ID other than the caller.
- if s.circuits.LookupCircuit(inKey) == nil {
- res, err := s.networkResults.getResult(attemptID)
- if err != nil {
- return nil, err
- }
- c := make(chan *networkResult, 1)
- c <- res
- nChan = c
- } else {
- // The payment was committed to the circuits, subscribe for a
- // result.
- nChan, err = s.networkResults.subscribeResult(attemptID)
- if err != nil {
- return nil, err
- }
- }
- resultChan := make(chan *PaymentResult, 1)
- // Since the payment was known, we can start a goroutine that can
- // extract the result when it is available, and pass it on to the
- // caller.
- s.wg.Add(1)
- go func() {
- defer s.wg.Done()
- var n *networkResult
- select {
- case n = <-nChan:
- case <-s.quit:
- // We close the result channel to signal a shutdown. We
- // don't send any result in this case since the HTLC is
- // still in flight.
- close(resultChan)
- return
- }
- log.Debugf("Received network result %T for attemptID=%v", n.msg,
- attemptID)
- // Extract the result and pass it to the result channel.
- result, err := s.extractResult(
- deobfuscator, n, attemptID, paymentHash,
- )
- if err != nil {
- e := fmt.Errorf("unable to extract result: %w", err)
- log.Error(e)
- resultChan <- &PaymentResult{
- Error: e,
- }
- return
- }
- resultChan <- result
- }()
- return resultChan, nil
- }
- // CleanStore calls the underlying result store, telling it is safe to delete
- // all entries except the ones in the keepPids map. This should be called
- // preiodically to let the switch clean up payment results that we have
- // handled.
- func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error {
- return s.networkResults.cleanStore(keepPids)
- }
- // SendHTLC is used by other subsystems which aren't belong to htlc switch
- // package in order to send the htlc update. The attemptID used MUST be unique
- // for this HTLC, and MUST be used only once, otherwise the switch might reject
- // it.
- func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64,
- htlc *lnwire.UpdateAddHTLC) error {
- // Generate and send new update packet, if error will be received on
- // this stage it means that packet haven't left boundaries of our
- // system and something wrong happened.
- packet := &htlcPacket{
- incomingChanID: hop.Source,
- incomingHTLCID: attemptID,
- outgoingChanID: firstHop,
- htlc: htlc,
- amount: htlc.Amount,
- }
- // Attempt to fetch the target link before creating a circuit so that
- // we don't leave dangling circuits. The getLocalLink method does not
- // require the circuit variable to be set on the *htlcPacket.
- link, linkErr := s.getLocalLink(packet, htlc)
- if linkErr != nil {
- // Notify the htlc notifier of a link failure on our outgoing
- // link. Incoming timelock/amount values are not set because
- // they are not present for local sends.
- s.cfg.HtlcNotifier.NotifyLinkFailEvent(
- newHtlcKey(packet),
- HtlcInfo{
- OutgoingTimeLock: htlc.Expiry,
- OutgoingAmt: htlc.Amount,
- },
- HtlcEventTypeSend,
- linkErr,
- false,
- )
- return linkErr
- }
- // Evaluate whether this HTLC would increase our exposure to dust. If
- // it does, don't send it out and instead return an error.
- if s.evaluateDustThreshold(link, htlc.Amount, false) {
- // Notify the htlc notifier of a link failure on our outgoing
- // link. We use the FailTemporaryChannelFailure in place of a
- // more descriptive error message.
- linkErr := NewLinkError(
- &lnwire.FailTemporaryChannelFailure{},
- )
- s.cfg.HtlcNotifier.NotifyLinkFailEvent(
- newHtlcKey(packet),
- HtlcInfo{
- OutgoingTimeLock: htlc.Expiry,
- OutgoingAmt: htlc.Amount,
- },
- HtlcEventTypeSend,
- linkErr,
- false,
- )
- return errDustThresholdExceeded
- }
- circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
- actions, err := s.circuits.CommitCircuits(circuit)
- if err != nil {
- log.Errorf("unable to commit circuit in switch: %v", err)
- return err
- }
- // Drop duplicate packet if it has already been seen.
- switch {
- case len(actions.Drops) == 1:
- return ErrDuplicateAdd
- case len(actions.Fails) == 1:
- return ErrLocalAddFailed
- }
- // Give the packet to the link's mailbox so that HTLC's are properly
- // canceled back if the mailbox timeout elapses.
- packet.circuit = circuit
- return link.handleSwitchPacket(packet)
- }
- // UpdateForwardingPolicies sends a message to the switch to update the
- // forwarding policies for the set of target channels, keyed in chanPolicies.
- //
- // NOTE: This function is synchronous and will block until either the
- // forwarding policies for all links have been updated, or the switch shuts
- // down.
- func (s *Switch) UpdateForwardingPolicies(
- chanPolicies map[wire.OutPoint]models.ForwardingPolicy) {
- log.Tracef("Updating link policies: %v", newLogClosure(func() string {
- return spew.Sdump(chanPolicies)
- }))
- s.indexMtx.RLock()
- // Update each link in chanPolicies.
- for targetLink, policy := range chanPolicies {
- cid := lnwire.NewChanIDFromOutPoint(targetLink)
- link, ok := s.linkIndex[cid]
- if !ok {
- log.Debugf("Unable to find ChannelPoint(%v) to update "+
- "link policy", targetLink)
- continue
- }
- link.UpdateForwardingPolicy(policy)
- }
- s.indexMtx.RUnlock()
- }
- // IsForwardedHTLC checks for a given channel and htlc index if it is related
- // to an opened circuit that represents a forwarded payment.
- func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID,
- htlcIndex uint64) bool {
- circuit := s.circuits.LookupOpenCircuit(models.CircuitKey{
- ChanID: chanID,
- HtlcID: htlcIndex,
- })
- return circuit != nil && circuit.Incoming.ChanID != hop.Source
- }
- // ForwardPackets adds a list of packets to the switch for processing. Fails
- // and settles are added on a first past, simultaneously constructing circuits
- // for any adds. After persisting the circuits, another pass of the adds is
- // given to forward them through the router. The sending link's quit channel is
- // used to prevent deadlocks when the switch stops a link in the midst of
- // forwarding.
- func (s *Switch) ForwardPackets(linkQuit chan struct{},
- packets ...*htlcPacket) error {
- var (
- // fwdChan is a buffered channel used to receive err msgs from
- // the htlcPlex when forwarding this batch.
- fwdChan = make(chan error, len(packets))
- // numSent keeps a running count of how many packets are
- // forwarded to the switch, which determines how many responses
- // we will wait for on the fwdChan..
- numSent int
- )
- // No packets, nothing to do.
- if len(packets) == 0 {
- return nil
- }
- // Setup a barrier to prevent the background tasks from processing
- // responses until this function returns to the user.
- var wg sync.WaitGroup
- wg.Add(1)
- defer wg.Done()
- // Before spawning the following goroutine to proxy our error responses,
- // check to see if we have already been issued a shutdown request. If
- // so, we exit early to avoid incrementing the switch's waitgroup while
- // it is already in the process of shutting down.
- select {
- case <-linkQuit:
- return nil
- case <-s.quit:
- return nil
- default:
- // Spawn a goroutine to log the errors returned from failed packets.
- s.wg.Add(1)
- go s.logFwdErrs(&numSent, &wg, fwdChan)
- }
- // Make a first pass over the packets, forwarding any settles or fails.
- // As adds are found, we create a circuit and append it to our set of
- // circuits to be written to disk.
- var circuits []*PaymentCircuit
- var addBatch []*htlcPacket
- for _, packet := range packets {
- switch htlc := packet.htlc.(type) {
- case *lnwire.UpdateAddHTLC:
- circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
- packet.circuit = circuit
- circuits = append(circuits, circuit)
- addBatch = append(addBatch, packet)
- default:
- err := s.routeAsync(packet, fwdChan, linkQuit)
- if err != nil {
- return fmt.Errorf("failed to forward packet %w",
- err)
- }
- numSent++
- }
- }
- // If this batch did not contain any circuits to commit, we can return
- // early.
- if len(circuits) == 0 {
- return nil
- }
- // Write any circuits that we found to disk.
- actions, err := s.circuits.CommitCircuits(circuits...)
- if err != nil {
- log.Errorf("unable to commit circuits in switch: %v", err)
- }
- // Split the htlc packets by comparing an in-order seek to the head of
- // the added, dropped, or failed circuits.
- //
- // NOTE: This assumes each list is guaranteed to be a subsequence of the
- // circuits, and that the union of the sets results in the original set
- // of circuits.
- var addedPackets, failedPackets []*htlcPacket
- for _, packet := range addBatch {
- switch {
- case len(actions.Adds) > 0 && packet.circuit == actions.Adds[0]:
- addedPackets = append(addedPackets, packet)
- actions.Adds = actions.Adds[1:]
- case len(actions.Drops) > 0 && packet.circuit == actions.Drops[0]:
- actions.Drops = actions.Drops[1:]
- case len(actions.Fails) > 0 && packet.circuit == actions.Fails[0]:
- failedPackets = append(failedPackets, packet)
- actions.Fails = actions.Fails[1:]
- }
- }
- // Now, forward any packets for circuits that were successfully added to
- // the switch's circuit map.
- for _, packet := range addedPackets {
- err := s.routeAsync(packet, fwdChan, linkQuit)
- if err != nil {
- return fmt.Errorf("failed to forward packet %w", err)
- }
- numSent++
- }
- // Lastly, for any packets that failed, this implies that they were
- // left in a half added state, which can happen when recovering from
- // failures.
- if len(failedPackets) > 0 {
- var failure lnwire.FailureMessage
- incomingID := failedPackets[0].incomingChanID
- // If the incoming channel is an option_scid_alias channel,
- // then we'll need to replace the SCID in the ChannelUpdate.
- update := s.failAliasUpdate(incomingID, true)
- if update == nil {
- // Fallback to the original non-option behavior.
- update, err := s.cfg.FetchLastChannelUpdate(
- incomingID,
- )
- if err != nil {
- failure = &lnwire.FailTemporaryNodeFailure{}
- } else {
- failure = lnwire.NewTemporaryChannelFailure(
- update,
- )
- }
- } else {
- // This is an option_scid_alias channel.
- failure = lnwire.NewTemporaryChannelFailure(update)
- }
- linkError := NewDetailedLinkError(
- failure, OutgoingFailureIncompleteForward,
- )
- for _, packet := range failedPackets {
- // We don't handle the error here since this method
- // always returns an error.
- _ = s.failAddPacket(packet, linkError)
- }
- }
- return nil
- }
- // logFwdErrs logs any errors received on `fwdChan`.
- func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) {
- defer s.wg.Done()
- // Wait here until the outer function has finished persisting
- // and routing the packets. This guarantees we don't read from num until
- // the value is accurate.
- wg.Wait()
- numSent := *num
- for i := 0; i < numSent; i++ {
- select {
- case err := <-fwdChan:
- if err != nil {
- log.Errorf("Unhandled error while reforwarding htlc "+
- "settle/fail over htlcswitch: %v", err)
- }
- case <-s.quit:
- log.Errorf("unable to forward htlc packet " +
- "htlc switch was stopped")
- return
- }
- }
- }
- // routeAsync sends a packet through the htlc switch, using the provided err
- // chan to propagate errors back to the caller. The link's quit channel is
- // provided so that the send can be canceled if either the link or the switch
- // receive a shutdown requuest. This method does not wait for a response from
- // the htlcForwarder before returning.
- func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error,
- linkQuit chan struct{}) error {
- command := &plexPacket{
- pkt: packet,
- err: errChan,
- }
- select {
- case s.htlcPlex <- command:
- return nil
- case <-linkQuit:
- return ErrLinkShuttingDown
- case <-s.quit:
- return errors.New("htlc switch was stopped")
- }
- }
- // getLocalLink handles the addition of a htlc for a send that originates from
- // our node. It returns the link that the htlc should be forwarded outwards on,
- // and a link error if the htlc cannot be forwarded.
- func (s *Switch) getLocalLink(pkt *htlcPacket, htlc *lnwire.UpdateAddHTLC) (
- ChannelLink, *LinkError) {
- // Try to find links by node destination.
- s.indexMtx.RLock()
- link, err := s.getLinkByShortID(pkt.outgoingChanID)
- defer s.indexMtx.RUnlock()
- if err != nil {
- // If the link was not found for the outgoingChanID, an outside
- // subsystem may be using the confirmed SCID of a zero-conf
- // channel. In this case, we'll consult the Switch maps to see
- // if an alias exists and use the alias to lookup the link.
- // This extra step is a consequence of not updating the Switch
- // forwardingIndex when a zero-conf channel is confirmed. We
- // don't need to change the outgoingChanID since the link will
- // do that upon receiving the packet.
- baseScid, ok := s.baseIndex[pkt.outgoingChanID]
- if !ok {
- log.Errorf("Link %v not found", pkt.outgoingChanID)
- return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
- }
- // The base SCID was found, so we'll use that to fetch the
- // link.
- link, err = s.getLinkByShortID(baseScid)
- if err != nil {
- log.Errorf("Link %v not found", baseScid)
- return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
- }
- }
- if !link.EligibleToForward() {
- log.Errorf("Link %v is not available to forward",
- pkt.outgoingChanID)
- // The update does not need to be populated as the error
- // will be returned back to the router.
- return nil, NewDetailedLinkError(
- lnwire.NewTemporaryChannelFailure(nil),
- OutgoingFailureLinkNotEligible,
- )
- }
- // Ensure that the htlc satisfies the outgoing channel policy.
- currentHeight := atomic.LoadUint32(&s.bestHeight)
- htlcErr := link.CheckHtlcTransit(
- htlc.PaymentHash, htlc.Amount, htlc.Expiry, currentHeight,
- )
- if htlcErr != nil {
- log.Errorf("Link %v policy for local forward not "+
- "satisfied", pkt.outgoingChanID)
- return nil, htlcErr
- }
- return link, nil
- }
- // handleLocalResponse processes a Settle or Fail responding to a
- // locally-initiated payment. This is handled asynchronously to avoid blocking
- // the main event loop within the switch, as these operations can require
- // multiple db transactions. The guarantees of the circuit map are stringent
- // enough such that we are able to tolerate reordering of these operations
- // without side effects. The primary operations handled are:
- // 1. Save the payment result to the pending payment store.
- // 2. Notify subscribers about the payment result.
- // 3. Ack settle/fail references, to avoid resending this response internally
- // 4. Teardown the closing circuit in the circuit map
- //
- // NOTE: This method MUST be spawned as a goroutine.
- func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
- defer s.wg.Done()
- attemptID := pkt.incomingHTLCID
- // The error reason will be unencypted in case this a local
- // failure or a converted error.
- unencrypted := pkt.localFailure || pkt.convertedError
- n := &networkResult{
- msg: pkt.htlc,
- unencrypted: unencrypted,
- isResolution: pkt.isResolution,
- }
- // Store the result to the db. This will also notify subscribers about
- // the result.
- if err := s.networkResults.storeResult(attemptID, n); err != nil {
- log.Errorf("Unable to complete payment for pid=%v: %v",
- attemptID, err)
- return
- }
- // First, we'll clean up any fwdpkg references, circuit entries, and
- // mark in our db that the payment for this payment hash has either
- // succeeded or failed.
- //
- // If this response is contained in a forwarding package, we'll start by
- // acking the settle/fail so that we don't continue to retransmit the
- // HTLC internally.
- if pkt.destRef != nil {
- if err := s.ackSettleFail(*pkt.destRef); err != nil {
- log.Warnf("Unable to ack settle/fail reference: %s: %v",
- *pkt.destRef, err)
- return
- }
- }
- // Next, we'll remove the circuit since we are about to complete an
- // fulfill/fail of this HTLC. Since we've already removed the
- // settle/fail fwdpkg reference, the response from the peer cannot be
- // replayed internally if this step fails. If this happens, this logic
- // will be executed when a provided resolution message comes through.
- // This can only happen if the circuit is still open, which is why this
- // ordering is chosen.
- if err := s.teardownCircuit(pkt); err != nil {
- log.Warnf("Unable to teardown circuit %s: %v",
- pkt.inKey(), err)
- return
- }
- // Finally, notify on the htlc failure or success that has been handled.
- key := newHtlcKey(pkt)
- eventType := getEventType(pkt)
- switch htlc := pkt.htlc.(type) {
- case *lnwire.UpdateFulfillHTLC:
- s.cfg.HtlcNotifier.NotifySettleEvent(key, htlc.PaymentPreimage,
- eventType)
- case *lnwire.UpdateFailHTLC:
- s.cfg.HtlcNotifier.NotifyForwardingFailEvent(key, eventType)
- }
- }
- // extractResult uses the given deobfuscator to extract the payment result from
- // the given network message.
- func (s *Switch) extractResult(deobfuscator ErrorDecrypter, n *networkResult,
- attemptID uint64, paymentHash lntypes.Hash) (*PaymentResult, error) {
- switch htlc := n.msg.(type) {
- // We've received a settle update which means we can finalize the user
- // payment and return successful response.
- case *lnwire.UpdateFulfillHTLC:
- return &PaymentResult{
- Preimage: htlc.PaymentPreimage,
- }, nil
- // We've received a fail update which means we can finalize the
- // user payment and return fail response.
- case *lnwire.UpdateFailHTLC:
- // TODO(yy): construct deobfuscator here to avoid creating it
- // in paymentLifecycle even for settled HTLCs.
- paymentErr := s.parseFailedPayment(
- deobfuscator, attemptID, paymentHash, n.unencrypted,
- n.isResolution, htlc,
- )
- return &PaymentResult{
- Error: paymentErr,
- }, nil
- default:
- return nil, fmt.Errorf("received unknown response type: %T",
- htlc)
- }
- }
- // parseFailedPayment determines the appropriate failure message to return to
- // a user initiated payment. The three cases handled are:
- // 1. An unencrypted failure, which should already plaintext.
- // 2. A resolution from the chain arbitrator, which possibly has no failure
- // reason attached.
- // 3. A failure from the remote party, which will need to be decrypted using
- // the payment deobfuscator.
- func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter,
- attemptID uint64, paymentHash lntypes.Hash, unencrypted,
- isResolution bool, htlc *lnwire.UpdateFailHTLC) error {
- switch {
- // The payment never cleared the link, so we don't need to
- // decrypt the error, simply decode it them report back to the
- // user.
- case unencrypted:
- r := bytes.NewReader(htlc.Reason)
- failureMsg, err := lnwire.DecodeFailure(r, 0)
- if err != nil {
- // If we could not decode the failure reason, return a link
- // error indicating that we failed to decode the onion.
- linkError := NewDetailedLinkError(
- // As this didn't even clear the link, we don't
- // need to apply an update here since it goes
- // directly to the router.
- lnwire.NewTemporaryChannelFailure(nil),
- OutgoingFailureDecodeError,
- )
- log.Errorf("%v: (hash=%v, pid=%d): %v",
- linkError.FailureDetail.FailureString(),
- paymentHash, attemptID, err)
- return linkError
- }
- // If we successfully decoded the failure reason, return it.
- return NewLinkError(failureMsg)
- // A payment had to be timed out on chain before it got past
- // the first hop. In this case, we'll report a permanent
- // channel failure as this means us, or the remote party had to
- // go on chain.
- case isResolution && htlc.Reason == nil:
- linkError := NewDetailedLinkError(
- &lnwire.FailPermanentChannelFailure{},
- OutgoingFailureOnChainTimeout,
- )
- log.Infof("%v: hash=%v, pid=%d",
- linkError.FailureDetail.FailureString(),
- paymentHash, attemptID)
- return linkError
- // A regular multi-hop payment error that we'll need to
- // decrypt.
- default:
- // We'll attempt to fully decrypt the onion encrypted
- // error. If we're unable to then we'll bail early.
- failure, err := deobfuscator.DecryptError(htlc.Reason)
- if err != nil {
- log.Errorf("unable to de-obfuscate onion failure "+
- "(hash=%v, pid=%d): %v",
- paymentHash, attemptID, err)
- return ErrUnreadableFailureMessage
- }
- return failure
- }
- }
- // handlePacketForward is used in cases when we need forward the htlc update
- // from one channel link to another and be able to propagate the settle/fail
- // updates back. This behaviour is achieved by creation of payment circuits.
- func (s *Switch) handlePacketForward(packet *htlcPacket) error {
- switch htlc := packet.htlc.(type) {
- // Channel link forwarded us a new htlc, therefore we initiate the
- // payment circuit within our internal state so we can properly forward
- // the ultimate settle message back latter.
- case *lnwire.UpdateAddHTLC:
- // Check if the node is set to reject all onward HTLCs and also make
- // sure that HTLC is not from the source node.
- if s.cfg.RejectHTLC {
- failure := NewDetailedLinkError(
- &lnwire.FailChannelDisabled{},
- OutgoingFailureForwardsDisabled,
- )
- return s.failAddPacket(packet, failure)
- }
- // Before we attempt to find a non-strict forwarding path for
- // this htlc, check whether the htlc is being routed over the
- // same incoming and outgoing channel. If our node does not
- // allow forwards of this nature, we fail the htlc early. This
- // check is in place to disallow inefficiently routed htlcs from
- // locking up our balance. With channels where the
- // option-scid-alias feature was negotiated, we also have to be
- // sure that the IDs aren't the same since one or both could be
- // an alias.
- linkErr := s.checkCircularForward(
- packet.incomingChanID, packet.outgoingChanID,
- s.cfg.AllowCircularRoute, htlc.PaymentHash,
- )
- if linkErr != nil {
- return s.failAddPacket(packet, linkErr)
- }
- s.indexMtx.RLock()
- targetLink, err := s.getLinkByMapping(packet)
- if err != nil {
- s.indexMtx.RUnlock()
- log.Debugf("unable to find link with "+
- "destination %v", packet.outgoingChanID)
- // If packet was forwarded from another channel link
- // than we should notify this link that some error
- // occurred.
- linkError := NewLinkError(
- &lnwire.FailUnknownNextPeer{},
- )
- return s.failAddPacket(packet, linkError)
- }
- targetPeerKey := targetLink.PeerPubKey()
- interfaceLinks, _ := s.getLinks(targetPeerKey)
- s.indexMtx.RUnlock()
- // We'll keep track of any HTLC failures during the link
- // selection process. This way we can return the error for
- // precise link that the sender selected, while optimistically
- // trying all links to utilize our available bandwidth.
- linkErrs := make(map[lnwire.ShortChannelID]*LinkError)
- // Find all destination channel links with appropriate
- // bandwidth.
- var destinations []ChannelLink
- for _, link := range interfaceLinks {
- var failure *LinkError
- // We'll skip any links that aren't yet eligible for
- // forwarding.
- if !link.EligibleToForward() {
- failure = NewDetailedLinkError(
- &lnwire.FailUnknownNextPeer{},
- OutgoingFailureLinkNotEligible,
- )
- } else {
- // We'll ensure that the HTLC satisfies the
- // current forwarding conditions of this target
- // link.
- currentHeight := atomic.LoadUint32(&s.bestHeight)
- failure = link.CheckHtlcForward(
- htlc.PaymentHash, packet.incomingAmount,
- packet.amount, packet.incomingTimeout,
- packet.outgoingTimeout,
- packet.inboundFee,
- currentHeight,
- packet.originalOutgoingChanID,
- )
- }
- // If this link can forward the htlc, add it to the set
- // of destinations.
- if failure == nil {
- destinations = append(destinations, link)
- continue
- }
- linkErrs[link.ShortChanID()] = failure
- }
- // If we had a forwarding failure due to the HTLC not
- // satisfying the current policy, then we'll send back an
- // error, but ensure we send back the error sourced at the
- // *target* link.
- if len(destinations) == 0 {
- // At this point, some or all of the links rejected the
- // HTLC so we couldn't forward it. So we'll try to look
- // up the error that came from the source.
- linkErr, ok := linkErrs[packet.outgoingChanID]
- if !ok {
- // If we can't find the error of the source,
- // then we'll return an unknown next peer,
- // though this should never happen.
- linkErr = NewLinkError(
- &lnwire.FailUnknownNextPeer{},
- )
- log.Warnf("unable to find err source for "+
- "outgoing_link=%v, errors=%v",
- packet.outgoingChanID, newLogClosure(func() string {
- return spew.Sdump(linkErrs)
- }))
- }
- log.Tracef("incoming HTLC(%x) violated "+
- "target outgoing link (id=%v) policy: %v",
- htlc.PaymentHash[:], packet.outgoingChanID,
- linkErr)
- return s.failAddPacket(packet, linkErr)
- }
- // Choose a random link out of the set of links that can forward
- // this htlc. The reason for randomization is to evenly
- // distribute the htlc load without making assumptions about
- // what the best channel is.
- destination := destinations[rand.Intn(len(destinations))] // nolint:gosec
- // Retrieve the incoming link by its ShortChannelID. Note that
- // the incomingChanID is never set to hop.Source here.
- s.indexMtx.RLock()
- incomingLink, err := s.getLinkByShortID(packet.incomingChanID)
- s.indexMtx.RUnlock()
- if err != nil {
- // If we couldn't find the incoming link, we can't
- // evaluate the incoming's exposure to dust, so we just
- // fail the HTLC back.
- linkErr := NewLinkError(
- &lnwire.FailTemporaryChannelFailure{},
- )
- return s.failAddPacket(packet, linkErr)
- }
- // Evaluate whether this HTLC would increase our exposure to
- // dust on the incoming link. If it does, fail it backwards.
- if s.evaluateDustThreshold(
- incomingLink, packet.incomingAmount, true,
- ) {
- // The incoming dust exceeds the threshold, so we fail
- // the add back.
- linkErr := NewLinkError(
- &lnwire.FailTemporaryChannelFailure{},
- )
- return s.failAddPacket(packet, linkErr)
- }
- // Also evaluate whether this HTLC would increase our exposure
- // to dust on the destination link. If it does, fail it back.
- if s.evaluateDustThreshold(
- destination, packet.amount, false,
- ) {
- // The outgoing dust exceeds the threshold, so we fail
- // the add back.
- linkErr := NewLinkError(
- &lnwire.FailTemporaryChannelFailure{},
- )
- return s.failAddPacket(packet, linkErr)
- }
- // Send the packet to the destination channel link which
- // manages the channel.
- packet.outgoingChanID = destination.ShortChanID()
- return destination.handleSwitchPacket(packet)
- case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC:
- // If the source of this packet has not been set, use the
- // circuit map to lookup the origin.
- circuit, err := s.closeCircuit(packet)
- if err != nil {
- return err
- }
- // closeCircuit returns a nil circuit when a settle packet returns an
- // ErrUnknownCircuit error upon the inner call to CloseCircuit.
- if circuit == nil {
- return nil
- }
- fail, isFail := htlc.(*lnwire.UpdateFailHTLC)
- if isFail && !packet.hasSource {
- // HTLC resolutions and messages restored from disk
- // don't have the obfuscator set from the original htlc
- // add packet - set it here for use in blinded errors.
- packet.obfuscator = circuit.ErrorEncrypter
- switch {
- // No message to encrypt, locally sourced payment.
- case circuit.ErrorEncrypter == nil:
- // If this is a resolution message, then we'll need to
- // encrypt it as it's actually internally sourced.
- case packet.isResolution:
- var err error
- // TODO(roasbeef): don't need to pass actually?
- failure := &lnwire.FailPermanentChannelFailure{}
- fail.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop(
- failure,
- )
- if err != nil {
- err = fmt.Errorf("unable to obfuscate "+
- "error: %v", err)
- log.Error(err)
- }
- // Alternatively, if the remote party send us an
- // UpdateFailMalformedHTLC, then we'll need to convert
- // this into a proper well formatted onion error as
- // there's no HMAC currently.
- case packet.convertedError:
- log.Infof("Converting malformed HTLC error "+
- "for circuit for Circuit(%x: "+
- "(%s, %d) <-> (%s, %d))", packet.circuit.PaymentHash,
- packet.incomingChanID, packet.incomingHTLCID,
- packet.outgoingChanID, packet.outgoingHTLCID)
- fail.Reason = circuit.ErrorEncrypter.EncryptMalformedError(
- fail.Reason,
- )
- default:
- // Otherwise, it's a forwarded error, so we'll perform a
- // wrapper encryption as normal.
- fail.Reason = circuit.ErrorEncrypter.IntermediateEncrypt(
- fail.Reason,
- )
- }
- } else if !isFail && circuit.Outgoing != nil {
- // If this is an HTLC settle, and it wasn't from a
- // locally initiated HTLC, then we'll log a forwarding
- // event so we can flush it to disk later.
- //
- // TODO(roasbeef): only do this once link actually
- // fully settles?
- localHTLC := packet.incomingChanID == hop.Source
- if !localHTLC {
- log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+
- "from IncomingChanID(%v) to OutgoingChanID(%v)",
- circuit.PaymentHash[:], circuit.OutgoingAmount,
- circuit.IncomingAmount-circuit.OutgoingAmount,
- circuit.Incoming.ChanID, circuit.Outgoing.ChanID)
- s.fwdEventMtx.Lock()
- s.pendingFwdingEvents = append(
- s.pendingFwdingEvents,
- channeldb.ForwardingEvent{
- Timestamp: time.Now(),
- IncomingChanID: circuit.Incoming.ChanID,
- OutgoingChanID: circuit.Outgoing.ChanID,
- AmtIn: circuit.IncomingAmount,
- AmtOut: circuit.OutgoingAmount,
- },
- )
- s.fwdEventMtx.Unlock()
- }
- }
- // A blank IncomingChanID in a circuit indicates that it is a pending
- // user-initiated payment.
- if packet.incomingChanID == hop.Source {
- s.wg.Add(1)
- go s.handleLocalResponse(packet)
- return nil
- }
- // Check to see that the source link is online before removing
- // the circuit.
- return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
- default:
- return errors.New("wrong update type")
- }
- }
- // checkCircularForward checks whether a forward is circular (arrives and
- // departs on the same link) and returns a link error if the switch is
- // configured to disallow this behaviour.
- func (s *Switch) checkCircularForward(incoming, outgoing lnwire.ShortChannelID,
- allowCircular bool, paymentHash lntypes.Hash) *LinkError {
- // If they are equal, we can skip the alias mapping checks.
- if incoming == outgoing {
- // The switch may be configured to allow circular routes, so
- // just log and return nil.
- if allowCircular {
- log.Debugf("allowing circular route over link: %v "+
- "(payment hash: %x)", incoming, paymentHash)
- return nil
- }
- // Otherwise, we'll return a temporary channel failure.
- return NewDetailedLinkError(
- lnwire.NewTemporaryChannelFailure(nil),
- OutgoingFailureCircularRoute,
- )
- }
- // We'll fetch the "base" SCID from the baseIndex for the incoming and
- // outgoing SCIDs. If either one does not have a base SCID, then the
- // two channels are not equal since one will be a channel that does not
- // need a mapping and SCID equality was checked above. If the "base"
- // SCIDs are equal, then this is a circular route. Otherwise, it isn't.
- s.indexMtx.RLock()
- incomingBaseScid, ok := s.baseIndex[incoming]
- if !ok {
- // This channel does not use baseIndex, bail out.
- s.indexMtx.RUnlock()
- return nil
- }
- outgoingBaseScid, ok := s.baseIndex[outgoing]
- if !ok {
- // This channel does not use baseIndex, bail out.
- s.indexMtx.RUnlock()
- return nil
- }
- s.indexMtx.RUnlock()
- // Check base SCID equality.
- if incomingBaseScid != outgoingBaseScid {
- // The base SCIDs are not equal so these are not the same
- // channel.
- return nil
- }
- // If the incoming and outgoing link are equal, the htlc is part of a
- // circular route which may be used to lock up our liquidity. If the
- // switch is configured to allow circular routes, log that we are
- // allowing the route then return nil.
- if allowCircular {
- log.Debugf("allowing circular route over link: %v "+
- "(payment hash: %x)", incoming, paymentHash)
- return nil
- }
- // If our node disallows circular routes, return a temporary channel
- // failure. There is nothing wrong with the policy used by the remote
- // node, so we do not include a channel update.
- return NewDetailedLinkError(
- lnwire.NewTemporaryChannelFailure(nil),
- OutgoingFailureCircularRoute,
- )
- }
- // failAddPacket encrypts a fail packet back to an add packet's source.
- // The ciphertext will be derived from the failure message proivded by context.
- // This method returns the failErr if all other steps complete successfully.
- func (s *Switch) failAddPacket(packet *htlcPacket, failure *LinkError) error {
- // Encrypt the failure so that the sender will be able to read the error
- // message. Since we failed this packet, we use EncryptFirstHop to
- // obfuscate the failure for their eyes only.
- reason, err := packet.obfuscator.EncryptFirstHop(failure.WireMessage())
- if err != nil {
- err := fmt.Errorf("unable to obfuscate "+
- "error: %v", err)
- log.Error(err)
- return err
- }
- log.Error(failure.Error())
- // Create a failure packet for this htlc. The full set of
- // information about the htlc failure is included so that they can
- // be included in link failure notifications.
- failPkt := &htlcPacket{
- sourceRef: packet.sourceRef,
- incomingChanID: packet.incomingChanID,
- incomingHTLCID: packet.incomingHTLCID,
- outgoingChanID: packet.outgoingChanID,
- outgoingHTLCID: packet.outgoingHTLCID,
- incomingAmount: packet.incomingAmount,
- amount: packet.amount,
- incomingTimeout: packet.incomingTimeout,
- outgoingTimeout: packet.outgoingTimeout,
- circuit: packet.circuit,
- obfuscator: packet.obfuscator,
- linkFailure: failure,
- htlc: &lnwire.UpdateFailHTLC{
- Reason: reason,
- },
- }
- // Route a fail packet back to the source link.
- err = s.mailOrchestrator.Deliver(failPkt.incomingChanID, failPkt)
- if err != nil {
- err = fmt.Errorf("source chanid=%v unable to "+
- "handle switch packet: %v",
- packet.incomingChanID, err)
- log.Error(err)
- return err
- }
- return failure
- }
- // closeCircuit accepts a settle or fail htlc and the associated htlc packet and
- // attempts to determine the source that forwarded this htlc. This method will
- // set the incoming chan and htlc ID of the given packet if the source was
- // found, and will properly [re]encrypt any failure messages.
- func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
- // If the packet has its source, that means it was failed locally by
- // the outgoing link. We fail it here to make sure only one response
- // makes it through the switch.
- if pkt.hasSource {
- circuit, err := s.circuits.FailCircuit(pkt.inKey())
- switch err {
- // Circuit successfully closed.
- case nil:
- return circuit, nil
- // Circuit was previously closed, but has not been deleted.
- // We'll just drop this response until the circuit has been
- // fully removed.
- case ErrCircuitClosing:
- return nil, err
- // Failed to close circuit because it does not exist. This is
- // likely because the circuit was already successfully closed.
- // Since this packet failed locally, there is no forwarding
- // package entry to acknowledge.
- case ErrUnknownCircuit:
- return nil, err
- // Unexpected error.
- default:
- return nil, err
- }
- }
- // Otherwise, this is packet was received from the remote party. Use
- // circuit map to find the incoming link to receive the settle/fail.
- circuit, err := s.circuits.CloseCircuit(pkt.outKey())
- switch err {
- // Open circuit successfully closed.
- case nil:
- pkt.incomingChanID = circuit.Incoming.ChanID
- pkt.incomingHTLCID = circuit.Incoming.HtlcID
- pkt.circuit = circuit
- pkt.sourceRef = &circuit.AddRef
- pktType := "SETTLE"
- if _, ok := pkt.htlc.(*lnwire.UpdateFailHTLC); ok {
- pktType = "FAIL"
- }
- log.Debugf("Closed completed %s circuit for %x: "+
- "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash,
- pkt.incomingChanID, pkt.incomingHTLCID,
- pkt.outgoingChanID, pkt.outgoingHTLCID)
- return circuit, nil
- // Circuit was previously closed, but has not been deleted. We'll just
- // drop this response until the circuit has been removed.
- case ErrCircuitClosing:
- return nil, err
- // Failed to close circuit because it does not exist. This is likely
- // because the circuit was already successfully closed.
- case ErrUnknownCircuit:
- if pkt.destRef != nil {
- // Add this SettleFailRef to the set of pending settle/fail entries
- // awaiting acknowledgement.
- s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef)
- }
- // If this is a settle, we will not log an error message as settles
- // are expected to hit the ErrUnknownCircuit case. The only way fails
- // can hit this case if the link restarts after having just sent a fail
- // to the switch.
- _, isSettle := pkt.htlc.(*lnwire.UpdateFulfillHTLC)
- if !isSettle {
- err := fmt.Errorf("unable to find target channel "+
- "for HTLC fail: channel ID = %s, "+
- "HTLC ID = %d", pkt.outgoingChanID,
- pkt.outgoingHTLCID)
- log.Error(err)
- return nil, err
- }
- return nil, nil
- // Unexpected error.
- default:
- return nil, err
- }
- }
- // ackSettleFail is used by the switch to ACK any settle/fail entries in the
- // forwarding package of the outgoing link for a payment circuit. We do this if
- // we're the originator of the payment, so the link stops attempting to
- // re-broadcast.
- func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error {
- return kvdb.Batch(s.cfg.DB, func(tx kvdb.RwTx) error {
- return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...)
- })
- }
- // teardownCircuit removes a pending or open circuit from the switch's circuit
- // map and prints useful logging statements regarding the outcome.
- func (s *Switch) teardownCircuit(pkt *htlcPacket) error {
- var pktType string
- switch htlc := pkt.htlc.(type) {
- case *lnwire.UpdateFulfillHTLC:
- pktType = "SETTLE"
- case *lnwire.UpdateFailHTLC:
- pktType = "FAIL"
- default:
- err := fmt.Errorf("cannot tear down packet of type: %T", htlc)
- log.Errorf(err.Error())
- return err
- }
- switch {
- case pkt.circuit.HasKeystone():
- log.Debugf("Tearing down open circuit with %s pkt, removing circuit=%v "+
- "with keystone=%v", pktType, pkt.inKey(), pkt.outKey())
- err := s.circuits.DeleteCircuits(pkt.inKey())
- if err != nil {
- log.Warnf("Failed to tear down open circuit (%s, %d) <-> (%s, %d) "+
- "with payment_hash-%v using %s pkt",
- pkt.incomingChanID, pkt.incomingHTLCID,
- pkt.outgoingChanID, pkt.outgoingHTLCID,
- pkt.circuit.PaymentHash, pktType)
- return err
- }
- log.Debugf("Closed completed %s circuit for %x: "+
- "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash,
- pkt.incomingChanID, pkt.incomingHTLCID,
- pkt.outgoingChanID, pkt.outgoingHTLCID)
- default:
- log.Debugf("Tearing down incomplete circuit with %s for inkey=%v",
- pktType, pkt.inKey())
- err := s.circuits.DeleteCircuits(pkt.inKey())
- if err != nil {
- log.Warnf("Failed to tear down pending %s circuit for %x: "+
- "(%s, %d)", pktType, pkt.circuit.PaymentHash,
- pkt.incomingChanID, pkt.incomingHTLCID)
- return err
- }
- log.Debugf("Removed pending onion circuit for %x: "+
- "(%s, %d)", pkt.circuit.PaymentHash,
- pkt.incomingChanID, pkt.incomingHTLCID)
- }
- return nil
- }
- // CloseLink creates and sends the close channel command to the target link
- // directing the specified closure type. If the closure type is CloseRegular,
- // targetFeePerKw parameter should be the ideal fee-per-kw that will be used as
- // a starting point for close negotiation. The deliveryScript parameter is an
- // optional parameter which sets a user specified script to close out to.
- func (s *Switch) CloseLink(chanPoint *wire.OutPoint,
- closeType contractcourt.ChannelCloseType,
- targetFeePerKw, maxFee chainfee.SatPerKWeight,
- deliveryScript lnwire.DeliveryAddress) (chan interface{}, chan error) {
- // TODO(roasbeef) abstract out the close updates.
- updateChan := make(chan interface{}, 2)
- errChan := make(chan error, 1)
- command := &ChanClose{
- CloseType: closeType,
- ChanPoint: chanPoint,
- Updates: updateChan,
- TargetFeePerKw: targetFeePerKw,
- MaxFee: maxFee,
- DeliveryScript: deliveryScript,
- Err: errChan,
- }
- select {
- case s.chanCloseRequests <- command:
- return updateChan, errChan
- case <-s.quit:
- errChan <- ErrSwitchExiting
- close(updateChan)
- return updateChan, errChan
- }
- }
- // htlcForwarder is responsible for optimally forwarding (and possibly
- // fragmenting) incoming/outgoing HTLCs amongst all active interfaces and their
- // links. The duties of the forwarder are similar to that of a network switch,
- // in that it facilitates multi-hop payments by acting as a central messaging
- // bus. The switch communicates will active links to create, manage, and tear
- // down active onion routed payments. Each active channel is modeled as
- // networked device with metadata such as the available payment bandwidth, and
- // total link capacity.
- //
- // NOTE: This MUST be run as a goroutine.
- func (s *Switch) htlcForwarder() {
- defer s.wg.Done()
- defer func() {
- s.blockEpochStream.Cancel()
- // Remove all links once we've been signalled for shutdown.
- var linksToStop []ChannelLink
- s.indexMtx.Lock()
- for _, link := range s.linkIndex {
- activeLink := s.removeLink(link.ChanID())
- if activeLink == nil {
- log.Errorf("unable to remove ChannelLink(%v) "+
- "on stop", link.ChanID())
- continue
- }
- linksToStop = append(linksToStop, activeLink)
- }
- for _, link := range s.pendingLinkIndex {
- pendingLink := s.removeLink(link.ChanID())
- if pendingLink == nil {
- log.Errorf("unable to remove ChannelLink(%v) "+
- "on stop", link.ChanID())
- continue
- }
- linksToStop = append(linksToStop, pendingLink)
- }
- s.indexMtx.Unlock()
- // Now that all pending and live links have been removed from
- // the forwarding indexes, stop each one before shutting down.
- // We'll shut them down in parallel to make exiting as fast as
- // possible.
- var wg sync.WaitGroup
- for _, link := range linksToStop {
- wg.Add(1)
- go func(l ChannelLink) {
- defer wg.Done()
- l.Stop()
- }(link)
- }
- wg.Wait()
- // Before we exit fully, we'll attempt to flush out any
- // forwarding events that may still be lingering since the last
- // batch flush.
- if err := s.FlushForwardingEvents(); err != nil {
- log.Errorf("unable to flush forwarding events: %v", err)
- }
- }()
- // TODO(roasbeef): cleared vs settled distinction
- var (
- totalNumUpdates uint64
- totalSatSent btcutil.Amount
- totalSatRecv btcutil.Amount
- )
- s.cfg.LogEventTicker.Resume()
- defer s.cfg.LogEventTicker.Stop()
- // Every 15 seconds, we'll flush out the forwarding events that
- // occurred during that period.
- s.cfg.FwdEventTicker.Resume()
- defer s.cfg.FwdEventTicker.Stop()
- defer s.cfg.AckEventTicker.Stop()
- out:
- for {
- // If the set of pending settle/fail entries is non-zero,
- // reinstate the ack ticker so we can batch ack them.
- if len(s.pendingSettleFails) > 0 {
- s.cfg.AckEventTicker.Resume()
- }
- select {
- case blockEpoch, ok := <-s.blockEpochStream.Epochs:
- if !ok {
- break out
- }
- atomic.StoreUint32(&s.bestHeight, uint32(blockEpoch.Height))
- // A local close request has arrived, we'll forward this to the
- // relevant link (if it exists) so the channel can be
- // cooperatively closed (if possible).
- case req := <-s.chanCloseRequests:
- chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
- s.indexMtx.RLock()
- link, ok := s.linkIndex[chanID]
- if !ok {
- s.indexMtx.RUnlock()
- req.Err <- fmt.Errorf("no peer for channel with "+
- "chan_id=%x", chanID[:])
- continue
- }
- s.indexMtx.RUnlock()
- peerPub := link.PeerPubKey()
- log.Debugf("Requesting local channel close: peer=%v, "+
- "chan_id=%x", link.PeerPubKey(), chanID[:])
- go s.cfg.LocalChannelClose(peerPub[:], req)
- case resolutionMsg := <-s.resolutionMsgs:
- // We'll persist the resolution message to the Switch's
- // resolution store.
- resMsg := resolutionMsg.ResolutionMsg
- err := s.resMsgStore.addResolutionMsg(&resMsg)
- if err != nil {
- // This will only fail if there is a database
- // error or a serialization error. Sending the
- // error prevents the contractcourt from being
- // in a state where it believes the send was
- // successful, when it wasn't.
- log.Errorf("unable to add resolution msg: %v",
- err)
- resolutionMsg.errChan <- err
- continue
- }
- // At this point, the resolution message has been
- // persisted. It is safe to signal success by sending
- // a nil error since the Switch will re-deliver the
- // resolution message on restart.
- resolutionMsg.errChan <- nil
- // Create a htlc packet for this resolution. We do
- // not have some of the information that we'll need
- // for blinded error handling here , so we'll rely on
- // our forwarding logic to fill it in later.
- pkt := &htlcPacket{
- outgoingChanID: resolutionMsg.SourceChan,
- outgoingHTLCID: resolutionMsg.HtlcIndex,
- isResolution: true,
- }
- // Resolution messages will either be cancelling
- // backwards an existing HTLC, or settling a previously
- // outgoing HTLC. Based on this, we'll map the message
- // to the proper htlcPacket.
- if resolutionMsg.Failure != nil {
- pkt.htlc = &lnwire.UpdateFailHTLC{}
- } else {
- pkt.htlc = &lnwire.UpdateFulfillHTLC{
- PaymentPreimage: *resolutionMsg.PreImage,
- }
- }
- log.Infof("Received outside contract resolution, "+
- "mapping to: %v", spew.Sdump(pkt))
- // We don't check the error, as the only failure we can
- // encounter is due to the circuit already being
- // closed. This is fine, as processing this message is
- // meant to be idempotent.
- err = s.handlePacketForward(pkt)
- if err != nil {
- log.Errorf("Unable to forward resolution msg: %v", err)
- }
- // A new packet has arrived for forwarding, we'll interpret the
- // packet concretely, then either forward it along, or
- // interpret a return packet to a locally initialized one.
- case cmd := <-s.htlcPlex:
- cmd.err <- s.handlePacketForward(cmd.pkt)
- // When this time ticks, then it indicates that we should
- // collect all the forwarding events since the last internal,
- // and write them out to our log.
- case <-s.cfg.FwdEventTicker.Ticks():
- s.wg.Add(1)
- go func() {
- defer s.wg.Done()
- if err := s.FlushForwardingEvents(); err != nil {
- log.Errorf("unable to flush "+
- "forwarding events: %v", err)
- }
- }()
- // The log ticker has fired, so we'll calculate some forwarding
- // stats for the last 10 seconds to display within the logs to
- // users.
- case <-s.cfg.LogEventTicker.Ticks():
- // First, we'll collate the current running tally of
- // our forwarding stats.
- prevSatSent := totalSatSent
- prevSatRecv := totalSatRecv
- prevNumUpdates := totalNumUpdates
- var (
- newNumUpdates uint64
- newSatSent btcutil.Amount
- newSatRecv btcutil.Amount
- )
- // Next, we'll run through all the registered links and
- // compute their up-to-date forwarding stats.
- s.indexMtx.RLock()
- for _, link := range s.linkIndex {
- // TODO(roasbeef): when links first registered
- // stats printed.
- updates, sent, recv := link.Stats()
- newNumUpdates += updates
- newSatSent += sent.ToSatoshis()
- newSatRecv += recv.ToSatoshis()
- }
- s.indexMtx.RUnlock()
- var (
- diffNumUpdates uint64
- diffSatSent btcutil.Amount
- diffSatRecv btcutil.Amount
- )
- // If this is the first time we're computing these
- // stats, then the diff is just the new value. We do
- // this in order to avoid integer underflow issues.
- if prevNumUpdates == 0 {
- diffNumUpdates = newNumUpdates
- diffSatSent = newSatSent
- diffSatRecv = newSatRecv
- } else {
- diffNumUpdates = newNumUpdates - prevNumUpdates
- diffSatSent = newSatSent - prevSatSent
- diffSatRecv = newSatRecv - prevSatRecv
- }
- // If the diff of num updates is zero, then we haven't
- // forwarded anything in the last 10 seconds, so we can
- // skip this update.
- if diffNumUpdates == 0 {
- continue
- }
- // If the diff of num updates is negative, then some
- // links may have been unregistered from the switch, so
- // we'll update our stats to only include our registered
- // links.
- if int64(diffNumUpdates) < 0 {
- totalNumUpdates = newNumUpdates
- totalSatSent = newSatSent
- totalSatRecv = newSatRecv
- continue
- }
- // Otherwise, we'll log this diff, then accumulate the
- // new stats into the running total.
- log.Debugf("Sent %d satoshis and received %d satoshis "+
- "in the last 10 seconds (%f tx/sec)",
- diffSatSent, diffSatRecv,
- float64(diffNumUpdates)/10)
- totalNumUpdates += diffNumUpdates
- totalSatSent += diffSatSent
- totalSatRecv += diffSatRecv
- // The ack ticker has fired so if we have any settle/fail entries
- // for a forwarding package to ack, we will do so here in a batch
- // db call.
- case <-s.cfg.AckEventTicker.Ticks():
- // If the current set is empty, pause the ticker.
- if len(s.pendingSettleFails) == 0 {
- s.cfg.AckEventTicker.Pause()
- continue
- }
- // Batch ack the settle/fail entries.
- if err := s.ackSettleFail(s.pendingSettleFails...); err != nil {
- log.Errorf("Unable to ack batch of settle/fails: %v", err)
- continue
- }
- log.Tracef("Acked %d settle fails: %v", len(s.pendingSettleFails),
- newLogClosure(func() string {
- return spew.Sdump(s.pendingSettleFails)
- }))
- // Reset the pendingSettleFails buffer while keeping acquired
- // memory.
- s.pendingSettleFails = s.pendingSettleFails[:0]
- case <-s.quit:
- return
- }
- }
- }
- // Start starts all helper goroutines required for the operation of the switch.
- func (s *Switch) Start() error {
- if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
- log.Warn("Htlc Switch already started")
- return errors.New("htlc switch already started")
- }
- log.Infof("HTLC Switch starting")
- blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
- if err != nil {
- return err
- }
- s.blockEpochStream = blockEpochStream
- s.wg.Add(1)
- go s.htlcForwarder()
- if err := s.reforwardResponses(); err != nil {
- s.Stop()
- log.Errorf("unable to reforward responses: %v", err)
- return err
- }
- if err := s.reforwardResolutions(); err != nil {
- // We are already stopping so we can ignore the error.
- _ = s.Stop()
- log.Errorf("unable to reforward resolutions: %v", err)
- return err
- }
- return nil
- }
- // reforwardResolutions fetches the set of resolution messages stored on-disk
- // and reforwards them if their circuits are still open. If the circuits have
- // been deleted, then we will delete the resolution message from the database.
- func (s *Switch) reforwardResolutions() error {
- // Fetch all stored resolution messages, deleting the ones that are
- // resolved.
- resMsgs, err := s.resMsgStore.fetchAllResolutionMsg()
- if err != nil {
- return err
- }
- switchPackets := make([]*htlcPacket, 0, len(resMsgs))
- for _, resMsg := range resMsgs {
- // If the open circuit no longer exists, then we can remove the
- // message from the store.
- outKey := CircuitKey{
- ChanID: resMsg.SourceChan,
- HtlcID: resMsg.HtlcIndex,
- }
- if s.circuits.LookupOpenCircuit(outKey) == nil {
- // The open circuit doesn't exist.
- err := s.resMsgStore.deleteResolutionMsg(&outKey)
- if err != nil {
- return err
- }
- continue
- }
- // The circuit is still open, so we can assume that the link or
- // switch (if we are the source) hasn't cleaned it up yet.
- // We rely on our forwarding logic to fill in details that
- // are not currently available to us.
- resPkt := &htlcPacket{
- outgoingChanID: resMsg.SourceChan,
- outgoingHTLCID: resMsg.HtlcIndex,
- isResolution: true,
- }
- if resMsg.Failure != nil {
- resPkt.htlc = &lnwire.UpdateFailHTLC{}
- } else {
- resPkt.htlc = &lnwire.UpdateFulfillHTLC{
- PaymentPreimage: *resMsg.PreImage,
- }
- }
- switchPackets = append(switchPackets, resPkt)
- }
- // We'll now dispatch the set of resolution messages to the proper
- // destination. An error is only encountered here if the switch is
- // shutting down.
- if err := s.ForwardPackets(nil, switchPackets...); err != nil {
- return err
- }
- return nil
- }
- // reforwardResponses for every known, non-pending channel, loads all associated
- // forwarding packages and reforwards any Settle or Fail HTLCs found. This is
- // used to resurrect the switch's mailboxes after a restart. This also runs for
- // waiting close channels since there may be settles or fails that need to be
- // reforwarded before they completely close.
- func (s *Switch) reforwardResponses() error {
- openChannels, err := s.cfg.FetchAllChannels()
- if err != nil {
- return err
- }
- for _, openChannel := range openChannels {
- shortChanID := openChannel.ShortChanID()
- // Locally-initiated payments never need reforwarding.
- if shortChanID == hop.Source {
- continue
- }
- // If the channel is pending, it should have no forwarding
- // packages, and nothing to reforward.
- if openChannel.IsPending {
- continue
- }
- // Channels in open or waiting-close may still have responses in
- // their forwarding packages. We will continue to reattempt
- // forwarding on startup until the channel is fully-closed.
- //
- // Load this channel's forwarding packages, and deliver them to
- // the switch.
- fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID)
- if err != nil {
- log.Errorf("unable to load forwarding "+
- "packages for %v: %v", shortChanID, err)
- return err
- }
- s.reforwardSettleFails(fwdPkgs)
- }
- return nil
- }
- // loadChannelFwdPkgs loads all forwarding packages owned by the `source` short
- // channel identifier.
- func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) {
- var fwdPkgs []*channeldb.FwdPkg
- if err := kvdb.View(s.cfg.DB, func(tx kvdb.RTx) error {
- var err error
- fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgs(
- tx, source,
- )
- return err
- }, func() {
- fwdPkgs = nil
- }); err != nil {
- return nil, err
- }
- return fwdPkgs, nil
- }
- // reforwardSettleFails parses the Settle and Fail HTLCs from the list of
- // forwarding packages, and reforwards those that have not been acknowledged.
- // This is intended to occur on startup, in order to recover the switch's
- // mailboxes, and to ensure that responses can be propagated in case the
- // outgoing link never comes back online.
- //
- // NOTE: This should mimic the behavior processRemoteSettleFails.
- func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) {
- for _, fwdPkg := range fwdPkgs {
- settleFails, err := lnwallet.PayDescsFromRemoteLogUpdates(
- fwdPkg.Source, fwdPkg.Height, fwdPkg.SettleFails,
- )
- if err != nil {
- log.Errorf("Unable to process remote log updates: %v",
- err)
- continue
- }
- switchPackets := make([]*htlcPacket, 0, len(settleFails))
- for i, pd := range settleFails {
- // Skip any settles or fails that have already been
- // acknowledged by the incoming link that originated the
- // forwarded Add.
- if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
- continue
- }
- switch pd.EntryType {
- // A settle for an HTLC we previously forwarded HTLC has
- // been received. So we'll forward the HTLC to the
- // switch which will handle propagating the settle to
- // the prior hop.
- case lnwallet.Settle:
- settlePacket := &htlcPacket{
- outgoingChanID: fwdPkg.Source,
- outgoingHTLCID: pd.ParentIndex,
- destRef: pd.DestRef,
- htlc: &lnwire.UpdateFulfillHTLC{
- PaymentPreimage: pd.RPreimage,
- },
- }
- // Add the packet to the batch to be forwarded, and
- // notify the overflow queue that a spare spot has been
- // freed up within the commitment state.
- switchPackets = append(switchPackets, settlePacket)
- // A failureCode message for a previously forwarded HTLC has been
- // received. As a result a new slot will be freed up in our
- // commitment state, so we'll forward this to the switch so the
- // backwards undo can continue.
- case lnwallet.Fail:
- // Fetch the reason the HTLC was canceled so
- // we can continue to propagate it. This
- // failure originated from another node, so
- // the linkFailure field is not set on this
- // packet. We rely on the link to fill in
- // additional circuit information for us.
- failPacket := &htlcPacket{
- outgoingChanID: fwdPkg.Source,
- outgoingHTLCID: pd.ParentIndex,
- destRef: pd.DestRef,
- htlc: &lnwire.UpdateFailHTLC{
- Reason: lnwire.OpaqueReason(pd.FailReason),
- },
- }
- // Add the packet to the batch to be forwarded, and
- // notify the overflow queue that a spare spot has been
- // freed up within the commitment state.
- switchPackets = append(switchPackets, failPacket)
- }
- }
- // Since this send isn't tied to a specific link, we pass a nil
- // link quit channel, meaning the send will fail only if the
- // switch receives a shutdown request.
- if err := s.ForwardPackets(nil, switchPackets...); err != nil {
- log.Errorf("Unhandled error while reforwarding packets "+
- "settle/fail over htlcswitch: %v", err)
- }
- }
- }
- // Stop gracefully stops all active helper goroutines, then waits until they've
- // exited.
- func (s *Switch) Stop() error {
- if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) {
- log.Warn("Htlc Switch already stopped")
- return errors.New("htlc switch already shutdown")
- }
- log.Info("HTLC Switch shutting down...")
- defer log.Debug("HTLC Switch shutdown complete")
- close(s.quit)
- s.wg.Wait()
- // Wait until all active goroutines have finished exiting before
- // stopping the mailboxes, otherwise the mailbox map could still be
- // accessed and modified.
- s.mailOrchestrator.Stop()
- return nil
- }
- // CreateAndAddLink will create a link and then add it to the internal maps
- // when given a ChannelLinkConfig and LightningChannel.
- func (s *Switch) CreateAndAddLink(linkCfg ChannelLinkConfig,
- lnChan *lnwallet.LightningChannel) error {
- link := NewChannelLink(linkCfg, lnChan)
- return s.AddLink(link)
- }
- // AddLink is used to initiate the handling of the add link command. The
- // request will be propagated and handled in the main goroutine.
- func (s *Switch) AddLink(link ChannelLink) error {
- s.indexMtx.Lock()
- defer s.indexMtx.Unlock()
- chanID := link.ChanID()
- // First, ensure that this link is not already active in the switch.
- _, err := s.getLink(chanID)
- if err == nil {
- return fmt.Errorf("unable to add ChannelLink(%v), already "+
- "active", chanID)
- }
- // Get and attach the mailbox for this link, which buffers packets in
- // case there packets that we tried to deliver while this link was
- // offline.
- shortChanID := link.ShortChanID()
- mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID)
- link.AttachMailBox(mailbox)
- // Attach the Switch's failAliasUpdate function to the link.
- link.attachFailAliasUpdate(s.failAliasUpdate)
- if err := link.Start(); err != nil {
- log.Errorf("AddLink failed to start link with chanID=%v: %v",
- chanID, err)
- s.removeLink(chanID)
- return err
- }
- if shortChanID == hop.Source {
- log.Infof("Adding pending link chan_id=%v, short_chan_id=%v",
- chanID, shortChanID)
- s.pendingLinkIndex[chanID] = link
- } else {
- log.Infof("Adding live link chan_id=%v, short_chan_id=%v",
- chanID, shortChanID)
- s.addLiveLink(link)
- s.mailOrchestrator.BindLiveShortChanID(
- mailbox, chanID, shortChanID,
- )
- }
- return nil
- }
- // addLiveLink adds a link to all associated forwarding index, this makes it a
- // candidate for forwarding HTLCs.
- func (s *Switch) addLiveLink(link ChannelLink) {
- linkScid := link.ShortChanID()
- // We'll add the link to the linkIndex which lets us quickly
- // look up a channel when we need to close or register it, and
- // the forwarding index which'll be used when forwarding HTLC's
- // in the multi-hop setting.
- s.linkIndex[link.ChanID()] = link
- s.forwardingIndex[linkScid] = link
- // Next we'll add the link to the interface index so we can
- // quickly look up all the channels for a particular node.
- peerPub := link.PeerPubKey()
- if _, ok := s.interfaceIndex[peerPub]; !ok {
- s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink)
- }
- s.interfaceIndex[peerPub][link.ChanID()] = link
- aliases := link.getAliases()
- if link.isZeroConf() {
- if link.zeroConfConfirmed() {
- // Since the zero-conf channel has confirmed, we can
- // populate the aliasToReal mapping.
- confirmedScid := link.confirmedScid()
- for _, alias := range aliases {
- s.aliasToReal[alias] = confirmedScid
- }
- // Add the confirmed SCID as a key in the baseIndex.
- s.baseIndex[confirmedScid] = linkScid
- }
- // Now we populate the baseIndex which will be used to fetch
- // the link given any of the channel's alias SCIDs or the real
- // SCID. The link's SCID is an alias, so we don't need to
- // special-case it like the option-scid-alias feature-bit case
- // further down.
- for _, alias := range aliases {
- s.baseIndex[alias] = linkScid
- }
- } else if link.negotiatedAliasFeature() {
- // The link's SCID is the confirmed SCID for non-zero-conf
- // option-scid-alias feature bit channels.
- for _, alias := range aliases {
- s.aliasToReal[alias] = linkScid
- s.baseIndex[alias] = linkScid
- }
- // Since the link's SCID is confirmed, it was not included in
- // the baseIndex above as a key. Add it now.
- s.baseIndex[linkScid] = linkScid
- }
- }
- // GetLink is used to initiate the handling of the get link command. The
- // request will be propagated/handled to/in the main goroutine.
- func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelUpdateHandler,
- error) {
- s.indexMtx.RLock()
- defer s.indexMtx.RUnlock()
- return s.getLink(chanID)
- }
- // getLink returns the link stored in either the pending index or the live
- // lindex.
- func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) {
- link, ok := s.linkIndex[chanID]
- if !ok {
- link, ok = s.pendingLinkIndex[chanID]
- if !ok {
- return nil, ErrChannelLinkNotFound
- }
- }
- return link, nil
- }
- // GetLinkByShortID attempts to return the link which possesses the target short
- // channel ID.
- func (s *Switch) GetLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink,
- error) {
- s.indexMtx.RLock()
- defer s.indexMtx.RUnlock()
- link, err := s.getLinkByShortID(chanID)
- if err != nil {
- // If we failed to find the link under the passed-in SCID, we
- // consult the Switch's baseIndex map to see if the confirmed
- // SCID was used for a zero-conf channel.
- aliasID, ok := s.baseIndex[chanID]
- if !ok {
- return nil, err
- }
- // An alias was found, use it to lookup if a link exists.
- return s.getLinkByShortID(aliasID)
- }
- return link, nil
- }
- // getLinkByShortID attempts to return the link which possesses the target
- // short channel ID.
- //
- // NOTE: This MUST be called with the indexMtx held.
- func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) {
- link, ok := s.forwardingIndex[chanID]
- if !ok {
- return nil, ErrChannelLinkNotFound
- }
- return link, nil
- }
- // getLinkByMapping attempts to fetch the link via the htlcPacket's
- // outgoingChanID, possibly using a mapping. If it finds the link via mapping,
- // the outgoingChanID will be changed so that an error can be properly
- // attributed when looping over linkErrs in handlePacketForward.
- //
- // * If the outgoingChanID is an alias, we'll fetch the link regardless if it's
- // public or not.
- //
- // * If the outgoingChanID is a confirmed SCID, we'll need to do more checks.
- // - If there is no entry found in baseIndex, fetch the link. This channel
- // did not have the option-scid-alias feature negotiated (which includes
- // zero-conf and option-scid-alias channel-types).
- // - If there is an entry found, fetch the link from forwardingIndex and
- // fail if this is a private link.
- //
- // NOTE: This MUST be called with the indexMtx read lock held.
- func (s *Switch) getLinkByMapping(pkt *htlcPacket) (ChannelLink, error) {
- // Determine if this ShortChannelID is an alias or a confirmed SCID.
- chanID := pkt.outgoingChanID
- aliasID := s.cfg.IsAlias(chanID)
- // Set the originalOutgoingChanID so the proper channel_update can be
- // sent back if the option-scid-alias feature bit was negotiated.
- pkt.originalOutgoingChanID = chanID
- if aliasID {
- // Since outgoingChanID is an alias, we'll fetch the link via
- // baseIndex.
- baseScid, ok := s.baseIndex[chanID]
- if !ok {
- // No mapping exists, bail.
- return nil, ErrChannelLinkNotFound
- }
- // A mapping exists, so use baseScid to find the link in the
- // forwardingIndex.
- link, ok := s.forwardingIndex[baseScid]
- if !ok {
- // Link not found, bail.
- return nil, ErrChannelLinkNotFound
- }
- // Change the packet's outgoingChanID field so that errors are
- // properly attributed.
- pkt.outgoingChanID = baseScid
- // Return the link without checking if it's private or not.
- return link, nil
- }
- // The outgoingChanID is a confirmed SCID. Attempt to fetch the base
- // SCID from baseIndex.
- baseScid, ok := s.baseIndex[chanID]
- if !ok {
- // outgoingChanID is not a key in base index meaning this
- // channel did not have the option-scid-alias feature bit
- // negotiated. We'll fetch the link and return it.
- link, ok := s.forwardingIndex[chanID]
- if !ok {
- // The link wasn't found, bail out.
- return nil, ErrChannelLinkNotFound
- }
- return link, nil
- }
- // Fetch the link whose internal SCID is baseScid.
- link, ok := s.forwardingIndex[baseScid]
- if !ok {
- // Link wasn't found, bail out.
- return nil, ErrChannelLinkNotFound
- }
- // If the link is unadvertised, we fail since the real SCID was used to
- // forward over it and this is a channel where the option-scid-alias
- // feature bit was negotiated.
- if link.IsUnadvertised() {
- return nil, ErrChannelLinkNotFound
- }
- // The link is public so the confirmed SCID can be used to forward over
- // it. We'll also replace pkt's outgoingChanID field so errors can
- // properly be attributed in the calling function.
- pkt.outgoingChanID = baseScid
- return link, nil
- }
- // HasActiveLink returns true if the given channel ID has a link in the link
- // index AND the link is eligible to forward.
- func (s *Switch) HasActiveLink(chanID lnwire.ChannelID) bool {
- s.indexMtx.RLock()
- defer s.indexMtx.RUnlock()
- if link, ok := s.linkIndex[chanID]; ok {
- return link.EligibleToForward()
- }
- return false
- }
- // RemoveLink purges the switch of any link associated with chanID. If a pending
- // or active link is not found, this method does nothing. Otherwise, the method
- // returns after the link has been completely shutdown.
- func (s *Switch) RemoveLink(chanID lnwire.ChannelID) {
- s.indexMtx.Lock()
- link, err := s.getLink(chanID)
- if err != nil {
- // If err is non-nil, this means that link is also nil. The
- // link variable cannot be nil without err being non-nil.
- s.indexMtx.Unlock()
- log.Tracef("Unable to remove link for ChannelID(%v): %v",
- chanID, err)
- return
- }
- // Check if the link is already stopping and grab the stop chan if it
- // is.
- stopChan, ok := s.linkStopIndex[chanID]
- if !ok {
- // If the link is non-nil, it is not currently stopping, so
- // we'll add a stop chan to the linkStopIndex.
- stopChan = make(chan struct{})
- s.linkStopIndex[chanID] = stopChan
- }
- s.indexMtx.Unlock()
- if ok {
- // If the stop chan exists, we will wait for it to be closed.
- // Once it is closed, we will exit.
- select {
- case <-stopChan:
- return
- case <-s.quit:
- return
- }
- }
- // Stop the link before removing it from the maps.
- link.Stop()
- s.indexMtx.Lock()
- _ = s.removeLink(chanID)
- // Close stopChan and remove this link from the linkStopIndex.
- // Deleting from the index and removing from the link must be done
- // in the same block while the mutex is held.
- close(stopChan)
- delete(s.linkStopIndex, chanID)
- s.indexMtx.Unlock()
- }
- // removeLink is used to remove and stop the channel link.
- //
- // NOTE: This MUST be called with the indexMtx held.
- func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink {
- log.Infof("Removing channel link with ChannelID(%v)", chanID)
- link, err := s.getLink(chanID)
- if err != nil {
- return nil
- }
- // Remove the channel from live link indexes.
- delete(s.pendingLinkIndex, link.ChanID())
- delete(s.linkIndex, link.ChanID())
- delete(s.forwardingIndex, link.ShortChanID())
- // If the link has been added to the peer index, then we'll move to
- // delete the entry within the index.
- peerPub := link.PeerPubKey()
- if peerIndex, ok := s.interfaceIndex[peerPub]; ok {
- delete(peerIndex, link.ChanID())
- // If after deletion, there are no longer any links, then we'll
- // remove the interface map all together.
- if len(peerIndex) == 0 {
- delete(s.interfaceIndex, peerPub)
- }
- }
- return link
- }
- // UpdateShortChanID locates the link with the passed-in chanID and updates the
- // underlying channel state. This is only used in zero-conf channels to allow
- // the confirmed SCID to be updated.
- func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error {
- s.indexMtx.Lock()
- defer s.indexMtx.Unlock()
- // Locate the target link in the link index. If no such link exists,
- // then we will ignore the request.
- link, ok := s.linkIndex[chanID]
- if !ok {
- return fmt.Errorf("link %v not found", chanID)
- }
- // Try to update the link's underlying channel state, returning early
- // if this update failed.
- _, err := link.UpdateShortChanID()
- if err != nil {
- return err
- }
- // Since the zero-conf channel is confirmed, we should populate the
- // aliasToReal map and update the baseIndex.
- aliases := link.getAliases()
- confirmedScid := link.confirmedScid()
- for _, alias := range aliases {
- s.aliasToReal[alias] = confirmedScid
- }
- s.baseIndex[confirmedScid] = link.ShortChanID()
- return nil
- }
- // GetLinksByInterface fetches all the links connected to a particular node
- // identified by the serialized compressed form of its public key.
- func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelUpdateHandler,
- error) {
- s.indexMtx.RLock()
- defer s.indexMtx.RUnlock()
- var handlers []ChannelUpdateHandler
- links, err := s.getLinks(hop)
- if err != nil {
- return nil, err
- }
- // Range over the returned []ChannelLink to convert them into
- // []ChannelUpdateHandler.
- for _, link := range links {
- handlers = append(handlers, link)
- }
- return handlers, nil
- }
- // getLinks is function which returns the channel links of the peer by hop
- // destination id.
- //
- // NOTE: This MUST be called with the indexMtx held.
- func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) {
- links, ok := s.interfaceIndex[destination]
- if !ok {
- return nil, ErrNoLinksFound
- }
- channelLinks := make([]ChannelLink, 0, len(links))
- for _, link := range links {
- channelLinks = append(channelLinks, link)
- }
- return channelLinks, nil
- }
- // CircuitModifier returns a reference to subset of the interfaces provided by
- // the circuit map, to allow links to open and close circuits.
- func (s *Switch) CircuitModifier() CircuitModifier {
- return s.circuits
- }
- // CircuitLookup returns a reference to subset of the interfaces provided by the
- // circuit map, to allow looking up circuits.
- func (s *Switch) CircuitLookup() CircuitLookup {
- return s.circuits
- }
- // commitCircuits persistently adds a circuit to the switch's circuit map.
- func (s *Switch) commitCircuits(circuits ...*PaymentCircuit) (
- *CircuitFwdActions, error) {
- return s.circuits.CommitCircuits(circuits...)
- }
- // FlushForwardingEvents flushes out the set of pending forwarding events to
- // the persistent log. This will be used by the switch to periodically flush
- // out the set of forwarding events to disk. External callers can also use this
- // method to ensure all data is flushed to dis before querying the log.
- func (s *Switch) FlushForwardingEvents() error {
- // First, we'll obtain a copy of the current set of pending forwarding
- // events.
- s.fwdEventMtx.Lock()
- // If we won't have any forwarding events, then we can exit early.
- if len(s.pendingFwdingEvents) == 0 {
- s.fwdEventMtx.Unlock()
- return nil
- }
- events := make([]channeldb.ForwardingEvent, len(s.pendingFwdingEvents))
- copy(events[:], s.pendingFwdingEvents[:])
- // With the copy obtained, we can now clear out the header pointer of
- // the current slice. This way, we can re-use the underlying storage
- // allocated for the slice.
- s.pendingFwdingEvents = s.pendingFwdingEvents[:0]
- s.fwdEventMtx.Unlock()
- // Finally, we'll write out the copied events to the persistent
- // forwarding log.
- return s.cfg.FwdingLog.AddForwardingEvents(events)
- }
- // BestHeight returns the best height known to the switch.
- func (s *Switch) BestHeight() uint32 {
- return atomic.LoadUint32(&s.bestHeight)
- }
- // evaluateDustThreshold takes in a ChannelLink, HTLC amount, and a boolean to
- // determine whether the default dust threshold has been exceeded. This
- // heuristic takes into account the trimmed-to-dust mechanism. The sum of the
- // commitment's dust with the mailbox's dust with the amount is checked against
- // the default threshold. If incoming is true, then the amount is not included
- // in the sum as it was already included in the commitment's dust. A boolean is
- // returned telling the caller whether the HTLC should be failed back.
- func (s *Switch) evaluateDustThreshold(link ChannelLink,
- amount lnwire.MilliSatoshi, incoming bool) bool {
- // Retrieve the link's current commitment feerate and dustClosure.
- feeRate := link.getFeeRate()
- isDust := link.getDustClosure()
- // Evaluate if the HTLC is dust on either sides' commitment.
- isLocalDust := isDust(feeRate, incoming, true, amount.ToSatoshis())
- isRemoteDust := isDust(feeRate, incoming, false, amount.ToSatoshis())
- if !(isLocalDust || isRemoteDust) {
- // If the HTLC is not dust on either commitment, it's fine to
- // forward.
- return false
- }
- // Fetch the dust sums currently in the mailbox for this link.
- cid := link.ChanID()
- sid := link.ShortChanID()
- mailbox := s.mailOrchestrator.GetOrCreateMailBox(cid, sid)
- localMailDust, remoteMailDust := mailbox.DustPackets()
- // If the htlc is dust on the local commitment, we'll obtain the dust
- // sum for it.
- if isLocalDust {
- localSum := link.getDustSum(false)
- localSum += localMailDust
- // Optionally include the HTLC amount only for outgoing
- // HTLCs.
- if !incoming {
- localSum += amount
- }
- // Finally check against the defined dust threshold.
- if localSum > s.cfg.DustThreshold {
- return true
- }
- }
- // Also check if the htlc is dust on the remote commitment, if we've
- // reached this point.
- if isRemoteDust {
- remoteSum := link.getDustSum(true)
- remoteSum += remoteMailDust
- // Optionally include the HTLC amount only for outgoing
- // HTLCs.
- if !incoming {
- remoteSum += amount
- }
- // Finally check against the defined dust threshold.
- if remoteSum > s.cfg.DustThreshold {
- return true
- }
- }
- // If we reached this point, this HTLC is fine to forward.
- return false
- }
- // failMailboxUpdate is passed to the mailbox orchestrator which in turn passes
- // it to individual mailboxes. It allows the mailboxes to construct a
- // FailureMessage when failing back HTLC's due to expiry and may include an
- // alias in the ShortChannelID field. The outgoingScid is the SCID originally
- // used in the onion. The mailboxScid is the SCID that the mailbox and link
- // use. The mailboxScid is only used in the non-alias case, so it is always
- // the confirmed SCID.
- func (s *Switch) failMailboxUpdate(outgoingScid,
- mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage {
- // Try to use the failAliasUpdate function in case this is a channel
- // that uses aliases. If it returns nil, we'll fallback to the original
- // pre-alias behavior.
- update := s.failAliasUpdate(outgoingScid, false)
- if update == nil {
- // Execute the fallback behavior.
- var err error
- update, err = s.cfg.FetchLastChannelUpdate(mailboxScid)
- if err != nil {
- return &lnwire.FailTemporaryNodeFailure{}
- }
- }
- return lnwire.NewTemporaryChannelFailure(update)
- }
- // failAliasUpdate prepares a ChannelUpdate for a failed incoming or outgoing
- // HTLC on a channel where the option-scid-alias feature bit was negotiated. If
- // the associated channel is not one of these, this function will return nil
- // and the caller is expected to handle this properly. In this case, a return
- // to the original non-alias behavior is expected.
- func (s *Switch) failAliasUpdate(scid lnwire.ShortChannelID,
- incoming bool) *lnwire.ChannelUpdate {
- // This function does not defer the unlocking because of the database
- // lookups for ChannelUpdate.
- s.indexMtx.RLock()
- if s.cfg.IsAlias(scid) {
- // The alias SCID was used. In the incoming case this means
- // the channel is zero-conf as the link sets the scid. In the
- // outgoing case, the sender set the scid to use and may be
- // either the alias or the confirmed one, if it exists.
- realScid, ok := s.aliasToReal[scid]
- if !ok {
- // The real, confirmed SCID does not exist yet. Find
- // the "base" SCID that the link uses via the
- // baseIndex. If we can't find it, return nil. This
- // means the channel is zero-conf.
- baseScid, ok := s.baseIndex[scid]
- s.indexMtx.RUnlock()
- if !ok {
- return nil
- }
- update, err := s.cfg.FetchLastChannelUpdate(baseScid)
- if err != nil {
- return nil
- }
- // Replace the baseScid with the passed-in alias.
- update.ShortChannelID = scid
- sig, err := s.cfg.SignAliasUpdate(update)
- if err != nil {
- return nil
- }
- update.Signature, err = lnwire.NewSigFromSignature(sig)
- if err != nil {
- return nil
- }
- return update
- }
- s.indexMtx.RUnlock()
- // Fetch the SCID via the confirmed SCID and replace it with
- // the alias.
- update, err := s.cfg.FetchLastChannelUpdate(realScid)
- if err != nil {
- return nil
- }
- // In the incoming case, we want to ensure that we don't leak
- // the UTXO in case the channel is private. In the outgoing
- // case, since the alias was used, we do the same thing.
- update.ShortChannelID = scid
- sig, err := s.cfg.SignAliasUpdate(update)
- if err != nil {
- return nil
- }
- update.Signature, err = lnwire.NewSigFromSignature(sig)
- if err != nil {
- return nil
- }
- return update
- }
- // If the confirmed SCID is not in baseIndex, this is not an
- // option-scid-alias or zero-conf channel.
- baseScid, ok := s.baseIndex[scid]
- if !ok {
- s.indexMtx.RUnlock()
- return nil
- }
- // Fetch the link so we can get an alias to use in the ShortChannelID
- // of the ChannelUpdate.
- link, ok := s.forwardingIndex[baseScid]
- s.indexMtx.RUnlock()
- if !ok {
- // This should never happen, but if it does for some reason,
- // fallback to the old behavior.
- return nil
- }
- aliases := link.getAliases()
- if len(aliases) == 0 {
- // This should never happen, but if it does, fallback.
- return nil
- }
- // Fetch the ChannelUpdate via the real, confirmed SCID.
- update, err := s.cfg.FetchLastChannelUpdate(scid)
- if err != nil {
- return nil
- }
- // The incoming case will replace the ShortChannelID in the retrieved
- // ChannelUpdate with the alias to ensure no privacy leak occurs. This
- // would happen if a private non-zero-conf option-scid-alias
- // feature-bit channel leaked its UTXO here rather than supplying an
- // alias. In the outgoing case, the confirmed SCID was actually used
- // for forwarding in the onion, so no replacement is necessary as the
- // sender knows the scid.
- if incoming {
- // We will replace and sign the update with the first alias.
- // Since this happens on the incoming side, it's not actually
- // possible to know what the sender used in the onion.
- update.ShortChannelID = aliases[0]
- sig, err := s.cfg.SignAliasUpdate(update)
- if err != nil {
- return nil
- }
- update.Signature, err = lnwire.NewSigFromSignature(sig)
- if err != nil {
- return nil
- }
- }
- return update
- }
- // AddAliasForLink instructs the Switch to update its in-memory maps to reflect
- // that a link has a new alias.
- func (s *Switch) AddAliasForLink(chanID lnwire.ChannelID,
- alias lnwire.ShortChannelID) error {
- // Fetch the link so that we can update the underlying channel's set of
- // aliases.
- s.indexMtx.RLock()
- link, err := s.getLink(chanID)
- s.indexMtx.RUnlock()
- if err != nil {
- return err
- }
- // If the link is a channel where the option-scid-alias feature bit was
- // not negotiated, we'll return an error.
- if !link.negotiatedAliasFeature() {
- return fmt.Errorf("attempted to update non-alias channel")
- }
- linkScid := link.ShortChanID()
- // We'll update the maps so the Switch includes this alias in its
- // forwarding decisions.
- if link.isZeroConf() {
- if link.zeroConfConfirmed() {
- // If the channel has confirmed on-chain, we'll
- // add this alias to the aliasToReal map.
- confirmedScid := link.confirmedScid()
- s.aliasToReal[alias] = confirmedScid
- }
- // Add this alias to the baseIndex mapping.
- s.baseIndex[alias] = linkScid
- } else if link.negotiatedAliasFeature() {
- // The channel is confirmed, so we'll populate the aliasToReal
- // and baseIndex maps.
- s.aliasToReal[alias] = linkScid
- s.baseIndex[alias] = linkScid
- }
- return nil
- }
|