sigpool.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package lnwallet
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/btcsuite/btcd/btcec/v2"
  6. "github.com/btcsuite/btcd/wire"
  7. "github.com/lightningnetwork/lnd/input"
  8. "github.com/lightningnetwork/lnd/lnwire"
  9. )
  10. const (
  11. // jobBuffer is a constant the represents the buffer of jobs in the two
  12. // main queues. This allows clients avoid necessarily blocking when
  13. // submitting jobs into the queue.
  14. jobBuffer = 100
  15. // TODO(roasbeef): job buffer pool?
  16. )
  17. // VerifyJob is a job sent to the sigPool sig pool to verify a signature
  18. // on a transaction. The items contained in the struct are necessary and
  19. // sufficient to verify the full signature. The passed sigHash closure function
  20. // should be set to a function that generates the relevant sighash.
  21. //
  22. // TODO(roasbeef): when we move to ecschnorr, make into batch signature
  23. // verification using bos-coster (or pip?).
  24. type VerifyJob struct {
  25. // PubKey is the public key that was used to generate the purported
  26. // valid signature. Note that with the current channel construction,
  27. // this public key will likely have been tweaked using the current per
  28. // commitment point for a particular commitment transactions.
  29. PubKey *btcec.PublicKey
  30. // Sig is the raw signature generated using the above public key. This
  31. // is the signature to be verified.
  32. Sig input.Signature
  33. // SigHash is a function closure generates the sighashes that the
  34. // passed signature is known to have signed.
  35. SigHash func() ([]byte, error)
  36. // HtlcIndex is the index of the HTLC from the PoV of the remote
  37. // party's update log.
  38. HtlcIndex uint64
  39. // Cancel is a channel that should be closed if the caller wishes to
  40. // cancel all pending verification jobs part of a single batch. This
  41. // channel is to be closed in the case that a single signature in a
  42. // batch has been returned as invalid, as there is no need to verify
  43. // the remainder of the signatures.
  44. Cancel chan struct{}
  45. // ErrResp is the channel that the result of the signature verification
  46. // is to be sent over. In the see that the signature is valid, a nil
  47. // error will be passed. Otherwise, a concrete error detailing the
  48. // issue will be passed.
  49. ErrResp chan *HtlcIndexErr
  50. }
  51. // HtlcIndexErr is a special type of error that also includes a pointer to the
  52. // original validation job. This error message allows us to craft more detailed
  53. // errors at upper layers.
  54. type HtlcIndexErr struct {
  55. error
  56. *VerifyJob
  57. }
  58. // SignJob is a job sent to the sigPool sig pool to generate a valid
  59. // signature according to the passed SignDescriptor for the passed transaction.
  60. // Jobs are intended to be sent in batches in order to parallelize the job of
  61. // generating signatures for a new commitment transaction.
  62. type SignJob struct {
  63. // SignDesc is intended to be a full populated SignDescriptor which
  64. // encodes the necessary material (keys, witness script, etc) required
  65. // to generate a valid signature for the specified input.
  66. SignDesc input.SignDescriptor
  67. // Tx is the transaction to be signed. This is required to generate the
  68. // proper sighash for the input to be signed.
  69. Tx *wire.MsgTx
  70. // OutputIndex is the output index of the HTLC on the commitment
  71. // transaction being signed.
  72. OutputIndex int32
  73. // Cancel is a channel that should be closed if the caller wishes to
  74. // abandon all pending sign jobs part of a single batch.
  75. Cancel chan struct{}
  76. // Resp is the channel that the response to this particular SignJob
  77. // will be sent over.
  78. //
  79. // TODO(roasbeef): actually need to allow caller to set, need to retain
  80. // order mark commit sig as special
  81. Resp chan SignJobResp
  82. }
  83. // SignJobResp is the response to a sign job. Both channels are to be read in
  84. // order to ensure no unnecessary goroutine blocking occurs. Additionally, both
  85. // channels should be buffered.
  86. type SignJobResp struct {
  87. // Sig is the generated signature for a particular SignJob In the case
  88. // of an error during signature generation, then this value sent will
  89. // be nil.
  90. Sig lnwire.Sig
  91. // Err is the error that occurred when executing the specified
  92. // signature job. In the case that no error occurred, this value will
  93. // be nil.
  94. Err error
  95. }
  96. // SigPool is a struct that is meant to allow the current channel state
  97. // machine to parallelize all signature generation and verification. This
  98. // struct is needed as _each_ HTLC when creating a commitment transaction
  99. // requires a signature, and similarly a receiver of a new commitment must
  100. // verify all the HTLC signatures included within the CommitSig message. A pool
  101. // of workers will be maintained by the sigPool. Batches of jobs (either
  102. // to sign or verify) can be sent to the pool of workers which will
  103. // asynchronously perform the specified job.
  104. type SigPool struct {
  105. started sync.Once
  106. stopped sync.Once
  107. signer input.Signer
  108. verifyJobs chan VerifyJob
  109. signJobs chan SignJob
  110. wg sync.WaitGroup
  111. quit chan struct{}
  112. numWorkers int
  113. }
  114. // NewSigPool creates a new signature pool with the specified number of
  115. // workers. The recommended parameter for the number of works is the number of
  116. // physical CPU cores available on the target machine.
  117. func NewSigPool(numWorkers int, signer input.Signer) *SigPool {
  118. return &SigPool{
  119. signer: signer,
  120. numWorkers: numWorkers,
  121. verifyJobs: make(chan VerifyJob, jobBuffer),
  122. signJobs: make(chan SignJob, jobBuffer),
  123. quit: make(chan struct{}),
  124. }
  125. }
  126. // Start starts of all goroutines that the sigPool sig pool needs to
  127. // carry out its duties.
  128. func (s *SigPool) Start() error {
  129. s.started.Do(func() {
  130. walletLog.Info("SigPool starting")
  131. for i := 0; i < s.numWorkers; i++ {
  132. s.wg.Add(1)
  133. go s.poolWorker()
  134. }
  135. })
  136. return nil
  137. }
  138. // Stop signals any active workers carrying out jobs to exit so the sigPool can
  139. // gracefully shutdown.
  140. func (s *SigPool) Stop() error {
  141. s.stopped.Do(func() {
  142. close(s.quit)
  143. s.wg.Wait()
  144. })
  145. return nil
  146. }
  147. // poolWorker is the main worker goroutine within the sigPool sig pool.
  148. // Individual batches are distributed amongst each of the active workers. The
  149. // workers then execute the task based on the type of job, and return the
  150. // result back to caller.
  151. func (s *SigPool) poolWorker() {
  152. defer s.wg.Done()
  153. for {
  154. select {
  155. // We've just received a new signature job. Given the items
  156. // contained within the message, we'll craft a signature and
  157. // send the result along with a possible error back to the
  158. // caller.
  159. case sigMsg := <-s.signJobs:
  160. rawSig, err := s.signer.SignOutputRaw(
  161. sigMsg.Tx, &sigMsg.SignDesc,
  162. )
  163. if err != nil {
  164. select {
  165. case sigMsg.Resp <- SignJobResp{
  166. Sig: lnwire.Sig{},
  167. Err: err,
  168. }:
  169. continue
  170. case <-sigMsg.Cancel:
  171. continue
  172. case <-s.quit:
  173. return
  174. }
  175. }
  176. // Use the sig mapper to go from the input.Signature
  177. // into the serialized lnwire.Sig that we'll send
  178. // across the wire.
  179. sig, err := lnwire.NewSigFromSignature(rawSig)
  180. select {
  181. case sigMsg.Resp <- SignJobResp{
  182. Sig: sig,
  183. Err: err,
  184. }:
  185. case <-sigMsg.Cancel:
  186. continue
  187. case <-s.quit:
  188. return
  189. }
  190. // We've just received a new verification job from the outside
  191. // world. We'll attempt to construct the sighash, parse the
  192. // signature, and finally verify the signature.
  193. case verifyMsg := <-s.verifyJobs:
  194. sigHash, err := verifyMsg.SigHash()
  195. if err != nil {
  196. select {
  197. case verifyMsg.ErrResp <- &HtlcIndexErr{
  198. error: err,
  199. VerifyJob: &verifyMsg,
  200. }:
  201. continue
  202. case <-verifyMsg.Cancel:
  203. continue
  204. }
  205. }
  206. rawSig := verifyMsg.Sig
  207. if !rawSig.Verify(sigHash, verifyMsg.PubKey) {
  208. err := fmt.Errorf("invalid signature "+
  209. "sighash: %x, sig: %x", sigHash,
  210. rawSig.Serialize())
  211. select {
  212. case verifyMsg.ErrResp <- &HtlcIndexErr{
  213. error: err,
  214. VerifyJob: &verifyMsg,
  215. }:
  216. case <-verifyMsg.Cancel:
  217. case <-s.quit:
  218. return
  219. }
  220. } else {
  221. select {
  222. case verifyMsg.ErrResp <- nil:
  223. case <-verifyMsg.Cancel:
  224. case <-s.quit:
  225. return
  226. }
  227. }
  228. // The sigPool sig pool is exiting, so we will as well.
  229. case <-s.quit:
  230. return
  231. }
  232. }
  233. }
  234. // SubmitSignBatch submits a batch of signature jobs to the sigPool. The
  235. // response and cancel channels for each of the SignJob's are expected to be
  236. // fully populated, as the response for each job will be sent over the
  237. // response channel within the job itself.
  238. func (s *SigPool) SubmitSignBatch(signJobs []SignJob) {
  239. for _, job := range signJobs {
  240. select {
  241. case s.signJobs <- job:
  242. case <-job.Cancel:
  243. // TODO(roasbeef): return error?
  244. case <-s.quit:
  245. return
  246. }
  247. }
  248. }
  249. // SubmitVerifyBatch submits a batch of verification jobs to the sigPool. For
  250. // each job submitted, an error will be passed into the returned channel
  251. // denoting if signature verification was valid or not. The passed cancelChan
  252. // allows the caller to cancel all pending jobs in the case that they wish to
  253. // bail early.
  254. func (s *SigPool) SubmitVerifyBatch(verifyJobs []VerifyJob,
  255. cancelChan chan struct{}) <-chan *HtlcIndexErr {
  256. errChan := make(chan *HtlcIndexErr, len(verifyJobs))
  257. for _, job := range verifyJobs {
  258. job.Cancel = cancelChan
  259. job.ErrResp = errChan
  260. select {
  261. case s.verifyJobs <- job:
  262. case <-job.Cancel:
  263. return errChan
  264. }
  265. }
  266. return errChan
  267. }