payment_result.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package htlcswitch
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "io"
  7. "sync"
  8. "github.com/lightningnetwork/lnd/channeldb"
  9. "github.com/lightningnetwork/lnd/kvdb"
  10. "github.com/lightningnetwork/lnd/lnwire"
  11. "github.com/lightningnetwork/lnd/multimutex"
  12. )
  13. var (
  14. // networkResultStoreBucketKey is used for the root level bucket that
  15. // stores the network result for each payment ID.
  16. networkResultStoreBucketKey = []byte("network-result-store-bucket")
  17. // ErrPaymentIDNotFound is an error returned if the given paymentID is
  18. // not found.
  19. ErrPaymentIDNotFound = errors.New("paymentID not found")
  20. // ErrPaymentIDAlreadyExists is returned if we try to write a pending
  21. // payment whose paymentID already exists.
  22. ErrPaymentIDAlreadyExists = errors.New("paymentID already exists")
  23. )
  24. // PaymentResult wraps a decoded result received from the network after a
  25. // payment attempt was made. This is what is eventually handed to the router
  26. // for processing.
  27. type PaymentResult struct {
  28. // Preimage is set by the switch in case a sent HTLC was settled.
  29. Preimage [32]byte
  30. // Error is non-nil in case a HTLC send failed, and the HTLC is now
  31. // irrevocably canceled. If the payment failed during forwarding, this
  32. // error will be a *ForwardingError.
  33. Error error
  34. }
  35. // networkResult is the raw result received from the network after a payment
  36. // attempt has been made. Since the switch doesn't always have the necessary
  37. // data to decode the raw message, we store it together with some meta data,
  38. // and decode it when the router query for the final result.
  39. type networkResult struct {
  40. // msg is the received result. This should be of type UpdateFulfillHTLC
  41. // or UpdateFailHTLC.
  42. msg lnwire.Message
  43. // unencrypted indicates whether the failure encoded in the message is
  44. // unencrypted, and hence doesn't need to be decrypted.
  45. unencrypted bool
  46. // isResolution indicates whether this is a resolution message, in
  47. // which the failure reason might not be included.
  48. isResolution bool
  49. }
  50. // serializeNetworkResult serializes the networkResult.
  51. func serializeNetworkResult(w io.Writer, n *networkResult) error {
  52. return channeldb.WriteElements(w, n.msg, n.unencrypted, n.isResolution)
  53. }
  54. // deserializeNetworkResult deserializes the networkResult.
  55. func deserializeNetworkResult(r io.Reader) (*networkResult, error) {
  56. n := &networkResult{}
  57. if err := channeldb.ReadElements(r,
  58. &n.msg, &n.unencrypted, &n.isResolution,
  59. ); err != nil {
  60. return nil, err
  61. }
  62. return n, nil
  63. }
  64. // networkResultStore is a persistent store that stores any results of HTLCs in
  65. // flight on the network. Since payment results are inherently asynchronous, it
  66. // is used as a common access point for senders of HTLCs, to know when a result
  67. // is back. The Switch will checkpoint any received result to the store, and
  68. // the store will keep results and notify the callers about them.
  69. type networkResultStore struct {
  70. backend kvdb.Backend
  71. // results is a map from paymentIDs to channels where subscribers to
  72. // payment results will be notified.
  73. results map[uint64][]chan *networkResult
  74. resultsMtx sync.Mutex
  75. // attemptIDMtx is a multimutex used to make sure the database and
  76. // result subscribers map is consistent for each attempt ID in case of
  77. // concurrent callers.
  78. attemptIDMtx *multimutex.Mutex[uint64]
  79. }
  80. func newNetworkResultStore(db kvdb.Backend) *networkResultStore {
  81. return &networkResultStore{
  82. backend: db,
  83. results: make(map[uint64][]chan *networkResult),
  84. attemptIDMtx: multimutex.NewMutex[uint64](),
  85. }
  86. }
  87. // storeResult stores the networkResult for the given attemptID, and notifies
  88. // any subscribers.
  89. func (store *networkResultStore) storeResult(attemptID uint64,
  90. result *networkResult) error {
  91. // We get a mutex for this attempt ID. This is needed to ensure
  92. // consistency between the database state and the subscribers in case
  93. // of concurrent calls.
  94. store.attemptIDMtx.Lock(attemptID)
  95. defer store.attemptIDMtx.Unlock(attemptID)
  96. log.Debugf("Storing result for attemptID=%v", attemptID)
  97. // Serialize the payment result.
  98. var b bytes.Buffer
  99. if err := serializeNetworkResult(&b, result); err != nil {
  100. return err
  101. }
  102. var attemptIDBytes [8]byte
  103. binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID)
  104. err := kvdb.Batch(store.backend, func(tx kvdb.RwTx) error {
  105. networkResults, err := tx.CreateTopLevelBucket(
  106. networkResultStoreBucketKey,
  107. )
  108. if err != nil {
  109. return err
  110. }
  111. return networkResults.Put(attemptIDBytes[:], b.Bytes())
  112. })
  113. if err != nil {
  114. return err
  115. }
  116. // Now that the result is stored in the database, we can notify any
  117. // active subscribers.
  118. store.resultsMtx.Lock()
  119. for _, res := range store.results[attemptID] {
  120. res <- result
  121. }
  122. delete(store.results, attemptID)
  123. store.resultsMtx.Unlock()
  124. return nil
  125. }
  126. // subscribeResult is used to get the HTLC attempt result for the given attempt
  127. // ID. It returns a channel on which the result will be delivered when ready.
  128. func (store *networkResultStore) subscribeResult(attemptID uint64) (
  129. <-chan *networkResult, error) {
  130. // We get a mutex for this payment ID. This is needed to ensure
  131. // consistency between the database state and the subscribers in case
  132. // of concurrent calls.
  133. store.attemptIDMtx.Lock(attemptID)
  134. defer store.attemptIDMtx.Unlock(attemptID)
  135. log.Debugf("Subscribing to result for attemptID=%v", attemptID)
  136. var (
  137. result *networkResult
  138. resultChan = make(chan *networkResult, 1)
  139. )
  140. err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
  141. var err error
  142. result, err = fetchResult(tx, attemptID)
  143. switch {
  144. // Result not yet available, we will notify once a result is
  145. // available.
  146. case err == ErrPaymentIDNotFound:
  147. return nil
  148. case err != nil:
  149. return err
  150. // The result was found, and will be returned immediately.
  151. default:
  152. return nil
  153. }
  154. }, func() {
  155. result = nil
  156. })
  157. if err != nil {
  158. return nil, err
  159. }
  160. // If the result was found, we can send it on the result channel
  161. // imemdiately.
  162. if result != nil {
  163. resultChan <- result
  164. return resultChan, nil
  165. }
  166. // Otherwise we store the result channel for when the result is
  167. // available.
  168. store.resultsMtx.Lock()
  169. store.results[attemptID] = append(
  170. store.results[attemptID], resultChan,
  171. )
  172. store.resultsMtx.Unlock()
  173. return resultChan, nil
  174. }
  175. // getResult attempts to immediately fetch the result for the given pid from
  176. // the store. If no result is available, ErrPaymentIDNotFound is returned.
  177. func (store *networkResultStore) getResult(pid uint64) (
  178. *networkResult, error) {
  179. var result *networkResult
  180. err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
  181. var err error
  182. result, err = fetchResult(tx, pid)
  183. return err
  184. }, func() {
  185. result = nil
  186. })
  187. if err != nil {
  188. return nil, err
  189. }
  190. return result, nil
  191. }
  192. func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) {
  193. var attemptIDBytes [8]byte
  194. binary.BigEndian.PutUint64(attemptIDBytes[:], pid)
  195. networkResults := tx.ReadBucket(networkResultStoreBucketKey)
  196. if networkResults == nil {
  197. return nil, ErrPaymentIDNotFound
  198. }
  199. // Check whether a result is already available.
  200. resultBytes := networkResults.Get(attemptIDBytes[:])
  201. if resultBytes == nil {
  202. return nil, ErrPaymentIDNotFound
  203. }
  204. // Decode the result we found.
  205. r := bytes.NewReader(resultBytes)
  206. return deserializeNetworkResult(r)
  207. }
  208. // cleanStore removes all entries from the store, except the payment IDs given.
  209. // NOTE: Since every result not listed in the keep map will be deleted, care
  210. // should be taken to ensure no new payment attempts are being made
  211. // concurrently while this process is ongoing, as its result might end up being
  212. // deleted.
  213. func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error {
  214. return kvdb.Update(store.backend, func(tx kvdb.RwTx) error {
  215. networkResults, err := tx.CreateTopLevelBucket(
  216. networkResultStoreBucketKey,
  217. )
  218. if err != nil {
  219. return err
  220. }
  221. // Iterate through the bucket, deleting all items not in the
  222. // keep map.
  223. var toClean [][]byte
  224. if err := networkResults.ForEach(func(k, _ []byte) error {
  225. pid := binary.BigEndian.Uint64(k)
  226. if _, ok := keep[pid]; ok {
  227. return nil
  228. }
  229. toClean = append(toClean, k)
  230. return nil
  231. }); err != nil {
  232. return err
  233. }
  234. for _, k := range toClean {
  235. err := networkResults.Delete(k)
  236. if err != nil {
  237. return err
  238. }
  239. }
  240. if len(toClean) > 0 {
  241. log.Infof("Removed %d stale entries from network "+
  242. "result store", len(toClean))
  243. }
  244. return nil
  245. }, func() {})
  246. }