sweeper_test.go 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063
  1. package sweep
  2. import (
  3. "errors"
  4. "testing"
  5. "time"
  6. "github.com/btcsuite/btcd/btcec/v2"
  7. "github.com/btcsuite/btcd/btcutil"
  8. "github.com/btcsuite/btcd/chaincfg/chainhash"
  9. "github.com/btcsuite/btcd/wire"
  10. "github.com/lightningnetwork/lnd/chainntnfs"
  11. "github.com/lightningnetwork/lnd/fn"
  12. "github.com/lightningnetwork/lnd/input"
  13. "github.com/lightningnetwork/lnd/lnwallet/chainfee"
  14. "github.com/stretchr/testify/mock"
  15. "github.com/stretchr/testify/require"
  16. )
  17. var (
  18. errDummy = errors.New("dummy error")
  19. testPubKey, _ = btcec.ParsePubKey([]byte{
  20. 0x04, 0x11, 0xdb, 0x93, 0xe1, 0xdc, 0xdb, 0x8a,
  21. 0x01, 0x6b, 0x49, 0x84, 0x0f, 0x8c, 0x53, 0xbc, 0x1e,
  22. 0xb6, 0x8a, 0x38, 0x2e, 0x97, 0xb1, 0x48, 0x2e, 0xca,
  23. 0xd7, 0xb1, 0x48, 0xa6, 0x90, 0x9a, 0x5c, 0xb2, 0xe0,
  24. 0xea, 0xdd, 0xfb, 0x84, 0xcc, 0xf9, 0x74, 0x44, 0x64,
  25. 0xf8, 0x2e, 0x16, 0x0b, 0xfa, 0x9b, 0x8b, 0x64, 0xf9,
  26. 0xd4, 0xc0, 0x3f, 0x99, 0x9b, 0x86, 0x43, 0xf6, 0x56,
  27. 0xb4, 0x12, 0xa3,
  28. })
  29. )
  30. // TestMarkInputsPendingPublish checks that given a list of inputs with
  31. // different states, only the non-terminal state will be marked as `Published`.
  32. func TestMarkInputsPendingPublish(t *testing.T) {
  33. t.Parallel()
  34. require := require.New(t)
  35. // Create a test sweeper.
  36. s := New(&UtxoSweeperConfig{})
  37. // Create a mock input set.
  38. set := &MockInputSet{}
  39. defer set.AssertExpectations(t)
  40. // Create three testing inputs.
  41. //
  42. // inputNotExist specifies an input that's not found in the sweeper's
  43. // `pendingInputs` map.
  44. inputNotExist := &input.MockInput{}
  45. defer inputNotExist.AssertExpectations(t)
  46. inputNotExist.On("OutPoint").Return(wire.OutPoint{Index: 0})
  47. // inputInit specifies a newly created input.
  48. inputInit := &input.MockInput{}
  49. defer inputInit.AssertExpectations(t)
  50. inputInit.On("OutPoint").Return(wire.OutPoint{Index: 1})
  51. s.inputs[inputInit.OutPoint()] = &SweeperInput{
  52. state: Init,
  53. }
  54. // inputPendingPublish specifies an input that's about to be published.
  55. inputPendingPublish := &input.MockInput{}
  56. defer inputPendingPublish.AssertExpectations(t)
  57. inputPendingPublish.On("OutPoint").Return(wire.OutPoint{Index: 2})
  58. s.inputs[inputPendingPublish.OutPoint()] = &SweeperInput{
  59. state: PendingPublish,
  60. }
  61. // inputTerminated specifies an input that's terminated.
  62. inputTerminated := &input.MockInput{}
  63. defer inputTerminated.AssertExpectations(t)
  64. inputTerminated.On("OutPoint").Return(wire.OutPoint{Index: 3})
  65. s.inputs[inputTerminated.OutPoint()] = &SweeperInput{
  66. state: Excluded,
  67. }
  68. // Mark the test inputs. We expect the non-exist input and the
  69. // inputTerminated to be skipped, and the rest to be marked as pending
  70. // publish.
  71. set.On("Inputs").Return([]input.Input{
  72. inputNotExist, inputInit, inputPendingPublish, inputTerminated,
  73. })
  74. s.markInputsPendingPublish(set)
  75. // We expect unchanged number of pending inputs.
  76. require.Len(s.inputs, 3)
  77. // We expect the init input's state to become pending publish.
  78. require.Equal(PendingPublish, s.inputs[inputInit.OutPoint()].state)
  79. // We expect the pending-publish to stay unchanged.
  80. require.Equal(PendingPublish,
  81. s.inputs[inputPendingPublish.OutPoint()].state)
  82. // We expect the terminated to stay unchanged.
  83. require.Equal(Excluded, s.inputs[inputTerminated.OutPoint()].state)
  84. }
  85. // TestMarkInputsPublished checks that given a list of inputs with different
  86. // states, only the state `PendingPublish` will be marked as `Published`.
  87. func TestMarkInputsPublished(t *testing.T) {
  88. t.Parallel()
  89. require := require.New(t)
  90. // Create a mock sweeper store.
  91. mockStore := NewMockSweeperStore()
  92. // Create a test TxRecord and a dummy error.
  93. dummyTR := &TxRecord{}
  94. dummyErr := errors.New("dummy error")
  95. // Create a test sweeper.
  96. s := New(&UtxoSweeperConfig{
  97. Store: mockStore,
  98. })
  99. // Create three testing inputs.
  100. //
  101. // inputNotExist specifies an input that's not found in the sweeper's
  102. // `inputs` map.
  103. inputNotExist := &wire.TxIn{
  104. PreviousOutPoint: wire.OutPoint{Index: 1},
  105. }
  106. // inputInit specifies a newly created input. When marking this as
  107. // published, we should see an error log as this input hasn't been
  108. // published yet.
  109. inputInit := &wire.TxIn{
  110. PreviousOutPoint: wire.OutPoint{Index: 2},
  111. }
  112. s.inputs[inputInit.PreviousOutPoint] = &SweeperInput{
  113. state: Init,
  114. }
  115. // inputPendingPublish specifies an input that's about to be published.
  116. inputPendingPublish := &wire.TxIn{
  117. PreviousOutPoint: wire.OutPoint{Index: 3},
  118. }
  119. s.inputs[inputPendingPublish.PreviousOutPoint] = &SweeperInput{
  120. state: PendingPublish,
  121. }
  122. // First, check that when an error is returned from db, it's properly
  123. // returned here.
  124. mockStore.On("StoreTx", dummyTR).Return(dummyErr).Once()
  125. err := s.markInputsPublished(dummyTR, nil)
  126. require.ErrorIs(err, dummyErr)
  127. // We also expect the record has been marked as published.
  128. require.True(dummyTR.Published)
  129. // Then, check that the target input has will be correctly marked as
  130. // published.
  131. //
  132. // Mock the store to return nil
  133. mockStore.On("StoreTx", dummyTR).Return(nil).Once()
  134. // Mark the test inputs. We expect the non-exist input and the
  135. // inputInit to be skipped, and the final input to be marked as
  136. // published.
  137. err = s.markInputsPublished(dummyTR, []*wire.TxIn{
  138. inputNotExist, inputInit, inputPendingPublish,
  139. })
  140. require.NoError(err)
  141. // We expect unchanged number of pending inputs.
  142. require.Len(s.inputs, 2)
  143. // We expect the init input's state to stay unchanged.
  144. require.Equal(Init,
  145. s.inputs[inputInit.PreviousOutPoint].state)
  146. // We expect the pending-publish input's is now marked as published.
  147. require.Equal(Published,
  148. s.inputs[inputPendingPublish.PreviousOutPoint].state)
  149. // Assert mocked statements are executed as expected.
  150. mockStore.AssertExpectations(t)
  151. }
  152. // TestMarkInputsPublishFailed checks that given a list of inputs with
  153. // different states, only the state `PendingPublish` and `Published` will be
  154. // marked as `PublishFailed`.
  155. func TestMarkInputsPublishFailed(t *testing.T) {
  156. t.Parallel()
  157. require := require.New(t)
  158. // Create a mock sweeper store.
  159. mockStore := NewMockSweeperStore()
  160. // Create a test sweeper.
  161. s := New(&UtxoSweeperConfig{
  162. Store: mockStore,
  163. })
  164. // Create three testing inputs.
  165. //
  166. // inputNotExist specifies an input that's not found in the sweeper's
  167. // `inputs` map.
  168. inputNotExist := &wire.TxIn{
  169. PreviousOutPoint: wire.OutPoint{Index: 1},
  170. }
  171. // inputInit specifies a newly created input. When marking this as
  172. // published, we should see an error log as this input hasn't been
  173. // published yet.
  174. inputInit := &wire.TxIn{
  175. PreviousOutPoint: wire.OutPoint{Index: 2},
  176. }
  177. s.inputs[inputInit.PreviousOutPoint] = &SweeperInput{
  178. state: Init,
  179. }
  180. // inputPendingPublish specifies an input that's about to be published.
  181. inputPendingPublish := &wire.TxIn{
  182. PreviousOutPoint: wire.OutPoint{Index: 3},
  183. }
  184. s.inputs[inputPendingPublish.PreviousOutPoint] = &SweeperInput{
  185. state: PendingPublish,
  186. }
  187. // inputPublished specifies an input that's published.
  188. inputPublished := &wire.TxIn{
  189. PreviousOutPoint: wire.OutPoint{Index: 4},
  190. }
  191. s.inputs[inputPublished.PreviousOutPoint] = &SweeperInput{
  192. state: Published,
  193. }
  194. // Mark the test inputs. We expect the non-exist input and the
  195. // inputInit to be skipped, and the final input to be marked as
  196. // published.
  197. s.markInputsPublishFailed([]wire.OutPoint{
  198. inputNotExist.PreviousOutPoint,
  199. inputInit.PreviousOutPoint,
  200. inputPendingPublish.PreviousOutPoint,
  201. inputPublished.PreviousOutPoint,
  202. })
  203. // We expect unchanged number of pending inputs.
  204. require.Len(s.inputs, 3)
  205. // We expect the init input's state to stay unchanged.
  206. require.Equal(Init,
  207. s.inputs[inputInit.PreviousOutPoint].state)
  208. // We expect the pending-publish input's is now marked as publish
  209. // failed.
  210. require.Equal(PublishFailed,
  211. s.inputs[inputPendingPublish.PreviousOutPoint].state)
  212. // We expect the published input's is now marked as publish failed.
  213. require.Equal(PublishFailed,
  214. s.inputs[inputPublished.PreviousOutPoint].state)
  215. // Assert mocked statements are executed as expected.
  216. mockStore.AssertExpectations(t)
  217. }
  218. // TestMarkInputsSwept checks that given a list of inputs with different
  219. // states, only the non-terminal state will be marked as `Swept`.
  220. func TestMarkInputsSwept(t *testing.T) {
  221. t.Parallel()
  222. require := require.New(t)
  223. // Create a mock input.
  224. mockInput := &input.MockInput{}
  225. defer mockInput.AssertExpectations(t)
  226. // Mock the `OutPoint` to return a dummy outpoint.
  227. mockInput.On("OutPoint").Return(wire.OutPoint{Hash: chainhash.Hash{1}})
  228. // Create a test sweeper.
  229. s := New(&UtxoSweeperConfig{})
  230. // Create three testing inputs.
  231. //
  232. // inputNotExist specifies an input that's not found in the sweeper's
  233. // `inputs` map.
  234. inputNotExist := &wire.TxIn{
  235. PreviousOutPoint: wire.OutPoint{Index: 1},
  236. }
  237. // inputInit specifies a newly created input.
  238. inputInit := &wire.TxIn{
  239. PreviousOutPoint: wire.OutPoint{Index: 2},
  240. }
  241. s.inputs[inputInit.PreviousOutPoint] = &SweeperInput{
  242. state: Init,
  243. Input: mockInput,
  244. }
  245. // inputPendingPublish specifies an input that's about to be published.
  246. inputPendingPublish := &wire.TxIn{
  247. PreviousOutPoint: wire.OutPoint{Index: 3},
  248. }
  249. s.inputs[inputPendingPublish.PreviousOutPoint] = &SweeperInput{
  250. state: PendingPublish,
  251. Input: mockInput,
  252. }
  253. // inputTerminated specifies an input that's terminated.
  254. inputTerminated := &wire.TxIn{
  255. PreviousOutPoint: wire.OutPoint{Index: 4},
  256. }
  257. s.inputs[inputTerminated.PreviousOutPoint] = &SweeperInput{
  258. state: Excluded,
  259. Input: mockInput,
  260. }
  261. tx := &wire.MsgTx{
  262. TxIn: []*wire.TxIn{
  263. inputNotExist, inputInit,
  264. inputPendingPublish, inputTerminated,
  265. },
  266. }
  267. // Mark the test inputs. We expect the inputTerminated to be skipped,
  268. // and the rest to be marked as swept.
  269. s.markInputsSwept(tx, true)
  270. // We expect unchanged number of pending inputs.
  271. require.Len(s.inputs, 3)
  272. // We expect the init input's state to become swept.
  273. require.Equal(Swept,
  274. s.inputs[inputInit.PreviousOutPoint].state)
  275. // We expect the pending-publish becomes swept.
  276. require.Equal(Swept,
  277. s.inputs[inputPendingPublish.PreviousOutPoint].state)
  278. // We expect the terminated to stay unchanged.
  279. require.Equal(Excluded,
  280. s.inputs[inputTerminated.PreviousOutPoint].state)
  281. }
  282. // TestMempoolLookup checks that the method `mempoolLookup` works as expected.
  283. func TestMempoolLookup(t *testing.T) {
  284. t.Parallel()
  285. require := require.New(t)
  286. // Create a test outpoint.
  287. op := wire.OutPoint{Index: 1}
  288. // Create a mock mempool watcher.
  289. mockMempool := chainntnfs.NewMockMempoolWatcher()
  290. defer mockMempool.AssertExpectations(t)
  291. // Create a test sweeper without a mempool.
  292. s := New(&UtxoSweeperConfig{})
  293. // Since we don't have a mempool, we expect the call to return a
  294. // fn.None indicating it's not found.
  295. tx := s.mempoolLookup(op)
  296. require.True(tx.IsNone())
  297. // Re-create the sweeper with the mocked mempool watcher.
  298. s = New(&UtxoSweeperConfig{
  299. Mempool: mockMempool,
  300. })
  301. // Mock the mempool watcher to return not found.
  302. mockMempool.On("LookupInputMempoolSpend", op).Return(
  303. fn.None[wire.MsgTx]()).Once()
  304. // We expect a fn.None tx to be returned.
  305. tx = s.mempoolLookup(op)
  306. require.True(tx.IsNone())
  307. // Mock the mempool to return a spending tx.
  308. dummyTx := wire.MsgTx{}
  309. mockMempool.On("LookupInputMempoolSpend", op).Return(
  310. fn.Some(dummyTx)).Once()
  311. // Calling the loopup again, we expect the dummyTx to be returned.
  312. tx = s.mempoolLookup(op)
  313. require.False(tx.IsNone())
  314. require.Equal(dummyTx, tx.UnsafeFromSome())
  315. }
  316. // TestUpdateSweeperInputs checks that the method `updateSweeperInputs` will
  317. // properly update the inputs based on their states.
  318. func TestUpdateSweeperInputs(t *testing.T) {
  319. t.Parallel()
  320. require := require.New(t)
  321. // Create a test sweeper.
  322. s := New(nil)
  323. // Create mock inputs.
  324. inp1 := &input.MockInput{}
  325. defer inp1.AssertExpectations(t)
  326. inp2 := &input.MockInput{}
  327. defer inp2.AssertExpectations(t)
  328. inp3 := &input.MockInput{}
  329. defer inp3.AssertExpectations(t)
  330. // Create a list of inputs using all the states.
  331. //
  332. // Mock the input to have a locktime that's matured so it will be
  333. // returned.
  334. inp1.On("RequiredLockTime").Return(
  335. uint32(s.currentHeight), false).Once()
  336. inp1.On("BlocksToMaturity").Return(uint32(0)).Once()
  337. inp1.On("HeightHint").Return(uint32(s.currentHeight)).Once()
  338. input0 := &SweeperInput{state: Init, Input: inp1}
  339. // These inputs won't hit RequiredLockTime so we won't mock.
  340. input1 := &SweeperInput{state: PendingPublish, Input: inp1}
  341. input2 := &SweeperInput{state: Published, Input: inp1}
  342. // Mock the input to have a locktime that's matured so it will be
  343. // returned.
  344. inp1.On("RequiredLockTime").Return(
  345. uint32(s.currentHeight), false).Once()
  346. inp1.On("BlocksToMaturity").Return(uint32(0)).Once()
  347. inp1.On("HeightHint").Return(uint32(s.currentHeight)).Once()
  348. input3 := &SweeperInput{state: PublishFailed, Input: inp1}
  349. // These inputs won't hit RequiredLockTime so we won't mock.
  350. input4 := &SweeperInput{state: Swept, Input: inp1}
  351. input5 := &SweeperInput{state: Excluded, Input: inp1}
  352. input6 := &SweeperInput{state: Failed, Input: inp1}
  353. // Mock the input to have a locktime in the future so it will NOT be
  354. // returned.
  355. inp2.On("RequiredLockTime").Return(
  356. uint32(s.currentHeight+1), true).Once()
  357. input7 := &SweeperInput{state: Init, Input: inp2}
  358. // Mock the input to have a CSV expiry in the future so it will NOT be
  359. // returned.
  360. inp3.On("RequiredLockTime").Return(
  361. uint32(s.currentHeight), false).Once()
  362. inp3.On("BlocksToMaturity").Return(uint32(2)).Once()
  363. inp3.On("HeightHint").Return(uint32(s.currentHeight)).Once()
  364. input8 := &SweeperInput{state: Init, Input: inp3}
  365. // Add the inputs to the sweeper. After the update, we should see the
  366. // terminated inputs being removed.
  367. s.inputs = map[wire.OutPoint]*SweeperInput{
  368. {Index: 0}: input0,
  369. {Index: 1}: input1,
  370. {Index: 2}: input2,
  371. {Index: 3}: input3,
  372. {Index: 4}: input4,
  373. {Index: 5}: input5,
  374. {Index: 6}: input6,
  375. {Index: 7}: input7,
  376. {Index: 8}: input8,
  377. }
  378. // We expect the inputs with `Swept`, `Excluded`, and `Failed` to be
  379. // removed.
  380. expectedInputs := map[wire.OutPoint]*SweeperInput{
  381. {Index: 0}: input0,
  382. {Index: 1}: input1,
  383. {Index: 2}: input2,
  384. {Index: 3}: input3,
  385. {Index: 7}: input7,
  386. {Index: 8}: input8,
  387. }
  388. // We expect only the inputs with `Init` and `PublishFailed` to be
  389. // returned.
  390. expectedReturn := map[wire.OutPoint]*SweeperInput{
  391. {Index: 0}: input0,
  392. {Index: 3}: input3,
  393. }
  394. // Update the sweeper inputs.
  395. inputs := s.updateSweeperInputs()
  396. // Assert the returned inputs are as expected.
  397. require.Equal(expectedReturn, inputs)
  398. // Assert the sweeper inputs are as expected.
  399. require.Equal(expectedInputs, s.inputs)
  400. }
  401. // TestDecideStateAndRBFInfo checks that the expected state and RBFInfo are
  402. // returned based on whether this input can be found both in mempool and the
  403. // sweeper store.
  404. func TestDecideStateAndRBFInfo(t *testing.T) {
  405. t.Parallel()
  406. require := require.New(t)
  407. // Create a test outpoint.
  408. op := wire.OutPoint{Index: 1}
  409. // Create a mock mempool watcher and a mock sweeper store.
  410. mockMempool := chainntnfs.NewMockMempoolWatcher()
  411. defer mockMempool.AssertExpectations(t)
  412. mockStore := NewMockSweeperStore()
  413. defer mockStore.AssertExpectations(t)
  414. // Create a test sweeper.
  415. s := New(&UtxoSweeperConfig{
  416. Store: mockStore,
  417. Mempool: mockMempool,
  418. })
  419. // First, mock the mempool to return false.
  420. mockMempool.On("LookupInputMempoolSpend", op).Return(
  421. fn.None[wire.MsgTx]()).Once()
  422. // Since the mempool lookup failed, we exepect state Init and no
  423. // RBFInfo.
  424. state, rbf := s.decideStateAndRBFInfo(op)
  425. require.True(rbf.IsNone())
  426. require.Equal(Init, state)
  427. // Mock the mempool lookup to return a tx three times as we are calling
  428. // attachAvailableRBFInfo three times.
  429. tx := wire.MsgTx{}
  430. mockMempool.On("LookupInputMempoolSpend", op).Return(
  431. fn.Some(tx)).Times(3)
  432. // Mock the store to return an error saying the tx cannot be found.
  433. mockStore.On("GetTx", tx.TxHash()).Return(nil, ErrTxNotFound).Once()
  434. // Although the db lookup failed, we expect the state to be Published.
  435. state, rbf = s.decideStateAndRBFInfo(op)
  436. require.True(rbf.IsNone())
  437. require.Equal(Published, state)
  438. // Mock the store to return a db error.
  439. dummyErr := errors.New("dummy error")
  440. mockStore.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once()
  441. // Although the db lookup failed, we expect the state to be Published.
  442. state, rbf = s.decideStateAndRBFInfo(op)
  443. require.True(rbf.IsNone())
  444. require.Equal(Published, state)
  445. // Mock the store to return a record.
  446. tr := &TxRecord{
  447. Fee: 100,
  448. FeeRate: 100,
  449. }
  450. mockStore.On("GetTx", tx.TxHash()).Return(tr, nil).Once()
  451. // Call the method again.
  452. state, rbf = s.decideStateAndRBFInfo(op)
  453. // Assert that the RBF info is returned.
  454. rbfInfo := fn.Some(RBFInfo{
  455. Txid: tx.TxHash(),
  456. Fee: btcutil.Amount(tr.Fee),
  457. FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
  458. })
  459. require.Equal(rbfInfo, rbf)
  460. // Assert the state is updated.
  461. require.Equal(Published, state)
  462. }
  463. // TestMarkInputFailed checks that the input is marked as failed as expected.
  464. func TestMarkInputFailed(t *testing.T) {
  465. t.Parallel()
  466. // Create a mock input.
  467. mockInput := &input.MockInput{}
  468. defer mockInput.AssertExpectations(t)
  469. // Mock the `OutPoint` to return a dummy outpoint.
  470. mockInput.On("OutPoint").Return(wire.OutPoint{Hash: chainhash.Hash{1}})
  471. // Create a test sweeper.
  472. s := New(&UtxoSweeperConfig{})
  473. // Create a testing pending input.
  474. pi := &SweeperInput{
  475. state: Init,
  476. Input: mockInput,
  477. }
  478. // Call the method under test.
  479. s.markInputFailed(pi, errors.New("dummy error"))
  480. // Assert the state is updated.
  481. require.Equal(t, Failed, pi.state)
  482. }
  483. // TestSweepPendingInputs checks that `sweepPendingInputs` correctly executes
  484. // its workflow based on the returned values from the interfaces.
  485. func TestSweepPendingInputs(t *testing.T) {
  486. t.Parallel()
  487. // Create a mock wallet and aggregator.
  488. wallet := &MockWallet{}
  489. defer wallet.AssertExpectations(t)
  490. aggregator := &mockUtxoAggregator{}
  491. defer aggregator.AssertExpectations(t)
  492. publisher := &MockBumper{}
  493. defer publisher.AssertExpectations(t)
  494. // Create a test sweeper.
  495. s := New(&UtxoSweeperConfig{
  496. Wallet: wallet,
  497. Aggregator: aggregator,
  498. Publisher: publisher,
  499. GenSweepScript: func() ([]byte, error) {
  500. return testPubKey.SerializeCompressed(), nil
  501. },
  502. NoDeadlineConfTarget: uint32(DefaultDeadlineDelta),
  503. })
  504. // Set a current height to test the deadline override.
  505. s.currentHeight = testHeight
  506. // Create an input set that needs wallet inputs.
  507. setNeedWallet := &MockInputSet{}
  508. defer setNeedWallet.AssertExpectations(t)
  509. // Mock this set to ask for wallet input.
  510. setNeedWallet.On("NeedWalletInput").Return(true).Once()
  511. setNeedWallet.On("AddWalletInputs", wallet).Return(nil).Once()
  512. // Mock the wallet to require the lock once.
  513. wallet.On("WithCoinSelectLock", mock.Anything).Return(nil).Once()
  514. // Create an input set that doesn't need wallet inputs.
  515. normalSet := &MockInputSet{}
  516. defer normalSet.AssertExpectations(t)
  517. normalSet.On("NeedWalletInput").Return(false).Once()
  518. // Mock the methods used in `sweep`. This is not important for this
  519. // unit test.
  520. setNeedWallet.On("Inputs").Return(nil).Times(4)
  521. setNeedWallet.On("DeadlineHeight").Return(testHeight).Once()
  522. setNeedWallet.On("Budget").Return(btcutil.Amount(1)).Once()
  523. setNeedWallet.On("StartingFeeRate").Return(
  524. fn.None[chainfee.SatPerKWeight]()).Once()
  525. normalSet.On("Inputs").Return(nil).Times(4)
  526. normalSet.On("DeadlineHeight").Return(testHeight).Once()
  527. normalSet.On("Budget").Return(btcutil.Amount(1)).Once()
  528. normalSet.On("StartingFeeRate").Return(
  529. fn.None[chainfee.SatPerKWeight]()).Once()
  530. // Make pending inputs for testing. We don't need real values here as
  531. // the returned clusters are mocked.
  532. pis := make(InputsMap)
  533. // Mock the aggregator to return the mocked input sets.
  534. aggregator.On("ClusterInputs", pis).Return([]InputSet{
  535. setNeedWallet, normalSet,
  536. })
  537. // Mock `Broadcast` to return an error. This should cause the
  538. // `createSweepTx` inside `sweep` to fail. This is done so we can
  539. // terminate the method early as we are only interested in testing the
  540. // workflow in `sweepPendingInputs`. We don't need to test `sweep` here
  541. // as it should be tested in its own unit test.
  542. dummyErr := errors.New("dummy error")
  543. publisher.On("Broadcast", mock.Anything).Return(nil, dummyErr).Twice()
  544. // Call the method under test.
  545. s.sweepPendingInputs(pis)
  546. }
  547. // TestHandleBumpEventTxFailed checks that the sweeper correctly handles the
  548. // case where the bump event tx fails to be published.
  549. func TestHandleBumpEventTxFailed(t *testing.T) {
  550. t.Parallel()
  551. // Create a test sweeper.
  552. s := New(&UtxoSweeperConfig{})
  553. var (
  554. // Create four testing outpoints.
  555. op1 = wire.OutPoint{Hash: chainhash.Hash{1}}
  556. op2 = wire.OutPoint{Hash: chainhash.Hash{2}}
  557. op3 = wire.OutPoint{Hash: chainhash.Hash{3}}
  558. opNotExist = wire.OutPoint{Hash: chainhash.Hash{4}}
  559. )
  560. // Create three mock inputs.
  561. input1 := &input.MockInput{}
  562. defer input1.AssertExpectations(t)
  563. input2 := &input.MockInput{}
  564. defer input2.AssertExpectations(t)
  565. input3 := &input.MockInput{}
  566. defer input3.AssertExpectations(t)
  567. // Construct the initial state for the sweeper.
  568. s.inputs = InputsMap{
  569. op1: &SweeperInput{Input: input1, state: PendingPublish},
  570. op2: &SweeperInput{Input: input2, state: PendingPublish},
  571. op3: &SweeperInput{Input: input3, state: PendingPublish},
  572. }
  573. // Create a testing tx that spends the first two inputs.
  574. tx := &wire.MsgTx{
  575. TxIn: []*wire.TxIn{
  576. {PreviousOutPoint: op1},
  577. {PreviousOutPoint: op2},
  578. {PreviousOutPoint: opNotExist},
  579. },
  580. }
  581. // Create a testing bump result.
  582. br := &BumpResult{
  583. Tx: tx,
  584. Event: TxFailed,
  585. Err: errDummy,
  586. }
  587. // Call the method under test.
  588. err := s.handleBumpEvent(br)
  589. require.ErrorIs(t, err, errDummy)
  590. // Assert the states of the first two inputs are updated.
  591. require.Equal(t, PublishFailed, s.inputs[op1].state)
  592. require.Equal(t, PublishFailed, s.inputs[op2].state)
  593. // Assert the state of the third input is not updated.
  594. require.Equal(t, PendingPublish, s.inputs[op3].state)
  595. // Assert the non-existing input is not added to the pending inputs.
  596. require.NotContains(t, s.inputs, opNotExist)
  597. }
  598. // TestHandleBumpEventTxReplaced checks that the sweeper correctly handles the
  599. // case where the bump event tx is replaced.
  600. func TestHandleBumpEventTxReplaced(t *testing.T) {
  601. t.Parallel()
  602. // Create a mock store.
  603. store := &MockSweeperStore{}
  604. defer store.AssertExpectations(t)
  605. // Create a mock wallet.
  606. wallet := &MockWallet{}
  607. defer wallet.AssertExpectations(t)
  608. // Create a test sweeper.
  609. s := New(&UtxoSweeperConfig{
  610. Store: store,
  611. Wallet: wallet,
  612. })
  613. // Create a testing outpoint.
  614. op := wire.OutPoint{Hash: chainhash.Hash{1}}
  615. // Create a mock input.
  616. inp := &input.MockInput{}
  617. defer inp.AssertExpectations(t)
  618. // Construct the initial state for the sweeper.
  619. s.inputs = InputsMap{
  620. op: &SweeperInput{Input: inp, state: PendingPublish},
  621. }
  622. // Create a testing tx that spends the input.
  623. tx := &wire.MsgTx{
  624. LockTime: 1,
  625. TxIn: []*wire.TxIn{
  626. {PreviousOutPoint: op},
  627. },
  628. }
  629. // Create a replacement tx.
  630. replacementTx := &wire.MsgTx{
  631. LockTime: 2,
  632. TxIn: []*wire.TxIn{
  633. {PreviousOutPoint: op},
  634. },
  635. }
  636. // Create a testing bump result.
  637. br := &BumpResult{
  638. Tx: replacementTx,
  639. ReplacedTx: tx,
  640. Event: TxReplaced,
  641. }
  642. // Mock the store to return an error.
  643. dummyErr := errors.New("dummy error")
  644. store.On("GetTx", tx.TxHash()).Return(nil, dummyErr).Once()
  645. // Call the method under test and assert the error is returned.
  646. err := s.handleBumpEventTxReplaced(br)
  647. require.ErrorIs(t, err, dummyErr)
  648. // Mock the store to return the old tx record.
  649. store.On("GetTx", tx.TxHash()).Return(&TxRecord{
  650. Txid: tx.TxHash(),
  651. }, nil).Once()
  652. // We expect to cancel rebroadcasting the replaced tx.
  653. wallet.On("CancelRebroadcast", tx.TxHash()).Once()
  654. // Mock an error returned when deleting the old tx record.
  655. store.On("DeleteTx", tx.TxHash()).Return(dummyErr).Once()
  656. // Call the method under test and assert the error is returned.
  657. err = s.handleBumpEventTxReplaced(br)
  658. require.ErrorIs(t, err, dummyErr)
  659. // Mock the store to return the old tx record and delete it without
  660. // error.
  661. store.On("GetTx", tx.TxHash()).Return(&TxRecord{
  662. Txid: tx.TxHash(),
  663. }, nil).Once()
  664. store.On("DeleteTx", tx.TxHash()).Return(nil).Once()
  665. // Mock the store to save the new tx record.
  666. store.On("StoreTx", &TxRecord{
  667. Txid: replacementTx.TxHash(),
  668. Published: true,
  669. }).Return(nil).Once()
  670. // We expect to cancel rebroadcasting the replaced tx.
  671. wallet.On("CancelRebroadcast", tx.TxHash()).Once()
  672. // Call the method under test.
  673. err = s.handleBumpEventTxReplaced(br)
  674. require.NoError(t, err)
  675. // Assert the state of the input is updated.
  676. require.Equal(t, Published, s.inputs[op].state)
  677. }
  678. // TestHandleBumpEventTxPublished checks that the sweeper correctly handles the
  679. // case where the bump event tx is published.
  680. func TestHandleBumpEventTxPublished(t *testing.T) {
  681. t.Parallel()
  682. // Create a mock store.
  683. store := &MockSweeperStore{}
  684. defer store.AssertExpectations(t)
  685. // Create a test sweeper.
  686. s := New(&UtxoSweeperConfig{
  687. Store: store,
  688. })
  689. // Create a testing outpoint.
  690. op := wire.OutPoint{Hash: chainhash.Hash{1}}
  691. // Create a mock input.
  692. inp := &input.MockInput{}
  693. defer inp.AssertExpectations(t)
  694. // Construct the initial state for the sweeper.
  695. s.inputs = InputsMap{
  696. op: &SweeperInput{Input: inp, state: PendingPublish},
  697. }
  698. // Create a testing tx that spends the input.
  699. tx := &wire.MsgTx{
  700. LockTime: 1,
  701. TxIn: []*wire.TxIn{
  702. {PreviousOutPoint: op},
  703. },
  704. }
  705. // Create a testing bump result.
  706. br := &BumpResult{
  707. Tx: tx,
  708. Event: TxPublished,
  709. }
  710. // Mock the store to save the new tx record.
  711. store.On("StoreTx", &TxRecord{
  712. Txid: tx.TxHash(),
  713. Published: true,
  714. }).Return(nil).Once()
  715. // Call the method under test.
  716. err := s.handleBumpEventTxPublished(br)
  717. require.NoError(t, err)
  718. // Assert the state of the input is updated.
  719. require.Equal(t, Published, s.inputs[op].state)
  720. }
  721. // TestMonitorFeeBumpResult checks that the fee bump monitor loop correctly
  722. // exits when the sweeper is stopped, the tx is confirmed or failed.
  723. func TestMonitorFeeBumpResult(t *testing.T) {
  724. // Create a mock store.
  725. store := &MockSweeperStore{}
  726. defer store.AssertExpectations(t)
  727. // Create a mock wallet.
  728. wallet := &MockWallet{}
  729. defer wallet.AssertExpectations(t)
  730. // Create a test sweeper.
  731. s := New(&UtxoSweeperConfig{
  732. Store: store,
  733. Wallet: wallet,
  734. })
  735. // Create a testing outpoint.
  736. op := wire.OutPoint{Hash: chainhash.Hash{1}}
  737. // Create a mock input.
  738. inp := &input.MockInput{}
  739. defer inp.AssertExpectations(t)
  740. // Construct the initial state for the sweeper.
  741. s.inputs = InputsMap{
  742. op: &SweeperInput{Input: inp, state: PendingPublish},
  743. }
  744. // Create a testing tx that spends the input.
  745. tx := &wire.MsgTx{
  746. LockTime: 1,
  747. TxIn: []*wire.TxIn{
  748. {PreviousOutPoint: op},
  749. },
  750. }
  751. testCases := []struct {
  752. name string
  753. setupResultChan func() <-chan *BumpResult
  754. shouldExit bool
  755. }{
  756. {
  757. // When a tx confirmed event is received, we expect to
  758. // exit the monitor loop.
  759. name: "tx confirmed",
  760. // We send a result with TxConfirmed event to the
  761. // result channel.
  762. setupResultChan: func() <-chan *BumpResult {
  763. // Create a result chan.
  764. resultChan := make(chan *BumpResult, 1)
  765. resultChan <- &BumpResult{
  766. Tx: tx,
  767. Event: TxConfirmed,
  768. Fee: 10000,
  769. FeeRate: 100,
  770. }
  771. // We expect to cancel rebroadcasting the tx
  772. // once confirmed.
  773. wallet.On("CancelRebroadcast",
  774. tx.TxHash()).Once()
  775. return resultChan
  776. },
  777. shouldExit: true,
  778. },
  779. {
  780. // When a tx failed event is received, we expect to
  781. // exit the monitor loop.
  782. name: "tx failed",
  783. // We send a result with TxConfirmed event to the
  784. // result channel.
  785. setupResultChan: func() <-chan *BumpResult {
  786. // Create a result chan.
  787. resultChan := make(chan *BumpResult, 1)
  788. resultChan <- &BumpResult{
  789. Tx: tx,
  790. Event: TxFailed,
  791. Err: errDummy,
  792. }
  793. // We expect to cancel rebroadcasting the tx
  794. // once failed.
  795. wallet.On("CancelRebroadcast",
  796. tx.TxHash()).Once()
  797. return resultChan
  798. },
  799. shouldExit: true,
  800. },
  801. {
  802. // When processing non-confirmed events, the monitor
  803. // should not exit.
  804. name: "no exit on normal event",
  805. // We send a result with TxPublished and mock the
  806. // method `StoreTx` to return nil.
  807. setupResultChan: func() <-chan *BumpResult {
  808. // Create a result chan.
  809. resultChan := make(chan *BumpResult, 1)
  810. resultChan <- &BumpResult{
  811. Tx: tx,
  812. Event: TxPublished,
  813. }
  814. return resultChan
  815. },
  816. shouldExit: false,
  817. }, {
  818. // When the sweeper is shutting down, the monitor loop
  819. // should exit.
  820. name: "exit on sweeper shutdown",
  821. // We don't send anything but quit the sweeper.
  822. setupResultChan: func() <-chan *BumpResult {
  823. close(s.quit)
  824. return nil
  825. },
  826. shouldExit: true,
  827. },
  828. }
  829. for _, tc := range testCases {
  830. tc := tc
  831. t.Run(tc.name, func(t *testing.T) {
  832. // Setup the testing result channel.
  833. resultChan := tc.setupResultChan()
  834. // Create a done chan that's used to signal the monitor
  835. // has exited.
  836. done := make(chan struct{})
  837. s.wg.Add(1)
  838. go func() {
  839. s.monitorFeeBumpResult(resultChan)
  840. close(done)
  841. }()
  842. // The monitor is expected to exit, we check it's done
  843. // in one second or fail.
  844. if tc.shouldExit {
  845. select {
  846. case <-done:
  847. case <-time.After(1 * time.Second):
  848. require.Fail(t, "monitor not exited")
  849. }
  850. return
  851. }
  852. // The monitor should not exit, check it doesn't close
  853. // the `done` channel within one second.
  854. select {
  855. case <-done:
  856. require.Fail(t, "monitor exited")
  857. case <-time.After(1 * time.Second):
  858. }
  859. })
  860. }
  861. }