1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348 |
- package sweep
- import (
- "errors"
- "fmt"
- "sync"
- "sync/atomic"
- "github.com/btcsuite/btcd/btcutil"
- "github.com/btcsuite/btcd/chaincfg/chainhash"
- "github.com/btcsuite/btcd/rpcclient"
- "github.com/btcsuite/btcd/txscript"
- "github.com/btcsuite/btcd/wire"
- "github.com/btcsuite/btcwallet/chain"
- "github.com/davecgh/go-spew/spew"
- "github.com/lightningnetwork/lnd/chainntnfs"
- "github.com/lightningnetwork/lnd/fn"
- "github.com/lightningnetwork/lnd/input"
- "github.com/lightningnetwork/lnd/labels"
- "github.com/lightningnetwork/lnd/lnutils"
- "github.com/lightningnetwork/lnd/lnwallet"
- "github.com/lightningnetwork/lnd/lnwallet/chainfee"
- )
- var (
- // ErrInvalidBumpResult is returned when the bump result is invalid.
- ErrInvalidBumpResult = errors.New("invalid bump result")
- // ErrNotEnoughBudget is returned when the fee bumper decides the
- // current budget cannot cover the fee.
- ErrNotEnoughBudget = errors.New("not enough budget")
- // ErrLocktimeImmature is returned when sweeping an input whose
- // locktime is not reached.
- ErrLocktimeImmature = errors.New("immature input")
- // ErrTxNoOutput is returned when an output cannot be created during tx
- // preparation, usually due to the output being dust.
- ErrTxNoOutput = errors.New("tx has no output")
- // ErrThirdPartySpent is returned when a third party has spent the
- // input in the sweeping tx.
- ErrThirdPartySpent = errors.New("third party spent the output")
- )
- // Bumper defines an interface that can be used by other subsystems for fee
- // bumping.
- type Bumper interface {
- // Broadcast is used to publish the tx created from the given inputs
- // specified in the request. It handles the tx creation, broadcasts it,
- // and monitors its confirmation status for potential fee bumping. It
- // returns a chan that the caller can use to receive updates about the
- // broadcast result and potential RBF attempts.
- Broadcast(req *BumpRequest) (<-chan *BumpResult, error)
- }
- // BumpEvent represents the event of a fee bumping attempt.
- type BumpEvent uint8
- const (
- // TxPublished is sent when the broadcast attempt is finished.
- TxPublished BumpEvent = iota
- // TxFailed is sent when the broadcast attempt fails.
- TxFailed
- // TxReplaced is sent when the original tx is replaced by a new one.
- TxReplaced
- // TxConfirmed is sent when the tx is confirmed.
- TxConfirmed
- // sentinalEvent is used to check if an event is unknown.
- sentinalEvent
- )
- // String returns a human-readable string for the event.
- func (e BumpEvent) String() string {
- switch e {
- case TxPublished:
- return "Published"
- case TxFailed:
- return "Failed"
- case TxReplaced:
- return "Replaced"
- case TxConfirmed:
- return "Confirmed"
- default:
- return "Unknown"
- }
- }
- // Unknown returns true if the event is unknown.
- func (e BumpEvent) Unknown() bool {
- return e >= sentinalEvent
- }
- // BumpRequest is used by the caller to give the Bumper the necessary info to
- // create and manage potential fee bumps for a set of inputs.
- type BumpRequest struct {
- // Budget givens the total amount that can be used as fees by these
- // inputs.
- Budget btcutil.Amount
- // Inputs is the set of inputs to sweep.
- Inputs []input.Input
- // DeadlineHeight is the block height at which the tx should be
- // confirmed.
- DeadlineHeight int32
- // DeliveryAddress is the script to send the change output to.
- DeliveryAddress []byte
- // MaxFeeRate is the maximum fee rate that can be used for fee bumping.
- MaxFeeRate chainfee.SatPerKWeight
- // StartingFeeRate is an optional parameter that can be used to specify
- // the initial fee rate to use for the fee function.
- StartingFeeRate fn.Option[chainfee.SatPerKWeight]
- }
- // MaxFeeRateAllowed returns the maximum fee rate allowed for the given
- // request. It calculates the feerate using the supplied budget and the weight,
- // compares it with the specified MaxFeeRate, and returns the smaller of the
- // two.
- func (r *BumpRequest) MaxFeeRateAllowed() (chainfee.SatPerKWeight, error) {
- // Get the size of the sweep tx, which will be used to calculate the
- // budget fee rate.
- size, err := calcSweepTxWeight(r.Inputs, r.DeliveryAddress)
- if err != nil {
- return 0, err
- }
- // Use the budget and MaxFeeRate to decide the max allowed fee rate.
- // This is needed as, when the input has a large value and the user
- // sets the budget to be proportional to the input value, the fee rate
- // can be very high and we need to make sure it doesn't exceed the max
- // fee rate.
- maxFeeRateAllowed := chainfee.NewSatPerKWeight(r.Budget, size)
- if maxFeeRateAllowed > r.MaxFeeRate {
- log.Debugf("Budget feerate %v exceeds MaxFeeRate %v, use "+
- "MaxFeeRate instead, txWeight=%v", maxFeeRateAllowed,
- r.MaxFeeRate, size)
- return r.MaxFeeRate, nil
- }
- log.Debugf("Budget feerate %v below MaxFeeRate %v, use budget feerate "+
- "instead, txWeight=%v", maxFeeRateAllowed, r.MaxFeeRate, size)
- return maxFeeRateAllowed, nil
- }
- // calcSweepTxWeight calculates the weight of the sweep tx. It assumes a
- // sweeping tx always has a single output(change).
- func calcSweepTxWeight(inputs []input.Input,
- outputPkScript []byte) (uint64, error) {
- // Use a const fee rate as we only use the weight estimator to
- // calculate the size.
- const feeRate = 1
- // Initialize the tx weight estimator with,
- // - nil outputs as we only have one single change output.
- // - const fee rate as we don't care about the fees here.
- // - 0 maxfeerate as we don't care about fees here.
- //
- // TODO(yy): we should refactor the weight estimator to not require a
- // fee rate and max fee rate and make it a pure tx weight calculator.
- _, estimator, err := getWeightEstimate(
- inputs, nil, feeRate, 0, outputPkScript,
- )
- if err != nil {
- return 0, err
- }
- return uint64(estimator.weight()), nil
- }
- // BumpResult is used by the Bumper to send updates about the tx being
- // broadcast.
- type BumpResult struct {
- // Event is the type of event that the result is for.
- Event BumpEvent
- // Tx is the tx being broadcast.
- Tx *wire.MsgTx
- // ReplacedTx is the old, replaced tx if a fee bump is attempted.
- ReplacedTx *wire.MsgTx
- // FeeRate is the fee rate used for the new tx.
- FeeRate chainfee.SatPerKWeight
- // Fee is the fee paid by the new tx.
- Fee btcutil.Amount
- // Err is the error that occurred during the broadcast.
- Err error
- // requestID is the ID of the request that created this record.
- requestID uint64
- }
- // Validate validates the BumpResult so it's safe to use.
- func (b *BumpResult) Validate() error {
- // Every result must have a tx.
- if b.Tx == nil {
- return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult)
- }
- // Every result must have a known event.
- if b.Event.Unknown() {
- return fmt.Errorf("%w: unknown event", ErrInvalidBumpResult)
- }
- // If it's a replacing event, it must have a replaced tx.
- if b.Event == TxReplaced && b.ReplacedTx == nil {
- return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult)
- }
- // If it's a failed event, it must have an error.
- if b.Event == TxFailed && b.Err == nil {
- return fmt.Errorf("%w: nil error", ErrInvalidBumpResult)
- }
- // If it's a confirmed event, it must have a fee rate and fee.
- if b.Event == TxConfirmed && (b.FeeRate == 0 || b.Fee == 0) {
- return fmt.Errorf("%w: missing fee rate or fee",
- ErrInvalidBumpResult)
- }
- return nil
- }
- // TxPublisherConfig is the config used to create a new TxPublisher.
- type TxPublisherConfig struct {
- // Signer is used to create the tx signature.
- Signer input.Signer
- // Wallet is used primarily to publish the tx.
- Wallet Wallet
- // Estimator is used to estimate the fee rate for the new tx based on
- // its deadline conf target.
- Estimator chainfee.Estimator
- // Notifier is used to monitor the confirmation status of the tx.
- Notifier chainntnfs.ChainNotifier
- }
- // TxPublisher is an implementation of the Bumper interface. It utilizes the
- // `testmempoolaccept` RPC to bump the fee of txns it created based on
- // different fee function selected or configed by the caller. Its purpose is to
- // take a list of inputs specified, and create a tx that spends them to a
- // specified output. It will then monitor the confirmation status of the tx,
- // and if it's not confirmed within a certain time frame, it will attempt to
- // bump the fee of the tx by creating a new tx that spends the same inputs to
- // the same output, but with a higher fee rate. It will continue to do this
- // until the tx is confirmed or the fee rate reaches the maximum fee rate
- // specified by the caller.
- type TxPublisher struct {
- wg sync.WaitGroup
- // cfg specifies the configuration of the TxPublisher.
- cfg *TxPublisherConfig
- // currentHeight is the current block height.
- currentHeight atomic.Int32
- // records is a map keyed by the requestCounter and the value is the tx
- // being monitored.
- records lnutils.SyncMap[uint64, *monitorRecord]
- // requestCounter is a monotonically increasing counter used to keep
- // track of how many requests have been made.
- requestCounter atomic.Uint64
- // subscriberChans is a map keyed by the requestCounter, each item is
- // the chan that the publisher sends the fee bump result to.
- subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
- // quit is used to signal the publisher to stop.
- quit chan struct{}
- }
- // Compile-time constraint to ensure TxPublisher implements Bumper.
- var _ Bumper = (*TxPublisher)(nil)
- // NewTxPublisher creates a new TxPublisher.
- func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
- return &TxPublisher{
- cfg: &cfg,
- records: lnutils.SyncMap[uint64, *monitorRecord]{},
- subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
- quit: make(chan struct{}),
- }
- }
- // isNeutrinoBackend checks if the wallet backend is neutrino.
- func (t *TxPublisher) isNeutrinoBackend() bool {
- return t.cfg.Wallet.BackEnd() == "neutrino"
- }
- // Broadcast is used to publish the tx created from the given inputs. It will,
- // 1. init a fee function based on the given strategy.
- // 2. create an RBF-compliant tx and monitor it for confirmation.
- // 3. notify the initial broadcast result back to the caller.
- // The initial broadcast is guaranteed to be RBF-compliant unless the budget
- // specified cannot cover the fee.
- //
- // NOTE: part of the Bumper interface.
- func (t *TxPublisher) Broadcast(req *BumpRequest) (<-chan *BumpResult, error) {
- log.Tracef("Received broadcast request: %s", newLogClosure(
- func() string {
- return spew.Sdump(req)
- })())
- // Attempt an initial broadcast which is guaranteed to comply with the
- // RBF rules.
- result, err := t.initialBroadcast(req)
- if err != nil {
- log.Errorf("Initial broadcast failed: %v", err)
- return nil, err
- }
- // Create a chan to send the result to the caller.
- subscriber := make(chan *BumpResult, 1)
- t.subscriberChans.Store(result.requestID, subscriber)
- // Send the initial broadcast result to the caller.
- t.handleResult(result)
- return subscriber, nil
- }
- // initialBroadcast initializes a fee function, creates an RBF-compliant tx and
- // broadcasts it.
- func (t *TxPublisher) initialBroadcast(req *BumpRequest) (*BumpResult, error) {
- // Create a fee bumping algorithm to be used for future RBF.
- feeAlgo, err := t.initializeFeeFunction(req)
- if err != nil {
- return nil, fmt.Errorf("init fee function: %w", err)
- }
- // Create the initial tx to be broadcasted. This tx is guaranteed to
- // comply with the RBF restrictions.
- requestID, err := t.createRBFCompliantTx(req, feeAlgo)
- if err != nil {
- return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
- }
- // Broadcast the tx and return the monitored record.
- result, err := t.broadcast(requestID)
- if err != nil {
- return nil, fmt.Errorf("broadcast sweep tx: %w", err)
- }
- return result, nil
- }
- // initializeFeeFunction initializes a fee function to be used for this request
- // for future fee bumping.
- func (t *TxPublisher) initializeFeeFunction(
- req *BumpRequest) (FeeFunction, error) {
- // Get the max allowed feerate.
- maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
- if err != nil {
- return nil, err
- }
- // Get the initial conf target.
- confTarget := calcCurrentConfTarget(
- t.currentHeight.Load(), req.DeadlineHeight,
- )
- log.Debugf("Initializing fee function with conf target=%v, budget=%v, "+
- "maxFeeRateAllowed=%v", confTarget, req.Budget,
- maxFeeRateAllowed)
- // Initialize the fee function and return it.
- //
- // TODO(yy): return based on differet req.Strategy?
- return NewLinearFeeFunction(
- maxFeeRateAllowed, confTarget, t.cfg.Estimator,
- req.StartingFeeRate,
- )
- }
- // createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
- // so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
- // and redo the process until the tx is valid, or return an error when non-RBF
- // related errors occur or the budget has been used up.
- func (t *TxPublisher) createRBFCompliantTx(req *BumpRequest,
- f FeeFunction) (uint64, error) {
- for {
- // Create a new tx with the given fee rate and check its
- // mempool acceptance.
- tx, fee, err := t.createAndCheckTx(req, f)
- switch {
- case err == nil:
- // The tx is valid, return the request ID.
- requestID := t.storeRecord(tx, req, f, fee)
- log.Infof("Created tx %v for %v inputs: feerate=%v, "+
- "fee=%v, inputs=%v", tx.TxHash(),
- len(req.Inputs), f.FeeRate(), fee,
- inputTypeSummary(req.Inputs))
- return requestID, nil
- // If the error indicates the fees paid is not enough, we will
- // ask the fee function to increase the fee rate and retry.
- case errors.Is(err, lnwallet.ErrMempoolFee):
- // We should at least start with a feerate above the
- // mempool min feerate, so if we get this error, it
- // means something is wrong earlier in the pipeline.
- log.Errorf("Current fee=%v, feerate=%v, %v", fee,
- f.FeeRate(), err)
- fallthrough
- // We are not paying enough fees so we increase it.
- case errors.Is(err, rpcclient.ErrInsufficientFee):
- increased := false
- // Keep calling the fee function until the fee rate is
- // increased or maxed out.
- for !increased {
- log.Debugf("Increasing fee for next round, "+
- "current fee=%v, feerate=%v", fee,
- f.FeeRate())
- // If the fee function tells us that we have
- // used up the budget, we will return an error
- // indicating this tx cannot be made. The
- // sweeper should handle this error and try to
- // cluster these inputs differetly.
- increased, err = f.Increment()
- if err != nil {
- return 0, err
- }
- }
- // TODO(yy): suppose there's only one bad input, we can do a
- // binary search to find out which input is causing this error
- // by recreating a tx using half of the inputs and check its
- // mempool acceptance.
- default:
- log.Debugf("Failed to create RBF-compliant tx: %v", err)
- return 0, err
- }
- }
- }
- // storeRecord stores the given record in the records map.
- func (t *TxPublisher) storeRecord(tx *wire.MsgTx, req *BumpRequest,
- f FeeFunction, fee btcutil.Amount) uint64 {
- // Increase the request counter.
- //
- // NOTE: this is the only place where we increase the
- // counter.
- requestID := t.requestCounter.Add(1)
- // Register the record.
- t.records.Store(requestID, &monitorRecord{
- tx: tx,
- req: req,
- feeFunction: f,
- fee: fee,
- })
- return requestID
- }
- // createAndCheckTx creates a tx based on the given inputs, change output
- // script, and the fee rate. In addition, it validates the tx's mempool
- // acceptance before returning a tx that can be published directly, along with
- // its fee.
- func (t *TxPublisher) createAndCheckTx(req *BumpRequest, f FeeFunction) (
- *wire.MsgTx, btcutil.Amount, error) {
- // Create the sweep tx with max fee rate of 0 as the fee function
- // guarantees the fee rate used here won't exceed the max fee rate.
- tx, fee, err := t.createSweepTx(
- req.Inputs, req.DeliveryAddress, f.FeeRate(),
- )
- if err != nil {
- return nil, fee, fmt.Errorf("create sweep tx: %w", err)
- }
- // Sanity check the budget still covers the fee.
- if fee > req.Budget {
- return nil, fee, fmt.Errorf("%w: budget=%v, fee=%v",
- ErrNotEnoughBudget, req.Budget, fee)
- }
- // Validate the tx's mempool acceptance.
- err = t.cfg.Wallet.CheckMempoolAcceptance(tx)
- // Exit early if the tx is valid.
- if err == nil {
- return tx, fee, nil
- }
- // Print an error log if the chain backend doesn't support the mempool
- // acceptance test RPC.
- if errors.Is(err, rpcclient.ErrBackendVersion) {
- log.Errorf("TestMempoolAccept not supported by backend, " +
- "consider upgrading it to a newer version")
- return tx, fee, nil
- }
- // We are running on a backend that doesn't implement the RPC
- // testmempoolaccept, eg, neutrino, so we'll skip the check.
- if errors.Is(err, chain.ErrUnimplemented) {
- log.Debug("Skipped testmempoolaccept due to not implemented")
- return tx, fee, nil
- }
- return nil, fee, fmt.Errorf("tx=%v failed mempool check: %w",
- tx.TxHash(), err)
- }
- // broadcast takes a monitored tx and publishes it to the network. Prior to the
- // broadcast, it will subscribe the tx's confirmation notification and attach
- // the event channel to the record. Any broadcast-related errors will not be
- // returned here, instead, they will be put inside the `BumpResult` and
- // returned to the caller.
- func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) {
- // Get the record being monitored.
- record, ok := t.records.Load(requestID)
- if !ok {
- return nil, fmt.Errorf("tx record %v not found", requestID)
- }
- txid := record.tx.TxHash()
- tx := record.tx
- log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
- txid, len(tx.TxIn), t.currentHeight.Load())
- // Set the event, and change it to TxFailed if the wallet fails to
- // publish it.
- event := TxPublished
- // Publish the sweeping tx with customized label. If the publish fails,
- // this error will be saved in the `BumpResult` and it will be removed
- // from being monitored.
- err := t.cfg.Wallet.PublishTransaction(
- tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
- )
- if err != nil {
- // NOTE: we decide to attach this error to the result instead
- // of returning it here because by the time the tx reaches
- // here, it should have passed the mempool acceptance check. If
- // it still fails to be broadcast, it's likely a non-RBF
- // related error happened. So we send this error back to the
- // caller so that it can handle it properly.
- //
- // TODO(yy): find out which input is causing the failure.
- log.Errorf("Failed to publish tx %v: %v", txid, err)
- event = TxFailed
- }
- result := &BumpResult{
- Event: event,
- Tx: record.tx,
- Fee: record.fee,
- FeeRate: record.feeFunction.FeeRate(),
- Err: err,
- requestID: requestID,
- }
- return result, nil
- }
- // notifyResult sends the result to the resultChan specified by the requestID.
- // This channel is expected to be read by the caller.
- func (t *TxPublisher) notifyResult(result *BumpResult) {
- id := result.requestID
- subscriber, ok := t.subscriberChans.Load(id)
- if !ok {
- log.Errorf("Result chan for id=%v not found", id)
- return
- }
- log.Debugf("Sending result for requestID=%v, tx=%v", id,
- result.Tx.TxHash())
- select {
- // Send the result to the subscriber.
- //
- // TODO(yy): Add timeout in case it's blocking?
- case subscriber <- result:
- case <-t.quit:
- log.Debug("Fee bumper stopped")
- }
- }
- // removeResult removes the tracking of the result if the result contains a
- // non-nil error, or the tx is confirmed, the record will be removed from the
- // maps.
- func (t *TxPublisher) removeResult(result *BumpResult) {
- id := result.requestID
- // Remove the record from the maps if there's an error. This means this
- // tx has failed its broadcast and cannot be retried. There are two
- // cases,
- // - when the budget cannot cover the fee.
- // - when a non-RBF related error occurs.
- switch result.Event {
- case TxFailed:
- log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
- id, result.Tx.TxHash(), result.Err)
- case TxConfirmed:
- // Remove the record is the tx is confirmed.
- log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
- result.Tx.TxHash())
- // Do nothing if it's neither failed or confirmed.
- default:
- log.Tracef("Skipping record removal for id=%v, event=%v", id,
- result.Event)
- return
- }
- t.records.Delete(id)
- t.subscriberChans.Delete(id)
- }
- // handleResult handles the result of a tx broadcast. It will notify the
- // subscriber and remove the record if the tx is confirmed or failed to be
- // broadcast.
- func (t *TxPublisher) handleResult(result *BumpResult) {
- // Notify the subscriber.
- t.notifyResult(result)
- // Remove the record if it's failed or confirmed.
- t.removeResult(result)
- }
- // monitorRecord is used to keep track of the tx being monitored by the
- // publisher internally.
- type monitorRecord struct {
- // tx is the tx being monitored.
- tx *wire.MsgTx
- // req is the original request.
- req *BumpRequest
- // feeFunction is the fee bumping algorithm used by the publisher.
- feeFunction FeeFunction
- // fee is the fee paid by the tx.
- fee btcutil.Amount
- }
- // Start starts the publisher by subscribing to block epoch updates and kicking
- // off the monitor loop.
- func (t *TxPublisher) Start() error {
- log.Info("TxPublisher starting...")
- defer log.Debugf("TxPublisher started")
- blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
- if err != nil {
- return fmt.Errorf("register block epoch ntfn: %w", err)
- }
- t.wg.Add(1)
- go t.monitor(blockEvent)
- return nil
- }
- // Stop stops the publisher and waits for the monitor loop to exit.
- func (t *TxPublisher) Stop() {
- log.Info("TxPublisher stopping...")
- defer log.Debugf("TxPublisher stopped")
- close(t.quit)
- t.wg.Wait()
- }
- // monitor is the main loop driven by new blocks. Whevenr a new block arrives,
- // it will examine all the txns being monitored, and check if any of them needs
- // to be bumped. If so, it will attempt to bump the fee of the tx.
- //
- // NOTE: Must be run as a goroutine.
- func (t *TxPublisher) monitor(blockEvent *chainntnfs.BlockEpochEvent) {
- defer blockEvent.Cancel()
- defer t.wg.Done()
- for {
- select {
- case epoch, ok := <-blockEvent.Epochs:
- if !ok {
- // We should stop the publisher before stopping
- // the chain service. Otherwise it indicates an
- // error.
- log.Error("Block epoch channel closed, exit " +
- "monitor")
- return
- }
- log.Debugf("TxPublisher received new block: %v",
- epoch.Height)
- // Update the best known height for the publisher.
- t.currentHeight.Store(epoch.Height)
- // Check all monitored txns to see if any of them needs
- // to be bumped.
- t.processRecords()
- case <-t.quit:
- log.Debug("Fee bumper stopped, exit monitor")
- return
- }
- }
- }
- // processRecords checks all the txns being monitored, and checks if any of
- // them needs to be bumped. If so, it will attempt to bump the fee of the tx.
- func (t *TxPublisher) processRecords() {
- // confirmedRecords stores a map of the records which have been
- // confirmed.
- confirmedRecords := make(map[uint64]*monitorRecord)
- // feeBumpRecords stores a map of the records which need to be bumped.
- feeBumpRecords := make(map[uint64]*monitorRecord)
- // failedRecords stores a map of the records which has inputs being
- // spent by a third party.
- //
- // NOTE: this is only used for neutrino backend.
- failedRecords := make(map[uint64]*monitorRecord)
- // visitor is a helper closure that visits each record and divides them
- // into two groups.
- visitor := func(requestID uint64, r *monitorRecord) error {
- log.Tracef("Checking monitor recordID=%v for tx=%v", requestID,
- r.tx.TxHash())
- // If the tx is already confirmed, we can stop monitoring it.
- if t.isConfirmed(r.tx.TxHash()) {
- confirmedRecords[requestID] = r
- // Move to the next record.
- return nil
- }
- // Check whether the inputs has been spent by a third party.
- //
- // NOTE: this check is only done for neutrino backend.
- if t.isThirdPartySpent(r.tx.TxHash(), r.req.Inputs) {
- failedRecords[requestID] = r
- // Move to the next record.
- return nil
- }
- feeBumpRecords[requestID] = r
- // Return nil to move to the next record.
- return nil
- }
- // Iterate through all the records and divide them into two groups.
- t.records.ForEach(visitor)
- // For records that are confirmed, we'll notify the caller about this
- // result.
- for requestID, r := range confirmedRecords {
- rec := r
- log.Debugf("Tx=%v is confirmed", r.tx.TxHash())
- t.wg.Add(1)
- go t.handleTxConfirmed(rec, requestID)
- }
- // Get the current height to be used in the following goroutines.
- currentHeight := t.currentHeight.Load()
- // For records that are not confirmed, we perform a fee bump if needed.
- for requestID, r := range feeBumpRecords {
- rec := r
- log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash())
- t.wg.Add(1)
- go t.handleFeeBumpTx(requestID, rec, currentHeight)
- }
- // For records that are failed, we'll notify the caller about this
- // result.
- for requestID, r := range failedRecords {
- rec := r
- log.Debugf("Tx=%v has inputs been spent by a third party, "+
- "failing it now", r.tx.TxHash())
- t.wg.Add(1)
- go t.handleThirdPartySpent(rec, requestID)
- }
- }
- // handleTxConfirmed is called when a monitored tx is confirmed. It will
- // notify the subscriber then remove the record from the maps .
- //
- // NOTE: Must be run as a goroutine to avoid blocking on sending the result.
- func (t *TxPublisher) handleTxConfirmed(r *monitorRecord, requestID uint64) {
- defer t.wg.Done()
- // Create a result that will be sent to the resultChan which is
- // listened by the caller.
- result := &BumpResult{
- Event: TxConfirmed,
- Tx: r.tx,
- requestID: requestID,
- Fee: r.fee,
- FeeRate: r.feeFunction.FeeRate(),
- }
- // Notify that this tx is confirmed and remove the record from the map.
- t.handleResult(result)
- }
- // handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
- // attempt to bump the fee of the tx.
- //
- // NOTE: Must be run as a goroutine to avoid blocking on sending the result.
- func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord,
- currentHeight int32) {
- defer t.wg.Done()
- oldTxid := r.tx.TxHash()
- // Get the current conf target for this record.
- confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
- // Ask the fee function whether a bump is needed. We expect the fee
- // function to increase its returned fee rate after calling this
- // method.
- increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
- if err != nil {
- // TODO(yy): send this error back to the sweeper so it can
- // re-group the inputs?
- log.Errorf("Failed to increase fee rate for tx %v at "+
- "height=%v: %v", oldTxid, t.currentHeight.Load(), err)
- return
- }
- // If the fee rate was not increased, there's no need to bump the fee.
- if !increased {
- log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
- t.currentHeight.Load())
- return
- }
- // The fee function now has a new fee rate, we will use it to bump the
- // fee of the tx.
- resultOpt := t.createAndPublishTx(requestID, r)
- // If there's a result, we will notify the caller about the result.
- resultOpt.WhenSome(func(result BumpResult) {
- // Notify the new result.
- t.handleResult(&result)
- })
- }
- // handleThirdPartySpent is called when the inputs in an unconfirmed tx is
- // spent. It will notify the subscriber then remove the record from the maps
- // and send a TxFailed event to the subscriber.
- //
- // NOTE: Must be run as a goroutine to avoid blocking on sending the result.
- func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord,
- requestID uint64) {
- defer t.wg.Done()
- // Create a result that will be sent to the resultChan which is
- // listened by the caller.
- //
- // TODO(yy): create a new state `TxThirdPartySpent` to notify the
- // sweeper to remove the input, hence moving the monitoring of inputs
- // spent inside the fee bumper.
- result := &BumpResult{
- Event: TxFailed,
- Tx: r.tx,
- requestID: requestID,
- Err: ErrThirdPartySpent,
- }
- // Notify that this tx is confirmed and remove the record from the map.
- t.handleResult(result)
- }
- // createAndPublishTx creates a new tx with a higher fee rate and publishes it
- // to the network. It will update the record with the new tx and fee rate if
- // successfully created, and return the result when published successfully.
- func (t *TxPublisher) createAndPublishTx(requestID uint64,
- r *monitorRecord) fn.Option[BumpResult] {
- // Fetch the old tx.
- oldTx := r.tx
- // Create a new tx with the new fee rate.
- //
- // NOTE: The fee function is expected to have increased its returned
- // fee rate after calling the SkipFeeBump method. So we can use it
- // directly here.
- tx, fee, err := t.createAndCheckTx(r.req, r.feeFunction)
- // If the error is fee related, we will return no error and let the fee
- // bumper retry it at next block.
- //
- // NOTE: we can check the RBF error here and ask the fee function to
- // recalculate the fee rate. However, this would defeat the purpose of
- // using a deadline based fee function:
- // - if the deadline is far away, there's no rush to RBF the tx.
- // - if the deadline is close, we expect the fee function to give us a
- // higher fee rate. If the fee rate cannot satisfy the RBF rules, it
- // means the budget is not enough.
- if errors.Is(err, rpcclient.ErrInsufficientFee) ||
- errors.Is(err, lnwallet.ErrMempoolFee) {
- log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
- return fn.None[BumpResult]()
- }
- // If the error is not fee related, we will return a `TxFailed` event
- // so this input can be retried.
- if err != nil {
- // If the tx doesn't not have enought budget, we will return a
- // result so the sweeper can handle it by re-clustering the
- // utxos.
- if errors.Is(err, ErrNotEnoughBudget) {
- log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(),
- err)
- } else {
- // Otherwise, an unexpected error occurred, we will
- // fail the tx and let the sweeper retry the whole
- // process.
- log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(),
- err)
- }
- return fn.Some(BumpResult{
- Event: TxFailed,
- Tx: oldTx,
- Err: err,
- requestID: requestID,
- })
- }
- // The tx has been created without any errors, we now register a new
- // record by overwriting the same requestID.
- t.records.Store(requestID, &monitorRecord{
- tx: tx,
- req: r.req,
- feeFunction: r.feeFunction,
- fee: fee,
- })
- // Attempt to broadcast this new tx.
- result, err := t.broadcast(requestID)
- if err != nil {
- log.Infof("Failed to broadcast replacement tx %v: %v",
- tx.TxHash(), err)
- return fn.None[BumpResult]()
- }
- // If the result error is fee related, we will return no error and let
- // the fee bumper retry it at next block.
- //
- // NOTE: we may get this error if we've bypassed the mempool check,
- // which means we are suing neutrino backend.
- if errors.Is(result.Err, rpcclient.ErrInsufficientFee) ||
- errors.Is(result.Err, lnwallet.ErrMempoolFee) {
- log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
- return fn.None[BumpResult]()
- }
- // A successful replacement tx is created, attach the old tx.
- result.ReplacedTx = oldTx
- // If the new tx failed to be published, we will return the result so
- // the caller can handle it.
- if result.Event == TxFailed {
- return fn.Some(*result)
- }
- log.Infof("Replaced tx=%v with new tx=%v", oldTx.TxHash(), tx.TxHash())
- // Otherwise, it's a successful RBF, set the event and return.
- result.Event = TxReplaced
- return fn.Some(*result)
- }
- // isConfirmed checks the btcwallet to see whether the tx is confirmed.
- func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool {
- details, err := t.cfg.Wallet.GetTransactionDetails(&txid)
- if err != nil {
- log.Warnf("Failed to get tx details for %v: %v", txid, err)
- return false
- }
- return details.NumConfirmations > 0
- }
- // isThirdPartySpent checks whether the inputs of the tx has already been spent
- // by a third party. When a tx is not confirmed, yet its inputs has been spent,
- // then it must be spent by a different tx other than the sweeping tx here.
- //
- // NOTE: this check is only performed for neutrino backend as it has no
- // reliable way to tell a tx has been replaced.
- func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash,
- inputs []input.Input) bool {
- // Skip this check for if this is not neutrino backend.
- if !t.isNeutrinoBackend() {
- return false
- }
- // Iterate all the inputs and check if they have been spent already.
- for _, inp := range inputs {
- op := inp.OutPoint()
- // For wallet utxos, the height hint is not set - we don't need
- // to monitor them for third party spend.
- heightHint := inp.HeightHint()
- if heightHint == 0 {
- log.Debugf("Skipped third party check for wallet "+
- "input %v", op)
- continue
- }
- // If the input has already been spent after the height hint, a
- // spend event is sent back immediately.
- spendEvent, err := t.cfg.Notifier.RegisterSpendNtfn(
- &op, inp.SignDesc().Output.PkScript, heightHint,
- )
- if err != nil {
- log.Criticalf("Failed to register spend ntfn for "+
- "input=%v: %v", op, err)
- return false
- }
- // Remove the subscription when exit.
- defer spendEvent.Cancel()
- // Do a non-blocking read to see if the output has been spent.
- select {
- case spend, ok := <-spendEvent.Spend:
- if !ok {
- log.Debugf("Spend ntfn for %v canceled", op)
- return false
- }
- spendingTxID := spend.SpendingTx.TxHash()
- // If the spending tx is the same as the sweeping tx
- // then we are good.
- if spendingTxID == txid {
- continue
- }
- log.Warnf("Detected third party spent of output=%v "+
- "in tx=%v", op, spend.SpendingTx.TxHash())
- return true
- // Move to the next input.
- default:
- }
- }
- return false
- }
- // calcCurrentConfTarget calculates the current confirmation target based on
- // the deadline height. The conf target is capped at 0 if the deadline has
- // already been past.
- func calcCurrentConfTarget(currentHeight, deadline int32) uint32 {
- var confTarget uint32
- // Calculate how many blocks left until the deadline.
- deadlineDelta := deadline - currentHeight
- // If we are already past the deadline, we will set the conf target to
- // be 1.
- if deadlineDelta < 0 {
- log.Warnf("Deadline is %d blocks behind current height %v",
- -deadlineDelta, currentHeight)
- confTarget = 0
- } else {
- confTarget = uint32(deadlineDelta)
- }
- return confTarget
- }
- // createSweepTx creates a sweeping tx based on the given inputs, change
- // address and fee rate.
- func (t *TxPublisher) createSweepTx(inputs []input.Input, changePkScript []byte,
- feeRate chainfee.SatPerKWeight) (*wire.MsgTx, btcutil.Amount, error) {
- // Validate and calculate the fee and change amount.
- txFee, changeAmtOpt, locktimeOpt, err := prepareSweepTx(
- inputs, changePkScript, feeRate, t.currentHeight.Load(),
- )
- if err != nil {
- return nil, 0, err
- }
- var (
- // Create the sweep transaction that we will be building. We
- // use version 2 as it is required for CSV.
- sweepTx = wire.NewMsgTx(2)
- // We'll add the inputs as we go so we know the final ordering
- // of inputs to sign.
- idxs []input.Input
- )
- // We start by adding all inputs that commit to an output. We do this
- // since the input and output index must stay the same for the
- // signatures to be valid.
- for _, o := range inputs {
- if o.RequiredTxOut() == nil {
- continue
- }
- idxs = append(idxs, o)
- sweepTx.AddTxIn(&wire.TxIn{
- PreviousOutPoint: o.OutPoint(),
- Sequence: o.BlocksToMaturity(),
- })
- sweepTx.AddTxOut(o.RequiredTxOut())
- }
- // Sum up the value contained in the remaining inputs, and add them to
- // the sweep transaction.
- for _, o := range inputs {
- if o.RequiredTxOut() != nil {
- continue
- }
- idxs = append(idxs, o)
- sweepTx.AddTxIn(&wire.TxIn{
- PreviousOutPoint: o.OutPoint(),
- Sequence: o.BlocksToMaturity(),
- })
- }
- // If there's a change amount, add it to the transaction.
- changeAmtOpt.WhenSome(func(changeAmt btcutil.Amount) {
- sweepTx.AddTxOut(&wire.TxOut{
- PkScript: changePkScript,
- Value: int64(changeAmt),
- })
- })
- // We'll default to using the current block height as locktime, if none
- // of the inputs commits to a different locktime.
- sweepTx.LockTime = uint32(locktimeOpt.UnwrapOr(t.currentHeight.Load()))
- prevInputFetcher, err := input.MultiPrevOutFetcher(inputs)
- if err != nil {
- return nil, 0, fmt.Errorf("error creating prev input fetcher "+
- "for hash cache: %v", err)
- }
- hashCache := txscript.NewTxSigHashes(sweepTx, prevInputFetcher)
- // With all the inputs in place, use each output's unique input script
- // function to generate the final witness required for spending.
- addInputScript := func(idx int, tso input.Input) error {
- inputScript, err := tso.CraftInputScript(
- t.cfg.Signer, sweepTx, hashCache, prevInputFetcher, idx,
- )
- if err != nil {
- return err
- }
- sweepTx.TxIn[idx].Witness = inputScript.Witness
- if len(inputScript.SigScript) == 0 {
- return nil
- }
- sweepTx.TxIn[idx].SignatureScript = inputScript.SigScript
- return nil
- }
- for idx, inp := range idxs {
- if err := addInputScript(idx, inp); err != nil {
- return nil, 0, err
- }
- }
- log.Debugf("Created sweep tx %v for %v inputs", sweepTx.TxHash(),
- len(inputs))
- return sweepTx, txFee, nil
- }
- // prepareSweepTx returns the tx fee, an optional change amount and an optional
- // locktime after a series of validations:
- // 1. check the locktime has been reached.
- // 2. check the locktimes are the same.
- // 3. check the inputs cover the outputs.
- //
- // NOTE: if the change amount is below dust, it will be added to the tx fee.
- func prepareSweepTx(inputs []input.Input, changePkScript []byte,
- feeRate chainfee.SatPerKWeight, currentHeight int32) (
- btcutil.Amount, fn.Option[btcutil.Amount], fn.Option[int32], error) {
- noChange := fn.None[btcutil.Amount]()
- noLocktime := fn.None[int32]()
- // Creating a weight estimator with nil outputs and zero max fee rate.
- // We don't allow adding customized outputs in the sweeping tx, and the
- // fee rate is already being managed before we get here.
- inputs, estimator, err := getWeightEstimate(
- inputs, nil, feeRate, 0, changePkScript,
- )
- if err != nil {
- return 0, noChange, noLocktime, err
- }
- txFee := estimator.fee()
- var (
- // Track whether any of the inputs require a certain locktime.
- locktime = int32(-1)
- // We keep track of total input amount, and required output
- // amount to use for calculating the change amount below.
- totalInput btcutil.Amount
- requiredOutput btcutil.Amount
- )
- // Go through each input and check if the required lock times have
- // reached and are the same.
- for _, o := range inputs {
- // If the input has a required output, we'll add it to the
- // required output amount.
- if o.RequiredTxOut() != nil {
- requiredOutput += btcutil.Amount(
- o.RequiredTxOut().Value,
- )
- }
- // Update the total input amount.
- totalInput += btcutil.Amount(o.SignDesc().Output.Value)
- lt, ok := o.RequiredLockTime()
- // Skip if the input doesn't require a lock time.
- if !ok {
- continue
- }
- // Check if the lock time has reached
- if lt > uint32(currentHeight) {
- return 0, noChange, noLocktime, ErrLocktimeImmature
- }
- // If another input commits to a different locktime, they
- // cannot be combined in the same transaction.
- if locktime != -1 && locktime != int32(lt) {
- return 0, noChange, noLocktime, ErrLocktimeConflict
- }
- // Update the locktime for next iteration.
- locktime = int32(lt)
- }
- // Make sure total output amount is less than total input amount.
- if requiredOutput+txFee > totalInput {
- return 0, noChange, noLocktime, fmt.Errorf("insufficient "+
- "input to create sweep tx: input_sum=%v, "+
- "output_sum=%v", totalInput, requiredOutput+txFee)
- }
- // The value remaining after the required output and fees is the
- // change output.
- changeAmt := totalInput - requiredOutput - txFee
- changeAmtOpt := fn.Some(changeAmt)
- // We'll calculate the dust limit for the given changePkScript since it
- // is variable.
- changeFloor := lnwallet.DustLimitForSize(len(changePkScript))
- // If the change amount is dust, we'll move it into the fees.
- if changeAmt < changeFloor {
- log.Infof("Change amt %v below dustlimit %v, not adding "+
- "change output", changeAmt, changeFloor)
- // If there's no required output, and the change output is a
- // dust, it means we are creating a tx without any outputs. In
- // this case we'll return an error. This could happen when
- // creating a tx that has an anchor as the only input.
- if requiredOutput == 0 {
- return 0, noChange, noLocktime, ErrTxNoOutput
- }
- // The dust amount is added to the fee.
- txFee += changeAmt
- // Set the change amount to none.
- changeAmtOpt = fn.None[btcutil.Amount]()
- }
- // Optionally set the locktime.
- locktimeOpt := fn.Some(locktime)
- if locktime == -1 {
- locktimeOpt = noLocktime
- }
- log.Debugf("Creating sweep tx for %v inputs (%s) using %v, "+
- "tx_weight=%v, tx_fee=%v, locktime=%v, parents_count=%v, "+
- "parents_fee=%v, parents_weight=%v, current_height=%v",
- len(inputs), inputTypeSummary(inputs), feeRate,
- estimator.weight(), txFee, locktimeOpt, len(estimator.parents),
- estimator.parentsFee, estimator.parentsWeight, currentHeight)
- return txFee, changeAmtOpt, locktimeOpt, nil
- }
|