123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971 |
- package htlcswitch
- import (
- "bytes"
- "container/list"
- "errors"
- "fmt"
- "sync"
- "time"
- "github.com/lightningnetwork/lnd/clock"
- "github.com/lightningnetwork/lnd/lnwallet/chainfee"
- "github.com/lightningnetwork/lnd/lnwire"
- )
- var (
- // ErrMailBoxShuttingDown is returned when the mailbox is interrupted by
- // a shutdown request.
- ErrMailBoxShuttingDown = errors.New("mailbox is shutting down")
- // ErrPacketAlreadyExists signals that an attempt to add a packet failed
- // because it already exists in the mailbox.
- ErrPacketAlreadyExists = errors.New("mailbox already has packet")
- )
- // MailBox is an interface which represents a concurrent-safe, in-order
- // delivery queue for messages from the network and also from the main switch.
- // This struct serves as a buffer between incoming messages, and messages to
- // the handled by the link. Each of the mutating methods within this interface
- // should be implemented in a non-blocking manner.
- type MailBox interface {
- // AddMessage appends a new message to the end of the message queue.
- AddMessage(msg lnwire.Message) error
- // AddPacket appends a new message to the end of the packet queue.
- AddPacket(pkt *htlcPacket) error
- // HasPacket queries the packets for a circuit key, this is used to drop
- // packets bound for the switch that already have a queued response.
- HasPacket(CircuitKey) bool
- // AckPacket removes a packet from the mailboxes in-memory replay
- // buffer. This will prevent a packet from being delivered after a link
- // restarts if the switch has remained online. The returned boolean
- // indicates whether or not a packet with the passed incoming circuit
- // key was removed.
- AckPacket(CircuitKey) bool
- // FailAdd fails an UpdateAddHTLC that exists within the mailbox,
- // removing it from the in-memory replay buffer. This will prevent the
- // packet from being delivered after the link restarts if the switch has
- // remained online. The generated LinkError will show an
- // OutgoingFailureDownstreamHtlcAdd FailureDetail.
- FailAdd(pkt *htlcPacket)
- // MessageOutBox returns a channel that any new messages ready for
- // delivery will be sent on.
- MessageOutBox() chan lnwire.Message
- // PacketOutBox returns a channel that any new packets ready for
- // delivery will be sent on.
- PacketOutBox() chan *htlcPacket
- // Clears any pending wire messages from the inbox.
- ResetMessages() error
- // Reset the packet head to point at the first element in the list.
- ResetPackets() error
- // SetDustClosure takes in a closure that is used to evaluate whether
- // mailbox HTLC's are dust.
- SetDustClosure(isDust dustClosure)
- // SetFeeRate sets the feerate to be used when evaluating dust.
- SetFeeRate(feerate chainfee.SatPerKWeight)
- // DustPackets returns the dust sum for Adds in the mailbox for the
- // local and remote commitments.
- DustPackets() (lnwire.MilliSatoshi, lnwire.MilliSatoshi)
- // Start starts the mailbox and any goroutines it needs to operate
- // properly.
- Start()
- // Stop signals the mailbox and its goroutines for a graceful shutdown.
- Stop()
- }
- type mailBoxConfig struct {
- // shortChanID is the short channel id of the channel this mailbox
- // belongs to.
- shortChanID lnwire.ShortChannelID
- // forwardPackets send a varidic number of htlcPackets to the switch to
- // be routed. A quit channel should be provided so that the call can
- // properly exit during shutdown.
- forwardPackets func(chan struct{}, ...*htlcPacket) error
- // clock is a time source for the mailbox.
- clock clock.Clock
- // expiry is the interval after which Adds will be cancelled if they
- // have not been yet been delivered. The computed deadline will expiry
- // this long after the Adds are added via AddPacket.
- expiry time.Duration
- // failMailboxUpdate is used to fail an expired HTLC and use the
- // correct SCID if the underlying channel uses aliases.
- failMailboxUpdate func(outScid,
- mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
- }
- // memoryMailBox is an implementation of the MailBox struct backed by purely
- // in-memory queues.
- //
- // TODO(morehouse): use typed lists instead of list.Lists to avoid type asserts.
- type memoryMailBox struct {
- started sync.Once
- stopped sync.Once
- cfg *mailBoxConfig
- wireMessages *list.List
- wireMtx sync.Mutex
- wireCond *sync.Cond
- messageOutbox chan lnwire.Message
- msgReset chan chan struct{}
- // repPkts is a queue for reply packets, e.g. Settles and Fails.
- repPkts *list.List
- repIndex map[CircuitKey]*list.Element
- repHead *list.Element
- // addPkts is a dedicated queue for Adds.
- addPkts *list.List
- addIndex map[CircuitKey]*list.Element
- addHead *list.Element
- pktMtx sync.Mutex
- pktCond *sync.Cond
- pktOutbox chan *htlcPacket
- pktReset chan chan struct{}
- wireShutdown chan struct{}
- pktShutdown chan struct{}
- quit chan struct{}
- // feeRate is set when the link receives or sends out fee updates. It
- // is refreshed when AttachMailBox is called in case a fee update did
- // not get committed. In some cases it may be out of sync with the
- // channel's feerate, but it should eventually get back in sync.
- feeRate chainfee.SatPerKWeight
- // isDust is set when AttachMailBox is called and serves to evaluate
- // the outstanding dust in the memoryMailBox given the current set
- // feeRate.
- isDust dustClosure
- }
- // newMemoryMailBox creates a new instance of the memoryMailBox.
- func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox {
- box := &memoryMailBox{
- cfg: cfg,
- wireMessages: list.New(),
- repPkts: list.New(),
- addPkts: list.New(),
- messageOutbox: make(chan lnwire.Message),
- pktOutbox: make(chan *htlcPacket),
- msgReset: make(chan chan struct{}, 1),
- pktReset: make(chan chan struct{}, 1),
- repIndex: make(map[CircuitKey]*list.Element),
- addIndex: make(map[CircuitKey]*list.Element),
- wireShutdown: make(chan struct{}),
- pktShutdown: make(chan struct{}),
- quit: make(chan struct{}),
- }
- box.wireCond = sync.NewCond(&box.wireMtx)
- box.pktCond = sync.NewCond(&box.pktMtx)
- return box
- }
- // A compile time assertion to ensure that memoryMailBox meets the MailBox
- // interface.
- var _ MailBox = (*memoryMailBox)(nil)
- // courierType is an enum that reflects the distinct types of messages a
- // MailBox can handle. Each type will be placed in an isolated mail box and
- // will have a dedicated goroutine for delivering the messages.
- type courierType uint8
- const (
- // wireCourier is a type of courier that handles wire messages.
- wireCourier courierType = iota
- // pktCourier is a type of courier that handles htlc packets.
- pktCourier
- )
- // Start starts the mailbox and any goroutines it needs to operate properly.
- //
- // NOTE: This method is part of the MailBox interface.
- func (m *memoryMailBox) Start() {
- m.started.Do(func() {
- go m.wireMailCourier()
- go m.pktMailCourier()
- })
- }
- // ResetMessages blocks until all buffered wire messages are cleared.
- func (m *memoryMailBox) ResetMessages() error {
- msgDone := make(chan struct{})
- select {
- case m.msgReset <- msgDone:
- return m.signalUntilReset(wireCourier, msgDone)
- case <-m.quit:
- return ErrMailBoxShuttingDown
- }
- }
- // ResetPackets blocks until the head of packets buffer is reset, causing the
- // packets to be redelivered in order.
- func (m *memoryMailBox) ResetPackets() error {
- pktDone := make(chan struct{})
- select {
- case m.pktReset <- pktDone:
- return m.signalUntilReset(pktCourier, pktDone)
- case <-m.quit:
- return ErrMailBoxShuttingDown
- }
- }
- // signalUntilReset strobes the condition variable for the specified inbox type
- // until receiving a response that the mailbox has processed a reset.
- func (m *memoryMailBox) signalUntilReset(cType courierType,
- done chan struct{}) error {
- for {
- switch cType {
- case wireCourier:
- m.wireCond.Signal()
- case pktCourier:
- m.pktCond.Signal()
- }
- select {
- case <-time.After(time.Millisecond):
- continue
- case <-done:
- return nil
- case <-m.quit:
- return ErrMailBoxShuttingDown
- }
- }
- }
- // AckPacket removes the packet identified by it's incoming circuit key from the
- // queue of packets to be delivered. The returned boolean indicates whether or
- // not a packet with the passed incoming circuit key was removed.
- //
- // NOTE: It is safe to call this method multiple times for the same circuit key.
- func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
- m.pktCond.L.Lock()
- defer m.pktCond.L.Unlock()
- if entry, ok := m.repIndex[inKey]; ok {
- // Check whether we are removing the head of the queue. If so,
- // we must advance the head to the next packet before removing.
- // It's possible that the courier has already advanced the
- // repHead, so this check prevents the repHead from getting
- // desynchronized.
- if entry == m.repHead {
- m.repHead = entry.Next()
- }
- m.repPkts.Remove(entry)
- delete(m.repIndex, inKey)
- return true
- }
- if entry, ok := m.addIndex[inKey]; ok {
- // Check whether we are removing the head of the queue. If so,
- // we must advance the head to the next add before removing.
- // It's possible that the courier has already advanced the
- // addHead, so this check prevents the addHead from getting
- // desynchronized.
- //
- // NOTE: While this event is rare for Settles or Fails, it could
- // be very common for Adds since the mailbox has the ability to
- // cancel Adds before they are delivered. When that occurs, the
- // head of addPkts has only been peeked and we expect to be
- // removing the head of the queue.
- if entry == m.addHead {
- m.addHead = entry.Next()
- }
- m.addPkts.Remove(entry)
- delete(m.addIndex, inKey)
- return true
- }
- return false
- }
- // HasPacket queries the packets for a circuit key, this is used to drop packets
- // bound for the switch that already have a queued response.
- func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
- m.pktCond.L.Lock()
- _, ok := m.repIndex[inKey]
- m.pktCond.L.Unlock()
- return ok
- }
- // Stop signals the mailbox and its goroutines for a graceful shutdown.
- //
- // NOTE: This method is part of the MailBox interface.
- func (m *memoryMailBox) Stop() {
- m.stopped.Do(func() {
- close(m.quit)
- m.signalUntilShutdown(wireCourier)
- m.signalUntilShutdown(pktCourier)
- })
- }
- // signalUntilShutdown strobes the condition variable of the passed courier
- // type, blocking until the worker has exited.
- func (m *memoryMailBox) signalUntilShutdown(cType courierType) {
- var (
- cond *sync.Cond
- shutdown chan struct{}
- )
- switch cType {
- case wireCourier:
- cond = m.wireCond
- shutdown = m.wireShutdown
- case pktCourier:
- cond = m.pktCond
- shutdown = m.pktShutdown
- }
- for {
- select {
- case <-time.After(time.Millisecond):
- cond.Signal()
- case <-shutdown:
- return
- }
- }
- }
- // pktWithExpiry wraps an incoming packet and records the time at which it it
- // should be canceled from the mailbox. This will be used to detect if it gets
- // stuck in the mailbox and inform when to cancel back.
- type pktWithExpiry struct {
- pkt *htlcPacket
- expiry time.Time
- }
- func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
- return clock.TickAfter(p.expiry.Sub(clock.Now()))
- }
- // wireMailCourier is a dedicated goroutine whose job is to reliably deliver
- // wire messages.
- func (m *memoryMailBox) wireMailCourier() {
- defer close(m.wireShutdown)
- for {
- // First, we'll check our condition. If our mailbox is empty,
- // then we'll wait until a new item is added.
- m.wireCond.L.Lock()
- for m.wireMessages.Front() == nil {
- m.wireCond.Wait()
- select {
- case msgDone := <-m.msgReset:
- m.wireMessages.Init()
- close(msgDone)
- case <-m.quit:
- m.wireCond.L.Unlock()
- return
- default:
- }
- }
- // Grab the datum off the front of the queue, shifting the
- // slice's reference down one in order to remove the datum from
- // the queue.
- entry := m.wireMessages.Front()
- //nolint:forcetypeassert
- nextMsg := m.wireMessages.Remove(entry).(lnwire.Message)
- // Now that we're done with the condition, we can unlock it to
- // allow any callers to append to the end of our target queue.
- m.wireCond.L.Unlock()
- // With the next message obtained, we'll now select to attempt
- // to deliver the message. If we receive a kill signal, then
- // we'll bail out.
- select {
- case m.messageOutbox <- nextMsg:
- case msgDone := <-m.msgReset:
- m.wireCond.L.Lock()
- m.wireMessages.Init()
- m.wireCond.L.Unlock()
- close(msgDone)
- case <-m.quit:
- return
- }
- }
- }
- // pktMailCourier is a dedicated goroutine whose job is to reliably deliver
- // packet messages.
- func (m *memoryMailBox) pktMailCourier() {
- defer close(m.pktShutdown)
- for {
- // First, we'll check our condition. If our mailbox is empty,
- // then we'll wait until a new item is added.
- m.pktCond.L.Lock()
- for m.repHead == nil && m.addHead == nil {
- m.pktCond.Wait()
- select {
- // Resetting the packet queue means just moving our
- // pointer to the front. This ensures that any un-ACK'd
- // messages are re-delivered upon reconnect.
- case pktDone := <-m.pktReset:
- m.repHead = m.repPkts.Front()
- m.addHead = m.addPkts.Front()
- close(pktDone)
- case <-m.quit:
- m.pktCond.L.Unlock()
- return
- default:
- }
- }
- var (
- nextRep *htlcPacket
- nextRepEl *list.Element
- nextAdd *pktWithExpiry
- nextAddEl *list.Element
- )
- // For packets, we actually never remove an item until it has
- // been ACK'd by the link. This ensures that if a read packet
- // doesn't make it into a commitment, then it'll be
- // re-delivered once the link comes back online.
- // Peek at the head of the Settle/Fails and Add queues. We peak
- // both even if there is a Settle/Fail present because we need
- // to set a deadline for the next pending Add if it's present.
- // Due to clock monotonicity, we know that the head of the Adds
- // is the next to expire.
- if m.repHead != nil {
- //nolint:forcetypeassert
- nextRep = m.repHead.Value.(*htlcPacket)
- nextRepEl = m.repHead
- }
- if m.addHead != nil {
- //nolint:forcetypeassert
- nextAdd = m.addHead.Value.(*pktWithExpiry)
- nextAddEl = m.addHead
- }
- // Now that we're done with the condition, we can unlock it to
- // allow any callers to append to the end of our target queue.
- m.pktCond.L.Unlock()
- var (
- pktOutbox chan *htlcPacket
- addOutbox chan *htlcPacket
- add *htlcPacket
- deadline <-chan time.Time
- )
- // Prioritize delivery of Settle/Fail packets over Adds. This
- // ensures that we actively clear the commitment of existing
- // HTLCs before trying to add new ones. This can help to improve
- // forwarding performance since the time to sign a commitment is
- // linear in the number of HTLCs manifested on the commitments.
- //
- // NOTE: Both types are eventually delivered over the same
- // channel, but we can control which is delivered by exclusively
- // making one nil and the other non-nil. We know from our loop
- // condition that at least one nextRep and nextAdd are non-nil.
- if nextRep != nil {
- pktOutbox = m.pktOutbox
- } else {
- addOutbox = m.pktOutbox
- }
- // If we have a pending Add, we'll also construct the deadline
- // so we can fail it back if we are unable to deliver any
- // message in time. We also dereference the nextAdd's packet,
- // since we will need access to it in the case we are delivering
- // it and/or if the deadline expires.
- //
- // NOTE: It's possible after this point for add to be nil, but
- // this can only occur when addOutbox is also nil, hence we
- // won't accidentally deliver a nil packet.
- if nextAdd != nil {
- add = nextAdd.pkt
- deadline = nextAdd.deadline(m.cfg.clock)
- }
- select {
- case pktOutbox <- nextRep:
- m.pktCond.L.Lock()
- // Only advance the repHead if this Settle or Fail is
- // still at the head of the queue.
- if m.repHead != nil && m.repHead == nextRepEl {
- m.repHead = m.repHead.Next()
- }
- m.pktCond.L.Unlock()
- case addOutbox <- add:
- m.pktCond.L.Lock()
- // Only advance the addHead if this Add is still at the
- // head of the queue.
- if m.addHead != nil && m.addHead == nextAddEl {
- m.addHead = m.addHead.Next()
- }
- m.pktCond.L.Unlock()
- case <-deadline:
- log.Debugf("Expiring add htlc with "+
- "keystone=%v", add.keystone())
- m.FailAdd(add)
- case pktDone := <-m.pktReset:
- m.pktCond.L.Lock()
- m.repHead = m.repPkts.Front()
- m.addHead = m.addPkts.Front()
- m.pktCond.L.Unlock()
- close(pktDone)
- case <-m.quit:
- return
- }
- }
- }
- // AddMessage appends a new message to the end of the message queue.
- //
- // NOTE: This method is safe for concrete use and part of the MailBox
- // interface.
- func (m *memoryMailBox) AddMessage(msg lnwire.Message) error {
- // First, we'll lock the condition, and add the message to the end of
- // the wire message inbox.
- m.wireCond.L.Lock()
- m.wireMessages.PushBack(msg)
- m.wireCond.L.Unlock()
- // With the message added, we signal to the mailCourier that there are
- // additional messages to deliver.
- m.wireCond.Signal()
- return nil
- }
- // AddPacket appends a new message to the end of the packet queue.
- //
- // NOTE: This method is safe for concrete use and part of the MailBox
- // interface.
- func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
- m.pktCond.L.Lock()
- switch htlc := pkt.htlc.(type) {
- // Split off Settle/Fail packets into the repPkts queue.
- case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC:
- if _, ok := m.repIndex[pkt.inKey()]; ok {
- m.pktCond.L.Unlock()
- return ErrPacketAlreadyExists
- }
- entry := m.repPkts.PushBack(pkt)
- m.repIndex[pkt.inKey()] = entry
- if m.repHead == nil {
- m.repHead = entry
- }
- // Split off Add packets into the addPkts queue.
- case *lnwire.UpdateAddHTLC:
- if _, ok := m.addIndex[pkt.inKey()]; ok {
- m.pktCond.L.Unlock()
- return ErrPacketAlreadyExists
- }
- entry := m.addPkts.PushBack(&pktWithExpiry{
- pkt: pkt,
- expiry: m.cfg.clock.Now().Add(m.cfg.expiry),
- })
- m.addIndex[pkt.inKey()] = entry
- if m.addHead == nil {
- m.addHead = entry
- }
- default:
- m.pktCond.L.Unlock()
- return fmt.Errorf("unknown htlc type: %T", htlc)
- }
- m.pktCond.L.Unlock()
- // With the packet added, we signal to the mailCourier that there are
- // additional packets to consume.
- m.pktCond.Signal()
- return nil
- }
- // SetFeeRate sets the memoryMailBox's feerate for use in DustPackets.
- func (m *memoryMailBox) SetFeeRate(feeRate chainfee.SatPerKWeight) {
- m.pktCond.L.Lock()
- defer m.pktCond.L.Unlock()
- m.feeRate = feeRate
- }
- // SetDustClosure sets the memoryMailBox's dustClosure for use in DustPackets.
- func (m *memoryMailBox) SetDustClosure(isDust dustClosure) {
- m.pktCond.L.Lock()
- defer m.pktCond.L.Unlock()
- m.isDust = isDust
- }
- // DustPackets returns the dust sum for add packets in the mailbox. The first
- // return value is the local dust sum and the second is the remote dust sum.
- // This will keep track of a given dust HTLC from the time it is added via
- // AddPacket until it is removed via AckPacket.
- func (m *memoryMailBox) DustPackets() (lnwire.MilliSatoshi,
- lnwire.MilliSatoshi) {
- m.pktCond.L.Lock()
- defer m.pktCond.L.Unlock()
- var (
- localDustSum lnwire.MilliSatoshi
- remoteDustSum lnwire.MilliSatoshi
- )
- // Run through the map of HTLC's and determine the dust sum with calls
- // to the memoryMailBox's isDust closure. Note that all mailbox packets
- // are outgoing so the second argument to isDust will be false.
- for _, e := range m.addIndex {
- addPkt := e.Value.(*pktWithExpiry).pkt
- // Evaluate whether this HTLC is dust on the local commitment.
- if m.isDust(
- m.feeRate, false, true, addPkt.amount.ToSatoshis(),
- ) {
- localDustSum += addPkt.amount
- }
- // Evaluate whether this HTLC is dust on the remote commitment.
- if m.isDust(
- m.feeRate, false, false, addPkt.amount.ToSatoshis(),
- ) {
- remoteDustSum += addPkt.amount
- }
- }
- return localDustSum, remoteDustSum
- }
- // FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it
- // from the in-memory replay buffer. This will prevent the packet from being
- // delivered after the link restarts if the switch has remained online. The
- // generated LinkError will show an OutgoingFailureDownstreamHtlcAdd
- // FailureDetail.
- func (m *memoryMailBox) FailAdd(pkt *htlcPacket) {
- // First, remove the packet from mailbox. If we didn't find the packet
- // because it has already been acked, we'll exit early to avoid sending
- // a duplicate fail message through the switch.
- if !m.AckPacket(pkt.inKey()) {
- return
- }
- var (
- localFailure = false
- reason lnwire.OpaqueReason
- )
- // Create a temporary channel failure which we will send back to our
- // peer if this is a forward, or report to the user if the failed
- // payment was locally initiated.
- failure := m.cfg.failMailboxUpdate(
- pkt.originalOutgoingChanID, m.cfg.shortChanID,
- )
- // If the payment was locally initiated (which is indicated by a nil
- // obfuscator), we do not need to encrypt it back to the sender.
- if pkt.obfuscator == nil {
- var b bytes.Buffer
- err := lnwire.EncodeFailure(&b, failure, 0)
- if err != nil {
- log.Errorf("Unable to encode failure: %v", err)
- return
- }
- reason = lnwire.OpaqueReason(b.Bytes())
- localFailure = true
- } else {
- // If the packet is part of a forward, (identified by a non-nil
- // obfuscator) we need to encrypt the error back to the source.
- var err error
- reason, err = pkt.obfuscator.EncryptFirstHop(failure)
- if err != nil {
- log.Errorf("Unable to obfuscate error: %v", err)
- return
- }
- }
- // Create a link error containing the temporary channel failure and a
- // detail which indicates the we failed to add the htlc.
- linkError := NewDetailedLinkError(
- failure, OutgoingFailureDownstreamHtlcAdd,
- )
- failPkt := &htlcPacket{
- incomingChanID: pkt.incomingChanID,
- incomingHTLCID: pkt.incomingHTLCID,
- circuit: pkt.circuit,
- sourceRef: pkt.sourceRef,
- hasSource: true,
- localFailure: localFailure,
- obfuscator: pkt.obfuscator,
- linkFailure: linkError,
- htlc: &lnwire.UpdateFailHTLC{
- Reason: reason,
- },
- }
- if err := m.cfg.forwardPackets(m.quit, failPkt); err != nil {
- log.Errorf("Unhandled error while reforwarding packets "+
- "settle/fail over htlcswitch: %v", err)
- }
- }
- // MessageOutBox returns a channel that any new messages ready for delivery
- // will be sent on.
- //
- // NOTE: This method is part of the MailBox interface.
- func (m *memoryMailBox) MessageOutBox() chan lnwire.Message {
- return m.messageOutbox
- }
- // PacketOutBox returns a channel that any new packets ready for delivery will
- // be sent on.
- //
- // NOTE: This method is part of the MailBox interface.
- func (m *memoryMailBox) PacketOutBox() chan *htlcPacket {
- return m.pktOutbox
- }
- // mailOrchestrator is responsible for coordinating the creation and lifecycle
- // of mailboxes used within the switch. It supports the ability to create
- // mailboxes, reassign their short channel id's, deliver htlc packets, and
- // queue packets for mailboxes that have not been created due to a link's late
- // registration.
- type mailOrchestrator struct {
- mu sync.RWMutex
- cfg *mailOrchConfig
- // mailboxes caches exactly one mailbox for all known channels.
- mailboxes map[lnwire.ChannelID]MailBox
- // liveIndex maps a live short chan id to the primary mailbox key.
- // An index in liveIndex map is only entered under two conditions:
- // 1. A link has a non-zero short channel id at time of AddLink.
- // 2. A link receives a non-zero short channel via UpdateShortChanID.
- liveIndex map[lnwire.ShortChannelID]lnwire.ChannelID
- // TODO(conner): add another pair of indexes:
- // chan_id -> short_chan_id
- // short_chan_id -> mailbox
- // so that Deliver can lookup mailbox directly once live,
- // but still queryable by channel_id.
- // unclaimedPackets maps a live short chan id to queue of packets if no
- // mailbox has been created.
- unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket
- }
- type mailOrchConfig struct {
- // forwardPackets send a varidic number of htlcPackets to the switch to
- // be routed. A quit channel should be provided so that the call can
- // properly exit during shutdown.
- forwardPackets func(chan struct{}, ...*htlcPacket) error
- // clock is a time source for the generated mailboxes.
- clock clock.Clock
- // expiry is the interval after which Adds will be cancelled if they
- // have not been yet been delivered. The computed deadline will expiry
- // this long after the Adds are added to a mailbox via AddPacket.
- expiry time.Duration
- // failMailboxUpdate is used to fail an expired HTLC and use the
- // correct SCID if the underlying channel uses aliases.
- failMailboxUpdate func(outScid,
- mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
- }
- // newMailOrchestrator initializes a fresh mailOrchestrator.
- func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator {
- return &mailOrchestrator{
- cfg: cfg,
- mailboxes: make(map[lnwire.ChannelID]MailBox),
- liveIndex: make(map[lnwire.ShortChannelID]lnwire.ChannelID),
- unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket),
- }
- }
- // Stop instructs the orchestrator to stop all active mailboxes.
- func (mo *mailOrchestrator) Stop() {
- for _, mailbox := range mo.mailboxes {
- mailbox.Stop()
- }
- }
- // GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or
- // creates and returns a new mailbox if none is found.
- func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID,
- shortChanID lnwire.ShortChannelID) MailBox {
- // First, try lookup the mailbox directly using only the shared mutex.
- mo.mu.RLock()
- mailbox, ok := mo.mailboxes[chanID]
- if ok {
- mo.mu.RUnlock()
- return mailbox
- }
- mo.mu.RUnlock()
- // Otherwise, we will try again with exclusive lock, creating a mailbox
- // if one still has not been created.
- mo.mu.Lock()
- mailbox = mo.exclusiveGetOrCreateMailBox(chanID, shortChanID)
- mo.mu.Unlock()
- return mailbox
- }
- // exclusiveGetOrCreateMailBox checks for the existence of a mailbox for the
- // given channel id. If none is found, a new one is creates, started, and
- // recorded.
- //
- // NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock.
- func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox(
- chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox {
- mailbox, ok := mo.mailboxes[chanID]
- if !ok {
- mailbox = newMemoryMailBox(&mailBoxConfig{
- shortChanID: shortChanID,
- forwardPackets: mo.cfg.forwardPackets,
- clock: mo.cfg.clock,
- expiry: mo.cfg.expiry,
- failMailboxUpdate: mo.cfg.failMailboxUpdate,
- })
- mailbox.Start()
- mo.mailboxes[chanID] = mailbox
- }
- return mailbox
- }
- // BindLiveShortChanID registers that messages bound for a particular short
- // channel id should be forwarded to the mailbox corresponding to the given
- // channel id. This method also checks to see if there are any unclaimed
- // packets for this short_chan_id. If any are found, they are delivered to the
- // mailbox and removed (marked as claimed).
- func (mo *mailOrchestrator) BindLiveShortChanID(mailbox MailBox,
- cid lnwire.ChannelID, sid lnwire.ShortChannelID) {
- mo.mu.Lock()
- // Update the mapping from short channel id to mailbox's channel id.
- mo.liveIndex[sid] = cid
- // Retrieve any unclaimed packets destined for this mailbox.
- pkts := mo.unclaimedPackets[sid]
- delete(mo.unclaimedPackets, sid)
- mo.mu.Unlock()
- // Deliver the unclaimed packets.
- for _, pkt := range pkts {
- mailbox.AddPacket(pkt)
- }
- }
- // Deliver lookups the target mailbox using the live index from short_chan_id
- // to channel_id. If the mailbox is found, the message is delivered directly.
- // Otherwise the packet is recorded as unclaimed, and will be delivered to the
- // mailbox upon the subsequent call to BindLiveShortChanID.
- func (mo *mailOrchestrator) Deliver(
- sid lnwire.ShortChannelID, pkt *htlcPacket) error {
- var (
- mailbox MailBox
- found bool
- )
- // First, try to find the channel id for the target short_chan_id. If
- // the link is live, we will also look up the created mailbox.
- mo.mu.RLock()
- chanID, isLive := mo.liveIndex[sid]
- if isLive {
- mailbox, found = mo.mailboxes[chanID]
- }
- mo.mu.RUnlock()
- // The link is live and target mailbox was found, deliver immediately.
- if isLive && found {
- return mailbox.AddPacket(pkt)
- }
- // If we detected that the link has not been made live, we will acquire
- // the exclusive lock preemptively in order to queue this packet in the
- // list of unclaimed packets.
- mo.mu.Lock()
- // Double check to see if the mailbox has been not made live since the
- // release of the shared lock.
- //
- // NOTE: Checking again with the exclusive lock held prevents a race
- // condition where BindLiveShortChanID is interleaved between the
- // release of the shared lock, and acquiring the exclusive lock. The
- // result would be stuck packets, as they wouldn't be redelivered until
- // the next call to BindLiveShortChanID, which is expected to occur
- // infrequently.
- chanID, isLive = mo.liveIndex[sid]
- if isLive {
- // Reaching this point indicates the mailbox is actually live.
- // We'll try to load the mailbox using the fresh channel id.
- //
- // NOTE: This should never create a new mailbox, as the live
- // index should only be set if the mailbox had been initialized
- // beforehand. However, this does ensure that this case is
- // handled properly in the event that it could happen.
- mailbox = mo.exclusiveGetOrCreateMailBox(chanID, sid)
- mo.mu.Unlock()
- // Deliver the packet to the mailbox if it was found or created.
- return mailbox.AddPacket(pkt)
- }
- // Finally, if the channel id is still not found in the live index,
- // we'll add this to the list of unclaimed packets. These will be
- // delivered upon the next call to BindLiveShortChanID.
- mo.unclaimedPackets[sid] = append(mo.unclaimedPackets[sid], pkt)
- mo.mu.Unlock()
- return nil
- }
|