decayedlog.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. package htlcswitch
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "sync"
  8. "sync/atomic"
  9. sphinx "github.com/lightningnetwork/lightning-onion"
  10. "github.com/lightningnetwork/lnd/chainntnfs"
  11. "github.com/lightningnetwork/lnd/kvdb"
  12. )
  13. const (
  14. // defaultDbDirectory is the default directory where our decayed log
  15. // will store our (sharedHash, CLTV) key-value pairs.
  16. defaultDbDirectory = "sharedhashes"
  17. )
  18. var (
  19. // sharedHashBucket is a bucket which houses the first HashPrefixSize
  20. // bytes of a received HTLC's hashed shared secret as the key and the HTLC's
  21. // CLTV expiry as the value.
  22. sharedHashBucket = []byte("shared-hash")
  23. // batchReplayBucket is a bucket that maps batch identifiers to
  24. // serialized ReplaySets. This is used to give idempotency in the event
  25. // that a batch is processed more than once.
  26. batchReplayBucket = []byte("batch-replay")
  27. )
  28. var (
  29. // ErrDecayedLogInit is used to indicate a decayed log failed to create
  30. // the proper bucketing structure on startup.
  31. ErrDecayedLogInit = errors.New("unable to initialize decayed log")
  32. // ErrDecayedLogCorrupted signals that the anticipated bucketing
  33. // structure has diverged since initialization.
  34. ErrDecayedLogCorrupted = errors.New("decayed log structure corrupted")
  35. )
  36. // NewBoltBackendCreator returns a function that creates a new bbolt backend for
  37. // the decayed logs database.
  38. func NewBoltBackendCreator(dbPath,
  39. dbFileName string) func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
  40. return func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
  41. cfg := &kvdb.BoltBackendConfig{
  42. DBPath: dbPath,
  43. DBFileName: dbFileName,
  44. NoFreelistSync: boltCfg.NoFreelistSync,
  45. AutoCompact: boltCfg.AutoCompact,
  46. AutoCompactMinAge: boltCfg.AutoCompactMinAge,
  47. DBTimeout: boltCfg.DBTimeout,
  48. }
  49. // Use default path for log database.
  50. if dbPath == "" {
  51. cfg.DBPath = defaultDbDirectory
  52. }
  53. db, err := kvdb.GetBoltBackend(cfg)
  54. if err != nil {
  55. return nil, fmt.Errorf("could not open boltdb: %w", err)
  56. }
  57. return db, nil
  58. }
  59. }
  60. // DecayedLog implements the PersistLog interface. It stores the first
  61. // HashPrefixSize bytes of a sha256-hashed shared secret along with a node's
  62. // CLTV value. It is a decaying log meaning there will be a garbage collector
  63. // to collect entries which are expired according to their stored CLTV value
  64. // and the current block height. DecayedLog wraps boltdb for simplicity and
  65. // batches writes to the database to decrease write contention.
  66. type DecayedLog struct {
  67. started int32 // To be used atomically.
  68. stopped int32 // To be used atomically.
  69. db kvdb.Backend
  70. notifier chainntnfs.ChainNotifier
  71. wg sync.WaitGroup
  72. quit chan struct{}
  73. }
  74. // NewDecayedLog creates a new DecayedLog, which caches recently seen hash
  75. // shared secrets. Entries are evicted as their cltv expires using block epochs
  76. // from the given notifier.
  77. func NewDecayedLog(db kvdb.Backend,
  78. notifier chainntnfs.ChainNotifier) *DecayedLog {
  79. return &DecayedLog{
  80. db: db,
  81. notifier: notifier,
  82. quit: make(chan struct{}),
  83. }
  84. }
  85. // Start opens the database we will be using to store hashed shared secrets.
  86. // It also starts the garbage collector in a goroutine to remove stale
  87. // database entries.
  88. func (d *DecayedLog) Start() error {
  89. if !atomic.CompareAndSwapInt32(&d.started, 0, 1) {
  90. return nil
  91. }
  92. // Initialize the primary buckets used by the decayed log.
  93. if err := d.initBuckets(); err != nil {
  94. return err
  95. }
  96. // Start garbage collector.
  97. if d.notifier != nil {
  98. epochClient, err := d.notifier.RegisterBlockEpochNtfn(nil)
  99. if err != nil {
  100. return fmt.Errorf("unable to register for epoch "+
  101. "notifications: %v", err)
  102. }
  103. d.wg.Add(1)
  104. go d.garbageCollector(epochClient)
  105. }
  106. return nil
  107. }
  108. // initBuckets initializes the primary buckets used by the decayed log, namely
  109. // the shared hash bucket, and batch replay
  110. func (d *DecayedLog) initBuckets() error {
  111. return kvdb.Update(d.db, func(tx kvdb.RwTx) error {
  112. _, err := tx.CreateTopLevelBucket(sharedHashBucket)
  113. if err != nil {
  114. return ErrDecayedLogInit
  115. }
  116. _, err = tx.CreateTopLevelBucket(batchReplayBucket)
  117. if err != nil {
  118. return ErrDecayedLogInit
  119. }
  120. return nil
  121. }, func() {})
  122. }
  123. // Stop halts the garbage collector and closes boltdb.
  124. func (d *DecayedLog) Stop() error {
  125. if !atomic.CompareAndSwapInt32(&d.stopped, 0, 1) {
  126. return nil
  127. }
  128. // Stop garbage collector.
  129. close(d.quit)
  130. d.wg.Wait()
  131. return nil
  132. }
  133. // garbageCollector deletes entries from sharedHashBucket whose expiry height
  134. // has already past. This function MUST be run as a goroutine.
  135. func (d *DecayedLog) garbageCollector(epochClient *chainntnfs.BlockEpochEvent) {
  136. defer d.wg.Done()
  137. defer epochClient.Cancel()
  138. for {
  139. select {
  140. case epoch, ok := <-epochClient.Epochs:
  141. if !ok {
  142. // Block epoch was canceled, shutting down.
  143. log.Infof("Block epoch canceled, " +
  144. "decaying hash log shutting down")
  145. return
  146. }
  147. // Perform a bout of garbage collection using the
  148. // epoch's block height.
  149. height := uint32(epoch.Height)
  150. numExpired, err := d.gcExpiredHashes(height)
  151. if err != nil {
  152. log.Errorf("unable to expire hashes at "+
  153. "height=%d", height)
  154. }
  155. if numExpired > 0 {
  156. log.Infof("Garbage collected %v shared "+
  157. "secret hashes at height=%v",
  158. numExpired, height)
  159. }
  160. case <-d.quit:
  161. // Received shutdown request.
  162. log.Infof("Decaying hash log received " +
  163. "shutdown request")
  164. return
  165. }
  166. }
  167. }
  168. // gcExpiredHashes purges the decaying log of all entries whose CLTV expires
  169. // below the provided height.
  170. func (d *DecayedLog) gcExpiredHashes(height uint32) (uint32, error) {
  171. var numExpiredHashes uint32
  172. err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
  173. numExpiredHashes = 0
  174. // Grab the shared hash bucket
  175. sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
  176. if sharedHashes == nil {
  177. return fmt.Errorf("sharedHashBucket " +
  178. "is nil")
  179. }
  180. var expiredCltv [][]byte
  181. if err := sharedHashes.ForEach(func(k, v []byte) error {
  182. // Deserialize the CLTV value for this entry.
  183. cltv := uint32(binary.BigEndian.Uint32(v))
  184. if cltv < height {
  185. // This CLTV is expired. We must add it to an
  186. // array which we'll loop over and delete every
  187. // hash contained from the db.
  188. expiredCltv = append(expiredCltv, k)
  189. numExpiredHashes++
  190. }
  191. return nil
  192. }); err != nil {
  193. return err
  194. }
  195. // Delete every item in the array. This must
  196. // be done explicitly outside of the ForEach
  197. // function for safety reasons.
  198. for _, hash := range expiredCltv {
  199. err := sharedHashes.Delete(hash)
  200. if err != nil {
  201. return err
  202. }
  203. }
  204. return nil
  205. })
  206. if err != nil {
  207. return 0, err
  208. }
  209. return numExpiredHashes, nil
  210. }
  211. // Delete removes a <shared secret hash, CLTV> key-pair from the
  212. // sharedHashBucket.
  213. func (d *DecayedLog) Delete(hash *sphinx.HashPrefix) error {
  214. return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
  215. sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
  216. if sharedHashes == nil {
  217. return ErrDecayedLogCorrupted
  218. }
  219. return sharedHashes.Delete(hash[:])
  220. })
  221. }
  222. // Get retrieves the CLTV of a processed HTLC given the first 20 bytes of the
  223. // Sha-256 hash of the shared secret.
  224. func (d *DecayedLog) Get(hash *sphinx.HashPrefix) (uint32, error) {
  225. var value uint32
  226. err := kvdb.View(d.db, func(tx kvdb.RTx) error {
  227. // Grab the shared hash bucket which stores the mapping from
  228. // truncated sha-256 hashes of shared secrets to CLTV's.
  229. sharedHashes := tx.ReadBucket(sharedHashBucket)
  230. if sharedHashes == nil {
  231. return fmt.Errorf("sharedHashes is nil, could " +
  232. "not retrieve CLTV value")
  233. }
  234. // Retrieve the bytes which represents the CLTV
  235. valueBytes := sharedHashes.Get(hash[:])
  236. if valueBytes == nil {
  237. return sphinx.ErrLogEntryNotFound
  238. }
  239. // The first 4 bytes represent the CLTV, store it in value.
  240. value = uint32(binary.BigEndian.Uint32(valueBytes))
  241. return nil
  242. }, func() {
  243. value = 0
  244. })
  245. if err != nil {
  246. return value, err
  247. }
  248. return value, nil
  249. }
  250. // Put stores a shared secret hash as the key and the CLTV as the value.
  251. func (d *DecayedLog) Put(hash *sphinx.HashPrefix, cltv uint32) error {
  252. // Optimisitically serialize the cltv value into the scratch buffer.
  253. var scratch [4]byte
  254. binary.BigEndian.PutUint32(scratch[:], cltv)
  255. return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
  256. sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
  257. if sharedHashes == nil {
  258. return ErrDecayedLogCorrupted
  259. }
  260. // Check to see if this hash prefix has been recorded before. If
  261. // a value is found, this packet is being replayed.
  262. valueBytes := sharedHashes.Get(hash[:])
  263. if valueBytes != nil {
  264. return sphinx.ErrReplayedPacket
  265. }
  266. return sharedHashes.Put(hash[:], scratch[:])
  267. })
  268. }
  269. // PutBatch accepts a pending batch of hashed secret entries to write to disk.
  270. // Each hashed secret is inserted with a corresponding time value, dictating
  271. // when the entry will be evicted from the log.
  272. // NOTE: This method enforces idempotency by writing the replay set obtained
  273. // from the first attempt for a particular batch ID, and decoding the return
  274. // value to subsequent calls. For the indices of the replay set to be aligned
  275. // properly, the batch MUST be constructed identically to the first attempt,
  276. // pruning will cause the indices to become invalid.
  277. func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) {
  278. // Since batched boltdb txns may be executed multiple times before
  279. // succeeding, we will create a new replay set for each invocation to
  280. // avoid any side-effects. If the txn is successful, this replay set
  281. // will be merged with the replay set computed during batch construction
  282. // to generate the complete replay set. If this batch was previously
  283. // processed, the replay set will be deserialized from disk.
  284. var replays *sphinx.ReplaySet
  285. if err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
  286. sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
  287. if sharedHashes == nil {
  288. return ErrDecayedLogCorrupted
  289. }
  290. // Load the batch replay bucket, which will be used to either
  291. // retrieve the result of previously processing this batch, or
  292. // to write the result of this operation.
  293. batchReplayBkt := tx.ReadWriteBucket(batchReplayBucket)
  294. if batchReplayBkt == nil {
  295. return ErrDecayedLogCorrupted
  296. }
  297. // Check for the existence of this batch's id in the replay
  298. // bucket. If a non-nil value is found, this indicates that we
  299. // have already processed this batch before. We deserialize the
  300. // resulting and return it to ensure calls to put batch are
  301. // idempotent.
  302. replayBytes := batchReplayBkt.Get(b.ID)
  303. if replayBytes != nil {
  304. replays = sphinx.NewReplaySet()
  305. return replays.Decode(bytes.NewReader(replayBytes))
  306. }
  307. // The CLTV will be stored into scratch and then stored into the
  308. // sharedHashBucket.
  309. var scratch [4]byte
  310. replays = sphinx.NewReplaySet()
  311. err := b.ForEach(func(seqNum uint16, hashPrefix *sphinx.HashPrefix, cltv uint32) error {
  312. // Retrieve the bytes which represents the CLTV
  313. valueBytes := sharedHashes.Get(hashPrefix[:])
  314. if valueBytes != nil {
  315. replays.Add(seqNum)
  316. return nil
  317. }
  318. // Serialize the cltv value and write an entry keyed by
  319. // the hash prefix.
  320. binary.BigEndian.PutUint32(scratch[:], cltv)
  321. return sharedHashes.Put(hashPrefix[:], scratch[:])
  322. })
  323. if err != nil {
  324. return err
  325. }
  326. // Merge the replay set computed from checking the on-disk
  327. // entries with the in-batch replays computed during this
  328. // batch's construction.
  329. replays.Merge(b.ReplaySet)
  330. // Write the replay set under the batch identifier to the batch
  331. // replays bucket. This can be used during recovery to test (1)
  332. // that a particular batch was successfully processed and (2)
  333. // recover the indexes of the adds that were rejected as
  334. // replays.
  335. var replayBuf bytes.Buffer
  336. if err := replays.Encode(&replayBuf); err != nil {
  337. return err
  338. }
  339. return batchReplayBkt.Put(b.ID, replayBuf.Bytes())
  340. }); err != nil {
  341. return nil, err
  342. }
  343. b.ReplaySet = replays
  344. b.IsCommitted = true
  345. return replays, nil
  346. }
  347. // A compile time check to see if DecayedLog adheres to the PersistLog
  348. // interface.
  349. var _ sphinx.ReplayLog = (*DecayedLog)(nil)