1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213 |
- package htlcswitch
- import (
- "bytes"
- "fmt"
- "sync"
- "github.com/davecgh/go-spew/spew"
- "github.com/go-errors/errors"
- "github.com/lightningnetwork/lnd/channeldb"
- "github.com/lightningnetwork/lnd/htlcswitch/hop"
- "github.com/lightningnetwork/lnd/kvdb"
- "github.com/lightningnetwork/lnd/lnwire"
- )
- var (
- // ErrCorruptedCircuitMap indicates that the on-disk bucketing structure
- // has altered since the circuit map instance was initialized.
- ErrCorruptedCircuitMap = errors.New("circuit map has been corrupted")
- // ErrCircuitNotInHashIndex indicates that a particular circuit did not
- // appear in the in-memory hash index.
- ErrCircuitNotInHashIndex = errors.New("payment circuit not found in " +
- "hash index")
- // ErrUnknownCircuit signals that circuit could not be removed from the
- // map because it was not found.
- ErrUnknownCircuit = errors.New("unknown payment circuit")
- // ErrCircuitClosing signals that an htlc has already closed this
- // circuit in-memory.
- ErrCircuitClosing = errors.New("circuit has already been closed")
- // ErrDuplicateCircuit signals that this circuit was previously
- // added.
- ErrDuplicateCircuit = errors.New("duplicate circuit add")
- // ErrUnknownKeystone signals that no circuit was found using the
- // outgoing circuit key.
- ErrUnknownKeystone = errors.New("unknown circuit keystone")
- // ErrDuplicateKeystone signals that this circuit was previously
- // assigned a keystone.
- ErrDuplicateKeystone = errors.New("cannot add duplicate keystone")
- )
- // CircuitModifier is a common interface used by channel links to modify the
- // contents of the circuit map maintained by the switch.
- type CircuitModifier interface {
- // OpenCircuits preemptively records a batch keystones that will mark
- // currently pending circuits as open. These changes can be rolled back
- // on restart if the outgoing Adds do not make it into a commitment
- // txn.
- OpenCircuits(...Keystone) error
- // TrimOpenCircuits removes a channel's open channels with htlc indexes
- // above `start`.
- TrimOpenCircuits(chanID lnwire.ShortChannelID, start uint64) error
- // DeleteCircuits removes the incoming circuit key to remove all
- // persistent references to a circuit. Returns a ErrUnknownCircuit if
- // any of the incoming keys are not known.
- DeleteCircuits(inKeys ...CircuitKey) error
- }
- // CircuitLookup is a common interface used to lookup information that is stored
- // in the circuit map.
- type CircuitLookup interface {
- // LookupCircuit queries the circuit map for the circuit identified by
- // inKey.
- LookupCircuit(inKey CircuitKey) *PaymentCircuit
- // LookupOpenCircuit queries the circuit map for a circuit identified
- // by its outgoing circuit key.
- LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit
- }
- // CircuitFwdActions represents the forwarding decision made by the circuit
- // map, and is returned from CommitCircuits. The sequence of circuits provided
- // to CommitCircuits is split into three sub-sequences, allowing the caller to
- // do an in-order scan, comparing the head of each subsequence, to determine
- // the decision made by the circuit map.
- type CircuitFwdActions struct {
- // Adds is the subsequence of circuits that were successfully committed
- // in the circuit map.
- Adds []*PaymentCircuit
- // Drops is the subsequence of circuits for which no action should be
- // done.
- Drops []*PaymentCircuit
- // Fails is the subsequence of circuits that should be failed back by
- // the calling link.
- Fails []*PaymentCircuit
- }
- // CircuitMap is an interface for managing the construction and teardown of
- // payment circuits used by the switch.
- type CircuitMap interface {
- CircuitModifier
- CircuitLookup
- // CommitCircuits attempts to add the given circuits to the circuit
- // map. The list of circuits is split into three distinct
- // sub-sequences, corresponding to adds, drops, and fails. Adds should
- // be forwarded to the switch, while fails should be failed back
- // locally within the calling link.
- CommitCircuits(circuit ...*PaymentCircuit) (*CircuitFwdActions, error)
- // CloseCircuit marks the circuit identified by `outKey` as closing
- // in-memory, which prevents duplicate settles/fails from completing an
- // open circuit twice.
- CloseCircuit(outKey CircuitKey) (*PaymentCircuit, error)
- // FailCircuit is used by locally failed HTLCs to mark the circuit
- // identified by `inKey` as closing in-memory, which prevents duplicate
- // settles/fails from being accepted for the same circuit.
- FailCircuit(inKey CircuitKey) (*PaymentCircuit, error)
- // LookupByPaymentHash queries the circuit map and returns all open
- // circuits that use the given payment hash.
- LookupByPaymentHash(hash [32]byte) []*PaymentCircuit
- // NumPending returns the total number of active circuits added by
- // CommitCircuits.
- NumPending() int
- // NumOpen returns the number of circuits with HTLCs that have been
- // forwarded via an outgoing link.
- NumOpen() int
- }
- var (
- // circuitAddKey is the key used to retrieve the bucket containing
- // payment circuits. A circuit records information about how to return
- // a packet to the source link, potentially including an error
- // encrypter for applying this hop's encryption to the payload in the
- // reverse direction.
- //
- // Bucket hierarchy:
- //
- // circuitAddKey(root-bucket)
- // |
- // |-- <incoming-circuit-key>: <encoded bytes of PaymentCircuit>
- // |-- <incoming-circuit-key>: <encoded bytes of PaymentCircuit>
- // |
- // ...
- //
- circuitAddKey = []byte("circuit-adds")
- // circuitKeystoneKey is used to retrieve the bucket containing circuit
- // keystones, which are set in place once a forwarded packet is
- // assigned an index on an outgoing commitment txn.
- //
- // Bucket hierarchy:
- //
- // circuitKeystoneKey(root-bucket)
- // |
- // |-- <outgoing-circuit-key>: <incoming-circuit-key>
- // |-- <outgoing-circuit-key>: <incoming-circuit-key>
- // |
- // ...
- //
- circuitKeystoneKey = []byte("circuit-keystones")
- )
- // circuitMap is a data structure that implements thread safe, persistent
- // storage of circuit routing information. The switch consults a circuit map to
- // determine where to forward returning HTLC update messages. Circuits are
- // always identifiable by their incoming CircuitKey, in addition to their
- // outgoing CircuitKey if the circuit is fully-opened.
- type circuitMap struct {
- cfg *CircuitMapConfig
- mtx sync.RWMutex
- // pending is an in-memory mapping of all half payment circuits, and is
- // kept in sync with the on-disk contents of the circuit map.
- pending map[CircuitKey]*PaymentCircuit
- // opened is an in-memory mapping of all full payment circuits, which
- // is also synchronized with the persistent state of the circuit map.
- opened map[CircuitKey]*PaymentCircuit
- // closed is an in-memory set of circuits for which the switch has
- // received a settle or fail. This precedes the actual deletion of a
- // circuit from disk.
- closed map[CircuitKey]struct{}
- // hashIndex is a volatile index that facilitates fast queries by
- // payment hash against the contents of circuits. This index can be
- // reconstructed entirely from the set of persisted full circuits on
- // startup.
- hashIndex map[[32]byte]map[CircuitKey]struct{}
- }
- // CircuitMapConfig houses the critical interfaces and references necessary to
- // parameterize an instance of circuitMap.
- type CircuitMapConfig struct {
- // DB provides the persistent storage engine for the circuit map.
- DB kvdb.Backend
- // FetchAllOpenChannels is a function that fetches all currently open
- // channels from the channel database.
- FetchAllOpenChannels func() ([]*channeldb.OpenChannel, error)
- // FetchClosedChannels is a function that fetches all closed channels
- // from the channel database.
- FetchClosedChannels func(
- pendingOnly bool) ([]*channeldb.ChannelCloseSummary, error)
- // ExtractErrorEncrypter derives the shared secret used to encrypt
- // errors from the obfuscator's ephemeral public key.
- ExtractErrorEncrypter hop.ErrorEncrypterExtracter
- // CheckResolutionMsg checks whether a given resolution message exists
- // for the passed CircuitKey.
- CheckResolutionMsg func(outKey *CircuitKey) error
- }
- // NewCircuitMap creates a new instance of the circuitMap.
- func NewCircuitMap(cfg *CircuitMapConfig) (CircuitMap, error) {
- cm := &circuitMap{
- cfg: cfg,
- }
- // Initialize the on-disk buckets used by the circuit map.
- if err := cm.initBuckets(); err != nil {
- return nil, err
- }
- // Delete old circuits and keystones of closed channels.
- if err := cm.cleanClosedChannels(); err != nil {
- return nil, err
- }
- // Load any previously persisted circuit into back into memory.
- if err := cm.restoreMemState(); err != nil {
- return nil, err
- }
- // Trim any keystones that were not committed in an outgoing commit txn.
- //
- // NOTE: This operation will be applied to the persistent state of all
- // active channels. Therefore, it must be called before any links are
- // created to avoid interfering with normal operation.
- if err := cm.trimAllOpenCircuits(); err != nil {
- return nil, err
- }
- return cm, nil
- }
- // initBuckets ensures that the primary buckets used by the circuit are
- // initialized so that we can assume their existence after startup.
- func (cm *circuitMap) initBuckets() error {
- return kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
- _, err := tx.CreateTopLevelBucket(circuitKeystoneKey)
- if err != nil {
- return err
- }
- _, err = tx.CreateTopLevelBucket(circuitAddKey)
- return err
- }, func() {})
- }
- // cleanClosedChannels deletes all circuits and keystones related to closed
- // channels. It first reads all the closed channels and caches the ShortChanIDs
- // into a map for fast lookup. Then it iterates the circuit bucket and keystone
- // bucket and deletes items whose ChanID matches the ShortChanID.
- //
- // NOTE: this operation can also be built into restoreMemState since the latter
- // already opens and iterates the two root buckets, circuitAddKey and
- // circuitKeystoneKey. Depending on the size of the buckets, this marginal gain
- // may be worth investigating. Atm, for clarity, this operation is wrapped into
- // its own function.
- func (cm *circuitMap) cleanClosedChannels() error {
- log.Infof("Cleaning circuits from disk for closed channels")
- // closedChanIDSet stores the short channel IDs for closed channels.
- closedChanIDSet := make(map[lnwire.ShortChannelID]struct{})
- // circuitKeySet stores the incoming circuit keys of the payment
- // circuits that need to be deleted.
- circuitKeySet := make(map[CircuitKey]struct{})
- // keystoneKeySet stores the outgoing keys of the keystones that need
- // to be deleted.
- keystoneKeySet := make(map[CircuitKey]struct{})
- // isClosedChannel is a helper closure that returns a bool indicating
- // the chanID belongs to a closed channel.
- isClosedChannel := func(chanID lnwire.ShortChannelID) bool {
- // Skip if the channel ID is zero value. This has the effect
- // that a zero value incoming or outgoing key will never be
- // matched and its corresponding circuits or keystones are not
- // deleted.
- if chanID.ToUint64() == 0 {
- return false
- }
- _, ok := closedChanIDSet[chanID]
- return ok
- }
- // Find closed channels and cache their ShortChannelIDs into a map.
- // This map will be used for looking up relative circuits and keystones.
- closedChannels, err := cm.cfg.FetchClosedChannels(false)
- if err != nil {
- return err
- }
- for _, closedChannel := range closedChannels {
- // Skip if the channel close is pending.
- if closedChannel.IsPending {
- continue
- }
- closedChanIDSet[closedChannel.ShortChanID] = struct{}{}
- }
- log.Debugf("Found %v closed channels", len(closedChanIDSet))
- // Exit early if there are no closed channels.
- if len(closedChanIDSet) == 0 {
- log.Infof("Finished cleaning: no closed channels found, " +
- "no actions taken.",
- )
- return nil
- }
- // Find the payment circuits and keystones that need to be deleted.
- if err := kvdb.View(cm.cfg.DB, func(tx kvdb.RTx) error {
- circuitBkt := tx.ReadBucket(circuitAddKey)
- if circuitBkt == nil {
- return ErrCorruptedCircuitMap
- }
- keystoneBkt := tx.ReadBucket(circuitKeystoneKey)
- if keystoneBkt == nil {
- return ErrCorruptedCircuitMap
- }
- // If a circuit's incoming/outgoing key prefix matches the
- // ShortChanID, it will be deleted. However, if the ShortChanID
- // of the incoming key is zero, the circuit will be kept as it
- // indicates a locally initiated payment.
- if err := circuitBkt.ForEach(func(_, v []byte) error {
- circuit, err := cm.decodeCircuit(v)
- if err != nil {
- return err
- }
- // Check if the incoming channel ID can be found in the
- // closed channel ID map.
- if !isClosedChannel(circuit.Incoming.ChanID) {
- return nil
- }
- circuitKeySet[circuit.Incoming] = struct{}{}
- return nil
- }); err != nil {
- return err
- }
- // If a keystone's InKey or OutKey matches the short channel id
- // in the closed channel ID map, it will be deleted.
- err := keystoneBkt.ForEach(func(k, v []byte) error {
- var (
- inKey CircuitKey
- outKey CircuitKey
- )
- // Decode the incoming and outgoing circuit keys.
- if err := inKey.SetBytes(v); err != nil {
- return err
- }
- if err := outKey.SetBytes(k); err != nil {
- return err
- }
- // Check if the incoming channel ID can be found in the
- // closed channel ID map.
- if isClosedChannel(inKey.ChanID) {
- // If the incoming channel is closed, we can
- // skip checking on outgoing channel ID because
- // this keystone will be deleted.
- keystoneKeySet[outKey] = struct{}{}
- // Technically the incoming keys found in
- // keystone bucket should be a subset of
- // circuit bucket. So a previous loop should
- // have this inKey put inside circuitAddKey map
- // already. We do this again to be sure the
- // circuits are properly cleaned. Even this
- // inKey doesn't exist in circuit bucket, we
- // are fine as db deletion is a noop.
- circuitKeySet[inKey] = struct{}{}
- return nil
- }
- // Check if the outgoing channel ID can be found in the
- // closed channel ID map. Notice that we need to store
- // the outgoing key because it's used for db query.
- //
- // NOTE: We skip this if a resolution message can be
- // found under the outKey. This means that there is an
- // existing resolution message(s) that need to get to
- // the incoming links.
- if isClosedChannel(outKey.ChanID) {
- // Check the resolution message store. A return
- // value of nil means we need to skip deleting
- // these circuits.
- if cm.cfg.CheckResolutionMsg(&outKey) == nil {
- return nil
- }
- keystoneKeySet[outKey] = struct{}{}
- // Also update circuitKeySet to mark the
- // payment circuit needs to be deleted.
- circuitKeySet[inKey] = struct{}{}
- }
- return nil
- })
- return err
- }, func() {
- // Reset the sets.
- circuitKeySet = make(map[CircuitKey]struct{})
- keystoneKeySet = make(map[CircuitKey]struct{})
- }); err != nil {
- return err
- }
- log.Debugf("To be deleted: num_circuits=%v, num_keystones=%v",
- len(circuitKeySet), len(keystoneKeySet),
- )
- numCircuitsDeleted := 0
- numKeystonesDeleted := 0
- // Delete all the circuits and keystones for closed channels.
- if err := kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
- circuitBkt := tx.ReadWriteBucket(circuitAddKey)
- if circuitBkt == nil {
- return ErrCorruptedCircuitMap
- }
- keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
- if keystoneBkt == nil {
- return ErrCorruptedCircuitMap
- }
- // Delete the circuit.
- for inKey := range circuitKeySet {
- if err := circuitBkt.Delete(inKey.Bytes()); err != nil {
- return err
- }
- numCircuitsDeleted++
- }
- // Delete the keystone using the outgoing key.
- for outKey := range keystoneKeySet {
- err := keystoneBkt.Delete(outKey.Bytes())
- if err != nil {
- return err
- }
- numKeystonesDeleted++
- }
- return nil
- }, func() {}); err != nil {
- numCircuitsDeleted = 0
- numKeystonesDeleted = 0
- return err
- }
- log.Infof("Finished cleaning: num_closed_channel=%v, "+
- "num_circuits=%v, num_keystone=%v",
- len(closedChannels), numCircuitsDeleted, numKeystonesDeleted,
- )
- return nil
- }
- // restoreMemState loads the contents of the half circuit and full circuit
- // buckets from disk and reconstructs the in-memory representation of the
- // circuit map. Afterwards, the state of the hash index is reconstructed using
- // the recovered set of full circuits. This method will also remove any stray
- // keystones, which are those that appear fully-opened, but have no pending
- // circuit related to the intended incoming link.
- func (cm *circuitMap) restoreMemState() error {
- log.Infof("Restoring in-memory circuit state from disk")
- var (
- opened map[CircuitKey]*PaymentCircuit
- pending map[CircuitKey]*PaymentCircuit
- )
- if err := kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
- // Restore any of the circuits persisted in the circuit bucket
- // back into memory.
- circuitBkt := tx.ReadWriteBucket(circuitAddKey)
- if circuitBkt == nil {
- return ErrCorruptedCircuitMap
- }
- if err := circuitBkt.ForEach(func(_, v []byte) error {
- circuit, err := cm.decodeCircuit(v)
- if err != nil {
- return err
- }
- circuit.LoadedFromDisk = true
- pending[circuit.Incoming] = circuit
- return nil
- }); err != nil {
- return err
- }
- // Furthermore, load the keystone bucket and resurrect the
- // keystones used in any open circuits.
- keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
- if keystoneBkt == nil {
- return ErrCorruptedCircuitMap
- }
- var strayKeystones []Keystone
- if err := keystoneBkt.ForEach(func(k, v []byte) error {
- var (
- inKey CircuitKey
- outKey = &CircuitKey{}
- )
- // Decode the incoming and outgoing circuit keys.
- if err := inKey.SetBytes(v); err != nil {
- return err
- }
- if err := outKey.SetBytes(k); err != nil {
- return err
- }
- // Retrieve the pending circuit, set its keystone, then
- // add it to the opened map.
- circuit, ok := pending[inKey]
- if ok {
- circuit.Outgoing = outKey
- opened[*outKey] = circuit
- } else {
- strayKeystones = append(strayKeystones, Keystone{
- InKey: inKey,
- OutKey: *outKey,
- })
- }
- return nil
- }); err != nil {
- return err
- }
- // If any stray keystones were found, we'll proceed to prune
- // them from the circuit map's persistent storage. This may
- // manifest on older nodes that had updated channels before
- // their short channel id was set properly. We believe this
- // issue has been fixed, though this will allow older nodes to
- // recover without additional intervention.
- for _, strayKeystone := range strayKeystones {
- // As a precaution, we will only cleanup keystones
- // related to locally-initiated payments. If a
- // documented case of stray keystones emerges for
- // forwarded payments, this check should be removed, but
- // with extreme caution.
- if strayKeystone.OutKey.ChanID != hop.Source {
- continue
- }
- log.Infof("Removing stray keystone: %v", strayKeystone)
- err := keystoneBkt.Delete(strayKeystone.OutKey.Bytes())
- if err != nil {
- return err
- }
- }
- return nil
- }, func() {
- opened = make(map[CircuitKey]*PaymentCircuit)
- pending = make(map[CircuitKey]*PaymentCircuit)
- }); err != nil {
- return err
- }
- cm.pending = pending
- cm.opened = opened
- cm.closed = make(map[CircuitKey]struct{})
- log.Infof("Payment circuits loaded: num_pending=%v, num_open=%v",
- len(pending), len(opened))
- // Finally, reconstruct the hash index by running through our set of
- // open circuits.
- cm.hashIndex = make(map[[32]byte]map[CircuitKey]struct{})
- for _, circuit := range opened {
- cm.addCircuitToHashIndex(circuit)
- }
- return nil
- }
- // decodeCircuit reconstructs an in-memory payment circuit from a byte slice.
- // The byte slice is assumed to have been generated by the circuit's Encode
- // method. If the decoding is successful, the onion obfuscator will be
- // reextracted, since it is not stored in plaintext on disk.
- func (cm *circuitMap) decodeCircuit(v []byte) (*PaymentCircuit, error) {
- var circuit = &PaymentCircuit{}
- circuitReader := bytes.NewReader(v)
- if err := circuit.Decode(circuitReader); err != nil {
- return nil, err
- }
- // If the error encrypter is nil, this is locally-source payment so
- // there is no encrypter.
- if circuit.ErrorEncrypter == nil {
- return circuit, nil
- }
- // Otherwise, we need to reextract the encrypter, so that the shared
- // secret is rederived from what was decoded.
- err := circuit.ErrorEncrypter.Reextract(
- cm.cfg.ExtractErrorEncrypter,
- )
- if err != nil {
- return nil, err
- }
- return circuit, nil
- }
- // trimAllOpenCircuits reads the set of active channels from disk and trims
- // keystones for any non-pending channels using the next unallocated htlc index.
- // This method is intended to be called on startup. Each link will also trim
- // it's own circuits upon startup.
- //
- // NOTE: This operation will be applied to the persistent state of all active
- // channels. Therefore, it must be called before any links are created to avoid
- // interfering with normal operation.
- func (cm *circuitMap) trimAllOpenCircuits() error {
- activeChannels, err := cm.cfg.FetchAllOpenChannels()
- if err != nil {
- return err
- }
- for _, activeChannel := range activeChannels {
- if activeChannel.IsPending {
- continue
- }
- // First, skip any channels that have not been assigned their
- // final channel identifier, otherwise we would try to trim
- // htlcs belonging to the all-zero, hop.Source ID.
- chanID := activeChannel.ShortChanID()
- if chanID == hop.Source {
- continue
- }
- // Next, retrieve the next unallocated htlc index, which bounds
- // the cutoff of confirmed htlc indexes.
- start, err := activeChannel.NextLocalHtlcIndex()
- if err != nil {
- return err
- }
- // Finally, remove all pending circuits above at or above the
- // next unallocated local htlc indexes. This has the effect of
- // reverting any circuits that have either not been locked in,
- // or had not been included in a pending commitment.
- err = cm.TrimOpenCircuits(chanID, start)
- if err != nil {
- return err
- }
- }
- return nil
- }
- // TrimOpenCircuits removes a channel's keystones above the short chan id's
- // highest committed htlc index. This has the effect of returning those
- // circuits to a half-open state. Since opening of circuits is done in advance
- // of actually committing the Add htlcs into a commitment txn, this allows
- // circuits to be opened preemptively, since we can roll them back after any
- // failures.
- func (cm *circuitMap) TrimOpenCircuits(chanID lnwire.ShortChannelID,
- start uint64) error {
- log.Infof("Trimming open circuits for chan_id=%v, start_htlc_id=%v",
- chanID, start)
- var trimmedOutKeys []CircuitKey
- // Scan forward from the last unacked htlc id, stopping as soon as we
- // don't find any more. Outgoing htlc id's must be assigned in order,
- // so there should never be disjoint segments of keystones to trim.
- cm.mtx.Lock()
- for i := start; ; i++ {
- outKey := CircuitKey{
- ChanID: chanID,
- HtlcID: i,
- }
- circuit, ok := cm.opened[outKey]
- if !ok {
- break
- }
- circuit.Outgoing = nil
- delete(cm.opened, outKey)
- trimmedOutKeys = append(trimmedOutKeys, outKey)
- cm.removeCircuitFromHashIndex(circuit)
- }
- cm.mtx.Unlock()
- if len(trimmedOutKeys) == 0 {
- return nil
- }
- return kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
- keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
- if keystoneBkt == nil {
- return ErrCorruptedCircuitMap
- }
- for _, outKey := range trimmedOutKeys {
- err := keystoneBkt.Delete(outKey.Bytes())
- if err != nil {
- return err
- }
- }
- return nil
- }, func() {})
- }
- // LookupCircuit queries the circuit map for the circuit identified by its
- // incoming circuit key. Returns nil if there is no such circuit.
- func (cm *circuitMap) LookupCircuit(inKey CircuitKey) *PaymentCircuit {
- cm.mtx.RLock()
- defer cm.mtx.RUnlock()
- return cm.pending[inKey]
- }
- // LookupOpenCircuit searches for the circuit identified by its outgoing circuit
- // key.
- func (cm *circuitMap) LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit {
- cm.mtx.RLock()
- defer cm.mtx.RUnlock()
- return cm.opened[outKey]
- }
- // LookupByPaymentHash looks up and returns any payment circuits with a given
- // payment hash.
- func (cm *circuitMap) LookupByPaymentHash(hash [32]byte) []*PaymentCircuit {
- cm.mtx.RLock()
- defer cm.mtx.RUnlock()
- var circuits []*PaymentCircuit
- if circuitSet, ok := cm.hashIndex[hash]; ok {
- // Iterate over the outgoing circuit keys found with this hash,
- // and retrieve the circuit from the opened map.
- circuits = make([]*PaymentCircuit, 0, len(circuitSet))
- for key := range circuitSet {
- if circuit, ok := cm.opened[key]; ok {
- circuits = append(circuits, circuit)
- }
- }
- }
- return circuits
- }
- // CommitCircuits accepts any number of circuits and persistently adds them to
- // the switch's circuit map. The method returns a list of circuits that had not
- // been seen prior by the switch. A link should only forward HTLCs corresponding
- // to the returned circuits to the switch.
- //
- // NOTE: This method uses batched writes to improve performance, gains will only
- // be realized if it is called concurrently from separate goroutines.
- func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) (
- *CircuitFwdActions, error) {
- inKeys := make([]CircuitKey, 0, len(circuits))
- for _, circuit := range circuits {
- inKeys = append(inKeys, circuit.Incoming)
- }
- log.Tracef("Committing fresh circuits: %v", newLogClosure(func() string {
- return spew.Sdump(inKeys)
- }))
- actions := &CircuitFwdActions{}
- // If an empty list was passed, return early to avoid grabbing the lock.
- if len(circuits) == 0 {
- return actions, nil
- }
- // First, we reconcile the provided circuits with our set of pending
- // circuits to construct a set of new circuits that need to be written
- // to disk. The circuit's pointer is stored so that we only permit this
- // exact circuit to be forwarded through the switch. If a circuit is
- // already pending, the htlc will be reforwarded by the switch.
- //
- // NOTE: We track an additional addFails subsequence, which permits us
- // to fail back all packets that weren't dropped if we encounter an
- // error when committing the circuits.
- cm.mtx.Lock()
- var adds, drops, fails, addFails []*PaymentCircuit
- for _, circuit := range circuits {
- inKey := circuit.InKey()
- if foundCircuit, ok := cm.pending[inKey]; ok {
- switch {
- // This circuit has a keystone, it's waiting for a
- // response from the remote peer on the outgoing link.
- // Drop it like it's hot, ensure duplicates get caught.
- case foundCircuit.HasKeystone():
- drops = append(drops, circuit)
- // If no keystone is set and the switch has not been
- // restarted, the corresponding packet should still be
- // in the outgoing link's mailbox. It will be delivered
- // if it comes online before the switch goes down.
- //
- // NOTE: Dropping here prevents a flapping, incoming
- // link from failing a duplicate add while it is still
- // in the server's memory mailboxes.
- case !foundCircuit.LoadedFromDisk:
- drops = append(drops, circuit)
- // Otherwise, the in-mem packet has been lost due to a
- // restart. It is now safe to send back a failure along
- // the incoming link. The incoming link should be able
- // detect and ignore duplicate packets of this type.
- default:
- fails = append(fails, circuit)
- addFails = append(addFails, circuit)
- }
- continue
- }
- cm.pending[inKey] = circuit
- adds = append(adds, circuit)
- addFails = append(addFails, circuit)
- }
- cm.mtx.Unlock()
- // If all circuits are dropped or failed, we are done.
- if len(adds) == 0 {
- actions.Drops = drops
- actions.Fails = fails
- return actions, nil
- }
- // Now, optimistically serialize the circuits to add.
- var bs = make([]bytes.Buffer, len(adds))
- for i, circuit := range adds {
- if err := circuit.Encode(&bs[i]); err != nil {
- actions.Drops = drops
- actions.Fails = addFails
- return actions, err
- }
- }
- // Write the entire batch of circuits to the persistent circuit bucket
- // using bolt's Batch write. This method must be called from multiple,
- // distinct goroutines to have any impact on performance.
- err := kvdb.Batch(cm.cfg.DB, func(tx kvdb.RwTx) error {
- circuitBkt := tx.ReadWriteBucket(circuitAddKey)
- if circuitBkt == nil {
- return ErrCorruptedCircuitMap
- }
- for i, circuit := range adds {
- inKeyBytes := circuit.InKey().Bytes()
- circuitBytes := bs[i].Bytes()
- err := circuitBkt.Put(inKeyBytes, circuitBytes)
- if err != nil {
- return err
- }
- }
- return nil
- })
- // Return if the write succeeded.
- if err == nil {
- actions.Adds = adds
- actions.Drops = drops
- actions.Fails = fails
- return actions, nil
- }
- // Otherwise, rollback the circuits added to the pending set if the
- // write failed.
- cm.mtx.Lock()
- for _, circuit := range adds {
- delete(cm.pending, circuit.InKey())
- }
- cm.mtx.Unlock()
- // Since our write failed, we will return the dropped packets and mark
- // all other circuits as failed.
- actions.Drops = drops
- actions.Fails = addFails
- return actions, err
- }
- // Keystone is a tuple binding an incoming and outgoing CircuitKey. Keystones
- // are preemptively written by an outgoing link before signing a new commitment
- // state, and cements which HTLCs we are awaiting a response from a remote
- // peer.
- type Keystone struct {
- InKey CircuitKey
- OutKey CircuitKey
- }
- // String returns a human readable description of the Keystone.
- func (k Keystone) String() string {
- return fmt.Sprintf("%s --> %s", k.InKey, k.OutKey)
- }
- // OpenCircuits sets the outgoing circuit key for the circuit identified by
- // inKey, persistently marking the circuit as opened. After the changes have
- // been persisted, the circuit map's in-memory indexes are updated so that this
- // circuit can be queried using LookupByKeystone or LookupByPaymentHash.
- func (cm *circuitMap) OpenCircuits(keystones ...Keystone) error {
- if len(keystones) == 0 {
- return nil
- }
- log.Tracef("Opening finalized circuits: %v", newLogClosure(func() string {
- return spew.Sdump(keystones)
- }))
- // Check that all keystones correspond to committed-but-unopened
- // circuits.
- cm.mtx.RLock()
- openedCircuits := make([]*PaymentCircuit, 0, len(keystones))
- for _, ks := range keystones {
- if _, ok := cm.opened[ks.OutKey]; ok {
- cm.mtx.RUnlock()
- return ErrDuplicateKeystone
- }
- circuit, ok := cm.pending[ks.InKey]
- if !ok {
- cm.mtx.RUnlock()
- return ErrUnknownCircuit
- }
- openedCircuits = append(openedCircuits, circuit)
- }
- cm.mtx.RUnlock()
- err := kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
- // Now, load the circuit bucket to which we will write the
- // already serialized circuit.
- keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
- if keystoneBkt == nil {
- return ErrCorruptedCircuitMap
- }
- for _, ks := range keystones {
- outBytes := ks.OutKey.Bytes()
- inBytes := ks.InKey.Bytes()
- err := keystoneBkt.Put(outBytes, inBytes)
- if err != nil {
- return err
- }
- }
- return nil
- }, func() {})
- if err != nil {
- return err
- }
- cm.mtx.Lock()
- for i, circuit := range openedCircuits {
- ks := keystones[i]
- // Since our persistent operation was successful, we can now
- // modify the in memory representations. Set the outgoing
- // circuit key on our pending circuit, add the same circuit to
- // set of opened circuits, and add this circuit to the hash
- // index.
- circuit.Outgoing = &CircuitKey{}
- *circuit.Outgoing = ks.OutKey
- cm.opened[ks.OutKey] = circuit
- cm.addCircuitToHashIndex(circuit)
- }
- cm.mtx.Unlock()
- return nil
- }
- // addCirciutToHashIndex inserts a circuit into the circuit map's hash index, so
- // that it can be queried using LookupByPaymentHash.
- func (cm *circuitMap) addCircuitToHashIndex(c *PaymentCircuit) {
- if _, ok := cm.hashIndex[c.PaymentHash]; !ok {
- cm.hashIndex[c.PaymentHash] = make(map[CircuitKey]struct{})
- }
- cm.hashIndex[c.PaymentHash][c.OutKey()] = struct{}{}
- }
- // FailCircuit marks the circuit identified by `inKey` as closing in-memory,
- // which prevents duplicate settles/fails from completing an open circuit twice.
- func (cm *circuitMap) FailCircuit(inKey CircuitKey) (*PaymentCircuit, error) {
- cm.mtx.Lock()
- defer cm.mtx.Unlock()
- circuit, ok := cm.pending[inKey]
- if !ok {
- return nil, ErrUnknownCircuit
- }
- _, ok = cm.closed[inKey]
- if ok {
- return nil, ErrCircuitClosing
- }
- cm.closed[inKey] = struct{}{}
- return circuit, nil
- }
- // CloseCircuit marks the circuit identified by `outKey` as closing in-memory,
- // which prevents duplicate settles/fails from completing an open
- // circuit twice.
- func (cm *circuitMap) CloseCircuit(outKey CircuitKey) (*PaymentCircuit, error) {
- cm.mtx.Lock()
- defer cm.mtx.Unlock()
- circuit, ok := cm.opened[outKey]
- if !ok {
- return nil, ErrUnknownCircuit
- }
- _, ok = cm.closed[circuit.Incoming]
- if ok {
- return nil, ErrCircuitClosing
- }
- cm.closed[circuit.Incoming] = struct{}{}
- return circuit, nil
- }
- // DeleteCircuits destroys the target circuits by removing them from the circuit
- // map, additionally removing the circuits' keystones if any HTLCs were
- // forwarded through an outgoing link. The circuits should be identified by its
- // incoming circuit key. If a given circuit is not found in the circuit map, it
- // will be ignored from the query. This would typically indicate that the
- // circuit was already cleaned up at a different point in time.
- func (cm *circuitMap) DeleteCircuits(inKeys ...CircuitKey) error {
- log.Tracef("Deleting resolved circuits: %v", newLogClosure(func() string {
- return spew.Sdump(inKeys)
- }))
- var (
- closingCircuits = make(map[CircuitKey]struct{})
- removedCircuits = make(map[CircuitKey]*PaymentCircuit)
- )
- cm.mtx.Lock()
- // Remove any references to the circuits from memory, keeping track of
- // which circuits were removed, and which ones had been marked closed.
- // This can be used to restore these entries later if the persistent
- // removal fails.
- for _, inKey := range inKeys {
- circuit, ok := cm.pending[inKey]
- if !ok {
- continue
- }
- delete(cm.pending, inKey)
- if _, ok := cm.closed[inKey]; ok {
- closingCircuits[inKey] = struct{}{}
- delete(cm.closed, inKey)
- }
- if circuit.HasKeystone() {
- delete(cm.opened, circuit.OutKey())
- cm.removeCircuitFromHashIndex(circuit)
- }
- removedCircuits[inKey] = circuit
- }
- cm.mtx.Unlock()
- err := kvdb.Batch(cm.cfg.DB, func(tx kvdb.RwTx) error {
- for _, circuit := range removedCircuits {
- // If this htlc made it to an outgoing link, load the
- // keystone bucket from which we will remove the
- // outgoing circuit key.
- if circuit.HasKeystone() {
- keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
- if keystoneBkt == nil {
- return ErrCorruptedCircuitMap
- }
- outKey := circuit.OutKey()
- err := keystoneBkt.Delete(outKey.Bytes())
- if err != nil {
- return err
- }
- }
- // Remove the circuit itself based on the incoming
- // circuit key.
- circuitBkt := tx.ReadWriteBucket(circuitAddKey)
- if circuitBkt == nil {
- return ErrCorruptedCircuitMap
- }
- inKey := circuit.InKey()
- if err := circuitBkt.Delete(inKey.Bytes()); err != nil {
- return err
- }
- }
- return nil
- })
- // Return if the write succeeded.
- if err == nil {
- return nil
- }
- // If the persistent changes failed, restore the circuit map to it's
- // previous state.
- cm.mtx.Lock()
- for inKey, circuit := range removedCircuits {
- cm.pending[inKey] = circuit
- if _, ok := closingCircuits[inKey]; ok {
- cm.closed[inKey] = struct{}{}
- }
- if circuit.HasKeystone() {
- cm.opened[circuit.OutKey()] = circuit
- cm.addCircuitToHashIndex(circuit)
- }
- }
- cm.mtx.Unlock()
- return err
- }
- // removeCircuitFromHashIndex removes the given circuit from the hash index,
- // pruning any unnecessary memory optimistically.
- func (cm *circuitMap) removeCircuitFromHashIndex(c *PaymentCircuit) {
- // Locate bucket containing this circuit's payment hashes.
- circuitsWithHash, ok := cm.hashIndex[c.PaymentHash]
- if !ok {
- return
- }
- outKey := c.OutKey()
- // Remove this circuit from the set of circuitsWithHash.
- delete(circuitsWithHash, outKey)
- // Prune the payment hash bucket if no other entries remain.
- if len(circuitsWithHash) == 0 {
- delete(cm.hashIndex, c.PaymentHash)
- }
- }
- // NumPending returns the number of active circuits added to the circuit map.
- func (cm *circuitMap) NumPending() int {
- cm.mtx.RLock()
- defer cm.mtx.RUnlock()
- return len(cm.pending)
- }
- // NumOpen returns the number of circuits that have been opened by way of
- // setting their keystones. This is the number of HTLCs that are waiting for a
- // settle/fail response from a remote peer.
- func (cm *circuitMap) NumOpen() int {
- cm.mtx.RLock()
- defer cm.mtx.RUnlock()
- return len(cm.opened)
- }
|