mailbox.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971
  1. package htlcswitch
  2. import (
  3. "bytes"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "sync"
  8. "time"
  9. "github.com/lightningnetwork/lnd/clock"
  10. "github.com/lightningnetwork/lnd/lnwallet/chainfee"
  11. "github.com/lightningnetwork/lnd/lnwire"
  12. )
  13. var (
  14. // ErrMailBoxShuttingDown is returned when the mailbox is interrupted by
  15. // a shutdown request.
  16. ErrMailBoxShuttingDown = errors.New("mailbox is shutting down")
  17. // ErrPacketAlreadyExists signals that an attempt to add a packet failed
  18. // because it already exists in the mailbox.
  19. ErrPacketAlreadyExists = errors.New("mailbox already has packet")
  20. )
  21. // MailBox is an interface which represents a concurrent-safe, in-order
  22. // delivery queue for messages from the network and also from the main switch.
  23. // This struct serves as a buffer between incoming messages, and messages to
  24. // the handled by the link. Each of the mutating methods within this interface
  25. // should be implemented in a non-blocking manner.
  26. type MailBox interface {
  27. // AddMessage appends a new message to the end of the message queue.
  28. AddMessage(msg lnwire.Message) error
  29. // AddPacket appends a new message to the end of the packet queue.
  30. AddPacket(pkt *htlcPacket) error
  31. // HasPacket queries the packets for a circuit key, this is used to drop
  32. // packets bound for the switch that already have a queued response.
  33. HasPacket(CircuitKey) bool
  34. // AckPacket removes a packet from the mailboxes in-memory replay
  35. // buffer. This will prevent a packet from being delivered after a link
  36. // restarts if the switch has remained online. The returned boolean
  37. // indicates whether or not a packet with the passed incoming circuit
  38. // key was removed.
  39. AckPacket(CircuitKey) bool
  40. // FailAdd fails an UpdateAddHTLC that exists within the mailbox,
  41. // removing it from the in-memory replay buffer. This will prevent the
  42. // packet from being delivered after the link restarts if the switch has
  43. // remained online. The generated LinkError will show an
  44. // OutgoingFailureDownstreamHtlcAdd FailureDetail.
  45. FailAdd(pkt *htlcPacket)
  46. // MessageOutBox returns a channel that any new messages ready for
  47. // delivery will be sent on.
  48. MessageOutBox() chan lnwire.Message
  49. // PacketOutBox returns a channel that any new packets ready for
  50. // delivery will be sent on.
  51. PacketOutBox() chan *htlcPacket
  52. // Clears any pending wire messages from the inbox.
  53. ResetMessages() error
  54. // Reset the packet head to point at the first element in the list.
  55. ResetPackets() error
  56. // SetDustClosure takes in a closure that is used to evaluate whether
  57. // mailbox HTLC's are dust.
  58. SetDustClosure(isDust dustClosure)
  59. // SetFeeRate sets the feerate to be used when evaluating dust.
  60. SetFeeRate(feerate chainfee.SatPerKWeight)
  61. // DustPackets returns the dust sum for Adds in the mailbox for the
  62. // local and remote commitments.
  63. DustPackets() (lnwire.MilliSatoshi, lnwire.MilliSatoshi)
  64. // Start starts the mailbox and any goroutines it needs to operate
  65. // properly.
  66. Start()
  67. // Stop signals the mailbox and its goroutines for a graceful shutdown.
  68. Stop()
  69. }
  70. type mailBoxConfig struct {
  71. // shortChanID is the short channel id of the channel this mailbox
  72. // belongs to.
  73. shortChanID lnwire.ShortChannelID
  74. // forwardPackets send a varidic number of htlcPackets to the switch to
  75. // be routed. A quit channel should be provided so that the call can
  76. // properly exit during shutdown.
  77. forwardPackets func(chan struct{}, ...*htlcPacket) error
  78. // clock is a time source for the mailbox.
  79. clock clock.Clock
  80. // expiry is the interval after which Adds will be cancelled if they
  81. // have not been yet been delivered. The computed deadline will expiry
  82. // this long after the Adds are added via AddPacket.
  83. expiry time.Duration
  84. // failMailboxUpdate is used to fail an expired HTLC and use the
  85. // correct SCID if the underlying channel uses aliases.
  86. failMailboxUpdate func(outScid,
  87. mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
  88. }
  89. // memoryMailBox is an implementation of the MailBox struct backed by purely
  90. // in-memory queues.
  91. //
  92. // TODO(morehouse): use typed lists instead of list.Lists to avoid type asserts.
  93. type memoryMailBox struct {
  94. started sync.Once
  95. stopped sync.Once
  96. cfg *mailBoxConfig
  97. wireMessages *list.List
  98. wireMtx sync.Mutex
  99. wireCond *sync.Cond
  100. messageOutbox chan lnwire.Message
  101. msgReset chan chan struct{}
  102. // repPkts is a queue for reply packets, e.g. Settles and Fails.
  103. repPkts *list.List
  104. repIndex map[CircuitKey]*list.Element
  105. repHead *list.Element
  106. // addPkts is a dedicated queue for Adds.
  107. addPkts *list.List
  108. addIndex map[CircuitKey]*list.Element
  109. addHead *list.Element
  110. pktMtx sync.Mutex
  111. pktCond *sync.Cond
  112. pktOutbox chan *htlcPacket
  113. pktReset chan chan struct{}
  114. wireShutdown chan struct{}
  115. pktShutdown chan struct{}
  116. quit chan struct{}
  117. // feeRate is set when the link receives or sends out fee updates. It
  118. // is refreshed when AttachMailBox is called in case a fee update did
  119. // not get committed. In some cases it may be out of sync with the
  120. // channel's feerate, but it should eventually get back in sync.
  121. feeRate chainfee.SatPerKWeight
  122. // isDust is set when AttachMailBox is called and serves to evaluate
  123. // the outstanding dust in the memoryMailBox given the current set
  124. // feeRate.
  125. isDust dustClosure
  126. }
  127. // newMemoryMailBox creates a new instance of the memoryMailBox.
  128. func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox {
  129. box := &memoryMailBox{
  130. cfg: cfg,
  131. wireMessages: list.New(),
  132. repPkts: list.New(),
  133. addPkts: list.New(),
  134. messageOutbox: make(chan lnwire.Message),
  135. pktOutbox: make(chan *htlcPacket),
  136. msgReset: make(chan chan struct{}, 1),
  137. pktReset: make(chan chan struct{}, 1),
  138. repIndex: make(map[CircuitKey]*list.Element),
  139. addIndex: make(map[CircuitKey]*list.Element),
  140. wireShutdown: make(chan struct{}),
  141. pktShutdown: make(chan struct{}),
  142. quit: make(chan struct{}),
  143. }
  144. box.wireCond = sync.NewCond(&box.wireMtx)
  145. box.pktCond = sync.NewCond(&box.pktMtx)
  146. return box
  147. }
  148. // A compile time assertion to ensure that memoryMailBox meets the MailBox
  149. // interface.
  150. var _ MailBox = (*memoryMailBox)(nil)
  151. // courierType is an enum that reflects the distinct types of messages a
  152. // MailBox can handle. Each type will be placed in an isolated mail box and
  153. // will have a dedicated goroutine for delivering the messages.
  154. type courierType uint8
  155. const (
  156. // wireCourier is a type of courier that handles wire messages.
  157. wireCourier courierType = iota
  158. // pktCourier is a type of courier that handles htlc packets.
  159. pktCourier
  160. )
  161. // Start starts the mailbox and any goroutines it needs to operate properly.
  162. //
  163. // NOTE: This method is part of the MailBox interface.
  164. func (m *memoryMailBox) Start() {
  165. m.started.Do(func() {
  166. go m.wireMailCourier()
  167. go m.pktMailCourier()
  168. })
  169. }
  170. // ResetMessages blocks until all buffered wire messages are cleared.
  171. func (m *memoryMailBox) ResetMessages() error {
  172. msgDone := make(chan struct{})
  173. select {
  174. case m.msgReset <- msgDone:
  175. return m.signalUntilReset(wireCourier, msgDone)
  176. case <-m.quit:
  177. return ErrMailBoxShuttingDown
  178. }
  179. }
  180. // ResetPackets blocks until the head of packets buffer is reset, causing the
  181. // packets to be redelivered in order.
  182. func (m *memoryMailBox) ResetPackets() error {
  183. pktDone := make(chan struct{})
  184. select {
  185. case m.pktReset <- pktDone:
  186. return m.signalUntilReset(pktCourier, pktDone)
  187. case <-m.quit:
  188. return ErrMailBoxShuttingDown
  189. }
  190. }
  191. // signalUntilReset strobes the condition variable for the specified inbox type
  192. // until receiving a response that the mailbox has processed a reset.
  193. func (m *memoryMailBox) signalUntilReset(cType courierType,
  194. done chan struct{}) error {
  195. for {
  196. switch cType {
  197. case wireCourier:
  198. m.wireCond.Signal()
  199. case pktCourier:
  200. m.pktCond.Signal()
  201. }
  202. select {
  203. case <-time.After(time.Millisecond):
  204. continue
  205. case <-done:
  206. return nil
  207. case <-m.quit:
  208. return ErrMailBoxShuttingDown
  209. }
  210. }
  211. }
  212. // AckPacket removes the packet identified by it's incoming circuit key from the
  213. // queue of packets to be delivered. The returned boolean indicates whether or
  214. // not a packet with the passed incoming circuit key was removed.
  215. //
  216. // NOTE: It is safe to call this method multiple times for the same circuit key.
  217. func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
  218. m.pktCond.L.Lock()
  219. defer m.pktCond.L.Unlock()
  220. if entry, ok := m.repIndex[inKey]; ok {
  221. // Check whether we are removing the head of the queue. If so,
  222. // we must advance the head to the next packet before removing.
  223. // It's possible that the courier has already advanced the
  224. // repHead, so this check prevents the repHead from getting
  225. // desynchronized.
  226. if entry == m.repHead {
  227. m.repHead = entry.Next()
  228. }
  229. m.repPkts.Remove(entry)
  230. delete(m.repIndex, inKey)
  231. return true
  232. }
  233. if entry, ok := m.addIndex[inKey]; ok {
  234. // Check whether we are removing the head of the queue. If so,
  235. // we must advance the head to the next add before removing.
  236. // It's possible that the courier has already advanced the
  237. // addHead, so this check prevents the addHead from getting
  238. // desynchronized.
  239. //
  240. // NOTE: While this event is rare for Settles or Fails, it could
  241. // be very common for Adds since the mailbox has the ability to
  242. // cancel Adds before they are delivered. When that occurs, the
  243. // head of addPkts has only been peeked and we expect to be
  244. // removing the head of the queue.
  245. if entry == m.addHead {
  246. m.addHead = entry.Next()
  247. }
  248. m.addPkts.Remove(entry)
  249. delete(m.addIndex, inKey)
  250. return true
  251. }
  252. return false
  253. }
  254. // HasPacket queries the packets for a circuit key, this is used to drop packets
  255. // bound for the switch that already have a queued response.
  256. func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
  257. m.pktCond.L.Lock()
  258. _, ok := m.repIndex[inKey]
  259. m.pktCond.L.Unlock()
  260. return ok
  261. }
  262. // Stop signals the mailbox and its goroutines for a graceful shutdown.
  263. //
  264. // NOTE: This method is part of the MailBox interface.
  265. func (m *memoryMailBox) Stop() {
  266. m.stopped.Do(func() {
  267. close(m.quit)
  268. m.signalUntilShutdown(wireCourier)
  269. m.signalUntilShutdown(pktCourier)
  270. })
  271. }
  272. // signalUntilShutdown strobes the condition variable of the passed courier
  273. // type, blocking until the worker has exited.
  274. func (m *memoryMailBox) signalUntilShutdown(cType courierType) {
  275. var (
  276. cond *sync.Cond
  277. shutdown chan struct{}
  278. )
  279. switch cType {
  280. case wireCourier:
  281. cond = m.wireCond
  282. shutdown = m.wireShutdown
  283. case pktCourier:
  284. cond = m.pktCond
  285. shutdown = m.pktShutdown
  286. }
  287. for {
  288. select {
  289. case <-time.After(time.Millisecond):
  290. cond.Signal()
  291. case <-shutdown:
  292. return
  293. }
  294. }
  295. }
  296. // pktWithExpiry wraps an incoming packet and records the time at which it it
  297. // should be canceled from the mailbox. This will be used to detect if it gets
  298. // stuck in the mailbox and inform when to cancel back.
  299. type pktWithExpiry struct {
  300. pkt *htlcPacket
  301. expiry time.Time
  302. }
  303. func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
  304. return clock.TickAfter(p.expiry.Sub(clock.Now()))
  305. }
  306. // wireMailCourier is a dedicated goroutine whose job is to reliably deliver
  307. // wire messages.
  308. func (m *memoryMailBox) wireMailCourier() {
  309. defer close(m.wireShutdown)
  310. for {
  311. // First, we'll check our condition. If our mailbox is empty,
  312. // then we'll wait until a new item is added.
  313. m.wireCond.L.Lock()
  314. for m.wireMessages.Front() == nil {
  315. m.wireCond.Wait()
  316. select {
  317. case msgDone := <-m.msgReset:
  318. m.wireMessages.Init()
  319. close(msgDone)
  320. case <-m.quit:
  321. m.wireCond.L.Unlock()
  322. return
  323. default:
  324. }
  325. }
  326. // Grab the datum off the front of the queue, shifting the
  327. // slice's reference down one in order to remove the datum from
  328. // the queue.
  329. entry := m.wireMessages.Front()
  330. //nolint:forcetypeassert
  331. nextMsg := m.wireMessages.Remove(entry).(lnwire.Message)
  332. // Now that we're done with the condition, we can unlock it to
  333. // allow any callers to append to the end of our target queue.
  334. m.wireCond.L.Unlock()
  335. // With the next message obtained, we'll now select to attempt
  336. // to deliver the message. If we receive a kill signal, then
  337. // we'll bail out.
  338. select {
  339. case m.messageOutbox <- nextMsg:
  340. case msgDone := <-m.msgReset:
  341. m.wireCond.L.Lock()
  342. m.wireMessages.Init()
  343. m.wireCond.L.Unlock()
  344. close(msgDone)
  345. case <-m.quit:
  346. return
  347. }
  348. }
  349. }
  350. // pktMailCourier is a dedicated goroutine whose job is to reliably deliver
  351. // packet messages.
  352. func (m *memoryMailBox) pktMailCourier() {
  353. defer close(m.pktShutdown)
  354. for {
  355. // First, we'll check our condition. If our mailbox is empty,
  356. // then we'll wait until a new item is added.
  357. m.pktCond.L.Lock()
  358. for m.repHead == nil && m.addHead == nil {
  359. m.pktCond.Wait()
  360. select {
  361. // Resetting the packet queue means just moving our
  362. // pointer to the front. This ensures that any un-ACK'd
  363. // messages are re-delivered upon reconnect.
  364. case pktDone := <-m.pktReset:
  365. m.repHead = m.repPkts.Front()
  366. m.addHead = m.addPkts.Front()
  367. close(pktDone)
  368. case <-m.quit:
  369. m.pktCond.L.Unlock()
  370. return
  371. default:
  372. }
  373. }
  374. var (
  375. nextRep *htlcPacket
  376. nextRepEl *list.Element
  377. nextAdd *pktWithExpiry
  378. nextAddEl *list.Element
  379. )
  380. // For packets, we actually never remove an item until it has
  381. // been ACK'd by the link. This ensures that if a read packet
  382. // doesn't make it into a commitment, then it'll be
  383. // re-delivered once the link comes back online.
  384. // Peek at the head of the Settle/Fails and Add queues. We peak
  385. // both even if there is a Settle/Fail present because we need
  386. // to set a deadline for the next pending Add if it's present.
  387. // Due to clock monotonicity, we know that the head of the Adds
  388. // is the next to expire.
  389. if m.repHead != nil {
  390. //nolint:forcetypeassert
  391. nextRep = m.repHead.Value.(*htlcPacket)
  392. nextRepEl = m.repHead
  393. }
  394. if m.addHead != nil {
  395. //nolint:forcetypeassert
  396. nextAdd = m.addHead.Value.(*pktWithExpiry)
  397. nextAddEl = m.addHead
  398. }
  399. // Now that we're done with the condition, we can unlock it to
  400. // allow any callers to append to the end of our target queue.
  401. m.pktCond.L.Unlock()
  402. var (
  403. pktOutbox chan *htlcPacket
  404. addOutbox chan *htlcPacket
  405. add *htlcPacket
  406. deadline <-chan time.Time
  407. )
  408. // Prioritize delivery of Settle/Fail packets over Adds. This
  409. // ensures that we actively clear the commitment of existing
  410. // HTLCs before trying to add new ones. This can help to improve
  411. // forwarding performance since the time to sign a commitment is
  412. // linear in the number of HTLCs manifested on the commitments.
  413. //
  414. // NOTE: Both types are eventually delivered over the same
  415. // channel, but we can control which is delivered by exclusively
  416. // making one nil and the other non-nil. We know from our loop
  417. // condition that at least one nextRep and nextAdd are non-nil.
  418. if nextRep != nil {
  419. pktOutbox = m.pktOutbox
  420. } else {
  421. addOutbox = m.pktOutbox
  422. }
  423. // If we have a pending Add, we'll also construct the deadline
  424. // so we can fail it back if we are unable to deliver any
  425. // message in time. We also dereference the nextAdd's packet,
  426. // since we will need access to it in the case we are delivering
  427. // it and/or if the deadline expires.
  428. //
  429. // NOTE: It's possible after this point for add to be nil, but
  430. // this can only occur when addOutbox is also nil, hence we
  431. // won't accidentally deliver a nil packet.
  432. if nextAdd != nil {
  433. add = nextAdd.pkt
  434. deadline = nextAdd.deadline(m.cfg.clock)
  435. }
  436. select {
  437. case pktOutbox <- nextRep:
  438. m.pktCond.L.Lock()
  439. // Only advance the repHead if this Settle or Fail is
  440. // still at the head of the queue.
  441. if m.repHead != nil && m.repHead == nextRepEl {
  442. m.repHead = m.repHead.Next()
  443. }
  444. m.pktCond.L.Unlock()
  445. case addOutbox <- add:
  446. m.pktCond.L.Lock()
  447. // Only advance the addHead if this Add is still at the
  448. // head of the queue.
  449. if m.addHead != nil && m.addHead == nextAddEl {
  450. m.addHead = m.addHead.Next()
  451. }
  452. m.pktCond.L.Unlock()
  453. case <-deadline:
  454. log.Debugf("Expiring add htlc with "+
  455. "keystone=%v", add.keystone())
  456. m.FailAdd(add)
  457. case pktDone := <-m.pktReset:
  458. m.pktCond.L.Lock()
  459. m.repHead = m.repPkts.Front()
  460. m.addHead = m.addPkts.Front()
  461. m.pktCond.L.Unlock()
  462. close(pktDone)
  463. case <-m.quit:
  464. return
  465. }
  466. }
  467. }
  468. // AddMessage appends a new message to the end of the message queue.
  469. //
  470. // NOTE: This method is safe for concrete use and part of the MailBox
  471. // interface.
  472. func (m *memoryMailBox) AddMessage(msg lnwire.Message) error {
  473. // First, we'll lock the condition, and add the message to the end of
  474. // the wire message inbox.
  475. m.wireCond.L.Lock()
  476. m.wireMessages.PushBack(msg)
  477. m.wireCond.L.Unlock()
  478. // With the message added, we signal to the mailCourier that there are
  479. // additional messages to deliver.
  480. m.wireCond.Signal()
  481. return nil
  482. }
  483. // AddPacket appends a new message to the end of the packet queue.
  484. //
  485. // NOTE: This method is safe for concrete use and part of the MailBox
  486. // interface.
  487. func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
  488. m.pktCond.L.Lock()
  489. switch htlc := pkt.htlc.(type) {
  490. // Split off Settle/Fail packets into the repPkts queue.
  491. case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC:
  492. if _, ok := m.repIndex[pkt.inKey()]; ok {
  493. m.pktCond.L.Unlock()
  494. return ErrPacketAlreadyExists
  495. }
  496. entry := m.repPkts.PushBack(pkt)
  497. m.repIndex[pkt.inKey()] = entry
  498. if m.repHead == nil {
  499. m.repHead = entry
  500. }
  501. // Split off Add packets into the addPkts queue.
  502. case *lnwire.UpdateAddHTLC:
  503. if _, ok := m.addIndex[pkt.inKey()]; ok {
  504. m.pktCond.L.Unlock()
  505. return ErrPacketAlreadyExists
  506. }
  507. entry := m.addPkts.PushBack(&pktWithExpiry{
  508. pkt: pkt,
  509. expiry: m.cfg.clock.Now().Add(m.cfg.expiry),
  510. })
  511. m.addIndex[pkt.inKey()] = entry
  512. if m.addHead == nil {
  513. m.addHead = entry
  514. }
  515. default:
  516. m.pktCond.L.Unlock()
  517. return fmt.Errorf("unknown htlc type: %T", htlc)
  518. }
  519. m.pktCond.L.Unlock()
  520. // With the packet added, we signal to the mailCourier that there are
  521. // additional packets to consume.
  522. m.pktCond.Signal()
  523. return nil
  524. }
  525. // SetFeeRate sets the memoryMailBox's feerate for use in DustPackets.
  526. func (m *memoryMailBox) SetFeeRate(feeRate chainfee.SatPerKWeight) {
  527. m.pktCond.L.Lock()
  528. defer m.pktCond.L.Unlock()
  529. m.feeRate = feeRate
  530. }
  531. // SetDustClosure sets the memoryMailBox's dustClosure for use in DustPackets.
  532. func (m *memoryMailBox) SetDustClosure(isDust dustClosure) {
  533. m.pktCond.L.Lock()
  534. defer m.pktCond.L.Unlock()
  535. m.isDust = isDust
  536. }
  537. // DustPackets returns the dust sum for add packets in the mailbox. The first
  538. // return value is the local dust sum and the second is the remote dust sum.
  539. // This will keep track of a given dust HTLC from the time it is added via
  540. // AddPacket until it is removed via AckPacket.
  541. func (m *memoryMailBox) DustPackets() (lnwire.MilliSatoshi,
  542. lnwire.MilliSatoshi) {
  543. m.pktCond.L.Lock()
  544. defer m.pktCond.L.Unlock()
  545. var (
  546. localDustSum lnwire.MilliSatoshi
  547. remoteDustSum lnwire.MilliSatoshi
  548. )
  549. // Run through the map of HTLC's and determine the dust sum with calls
  550. // to the memoryMailBox's isDust closure. Note that all mailbox packets
  551. // are outgoing so the second argument to isDust will be false.
  552. for _, e := range m.addIndex {
  553. addPkt := e.Value.(*pktWithExpiry).pkt
  554. // Evaluate whether this HTLC is dust on the local commitment.
  555. if m.isDust(
  556. m.feeRate, false, true, addPkt.amount.ToSatoshis(),
  557. ) {
  558. localDustSum += addPkt.amount
  559. }
  560. // Evaluate whether this HTLC is dust on the remote commitment.
  561. if m.isDust(
  562. m.feeRate, false, false, addPkt.amount.ToSatoshis(),
  563. ) {
  564. remoteDustSum += addPkt.amount
  565. }
  566. }
  567. return localDustSum, remoteDustSum
  568. }
  569. // FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it
  570. // from the in-memory replay buffer. This will prevent the packet from being
  571. // delivered after the link restarts if the switch has remained online. The
  572. // generated LinkError will show an OutgoingFailureDownstreamHtlcAdd
  573. // FailureDetail.
  574. func (m *memoryMailBox) FailAdd(pkt *htlcPacket) {
  575. // First, remove the packet from mailbox. If we didn't find the packet
  576. // because it has already been acked, we'll exit early to avoid sending
  577. // a duplicate fail message through the switch.
  578. if !m.AckPacket(pkt.inKey()) {
  579. return
  580. }
  581. var (
  582. localFailure = false
  583. reason lnwire.OpaqueReason
  584. )
  585. // Create a temporary channel failure which we will send back to our
  586. // peer if this is a forward, or report to the user if the failed
  587. // payment was locally initiated.
  588. failure := m.cfg.failMailboxUpdate(
  589. pkt.originalOutgoingChanID, m.cfg.shortChanID,
  590. )
  591. // If the payment was locally initiated (which is indicated by a nil
  592. // obfuscator), we do not need to encrypt it back to the sender.
  593. if pkt.obfuscator == nil {
  594. var b bytes.Buffer
  595. err := lnwire.EncodeFailure(&b, failure, 0)
  596. if err != nil {
  597. log.Errorf("Unable to encode failure: %v", err)
  598. return
  599. }
  600. reason = lnwire.OpaqueReason(b.Bytes())
  601. localFailure = true
  602. } else {
  603. // If the packet is part of a forward, (identified by a non-nil
  604. // obfuscator) we need to encrypt the error back to the source.
  605. var err error
  606. reason, err = pkt.obfuscator.EncryptFirstHop(failure)
  607. if err != nil {
  608. log.Errorf("Unable to obfuscate error: %v", err)
  609. return
  610. }
  611. }
  612. // Create a link error containing the temporary channel failure and a
  613. // detail which indicates the we failed to add the htlc.
  614. linkError := NewDetailedLinkError(
  615. failure, OutgoingFailureDownstreamHtlcAdd,
  616. )
  617. failPkt := &htlcPacket{
  618. incomingChanID: pkt.incomingChanID,
  619. incomingHTLCID: pkt.incomingHTLCID,
  620. circuit: pkt.circuit,
  621. sourceRef: pkt.sourceRef,
  622. hasSource: true,
  623. localFailure: localFailure,
  624. obfuscator: pkt.obfuscator,
  625. linkFailure: linkError,
  626. htlc: &lnwire.UpdateFailHTLC{
  627. Reason: reason,
  628. },
  629. }
  630. if err := m.cfg.forwardPackets(m.quit, failPkt); err != nil {
  631. log.Errorf("Unhandled error while reforwarding packets "+
  632. "settle/fail over htlcswitch: %v", err)
  633. }
  634. }
  635. // MessageOutBox returns a channel that any new messages ready for delivery
  636. // will be sent on.
  637. //
  638. // NOTE: This method is part of the MailBox interface.
  639. func (m *memoryMailBox) MessageOutBox() chan lnwire.Message {
  640. return m.messageOutbox
  641. }
  642. // PacketOutBox returns a channel that any new packets ready for delivery will
  643. // be sent on.
  644. //
  645. // NOTE: This method is part of the MailBox interface.
  646. func (m *memoryMailBox) PacketOutBox() chan *htlcPacket {
  647. return m.pktOutbox
  648. }
  649. // mailOrchestrator is responsible for coordinating the creation and lifecycle
  650. // of mailboxes used within the switch. It supports the ability to create
  651. // mailboxes, reassign their short channel id's, deliver htlc packets, and
  652. // queue packets for mailboxes that have not been created due to a link's late
  653. // registration.
  654. type mailOrchestrator struct {
  655. mu sync.RWMutex
  656. cfg *mailOrchConfig
  657. // mailboxes caches exactly one mailbox for all known channels.
  658. mailboxes map[lnwire.ChannelID]MailBox
  659. // liveIndex maps a live short chan id to the primary mailbox key.
  660. // An index in liveIndex map is only entered under two conditions:
  661. // 1. A link has a non-zero short channel id at time of AddLink.
  662. // 2. A link receives a non-zero short channel via UpdateShortChanID.
  663. liveIndex map[lnwire.ShortChannelID]lnwire.ChannelID
  664. // TODO(conner): add another pair of indexes:
  665. // chan_id -> short_chan_id
  666. // short_chan_id -> mailbox
  667. // so that Deliver can lookup mailbox directly once live,
  668. // but still queryable by channel_id.
  669. // unclaimedPackets maps a live short chan id to queue of packets if no
  670. // mailbox has been created.
  671. unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket
  672. }
  673. type mailOrchConfig struct {
  674. // forwardPackets send a varidic number of htlcPackets to the switch to
  675. // be routed. A quit channel should be provided so that the call can
  676. // properly exit during shutdown.
  677. forwardPackets func(chan struct{}, ...*htlcPacket) error
  678. // clock is a time source for the generated mailboxes.
  679. clock clock.Clock
  680. // expiry is the interval after which Adds will be cancelled if they
  681. // have not been yet been delivered. The computed deadline will expiry
  682. // this long after the Adds are added to a mailbox via AddPacket.
  683. expiry time.Duration
  684. // failMailboxUpdate is used to fail an expired HTLC and use the
  685. // correct SCID if the underlying channel uses aliases.
  686. failMailboxUpdate func(outScid,
  687. mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
  688. }
  689. // newMailOrchestrator initializes a fresh mailOrchestrator.
  690. func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator {
  691. return &mailOrchestrator{
  692. cfg: cfg,
  693. mailboxes: make(map[lnwire.ChannelID]MailBox),
  694. liveIndex: make(map[lnwire.ShortChannelID]lnwire.ChannelID),
  695. unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket),
  696. }
  697. }
  698. // Stop instructs the orchestrator to stop all active mailboxes.
  699. func (mo *mailOrchestrator) Stop() {
  700. for _, mailbox := range mo.mailboxes {
  701. mailbox.Stop()
  702. }
  703. }
  704. // GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or
  705. // creates and returns a new mailbox if none is found.
  706. func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID,
  707. shortChanID lnwire.ShortChannelID) MailBox {
  708. // First, try lookup the mailbox directly using only the shared mutex.
  709. mo.mu.RLock()
  710. mailbox, ok := mo.mailboxes[chanID]
  711. if ok {
  712. mo.mu.RUnlock()
  713. return mailbox
  714. }
  715. mo.mu.RUnlock()
  716. // Otherwise, we will try again with exclusive lock, creating a mailbox
  717. // if one still has not been created.
  718. mo.mu.Lock()
  719. mailbox = mo.exclusiveGetOrCreateMailBox(chanID, shortChanID)
  720. mo.mu.Unlock()
  721. return mailbox
  722. }
  723. // exclusiveGetOrCreateMailBox checks for the existence of a mailbox for the
  724. // given channel id. If none is found, a new one is creates, started, and
  725. // recorded.
  726. //
  727. // NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock.
  728. func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox(
  729. chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox {
  730. mailbox, ok := mo.mailboxes[chanID]
  731. if !ok {
  732. mailbox = newMemoryMailBox(&mailBoxConfig{
  733. shortChanID: shortChanID,
  734. forwardPackets: mo.cfg.forwardPackets,
  735. clock: mo.cfg.clock,
  736. expiry: mo.cfg.expiry,
  737. failMailboxUpdate: mo.cfg.failMailboxUpdate,
  738. })
  739. mailbox.Start()
  740. mo.mailboxes[chanID] = mailbox
  741. }
  742. return mailbox
  743. }
  744. // BindLiveShortChanID registers that messages bound for a particular short
  745. // channel id should be forwarded to the mailbox corresponding to the given
  746. // channel id. This method also checks to see if there are any unclaimed
  747. // packets for this short_chan_id. If any are found, they are delivered to the
  748. // mailbox and removed (marked as claimed).
  749. func (mo *mailOrchestrator) BindLiveShortChanID(mailbox MailBox,
  750. cid lnwire.ChannelID, sid lnwire.ShortChannelID) {
  751. mo.mu.Lock()
  752. // Update the mapping from short channel id to mailbox's channel id.
  753. mo.liveIndex[sid] = cid
  754. // Retrieve any unclaimed packets destined for this mailbox.
  755. pkts := mo.unclaimedPackets[sid]
  756. delete(mo.unclaimedPackets, sid)
  757. mo.mu.Unlock()
  758. // Deliver the unclaimed packets.
  759. for _, pkt := range pkts {
  760. mailbox.AddPacket(pkt)
  761. }
  762. }
  763. // Deliver lookups the target mailbox using the live index from short_chan_id
  764. // to channel_id. If the mailbox is found, the message is delivered directly.
  765. // Otherwise the packet is recorded as unclaimed, and will be delivered to the
  766. // mailbox upon the subsequent call to BindLiveShortChanID.
  767. func (mo *mailOrchestrator) Deliver(
  768. sid lnwire.ShortChannelID, pkt *htlcPacket) error {
  769. var (
  770. mailbox MailBox
  771. found bool
  772. )
  773. // First, try to find the channel id for the target short_chan_id. If
  774. // the link is live, we will also look up the created mailbox.
  775. mo.mu.RLock()
  776. chanID, isLive := mo.liveIndex[sid]
  777. if isLive {
  778. mailbox, found = mo.mailboxes[chanID]
  779. }
  780. mo.mu.RUnlock()
  781. // The link is live and target mailbox was found, deliver immediately.
  782. if isLive && found {
  783. return mailbox.AddPacket(pkt)
  784. }
  785. // If we detected that the link has not been made live, we will acquire
  786. // the exclusive lock preemptively in order to queue this packet in the
  787. // list of unclaimed packets.
  788. mo.mu.Lock()
  789. // Double check to see if the mailbox has been not made live since the
  790. // release of the shared lock.
  791. //
  792. // NOTE: Checking again with the exclusive lock held prevents a race
  793. // condition where BindLiveShortChanID is interleaved between the
  794. // release of the shared lock, and acquiring the exclusive lock. The
  795. // result would be stuck packets, as they wouldn't be redelivered until
  796. // the next call to BindLiveShortChanID, which is expected to occur
  797. // infrequently.
  798. chanID, isLive = mo.liveIndex[sid]
  799. if isLive {
  800. // Reaching this point indicates the mailbox is actually live.
  801. // We'll try to load the mailbox using the fresh channel id.
  802. //
  803. // NOTE: This should never create a new mailbox, as the live
  804. // index should only be set if the mailbox had been initialized
  805. // beforehand. However, this does ensure that this case is
  806. // handled properly in the event that it could happen.
  807. mailbox = mo.exclusiveGetOrCreateMailBox(chanID, sid)
  808. mo.mu.Unlock()
  809. // Deliver the packet to the mailbox if it was found or created.
  810. return mailbox.AddPacket(pkt)
  811. }
  812. // Finally, if the channel id is still not found in the live index,
  813. // we'll add this to the list of unclaimed packets. These will be
  814. // delivered upon the next call to BindLiveShortChanID.
  815. mo.unclaimedPackets[sid] = append(mo.unclaimedPackets[sid], pkt)
  816. mo.mu.Unlock()
  817. return nil
  818. }