circuit_map.go 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213
  1. package htlcswitch
  2. import (
  3. "bytes"
  4. "fmt"
  5. "sync"
  6. "github.com/davecgh/go-spew/spew"
  7. "github.com/go-errors/errors"
  8. "github.com/lightningnetwork/lnd/channeldb"
  9. "github.com/lightningnetwork/lnd/htlcswitch/hop"
  10. "github.com/lightningnetwork/lnd/kvdb"
  11. "github.com/lightningnetwork/lnd/lnwire"
  12. )
  13. var (
  14. // ErrCorruptedCircuitMap indicates that the on-disk bucketing structure
  15. // has altered since the circuit map instance was initialized.
  16. ErrCorruptedCircuitMap = errors.New("circuit map has been corrupted")
  17. // ErrCircuitNotInHashIndex indicates that a particular circuit did not
  18. // appear in the in-memory hash index.
  19. ErrCircuitNotInHashIndex = errors.New("payment circuit not found in " +
  20. "hash index")
  21. // ErrUnknownCircuit signals that circuit could not be removed from the
  22. // map because it was not found.
  23. ErrUnknownCircuit = errors.New("unknown payment circuit")
  24. // ErrCircuitClosing signals that an htlc has already closed this
  25. // circuit in-memory.
  26. ErrCircuitClosing = errors.New("circuit has already been closed")
  27. // ErrDuplicateCircuit signals that this circuit was previously
  28. // added.
  29. ErrDuplicateCircuit = errors.New("duplicate circuit add")
  30. // ErrUnknownKeystone signals that no circuit was found using the
  31. // outgoing circuit key.
  32. ErrUnknownKeystone = errors.New("unknown circuit keystone")
  33. // ErrDuplicateKeystone signals that this circuit was previously
  34. // assigned a keystone.
  35. ErrDuplicateKeystone = errors.New("cannot add duplicate keystone")
  36. )
  37. // CircuitModifier is a common interface used by channel links to modify the
  38. // contents of the circuit map maintained by the switch.
  39. type CircuitModifier interface {
  40. // OpenCircuits preemptively records a batch keystones that will mark
  41. // currently pending circuits as open. These changes can be rolled back
  42. // on restart if the outgoing Adds do not make it into a commitment
  43. // txn.
  44. OpenCircuits(...Keystone) error
  45. // TrimOpenCircuits removes a channel's open channels with htlc indexes
  46. // above `start`.
  47. TrimOpenCircuits(chanID lnwire.ShortChannelID, start uint64) error
  48. // DeleteCircuits removes the incoming circuit key to remove all
  49. // persistent references to a circuit. Returns a ErrUnknownCircuit if
  50. // any of the incoming keys are not known.
  51. DeleteCircuits(inKeys ...CircuitKey) error
  52. }
  53. // CircuitLookup is a common interface used to lookup information that is stored
  54. // in the circuit map.
  55. type CircuitLookup interface {
  56. // LookupCircuit queries the circuit map for the circuit identified by
  57. // inKey.
  58. LookupCircuit(inKey CircuitKey) *PaymentCircuit
  59. // LookupOpenCircuit queries the circuit map for a circuit identified
  60. // by its outgoing circuit key.
  61. LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit
  62. }
  63. // CircuitFwdActions represents the forwarding decision made by the circuit
  64. // map, and is returned from CommitCircuits. The sequence of circuits provided
  65. // to CommitCircuits is split into three sub-sequences, allowing the caller to
  66. // do an in-order scan, comparing the head of each subsequence, to determine
  67. // the decision made by the circuit map.
  68. type CircuitFwdActions struct {
  69. // Adds is the subsequence of circuits that were successfully committed
  70. // in the circuit map.
  71. Adds []*PaymentCircuit
  72. // Drops is the subsequence of circuits for which no action should be
  73. // done.
  74. Drops []*PaymentCircuit
  75. // Fails is the subsequence of circuits that should be failed back by
  76. // the calling link.
  77. Fails []*PaymentCircuit
  78. }
  79. // CircuitMap is an interface for managing the construction and teardown of
  80. // payment circuits used by the switch.
  81. type CircuitMap interface {
  82. CircuitModifier
  83. CircuitLookup
  84. // CommitCircuits attempts to add the given circuits to the circuit
  85. // map. The list of circuits is split into three distinct
  86. // sub-sequences, corresponding to adds, drops, and fails. Adds should
  87. // be forwarded to the switch, while fails should be failed back
  88. // locally within the calling link.
  89. CommitCircuits(circuit ...*PaymentCircuit) (*CircuitFwdActions, error)
  90. // CloseCircuit marks the circuit identified by `outKey` as closing
  91. // in-memory, which prevents duplicate settles/fails from completing an
  92. // open circuit twice.
  93. CloseCircuit(outKey CircuitKey) (*PaymentCircuit, error)
  94. // FailCircuit is used by locally failed HTLCs to mark the circuit
  95. // identified by `inKey` as closing in-memory, which prevents duplicate
  96. // settles/fails from being accepted for the same circuit.
  97. FailCircuit(inKey CircuitKey) (*PaymentCircuit, error)
  98. // LookupByPaymentHash queries the circuit map and returns all open
  99. // circuits that use the given payment hash.
  100. LookupByPaymentHash(hash [32]byte) []*PaymentCircuit
  101. // NumPending returns the total number of active circuits added by
  102. // CommitCircuits.
  103. NumPending() int
  104. // NumOpen returns the number of circuits with HTLCs that have been
  105. // forwarded via an outgoing link.
  106. NumOpen() int
  107. }
  108. var (
  109. // circuitAddKey is the key used to retrieve the bucket containing
  110. // payment circuits. A circuit records information about how to return
  111. // a packet to the source link, potentially including an error
  112. // encrypter for applying this hop's encryption to the payload in the
  113. // reverse direction.
  114. //
  115. // Bucket hierarchy:
  116. //
  117. // circuitAddKey(root-bucket)
  118. // |
  119. // |-- <incoming-circuit-key>: <encoded bytes of PaymentCircuit>
  120. // |-- <incoming-circuit-key>: <encoded bytes of PaymentCircuit>
  121. // |
  122. // ...
  123. //
  124. circuitAddKey = []byte("circuit-adds")
  125. // circuitKeystoneKey is used to retrieve the bucket containing circuit
  126. // keystones, which are set in place once a forwarded packet is
  127. // assigned an index on an outgoing commitment txn.
  128. //
  129. // Bucket hierarchy:
  130. //
  131. // circuitKeystoneKey(root-bucket)
  132. // |
  133. // |-- <outgoing-circuit-key>: <incoming-circuit-key>
  134. // |-- <outgoing-circuit-key>: <incoming-circuit-key>
  135. // |
  136. // ...
  137. //
  138. circuitKeystoneKey = []byte("circuit-keystones")
  139. )
  140. // circuitMap is a data structure that implements thread safe, persistent
  141. // storage of circuit routing information. The switch consults a circuit map to
  142. // determine where to forward returning HTLC update messages. Circuits are
  143. // always identifiable by their incoming CircuitKey, in addition to their
  144. // outgoing CircuitKey if the circuit is fully-opened.
  145. type circuitMap struct {
  146. cfg *CircuitMapConfig
  147. mtx sync.RWMutex
  148. // pending is an in-memory mapping of all half payment circuits, and is
  149. // kept in sync with the on-disk contents of the circuit map.
  150. pending map[CircuitKey]*PaymentCircuit
  151. // opened is an in-memory mapping of all full payment circuits, which
  152. // is also synchronized with the persistent state of the circuit map.
  153. opened map[CircuitKey]*PaymentCircuit
  154. // closed is an in-memory set of circuits for which the switch has
  155. // received a settle or fail. This precedes the actual deletion of a
  156. // circuit from disk.
  157. closed map[CircuitKey]struct{}
  158. // hashIndex is a volatile index that facilitates fast queries by
  159. // payment hash against the contents of circuits. This index can be
  160. // reconstructed entirely from the set of persisted full circuits on
  161. // startup.
  162. hashIndex map[[32]byte]map[CircuitKey]struct{}
  163. }
  164. // CircuitMapConfig houses the critical interfaces and references necessary to
  165. // parameterize an instance of circuitMap.
  166. type CircuitMapConfig struct {
  167. // DB provides the persistent storage engine for the circuit map.
  168. DB kvdb.Backend
  169. // FetchAllOpenChannels is a function that fetches all currently open
  170. // channels from the channel database.
  171. FetchAllOpenChannels func() ([]*channeldb.OpenChannel, error)
  172. // FetchClosedChannels is a function that fetches all closed channels
  173. // from the channel database.
  174. FetchClosedChannels func(
  175. pendingOnly bool) ([]*channeldb.ChannelCloseSummary, error)
  176. // ExtractErrorEncrypter derives the shared secret used to encrypt
  177. // errors from the obfuscator's ephemeral public key.
  178. ExtractErrorEncrypter hop.ErrorEncrypterExtracter
  179. // CheckResolutionMsg checks whether a given resolution message exists
  180. // for the passed CircuitKey.
  181. CheckResolutionMsg func(outKey *CircuitKey) error
  182. }
  183. // NewCircuitMap creates a new instance of the circuitMap.
  184. func NewCircuitMap(cfg *CircuitMapConfig) (CircuitMap, error) {
  185. cm := &circuitMap{
  186. cfg: cfg,
  187. }
  188. // Initialize the on-disk buckets used by the circuit map.
  189. if err := cm.initBuckets(); err != nil {
  190. return nil, err
  191. }
  192. // Delete old circuits and keystones of closed channels.
  193. if err := cm.cleanClosedChannels(); err != nil {
  194. return nil, err
  195. }
  196. // Load any previously persisted circuit into back into memory.
  197. if err := cm.restoreMemState(); err != nil {
  198. return nil, err
  199. }
  200. // Trim any keystones that were not committed in an outgoing commit txn.
  201. //
  202. // NOTE: This operation will be applied to the persistent state of all
  203. // active channels. Therefore, it must be called before any links are
  204. // created to avoid interfering with normal operation.
  205. if err := cm.trimAllOpenCircuits(); err != nil {
  206. return nil, err
  207. }
  208. return cm, nil
  209. }
  210. // initBuckets ensures that the primary buckets used by the circuit are
  211. // initialized so that we can assume their existence after startup.
  212. func (cm *circuitMap) initBuckets() error {
  213. return kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
  214. _, err := tx.CreateTopLevelBucket(circuitKeystoneKey)
  215. if err != nil {
  216. return err
  217. }
  218. _, err = tx.CreateTopLevelBucket(circuitAddKey)
  219. return err
  220. }, func() {})
  221. }
  222. // cleanClosedChannels deletes all circuits and keystones related to closed
  223. // channels. It first reads all the closed channels and caches the ShortChanIDs
  224. // into a map for fast lookup. Then it iterates the circuit bucket and keystone
  225. // bucket and deletes items whose ChanID matches the ShortChanID.
  226. //
  227. // NOTE: this operation can also be built into restoreMemState since the latter
  228. // already opens and iterates the two root buckets, circuitAddKey and
  229. // circuitKeystoneKey. Depending on the size of the buckets, this marginal gain
  230. // may be worth investigating. Atm, for clarity, this operation is wrapped into
  231. // its own function.
  232. func (cm *circuitMap) cleanClosedChannels() error {
  233. log.Infof("Cleaning circuits from disk for closed channels")
  234. // closedChanIDSet stores the short channel IDs for closed channels.
  235. closedChanIDSet := make(map[lnwire.ShortChannelID]struct{})
  236. // circuitKeySet stores the incoming circuit keys of the payment
  237. // circuits that need to be deleted.
  238. circuitKeySet := make(map[CircuitKey]struct{})
  239. // keystoneKeySet stores the outgoing keys of the keystones that need
  240. // to be deleted.
  241. keystoneKeySet := make(map[CircuitKey]struct{})
  242. // isClosedChannel is a helper closure that returns a bool indicating
  243. // the chanID belongs to a closed channel.
  244. isClosedChannel := func(chanID lnwire.ShortChannelID) bool {
  245. // Skip if the channel ID is zero value. This has the effect
  246. // that a zero value incoming or outgoing key will never be
  247. // matched and its corresponding circuits or keystones are not
  248. // deleted.
  249. if chanID.ToUint64() == 0 {
  250. return false
  251. }
  252. _, ok := closedChanIDSet[chanID]
  253. return ok
  254. }
  255. // Find closed channels and cache their ShortChannelIDs into a map.
  256. // This map will be used for looking up relative circuits and keystones.
  257. closedChannels, err := cm.cfg.FetchClosedChannels(false)
  258. if err != nil {
  259. return err
  260. }
  261. for _, closedChannel := range closedChannels {
  262. // Skip if the channel close is pending.
  263. if closedChannel.IsPending {
  264. continue
  265. }
  266. closedChanIDSet[closedChannel.ShortChanID] = struct{}{}
  267. }
  268. log.Debugf("Found %v closed channels", len(closedChanIDSet))
  269. // Exit early if there are no closed channels.
  270. if len(closedChanIDSet) == 0 {
  271. log.Infof("Finished cleaning: no closed channels found, " +
  272. "no actions taken.",
  273. )
  274. return nil
  275. }
  276. // Find the payment circuits and keystones that need to be deleted.
  277. if err := kvdb.View(cm.cfg.DB, func(tx kvdb.RTx) error {
  278. circuitBkt := tx.ReadBucket(circuitAddKey)
  279. if circuitBkt == nil {
  280. return ErrCorruptedCircuitMap
  281. }
  282. keystoneBkt := tx.ReadBucket(circuitKeystoneKey)
  283. if keystoneBkt == nil {
  284. return ErrCorruptedCircuitMap
  285. }
  286. // If a circuit's incoming/outgoing key prefix matches the
  287. // ShortChanID, it will be deleted. However, if the ShortChanID
  288. // of the incoming key is zero, the circuit will be kept as it
  289. // indicates a locally initiated payment.
  290. if err := circuitBkt.ForEach(func(_, v []byte) error {
  291. circuit, err := cm.decodeCircuit(v)
  292. if err != nil {
  293. return err
  294. }
  295. // Check if the incoming channel ID can be found in the
  296. // closed channel ID map.
  297. if !isClosedChannel(circuit.Incoming.ChanID) {
  298. return nil
  299. }
  300. circuitKeySet[circuit.Incoming] = struct{}{}
  301. return nil
  302. }); err != nil {
  303. return err
  304. }
  305. // If a keystone's InKey or OutKey matches the short channel id
  306. // in the closed channel ID map, it will be deleted.
  307. err := keystoneBkt.ForEach(func(k, v []byte) error {
  308. var (
  309. inKey CircuitKey
  310. outKey CircuitKey
  311. )
  312. // Decode the incoming and outgoing circuit keys.
  313. if err := inKey.SetBytes(v); err != nil {
  314. return err
  315. }
  316. if err := outKey.SetBytes(k); err != nil {
  317. return err
  318. }
  319. // Check if the incoming channel ID can be found in the
  320. // closed channel ID map.
  321. if isClosedChannel(inKey.ChanID) {
  322. // If the incoming channel is closed, we can
  323. // skip checking on outgoing channel ID because
  324. // this keystone will be deleted.
  325. keystoneKeySet[outKey] = struct{}{}
  326. // Technically the incoming keys found in
  327. // keystone bucket should be a subset of
  328. // circuit bucket. So a previous loop should
  329. // have this inKey put inside circuitAddKey map
  330. // already. We do this again to be sure the
  331. // circuits are properly cleaned. Even this
  332. // inKey doesn't exist in circuit bucket, we
  333. // are fine as db deletion is a noop.
  334. circuitKeySet[inKey] = struct{}{}
  335. return nil
  336. }
  337. // Check if the outgoing channel ID can be found in the
  338. // closed channel ID map. Notice that we need to store
  339. // the outgoing key because it's used for db query.
  340. //
  341. // NOTE: We skip this if a resolution message can be
  342. // found under the outKey. This means that there is an
  343. // existing resolution message(s) that need to get to
  344. // the incoming links.
  345. if isClosedChannel(outKey.ChanID) {
  346. // Check the resolution message store. A return
  347. // value of nil means we need to skip deleting
  348. // these circuits.
  349. if cm.cfg.CheckResolutionMsg(&outKey) == nil {
  350. return nil
  351. }
  352. keystoneKeySet[outKey] = struct{}{}
  353. // Also update circuitKeySet to mark the
  354. // payment circuit needs to be deleted.
  355. circuitKeySet[inKey] = struct{}{}
  356. }
  357. return nil
  358. })
  359. return err
  360. }, func() {
  361. // Reset the sets.
  362. circuitKeySet = make(map[CircuitKey]struct{})
  363. keystoneKeySet = make(map[CircuitKey]struct{})
  364. }); err != nil {
  365. return err
  366. }
  367. log.Debugf("To be deleted: num_circuits=%v, num_keystones=%v",
  368. len(circuitKeySet), len(keystoneKeySet),
  369. )
  370. numCircuitsDeleted := 0
  371. numKeystonesDeleted := 0
  372. // Delete all the circuits and keystones for closed channels.
  373. if err := kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
  374. circuitBkt := tx.ReadWriteBucket(circuitAddKey)
  375. if circuitBkt == nil {
  376. return ErrCorruptedCircuitMap
  377. }
  378. keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
  379. if keystoneBkt == nil {
  380. return ErrCorruptedCircuitMap
  381. }
  382. // Delete the circuit.
  383. for inKey := range circuitKeySet {
  384. if err := circuitBkt.Delete(inKey.Bytes()); err != nil {
  385. return err
  386. }
  387. numCircuitsDeleted++
  388. }
  389. // Delete the keystone using the outgoing key.
  390. for outKey := range keystoneKeySet {
  391. err := keystoneBkt.Delete(outKey.Bytes())
  392. if err != nil {
  393. return err
  394. }
  395. numKeystonesDeleted++
  396. }
  397. return nil
  398. }, func() {}); err != nil {
  399. numCircuitsDeleted = 0
  400. numKeystonesDeleted = 0
  401. return err
  402. }
  403. log.Infof("Finished cleaning: num_closed_channel=%v, "+
  404. "num_circuits=%v, num_keystone=%v",
  405. len(closedChannels), numCircuitsDeleted, numKeystonesDeleted,
  406. )
  407. return nil
  408. }
  409. // restoreMemState loads the contents of the half circuit and full circuit
  410. // buckets from disk and reconstructs the in-memory representation of the
  411. // circuit map. Afterwards, the state of the hash index is reconstructed using
  412. // the recovered set of full circuits. This method will also remove any stray
  413. // keystones, which are those that appear fully-opened, but have no pending
  414. // circuit related to the intended incoming link.
  415. func (cm *circuitMap) restoreMemState() error {
  416. log.Infof("Restoring in-memory circuit state from disk")
  417. var (
  418. opened map[CircuitKey]*PaymentCircuit
  419. pending map[CircuitKey]*PaymentCircuit
  420. )
  421. if err := kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
  422. // Restore any of the circuits persisted in the circuit bucket
  423. // back into memory.
  424. circuitBkt := tx.ReadWriteBucket(circuitAddKey)
  425. if circuitBkt == nil {
  426. return ErrCorruptedCircuitMap
  427. }
  428. if err := circuitBkt.ForEach(func(_, v []byte) error {
  429. circuit, err := cm.decodeCircuit(v)
  430. if err != nil {
  431. return err
  432. }
  433. circuit.LoadedFromDisk = true
  434. pending[circuit.Incoming] = circuit
  435. return nil
  436. }); err != nil {
  437. return err
  438. }
  439. // Furthermore, load the keystone bucket and resurrect the
  440. // keystones used in any open circuits.
  441. keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
  442. if keystoneBkt == nil {
  443. return ErrCorruptedCircuitMap
  444. }
  445. var strayKeystones []Keystone
  446. if err := keystoneBkt.ForEach(func(k, v []byte) error {
  447. var (
  448. inKey CircuitKey
  449. outKey = &CircuitKey{}
  450. )
  451. // Decode the incoming and outgoing circuit keys.
  452. if err := inKey.SetBytes(v); err != nil {
  453. return err
  454. }
  455. if err := outKey.SetBytes(k); err != nil {
  456. return err
  457. }
  458. // Retrieve the pending circuit, set its keystone, then
  459. // add it to the opened map.
  460. circuit, ok := pending[inKey]
  461. if ok {
  462. circuit.Outgoing = outKey
  463. opened[*outKey] = circuit
  464. } else {
  465. strayKeystones = append(strayKeystones, Keystone{
  466. InKey: inKey,
  467. OutKey: *outKey,
  468. })
  469. }
  470. return nil
  471. }); err != nil {
  472. return err
  473. }
  474. // If any stray keystones were found, we'll proceed to prune
  475. // them from the circuit map's persistent storage. This may
  476. // manifest on older nodes that had updated channels before
  477. // their short channel id was set properly. We believe this
  478. // issue has been fixed, though this will allow older nodes to
  479. // recover without additional intervention.
  480. for _, strayKeystone := range strayKeystones {
  481. // As a precaution, we will only cleanup keystones
  482. // related to locally-initiated payments. If a
  483. // documented case of stray keystones emerges for
  484. // forwarded payments, this check should be removed, but
  485. // with extreme caution.
  486. if strayKeystone.OutKey.ChanID != hop.Source {
  487. continue
  488. }
  489. log.Infof("Removing stray keystone: %v", strayKeystone)
  490. err := keystoneBkt.Delete(strayKeystone.OutKey.Bytes())
  491. if err != nil {
  492. return err
  493. }
  494. }
  495. return nil
  496. }, func() {
  497. opened = make(map[CircuitKey]*PaymentCircuit)
  498. pending = make(map[CircuitKey]*PaymentCircuit)
  499. }); err != nil {
  500. return err
  501. }
  502. cm.pending = pending
  503. cm.opened = opened
  504. cm.closed = make(map[CircuitKey]struct{})
  505. log.Infof("Payment circuits loaded: num_pending=%v, num_open=%v",
  506. len(pending), len(opened))
  507. // Finally, reconstruct the hash index by running through our set of
  508. // open circuits.
  509. cm.hashIndex = make(map[[32]byte]map[CircuitKey]struct{})
  510. for _, circuit := range opened {
  511. cm.addCircuitToHashIndex(circuit)
  512. }
  513. return nil
  514. }
  515. // decodeCircuit reconstructs an in-memory payment circuit from a byte slice.
  516. // The byte slice is assumed to have been generated by the circuit's Encode
  517. // method. If the decoding is successful, the onion obfuscator will be
  518. // reextracted, since it is not stored in plaintext on disk.
  519. func (cm *circuitMap) decodeCircuit(v []byte) (*PaymentCircuit, error) {
  520. var circuit = &PaymentCircuit{}
  521. circuitReader := bytes.NewReader(v)
  522. if err := circuit.Decode(circuitReader); err != nil {
  523. return nil, err
  524. }
  525. // If the error encrypter is nil, this is locally-source payment so
  526. // there is no encrypter.
  527. if circuit.ErrorEncrypter == nil {
  528. return circuit, nil
  529. }
  530. // Otherwise, we need to reextract the encrypter, so that the shared
  531. // secret is rederived from what was decoded.
  532. err := circuit.ErrorEncrypter.Reextract(
  533. cm.cfg.ExtractErrorEncrypter,
  534. )
  535. if err != nil {
  536. return nil, err
  537. }
  538. return circuit, nil
  539. }
  540. // trimAllOpenCircuits reads the set of active channels from disk and trims
  541. // keystones for any non-pending channels using the next unallocated htlc index.
  542. // This method is intended to be called on startup. Each link will also trim
  543. // it's own circuits upon startup.
  544. //
  545. // NOTE: This operation will be applied to the persistent state of all active
  546. // channels. Therefore, it must be called before any links are created to avoid
  547. // interfering with normal operation.
  548. func (cm *circuitMap) trimAllOpenCircuits() error {
  549. activeChannels, err := cm.cfg.FetchAllOpenChannels()
  550. if err != nil {
  551. return err
  552. }
  553. for _, activeChannel := range activeChannels {
  554. if activeChannel.IsPending {
  555. continue
  556. }
  557. // First, skip any channels that have not been assigned their
  558. // final channel identifier, otherwise we would try to trim
  559. // htlcs belonging to the all-zero, hop.Source ID.
  560. chanID := activeChannel.ShortChanID()
  561. if chanID == hop.Source {
  562. continue
  563. }
  564. // Next, retrieve the next unallocated htlc index, which bounds
  565. // the cutoff of confirmed htlc indexes.
  566. start, err := activeChannel.NextLocalHtlcIndex()
  567. if err != nil {
  568. return err
  569. }
  570. // Finally, remove all pending circuits above at or above the
  571. // next unallocated local htlc indexes. This has the effect of
  572. // reverting any circuits that have either not been locked in,
  573. // or had not been included in a pending commitment.
  574. err = cm.TrimOpenCircuits(chanID, start)
  575. if err != nil {
  576. return err
  577. }
  578. }
  579. return nil
  580. }
  581. // TrimOpenCircuits removes a channel's keystones above the short chan id's
  582. // highest committed htlc index. This has the effect of returning those
  583. // circuits to a half-open state. Since opening of circuits is done in advance
  584. // of actually committing the Add htlcs into a commitment txn, this allows
  585. // circuits to be opened preemptively, since we can roll them back after any
  586. // failures.
  587. func (cm *circuitMap) TrimOpenCircuits(chanID lnwire.ShortChannelID,
  588. start uint64) error {
  589. log.Infof("Trimming open circuits for chan_id=%v, start_htlc_id=%v",
  590. chanID, start)
  591. var trimmedOutKeys []CircuitKey
  592. // Scan forward from the last unacked htlc id, stopping as soon as we
  593. // don't find any more. Outgoing htlc id's must be assigned in order,
  594. // so there should never be disjoint segments of keystones to trim.
  595. cm.mtx.Lock()
  596. for i := start; ; i++ {
  597. outKey := CircuitKey{
  598. ChanID: chanID,
  599. HtlcID: i,
  600. }
  601. circuit, ok := cm.opened[outKey]
  602. if !ok {
  603. break
  604. }
  605. circuit.Outgoing = nil
  606. delete(cm.opened, outKey)
  607. trimmedOutKeys = append(trimmedOutKeys, outKey)
  608. cm.removeCircuitFromHashIndex(circuit)
  609. }
  610. cm.mtx.Unlock()
  611. if len(trimmedOutKeys) == 0 {
  612. return nil
  613. }
  614. return kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
  615. keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
  616. if keystoneBkt == nil {
  617. return ErrCorruptedCircuitMap
  618. }
  619. for _, outKey := range trimmedOutKeys {
  620. err := keystoneBkt.Delete(outKey.Bytes())
  621. if err != nil {
  622. return err
  623. }
  624. }
  625. return nil
  626. }, func() {})
  627. }
  628. // LookupCircuit queries the circuit map for the circuit identified by its
  629. // incoming circuit key. Returns nil if there is no such circuit.
  630. func (cm *circuitMap) LookupCircuit(inKey CircuitKey) *PaymentCircuit {
  631. cm.mtx.RLock()
  632. defer cm.mtx.RUnlock()
  633. return cm.pending[inKey]
  634. }
  635. // LookupOpenCircuit searches for the circuit identified by its outgoing circuit
  636. // key.
  637. func (cm *circuitMap) LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit {
  638. cm.mtx.RLock()
  639. defer cm.mtx.RUnlock()
  640. return cm.opened[outKey]
  641. }
  642. // LookupByPaymentHash looks up and returns any payment circuits with a given
  643. // payment hash.
  644. func (cm *circuitMap) LookupByPaymentHash(hash [32]byte) []*PaymentCircuit {
  645. cm.mtx.RLock()
  646. defer cm.mtx.RUnlock()
  647. var circuits []*PaymentCircuit
  648. if circuitSet, ok := cm.hashIndex[hash]; ok {
  649. // Iterate over the outgoing circuit keys found with this hash,
  650. // and retrieve the circuit from the opened map.
  651. circuits = make([]*PaymentCircuit, 0, len(circuitSet))
  652. for key := range circuitSet {
  653. if circuit, ok := cm.opened[key]; ok {
  654. circuits = append(circuits, circuit)
  655. }
  656. }
  657. }
  658. return circuits
  659. }
  660. // CommitCircuits accepts any number of circuits and persistently adds them to
  661. // the switch's circuit map. The method returns a list of circuits that had not
  662. // been seen prior by the switch. A link should only forward HTLCs corresponding
  663. // to the returned circuits to the switch.
  664. //
  665. // NOTE: This method uses batched writes to improve performance, gains will only
  666. // be realized if it is called concurrently from separate goroutines.
  667. func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) (
  668. *CircuitFwdActions, error) {
  669. inKeys := make([]CircuitKey, 0, len(circuits))
  670. for _, circuit := range circuits {
  671. inKeys = append(inKeys, circuit.Incoming)
  672. }
  673. log.Tracef("Committing fresh circuits: %v", newLogClosure(func() string {
  674. return spew.Sdump(inKeys)
  675. }))
  676. actions := &CircuitFwdActions{}
  677. // If an empty list was passed, return early to avoid grabbing the lock.
  678. if len(circuits) == 0 {
  679. return actions, nil
  680. }
  681. // First, we reconcile the provided circuits with our set of pending
  682. // circuits to construct a set of new circuits that need to be written
  683. // to disk. The circuit's pointer is stored so that we only permit this
  684. // exact circuit to be forwarded through the switch. If a circuit is
  685. // already pending, the htlc will be reforwarded by the switch.
  686. //
  687. // NOTE: We track an additional addFails subsequence, which permits us
  688. // to fail back all packets that weren't dropped if we encounter an
  689. // error when committing the circuits.
  690. cm.mtx.Lock()
  691. var adds, drops, fails, addFails []*PaymentCircuit
  692. for _, circuit := range circuits {
  693. inKey := circuit.InKey()
  694. if foundCircuit, ok := cm.pending[inKey]; ok {
  695. switch {
  696. // This circuit has a keystone, it's waiting for a
  697. // response from the remote peer on the outgoing link.
  698. // Drop it like it's hot, ensure duplicates get caught.
  699. case foundCircuit.HasKeystone():
  700. drops = append(drops, circuit)
  701. // If no keystone is set and the switch has not been
  702. // restarted, the corresponding packet should still be
  703. // in the outgoing link's mailbox. It will be delivered
  704. // if it comes online before the switch goes down.
  705. //
  706. // NOTE: Dropping here prevents a flapping, incoming
  707. // link from failing a duplicate add while it is still
  708. // in the server's memory mailboxes.
  709. case !foundCircuit.LoadedFromDisk:
  710. drops = append(drops, circuit)
  711. // Otherwise, the in-mem packet has been lost due to a
  712. // restart. It is now safe to send back a failure along
  713. // the incoming link. The incoming link should be able
  714. // detect and ignore duplicate packets of this type.
  715. default:
  716. fails = append(fails, circuit)
  717. addFails = append(addFails, circuit)
  718. }
  719. continue
  720. }
  721. cm.pending[inKey] = circuit
  722. adds = append(adds, circuit)
  723. addFails = append(addFails, circuit)
  724. }
  725. cm.mtx.Unlock()
  726. // If all circuits are dropped or failed, we are done.
  727. if len(adds) == 0 {
  728. actions.Drops = drops
  729. actions.Fails = fails
  730. return actions, nil
  731. }
  732. // Now, optimistically serialize the circuits to add.
  733. var bs = make([]bytes.Buffer, len(adds))
  734. for i, circuit := range adds {
  735. if err := circuit.Encode(&bs[i]); err != nil {
  736. actions.Drops = drops
  737. actions.Fails = addFails
  738. return actions, err
  739. }
  740. }
  741. // Write the entire batch of circuits to the persistent circuit bucket
  742. // using bolt's Batch write. This method must be called from multiple,
  743. // distinct goroutines to have any impact on performance.
  744. err := kvdb.Batch(cm.cfg.DB, func(tx kvdb.RwTx) error {
  745. circuitBkt := tx.ReadWriteBucket(circuitAddKey)
  746. if circuitBkt == nil {
  747. return ErrCorruptedCircuitMap
  748. }
  749. for i, circuit := range adds {
  750. inKeyBytes := circuit.InKey().Bytes()
  751. circuitBytes := bs[i].Bytes()
  752. err := circuitBkt.Put(inKeyBytes, circuitBytes)
  753. if err != nil {
  754. return err
  755. }
  756. }
  757. return nil
  758. })
  759. // Return if the write succeeded.
  760. if err == nil {
  761. actions.Adds = adds
  762. actions.Drops = drops
  763. actions.Fails = fails
  764. return actions, nil
  765. }
  766. // Otherwise, rollback the circuits added to the pending set if the
  767. // write failed.
  768. cm.mtx.Lock()
  769. for _, circuit := range adds {
  770. delete(cm.pending, circuit.InKey())
  771. }
  772. cm.mtx.Unlock()
  773. // Since our write failed, we will return the dropped packets and mark
  774. // all other circuits as failed.
  775. actions.Drops = drops
  776. actions.Fails = addFails
  777. return actions, err
  778. }
  779. // Keystone is a tuple binding an incoming and outgoing CircuitKey. Keystones
  780. // are preemptively written by an outgoing link before signing a new commitment
  781. // state, and cements which HTLCs we are awaiting a response from a remote
  782. // peer.
  783. type Keystone struct {
  784. InKey CircuitKey
  785. OutKey CircuitKey
  786. }
  787. // String returns a human readable description of the Keystone.
  788. func (k Keystone) String() string {
  789. return fmt.Sprintf("%s --> %s", k.InKey, k.OutKey)
  790. }
  791. // OpenCircuits sets the outgoing circuit key for the circuit identified by
  792. // inKey, persistently marking the circuit as opened. After the changes have
  793. // been persisted, the circuit map's in-memory indexes are updated so that this
  794. // circuit can be queried using LookupByKeystone or LookupByPaymentHash.
  795. func (cm *circuitMap) OpenCircuits(keystones ...Keystone) error {
  796. if len(keystones) == 0 {
  797. return nil
  798. }
  799. log.Tracef("Opening finalized circuits: %v", newLogClosure(func() string {
  800. return spew.Sdump(keystones)
  801. }))
  802. // Check that all keystones correspond to committed-but-unopened
  803. // circuits.
  804. cm.mtx.RLock()
  805. openedCircuits := make([]*PaymentCircuit, 0, len(keystones))
  806. for _, ks := range keystones {
  807. if _, ok := cm.opened[ks.OutKey]; ok {
  808. cm.mtx.RUnlock()
  809. return ErrDuplicateKeystone
  810. }
  811. circuit, ok := cm.pending[ks.InKey]
  812. if !ok {
  813. cm.mtx.RUnlock()
  814. return ErrUnknownCircuit
  815. }
  816. openedCircuits = append(openedCircuits, circuit)
  817. }
  818. cm.mtx.RUnlock()
  819. err := kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
  820. // Now, load the circuit bucket to which we will write the
  821. // already serialized circuit.
  822. keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
  823. if keystoneBkt == nil {
  824. return ErrCorruptedCircuitMap
  825. }
  826. for _, ks := range keystones {
  827. outBytes := ks.OutKey.Bytes()
  828. inBytes := ks.InKey.Bytes()
  829. err := keystoneBkt.Put(outBytes, inBytes)
  830. if err != nil {
  831. return err
  832. }
  833. }
  834. return nil
  835. }, func() {})
  836. if err != nil {
  837. return err
  838. }
  839. cm.mtx.Lock()
  840. for i, circuit := range openedCircuits {
  841. ks := keystones[i]
  842. // Since our persistent operation was successful, we can now
  843. // modify the in memory representations. Set the outgoing
  844. // circuit key on our pending circuit, add the same circuit to
  845. // set of opened circuits, and add this circuit to the hash
  846. // index.
  847. circuit.Outgoing = &CircuitKey{}
  848. *circuit.Outgoing = ks.OutKey
  849. cm.opened[ks.OutKey] = circuit
  850. cm.addCircuitToHashIndex(circuit)
  851. }
  852. cm.mtx.Unlock()
  853. return nil
  854. }
  855. // addCirciutToHashIndex inserts a circuit into the circuit map's hash index, so
  856. // that it can be queried using LookupByPaymentHash.
  857. func (cm *circuitMap) addCircuitToHashIndex(c *PaymentCircuit) {
  858. if _, ok := cm.hashIndex[c.PaymentHash]; !ok {
  859. cm.hashIndex[c.PaymentHash] = make(map[CircuitKey]struct{})
  860. }
  861. cm.hashIndex[c.PaymentHash][c.OutKey()] = struct{}{}
  862. }
  863. // FailCircuit marks the circuit identified by `inKey` as closing in-memory,
  864. // which prevents duplicate settles/fails from completing an open circuit twice.
  865. func (cm *circuitMap) FailCircuit(inKey CircuitKey) (*PaymentCircuit, error) {
  866. cm.mtx.Lock()
  867. defer cm.mtx.Unlock()
  868. circuit, ok := cm.pending[inKey]
  869. if !ok {
  870. return nil, ErrUnknownCircuit
  871. }
  872. _, ok = cm.closed[inKey]
  873. if ok {
  874. return nil, ErrCircuitClosing
  875. }
  876. cm.closed[inKey] = struct{}{}
  877. return circuit, nil
  878. }
  879. // CloseCircuit marks the circuit identified by `outKey` as closing in-memory,
  880. // which prevents duplicate settles/fails from completing an open
  881. // circuit twice.
  882. func (cm *circuitMap) CloseCircuit(outKey CircuitKey) (*PaymentCircuit, error) {
  883. cm.mtx.Lock()
  884. defer cm.mtx.Unlock()
  885. circuit, ok := cm.opened[outKey]
  886. if !ok {
  887. return nil, ErrUnknownCircuit
  888. }
  889. _, ok = cm.closed[circuit.Incoming]
  890. if ok {
  891. return nil, ErrCircuitClosing
  892. }
  893. cm.closed[circuit.Incoming] = struct{}{}
  894. return circuit, nil
  895. }
  896. // DeleteCircuits destroys the target circuits by removing them from the circuit
  897. // map, additionally removing the circuits' keystones if any HTLCs were
  898. // forwarded through an outgoing link. The circuits should be identified by its
  899. // incoming circuit key. If a given circuit is not found in the circuit map, it
  900. // will be ignored from the query. This would typically indicate that the
  901. // circuit was already cleaned up at a different point in time.
  902. func (cm *circuitMap) DeleteCircuits(inKeys ...CircuitKey) error {
  903. log.Tracef("Deleting resolved circuits: %v", newLogClosure(func() string {
  904. return spew.Sdump(inKeys)
  905. }))
  906. var (
  907. closingCircuits = make(map[CircuitKey]struct{})
  908. removedCircuits = make(map[CircuitKey]*PaymentCircuit)
  909. )
  910. cm.mtx.Lock()
  911. // Remove any references to the circuits from memory, keeping track of
  912. // which circuits were removed, and which ones had been marked closed.
  913. // This can be used to restore these entries later if the persistent
  914. // removal fails.
  915. for _, inKey := range inKeys {
  916. circuit, ok := cm.pending[inKey]
  917. if !ok {
  918. continue
  919. }
  920. delete(cm.pending, inKey)
  921. if _, ok := cm.closed[inKey]; ok {
  922. closingCircuits[inKey] = struct{}{}
  923. delete(cm.closed, inKey)
  924. }
  925. if circuit.HasKeystone() {
  926. delete(cm.opened, circuit.OutKey())
  927. cm.removeCircuitFromHashIndex(circuit)
  928. }
  929. removedCircuits[inKey] = circuit
  930. }
  931. cm.mtx.Unlock()
  932. err := kvdb.Batch(cm.cfg.DB, func(tx kvdb.RwTx) error {
  933. for _, circuit := range removedCircuits {
  934. // If this htlc made it to an outgoing link, load the
  935. // keystone bucket from which we will remove the
  936. // outgoing circuit key.
  937. if circuit.HasKeystone() {
  938. keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
  939. if keystoneBkt == nil {
  940. return ErrCorruptedCircuitMap
  941. }
  942. outKey := circuit.OutKey()
  943. err := keystoneBkt.Delete(outKey.Bytes())
  944. if err != nil {
  945. return err
  946. }
  947. }
  948. // Remove the circuit itself based on the incoming
  949. // circuit key.
  950. circuitBkt := tx.ReadWriteBucket(circuitAddKey)
  951. if circuitBkt == nil {
  952. return ErrCorruptedCircuitMap
  953. }
  954. inKey := circuit.InKey()
  955. if err := circuitBkt.Delete(inKey.Bytes()); err != nil {
  956. return err
  957. }
  958. }
  959. return nil
  960. })
  961. // Return if the write succeeded.
  962. if err == nil {
  963. return nil
  964. }
  965. // If the persistent changes failed, restore the circuit map to it's
  966. // previous state.
  967. cm.mtx.Lock()
  968. for inKey, circuit := range removedCircuits {
  969. cm.pending[inKey] = circuit
  970. if _, ok := closingCircuits[inKey]; ok {
  971. cm.closed[inKey] = struct{}{}
  972. }
  973. if circuit.HasKeystone() {
  974. cm.opened[circuit.OutKey()] = circuit
  975. cm.addCircuitToHashIndex(circuit)
  976. }
  977. }
  978. cm.mtx.Unlock()
  979. return err
  980. }
  981. // removeCircuitFromHashIndex removes the given circuit from the hash index,
  982. // pruning any unnecessary memory optimistically.
  983. func (cm *circuitMap) removeCircuitFromHashIndex(c *PaymentCircuit) {
  984. // Locate bucket containing this circuit's payment hashes.
  985. circuitsWithHash, ok := cm.hashIndex[c.PaymentHash]
  986. if !ok {
  987. return
  988. }
  989. outKey := c.OutKey()
  990. // Remove this circuit from the set of circuitsWithHash.
  991. delete(circuitsWithHash, outKey)
  992. // Prune the payment hash bucket if no other entries remain.
  993. if len(circuitsWithHash) == 0 {
  994. delete(cm.hashIndex, c.PaymentHash)
  995. }
  996. }
  997. // NumPending returns the number of active circuits added to the circuit map.
  998. func (cm *circuitMap) NumPending() int {
  999. cm.mtx.RLock()
  1000. defer cm.mtx.RUnlock()
  1001. return len(cm.pending)
  1002. }
  1003. // NumOpen returns the number of circuits that have been opened by way of
  1004. // setting their keystones. This is the number of HTLCs that are waiting for a
  1005. // settle/fail response from a remote peer.
  1006. func (cm *circuitMap) NumOpen() int {
  1007. cm.mtx.RLock()
  1008. defer cm.mtx.RUnlock()
  1009. return len(cm.opened)
  1010. }