123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418 |
- package htlcswitch
- import (
- "bytes"
- "encoding/binary"
- "errors"
- "fmt"
- "sync"
- "sync/atomic"
- sphinx "github.com/lightningnetwork/lightning-onion"
- "github.com/lightningnetwork/lnd/chainntnfs"
- "github.com/lightningnetwork/lnd/kvdb"
- )
- const (
- // defaultDbDirectory is the default directory where our decayed log
- // will store our (sharedHash, CLTV) key-value pairs.
- defaultDbDirectory = "sharedhashes"
- )
- var (
- // sharedHashBucket is a bucket which houses the first HashPrefixSize
- // bytes of a received HTLC's hashed shared secret as the key and the HTLC's
- // CLTV expiry as the value.
- sharedHashBucket = []byte("shared-hash")
- // batchReplayBucket is a bucket that maps batch identifiers to
- // serialized ReplaySets. This is used to give idempotency in the event
- // that a batch is processed more than once.
- batchReplayBucket = []byte("batch-replay")
- )
- var (
- // ErrDecayedLogInit is used to indicate a decayed log failed to create
- // the proper bucketing structure on startup.
- ErrDecayedLogInit = errors.New("unable to initialize decayed log")
- // ErrDecayedLogCorrupted signals that the anticipated bucketing
- // structure has diverged since initialization.
- ErrDecayedLogCorrupted = errors.New("decayed log structure corrupted")
- )
- // NewBoltBackendCreator returns a function that creates a new bbolt backend for
- // the decayed logs database.
- func NewBoltBackendCreator(dbPath,
- dbFileName string) func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
- return func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
- cfg := &kvdb.BoltBackendConfig{
- DBPath: dbPath,
- DBFileName: dbFileName,
- NoFreelistSync: boltCfg.NoFreelistSync,
- AutoCompact: boltCfg.AutoCompact,
- AutoCompactMinAge: boltCfg.AutoCompactMinAge,
- DBTimeout: boltCfg.DBTimeout,
- }
- // Use default path for log database.
- if dbPath == "" {
- cfg.DBPath = defaultDbDirectory
- }
- db, err := kvdb.GetBoltBackend(cfg)
- if err != nil {
- return nil, fmt.Errorf("could not open boltdb: %w", err)
- }
- return db, nil
- }
- }
- // DecayedLog implements the PersistLog interface. It stores the first
- // HashPrefixSize bytes of a sha256-hashed shared secret along with a node's
- // CLTV value. It is a decaying log meaning there will be a garbage collector
- // to collect entries which are expired according to their stored CLTV value
- // and the current block height. DecayedLog wraps boltdb for simplicity and
- // batches writes to the database to decrease write contention.
- type DecayedLog struct {
- started int32 // To be used atomically.
- stopped int32 // To be used atomically.
- db kvdb.Backend
- notifier chainntnfs.ChainNotifier
- wg sync.WaitGroup
- quit chan struct{}
- }
- // NewDecayedLog creates a new DecayedLog, which caches recently seen hash
- // shared secrets. Entries are evicted as their cltv expires using block epochs
- // from the given notifier.
- func NewDecayedLog(db kvdb.Backend,
- notifier chainntnfs.ChainNotifier) *DecayedLog {
- return &DecayedLog{
- db: db,
- notifier: notifier,
- quit: make(chan struct{}),
- }
- }
- // Start opens the database we will be using to store hashed shared secrets.
- // It also starts the garbage collector in a goroutine to remove stale
- // database entries.
- func (d *DecayedLog) Start() error {
- if !atomic.CompareAndSwapInt32(&d.started, 0, 1) {
- return nil
- }
- // Initialize the primary buckets used by the decayed log.
- if err := d.initBuckets(); err != nil {
- return err
- }
- // Start garbage collector.
- if d.notifier != nil {
- epochClient, err := d.notifier.RegisterBlockEpochNtfn(nil)
- if err != nil {
- return fmt.Errorf("unable to register for epoch "+
- "notifications: %v", err)
- }
- d.wg.Add(1)
- go d.garbageCollector(epochClient)
- }
- return nil
- }
- // initBuckets initializes the primary buckets used by the decayed log, namely
- // the shared hash bucket, and batch replay
- func (d *DecayedLog) initBuckets() error {
- return kvdb.Update(d.db, func(tx kvdb.RwTx) error {
- _, err := tx.CreateTopLevelBucket(sharedHashBucket)
- if err != nil {
- return ErrDecayedLogInit
- }
- _, err = tx.CreateTopLevelBucket(batchReplayBucket)
- if err != nil {
- return ErrDecayedLogInit
- }
- return nil
- }, func() {})
- }
- // Stop halts the garbage collector and closes boltdb.
- func (d *DecayedLog) Stop() error {
- if !atomic.CompareAndSwapInt32(&d.stopped, 0, 1) {
- return nil
- }
- // Stop garbage collector.
- close(d.quit)
- d.wg.Wait()
- return nil
- }
- // garbageCollector deletes entries from sharedHashBucket whose expiry height
- // has already past. This function MUST be run as a goroutine.
- func (d *DecayedLog) garbageCollector(epochClient *chainntnfs.BlockEpochEvent) {
- defer d.wg.Done()
- defer epochClient.Cancel()
- for {
- select {
- case epoch, ok := <-epochClient.Epochs:
- if !ok {
- // Block epoch was canceled, shutting down.
- log.Infof("Block epoch canceled, " +
- "decaying hash log shutting down")
- return
- }
- // Perform a bout of garbage collection using the
- // epoch's block height.
- height := uint32(epoch.Height)
- numExpired, err := d.gcExpiredHashes(height)
- if err != nil {
- log.Errorf("unable to expire hashes at "+
- "height=%d", height)
- }
- if numExpired > 0 {
- log.Infof("Garbage collected %v shared "+
- "secret hashes at height=%v",
- numExpired, height)
- }
- case <-d.quit:
- // Received shutdown request.
- log.Infof("Decaying hash log received " +
- "shutdown request")
- return
- }
- }
- }
- // gcExpiredHashes purges the decaying log of all entries whose CLTV expires
- // below the provided height.
- func (d *DecayedLog) gcExpiredHashes(height uint32) (uint32, error) {
- var numExpiredHashes uint32
- err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
- numExpiredHashes = 0
- // Grab the shared hash bucket
- sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
- if sharedHashes == nil {
- return fmt.Errorf("sharedHashBucket " +
- "is nil")
- }
- var expiredCltv [][]byte
- if err := sharedHashes.ForEach(func(k, v []byte) error {
- // Deserialize the CLTV value for this entry.
- cltv := uint32(binary.BigEndian.Uint32(v))
- if cltv < height {
- // This CLTV is expired. We must add it to an
- // array which we'll loop over and delete every
- // hash contained from the db.
- expiredCltv = append(expiredCltv, k)
- numExpiredHashes++
- }
- return nil
- }); err != nil {
- return err
- }
- // Delete every item in the array. This must
- // be done explicitly outside of the ForEach
- // function for safety reasons.
- for _, hash := range expiredCltv {
- err := sharedHashes.Delete(hash)
- if err != nil {
- return err
- }
- }
- return nil
- })
- if err != nil {
- return 0, err
- }
- return numExpiredHashes, nil
- }
- // Delete removes a <shared secret hash, CLTV> key-pair from the
- // sharedHashBucket.
- func (d *DecayedLog) Delete(hash *sphinx.HashPrefix) error {
- return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
- sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
- if sharedHashes == nil {
- return ErrDecayedLogCorrupted
- }
- return sharedHashes.Delete(hash[:])
- })
- }
- // Get retrieves the CLTV of a processed HTLC given the first 20 bytes of the
- // Sha-256 hash of the shared secret.
- func (d *DecayedLog) Get(hash *sphinx.HashPrefix) (uint32, error) {
- var value uint32
- err := kvdb.View(d.db, func(tx kvdb.RTx) error {
- // Grab the shared hash bucket which stores the mapping from
- // truncated sha-256 hashes of shared secrets to CLTV's.
- sharedHashes := tx.ReadBucket(sharedHashBucket)
- if sharedHashes == nil {
- return fmt.Errorf("sharedHashes is nil, could " +
- "not retrieve CLTV value")
- }
- // Retrieve the bytes which represents the CLTV
- valueBytes := sharedHashes.Get(hash[:])
- if valueBytes == nil {
- return sphinx.ErrLogEntryNotFound
- }
- // The first 4 bytes represent the CLTV, store it in value.
- value = uint32(binary.BigEndian.Uint32(valueBytes))
- return nil
- }, func() {
- value = 0
- })
- if err != nil {
- return value, err
- }
- return value, nil
- }
- // Put stores a shared secret hash as the key and the CLTV as the value.
- func (d *DecayedLog) Put(hash *sphinx.HashPrefix, cltv uint32) error {
- // Optimisitically serialize the cltv value into the scratch buffer.
- var scratch [4]byte
- binary.BigEndian.PutUint32(scratch[:], cltv)
- return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
- sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
- if sharedHashes == nil {
- return ErrDecayedLogCorrupted
- }
- // Check to see if this hash prefix has been recorded before. If
- // a value is found, this packet is being replayed.
- valueBytes := sharedHashes.Get(hash[:])
- if valueBytes != nil {
- return sphinx.ErrReplayedPacket
- }
- return sharedHashes.Put(hash[:], scratch[:])
- })
- }
- // PutBatch accepts a pending batch of hashed secret entries to write to disk.
- // Each hashed secret is inserted with a corresponding time value, dictating
- // when the entry will be evicted from the log.
- // NOTE: This method enforces idempotency by writing the replay set obtained
- // from the first attempt for a particular batch ID, and decoding the return
- // value to subsequent calls. For the indices of the replay set to be aligned
- // properly, the batch MUST be constructed identically to the first attempt,
- // pruning will cause the indices to become invalid.
- func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) {
- // Since batched boltdb txns may be executed multiple times before
- // succeeding, we will create a new replay set for each invocation to
- // avoid any side-effects. If the txn is successful, this replay set
- // will be merged with the replay set computed during batch construction
- // to generate the complete replay set. If this batch was previously
- // processed, the replay set will be deserialized from disk.
- var replays *sphinx.ReplaySet
- if err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
- sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
- if sharedHashes == nil {
- return ErrDecayedLogCorrupted
- }
- // Load the batch replay bucket, which will be used to either
- // retrieve the result of previously processing this batch, or
- // to write the result of this operation.
- batchReplayBkt := tx.ReadWriteBucket(batchReplayBucket)
- if batchReplayBkt == nil {
- return ErrDecayedLogCorrupted
- }
- // Check for the existence of this batch's id in the replay
- // bucket. If a non-nil value is found, this indicates that we
- // have already processed this batch before. We deserialize the
- // resulting and return it to ensure calls to put batch are
- // idempotent.
- replayBytes := batchReplayBkt.Get(b.ID)
- if replayBytes != nil {
- replays = sphinx.NewReplaySet()
- return replays.Decode(bytes.NewReader(replayBytes))
- }
- // The CLTV will be stored into scratch and then stored into the
- // sharedHashBucket.
- var scratch [4]byte
- replays = sphinx.NewReplaySet()
- err := b.ForEach(func(seqNum uint16, hashPrefix *sphinx.HashPrefix, cltv uint32) error {
- // Retrieve the bytes which represents the CLTV
- valueBytes := sharedHashes.Get(hashPrefix[:])
- if valueBytes != nil {
- replays.Add(seqNum)
- return nil
- }
- // Serialize the cltv value and write an entry keyed by
- // the hash prefix.
- binary.BigEndian.PutUint32(scratch[:], cltv)
- return sharedHashes.Put(hashPrefix[:], scratch[:])
- })
- if err != nil {
- return err
- }
- // Merge the replay set computed from checking the on-disk
- // entries with the in-batch replays computed during this
- // batch's construction.
- replays.Merge(b.ReplaySet)
- // Write the replay set under the batch identifier to the batch
- // replays bucket. This can be used during recovery to test (1)
- // that a particular batch was successfully processed and (2)
- // recover the indexes of the adds that were rejected as
- // replays.
- var replayBuf bytes.Buffer
- if err := replays.Encode(&replayBuf); err != nil {
- return err
- }
- return batchReplayBkt.Put(b.ID, replayBuf.Bytes())
- }); err != nil {
- return nil, err
- }
- b.ReplaySet = replays
- b.IsCommitted = true
- return replays, nil
- }
- // A compile time check to see if DecayedLog adheres to the PersistLog
- // interface.
- var _ sphinx.ReplayLog = (*DecayedLog)(nil)
|