sync_manager_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686
  1. package discovery
  2. import (
  3. "fmt"
  4. "reflect"
  5. "sync"
  6. "sync/atomic"
  7. "testing"
  8. "time"
  9. "github.com/btcsuite/btcd/btcec/v2"
  10. "github.com/davecgh/go-spew/spew"
  11. "github.com/lightningnetwork/lnd/lntest/wait"
  12. "github.com/lightningnetwork/lnd/lnwire"
  13. "github.com/lightningnetwork/lnd/routing/route"
  14. "github.com/lightningnetwork/lnd/ticker"
  15. "github.com/stretchr/testify/require"
  16. )
  17. // randPeer creates a random peer.
  18. func randPeer(t *testing.T, quit chan struct{}) *mockPeer {
  19. t.Helper()
  20. pk := randPubKey(t)
  21. return peerWithPubkey(pk, quit)
  22. }
  23. func peerWithPubkey(pk *btcec.PublicKey, quit chan struct{}) *mockPeer {
  24. return &mockPeer{
  25. pk: pk,
  26. sentMsgs: make(chan lnwire.Message),
  27. quit: quit,
  28. }
  29. }
  30. // newTestSyncManager creates a new test SyncManager using mock implementations
  31. // of its dependencies.
  32. func newTestSyncManager(numActiveSyncers int) *SyncManager {
  33. return newPinnedTestSyncManager(numActiveSyncers, nil)
  34. }
  35. // newTestSyncManager creates a new test SyncManager with a set of pinned
  36. // syncers using mock implementations of its dependencies.
  37. func newPinnedTestSyncManager(numActiveSyncers int,
  38. pinnedSyncers PinnedSyncers) *SyncManager {
  39. hID := lnwire.ShortChannelID{BlockHeight: latestKnownHeight}
  40. return newSyncManager(&SyncManagerCfg{
  41. ChanSeries: newMockChannelGraphTimeSeries(hID),
  42. RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
  43. HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
  44. NumActiveSyncers: numActiveSyncers,
  45. BestHeight: func() uint32 {
  46. return latestKnownHeight
  47. },
  48. PinnedSyncers: pinnedSyncers,
  49. })
  50. }
  51. // TestSyncManagerNumActiveSyncers ensures that we are unable to have more than
  52. // NumActiveSyncers active syncers.
  53. func TestSyncManagerNumActiveSyncers(t *testing.T) {
  54. t.Parallel()
  55. // We'll start by creating our test sync manager which will hold up to
  56. // 3 active syncers.
  57. const numActiveSyncers = 3
  58. const numPinnedSyncers = 3
  59. const numInactiveSyncers = 1
  60. pinnedSyncers := make(PinnedSyncers)
  61. pinnedPubkeys := make(map[route.Vertex]*btcec.PublicKey)
  62. for i := 0; i < numPinnedSyncers; i++ {
  63. pubkey := randPubKey(t)
  64. vertex := route.NewVertex(pubkey)
  65. pinnedSyncers[vertex] = struct{}{}
  66. pinnedPubkeys[vertex] = pubkey
  67. }
  68. syncMgr := newPinnedTestSyncManager(numActiveSyncers, pinnedSyncers)
  69. syncMgr.Start()
  70. defer syncMgr.Stop()
  71. // First we'll start by adding the pinned syncers. These should
  72. // immediately be assigned PinnedSync.
  73. for _, pubkey := range pinnedPubkeys {
  74. peer := peerWithPubkey(pubkey, syncMgr.quit)
  75. err := syncMgr.InitSyncState(peer)
  76. require.NoError(t, err)
  77. s := assertSyncerExistence(t, syncMgr, peer)
  78. assertTransitionToChansSynced(t, s, peer)
  79. assertActiveGossipTimestampRange(t, peer)
  80. assertSyncerStatus(t, s, chansSynced, PinnedSync)
  81. }
  82. // We'll go ahead and create our syncers. We'll gather the ones which
  83. // should be active and passive to check them later on. The pinned peers
  84. // added above should not influence the active syncer count.
  85. for i := 0; i < numActiveSyncers; i++ {
  86. peer := randPeer(t, syncMgr.quit)
  87. err := syncMgr.InitSyncState(peer)
  88. require.NoError(t, err)
  89. s := assertSyncerExistence(t, syncMgr, peer)
  90. // The first syncer registered always attempts a historical
  91. // sync.
  92. if i == 0 {
  93. assertTransitionToChansSynced(t, s, peer)
  94. }
  95. assertActiveGossipTimestampRange(t, peer)
  96. assertSyncerStatus(t, s, chansSynced, ActiveSync)
  97. }
  98. for i := 0; i < numInactiveSyncers; i++ {
  99. peer := randPeer(t, syncMgr.quit)
  100. err := syncMgr.InitSyncState(peer)
  101. require.NoError(t, err)
  102. s := assertSyncerExistence(t, syncMgr, peer)
  103. assertSyncerStatus(t, s, chansSynced, PassiveSync)
  104. }
  105. }
  106. // TestSyncManagerNewActiveSyncerAfterDisconnect ensures that we can regain an
  107. // active syncer after losing one due to the peer disconnecting.
  108. func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) {
  109. t.Parallel()
  110. // We'll create our test sync manager to have two active syncers.
  111. syncMgr := newTestSyncManager(2)
  112. syncMgr.Start()
  113. defer syncMgr.Stop()
  114. // The first will be an active syncer that performs a historical sync
  115. // since it is the first one registered with the SyncManager.
  116. historicalSyncPeer := randPeer(t, syncMgr.quit)
  117. syncMgr.InitSyncState(historicalSyncPeer)
  118. historicalSyncer := assertSyncerExistence(t, syncMgr, historicalSyncPeer)
  119. assertTransitionToChansSynced(t, historicalSyncer, historicalSyncPeer)
  120. assertActiveGossipTimestampRange(t, historicalSyncPeer)
  121. assertSyncerStatus(t, historicalSyncer, chansSynced, ActiveSync)
  122. // Then, we'll create the second active syncer, which is the one we'll
  123. // disconnect.
  124. activeSyncPeer := randPeer(t, syncMgr.quit)
  125. syncMgr.InitSyncState(activeSyncPeer)
  126. activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer)
  127. assertActiveGossipTimestampRange(t, activeSyncPeer)
  128. assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync)
  129. // It will then be torn down to simulate a disconnection. Since there
  130. // are no other candidate syncers available, the active syncer won't be
  131. // replaced.
  132. syncMgr.PruneSyncState(activeSyncPeer.PubKey())
  133. // Then, we'll start our active syncer again, but this time we'll also
  134. // have a passive syncer available to replace the active syncer after
  135. // the peer disconnects.
  136. syncMgr.InitSyncState(activeSyncPeer)
  137. activeSyncer = assertSyncerExistence(t, syncMgr, activeSyncPeer)
  138. assertActiveGossipTimestampRange(t, activeSyncPeer)
  139. assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync)
  140. // Create our second peer, which should be initialized as a passive
  141. // syncer.
  142. newActiveSyncPeer := randPeer(t, syncMgr.quit)
  143. syncMgr.InitSyncState(newActiveSyncPeer)
  144. newActiveSyncer := assertSyncerExistence(t, syncMgr, newActiveSyncPeer)
  145. assertSyncerStatus(t, newActiveSyncer, chansSynced, PassiveSync)
  146. // Disconnect our active syncer, which should trigger the SyncManager to
  147. // replace it with our passive syncer.
  148. go syncMgr.PruneSyncState(activeSyncPeer.PubKey())
  149. assertPassiveSyncerTransition(t, newActiveSyncer, newActiveSyncPeer)
  150. }
  151. // TestSyncManagerRotateActiveSyncerCandidate tests that we can successfully
  152. // rotate our active syncers after a certain interval.
  153. func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) {
  154. t.Parallel()
  155. // We'll create our sync manager with three active syncers.
  156. syncMgr := newTestSyncManager(1)
  157. syncMgr.Start()
  158. defer syncMgr.Stop()
  159. // The first syncer registered always performs a historical sync.
  160. activeSyncPeer := randPeer(t, syncMgr.quit)
  161. syncMgr.InitSyncState(activeSyncPeer)
  162. activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer)
  163. assertTransitionToChansSynced(t, activeSyncer, activeSyncPeer)
  164. assertActiveGossipTimestampRange(t, activeSyncPeer)
  165. assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync)
  166. // We'll send a tick to force a rotation. Since there aren't any
  167. // candidates, none of the active syncers will be rotated.
  168. syncMgr.cfg.RotateTicker.(*ticker.Force).Force <- time.Time{}
  169. assertNoMsgSent(t, activeSyncPeer)
  170. assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync)
  171. // We'll then go ahead and add a passive syncer.
  172. passiveSyncPeer := randPeer(t, syncMgr.quit)
  173. syncMgr.InitSyncState(passiveSyncPeer)
  174. passiveSyncer := assertSyncerExistence(t, syncMgr, passiveSyncPeer)
  175. assertSyncerStatus(t, passiveSyncer, chansSynced, PassiveSync)
  176. // We'll force another rotation - this time, since we have a passive
  177. // syncer available, they should be rotated.
  178. syncMgr.cfg.RotateTicker.(*ticker.Force).Force <- time.Time{}
  179. // The transition from an active syncer to a passive syncer causes the
  180. // peer to send out a new GossipTimestampRange in the past so that they
  181. // don't receive new graph updates.
  182. assertActiveSyncerTransition(t, activeSyncer, activeSyncPeer)
  183. // The transition from a passive syncer to an active syncer causes the
  184. // peer to send a new GossipTimestampRange with the current timestamp to
  185. // signal that they would like to receive new graph updates from their
  186. // peers. This will also cause the gossip syncer to redo its state
  187. // machine, starting from its initial syncingChans state. We'll then
  188. // need to transition it to its final chansSynced state to ensure the
  189. // next syncer is properly started in the round-robin.
  190. assertPassiveSyncerTransition(t, passiveSyncer, passiveSyncPeer)
  191. }
  192. // TestSyncManagerNoInitialHistoricalSync ensures no initial sync is attempted
  193. // when NumActiveSyncers is set to 0.
  194. func TestSyncManagerNoInitialHistoricalSync(t *testing.T) {
  195. t.Parallel()
  196. syncMgr := newTestSyncManager(0)
  197. syncMgr.Start()
  198. defer syncMgr.Stop()
  199. // We should not expect any messages from the peer.
  200. peer := randPeer(t, syncMgr.quit)
  201. err := syncMgr.InitSyncState(peer)
  202. require.NoError(t, err)
  203. assertNoMsgSent(t, peer)
  204. // Force the historical syncer to tick. This shouldn't happen normally
  205. // since the ticker is never started. However, we will test that even if
  206. // this were to occur that a historical sync does not progress.
  207. syncMgr.cfg.HistoricalSyncTicker.(*ticker.Force).Force <- time.Time{}
  208. assertNoMsgSent(t, peer)
  209. s := assertSyncerExistence(t, syncMgr, peer)
  210. assertSyncerStatus(t, s, chansSynced, PassiveSync)
  211. }
  212. // TestSyncManagerInitialHistoricalSync ensures that we only attempt a single
  213. // historical sync during the SyncManager's startup. If the peer corresponding
  214. // to the initial historical syncer disconnects, we should attempt to find a
  215. // replacement.
  216. func TestSyncManagerInitialHistoricalSync(t *testing.T) {
  217. t.Parallel()
  218. syncMgr := newTestSyncManager(1)
  219. // The graph should not be considered as synced since the sync manager
  220. // has yet to start.
  221. if syncMgr.IsGraphSynced() {
  222. t.Fatal("expected graph to not be considered as synced")
  223. }
  224. syncMgr.Start()
  225. defer syncMgr.Stop()
  226. // We should expect to see a QueryChannelRange message with a
  227. // FirstBlockHeight of the genesis block, signaling that an initial
  228. // historical sync is being attempted.
  229. peer := randPeer(t, syncMgr.quit)
  230. syncMgr.InitSyncState(peer)
  231. assertMsgSent(t, peer, &lnwire.QueryChannelRange{
  232. FirstBlockHeight: 0,
  233. NumBlocks: latestKnownHeight,
  234. QueryOptions: lnwire.NewTimestampQueryOption(),
  235. })
  236. // The graph should not be considered as synced since the initial
  237. // historical sync has not finished.
  238. if syncMgr.IsGraphSynced() {
  239. t.Fatal("expected graph to not be considered as synced")
  240. }
  241. // If an additional peer connects, then another historical sync should
  242. // not be attempted.
  243. finalHistoricalPeer := randPeer(t, syncMgr.quit)
  244. syncMgr.InitSyncState(finalHistoricalPeer)
  245. finalHistoricalSyncer := assertSyncerExistence(t, syncMgr, finalHistoricalPeer)
  246. assertNoMsgSent(t, finalHistoricalPeer)
  247. // If we disconnect the peer performing the initial historical sync, a
  248. // new one should be chosen.
  249. syncMgr.PruneSyncState(peer.PubKey())
  250. // Complete the initial historical sync by transitionining the syncer to
  251. // its final chansSynced state. The graph should be considered as synced
  252. // after the fact.
  253. assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer)
  254. if !syncMgr.IsGraphSynced() {
  255. t.Fatal("expected graph to be considered as synced")
  256. }
  257. // The historical syncer should be active after the sync completes.
  258. assertActiveGossipTimestampRange(t, finalHistoricalPeer)
  259. // Once the initial historical sync has succeeded, another one should
  260. // not be attempted by disconnecting the peer who performed it.
  261. extraPeer := randPeer(t, syncMgr.quit)
  262. syncMgr.InitSyncState(extraPeer)
  263. // Pruning the first peer will cause the passive peer to send an active
  264. // gossip timestamp msg, which we must consume asynchronously for the
  265. // call to return.
  266. var wg sync.WaitGroup
  267. wg.Add(1)
  268. go func() {
  269. defer wg.Done()
  270. assertActiveGossipTimestampRange(t, extraPeer)
  271. }()
  272. syncMgr.PruneSyncState(finalHistoricalPeer.PubKey())
  273. wg.Wait()
  274. // No further messages should be sent.
  275. assertNoMsgSent(t, extraPeer)
  276. }
  277. // TestSyncManagerHistoricalSyncOnReconnect tests that the sync manager will
  278. // re-trigger a historical sync when a new peer connects after a historical
  279. // sync has completed, but we have lost all peers.
  280. func TestSyncManagerHistoricalSyncOnReconnect(t *testing.T) {
  281. t.Parallel()
  282. syncMgr := newTestSyncManager(2)
  283. syncMgr.Start()
  284. defer syncMgr.Stop()
  285. // We should expect to see a QueryChannelRange message with a
  286. // FirstBlockHeight of the genesis block, signaling that an initial
  287. // historical sync is being attempted.
  288. peer := randPeer(t, syncMgr.quit)
  289. syncMgr.InitSyncState(peer)
  290. s := assertSyncerExistence(t, syncMgr, peer)
  291. assertTransitionToChansSynced(t, s, peer)
  292. assertActiveGossipTimestampRange(t, peer)
  293. assertSyncerStatus(t, s, chansSynced, ActiveSync)
  294. // Now that the historical sync is completed, we prune the syncer,
  295. // simulating all peers having disconnected.
  296. syncMgr.PruneSyncState(peer.PubKey())
  297. // If a new peer now connects, then another historical sync should
  298. // be attempted. This is to ensure we get an up-to-date graph if we
  299. // haven't had any peers for a time.
  300. nextPeer := randPeer(t, syncMgr.quit)
  301. syncMgr.InitSyncState(nextPeer)
  302. s1 := assertSyncerExistence(t, syncMgr, nextPeer)
  303. assertTransitionToChansSynced(t, s1, nextPeer)
  304. assertActiveGossipTimestampRange(t, nextPeer)
  305. assertSyncerStatus(t, s1, chansSynced, ActiveSync)
  306. }
  307. // TestSyncManagerForceHistoricalSync ensures that we can perform routine
  308. // historical syncs whenever the HistoricalSyncTicker fires.
  309. func TestSyncManagerForceHistoricalSync(t *testing.T) {
  310. t.Parallel()
  311. syncMgr := newTestSyncManager(1)
  312. syncMgr.Start()
  313. defer syncMgr.Stop()
  314. // We should expect to see a QueryChannelRange message with a
  315. // FirstBlockHeight of the genesis block, signaling that a historical
  316. // sync is being attempted.
  317. peer := randPeer(t, syncMgr.quit)
  318. syncMgr.InitSyncState(peer)
  319. assertMsgSent(t, peer, &lnwire.QueryChannelRange{
  320. FirstBlockHeight: 0,
  321. NumBlocks: latestKnownHeight,
  322. QueryOptions: lnwire.NewTimestampQueryOption(),
  323. })
  324. // If an additional peer connects, then a historical sync should not be
  325. // attempted again.
  326. extraPeer := randPeer(t, syncMgr.quit)
  327. syncMgr.InitSyncState(extraPeer)
  328. assertNoMsgSent(t, extraPeer)
  329. // Then, we'll send a tick to force a historical sync. This should
  330. // trigger the extra peer to also perform a historical sync since the
  331. // first peer is not eligible due to not being in a chansSynced state.
  332. syncMgr.cfg.HistoricalSyncTicker.(*ticker.Force).Force <- time.Time{}
  333. assertMsgSent(t, extraPeer, &lnwire.QueryChannelRange{
  334. FirstBlockHeight: 0,
  335. NumBlocks: latestKnownHeight,
  336. QueryOptions: lnwire.NewTimestampQueryOption(),
  337. })
  338. }
  339. // TestSyncManagerGraphSyncedAfterHistoricalSyncReplacement ensures that the
  340. // sync manager properly marks the graph as synced given that our initial
  341. // historical sync has stalled, but a replacement has fully completed.
  342. func TestSyncManagerGraphSyncedAfterHistoricalSyncReplacement(t *testing.T) {
  343. t.Parallel()
  344. syncMgr := newTestSyncManager(1)
  345. syncMgr.Start()
  346. defer syncMgr.Stop()
  347. // We should expect to see a QueryChannelRange message with a
  348. // FirstBlockHeight of the genesis block, signaling that an initial
  349. // historical sync is being attempted.
  350. peer := randPeer(t, syncMgr.quit)
  351. syncMgr.InitSyncState(peer)
  352. assertMsgSent(t, peer, &lnwire.QueryChannelRange{
  353. FirstBlockHeight: 0,
  354. NumBlocks: latestKnownHeight,
  355. QueryOptions: lnwire.NewTimestampQueryOption(),
  356. })
  357. // The graph should not be considered as synced since the initial
  358. // historical sync has not finished.
  359. if syncMgr.IsGraphSynced() {
  360. t.Fatal("expected graph to not be considered as synced")
  361. }
  362. // If an additional peer connects, then another historical sync should
  363. // not be attempted.
  364. finalHistoricalPeer := randPeer(t, syncMgr.quit)
  365. syncMgr.InitSyncState(finalHistoricalPeer)
  366. finalHistoricalSyncer := assertSyncerExistence(t, syncMgr, finalHistoricalPeer)
  367. assertNoMsgSent(t, finalHistoricalPeer)
  368. // To simulate that our initial historical sync has stalled, we'll force
  369. // a historical sync with the new peer to ensure it is replaced.
  370. syncMgr.cfg.HistoricalSyncTicker.(*ticker.Force).Force <- time.Time{}
  371. // The graph should still not be considered as synced since the
  372. // replacement historical sync has not finished.
  373. if syncMgr.IsGraphSynced() {
  374. t.Fatal("expected graph to not be considered as synced")
  375. }
  376. // Complete the replacement historical sync by transitioning the syncer
  377. // to its final chansSynced state. The graph should be considered as
  378. // synced after the fact.
  379. assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer)
  380. if !syncMgr.IsGraphSynced() {
  381. t.Fatal("expected graph to be considered as synced")
  382. }
  383. }
  384. // TestSyncManagerWaitUntilInitialHistoricalSync ensures that no GossipSyncers
  385. // are initialized as ActiveSync until the initial historical sync has been
  386. // completed. Once it does, the pending GossipSyncers should be transitioned to
  387. // ActiveSync.
  388. func TestSyncManagerWaitUntilInitialHistoricalSync(t *testing.T) {
  389. t.Parallel()
  390. const numActiveSyncers = 2
  391. // We'll start by creating our test sync manager which will hold up to
  392. // 2 active syncers.
  393. syncMgr := newTestSyncManager(numActiveSyncers)
  394. syncMgr.Start()
  395. defer syncMgr.Stop()
  396. // We'll go ahead and create our syncers.
  397. peers := make([]*mockPeer, 0, numActiveSyncers)
  398. syncers := make([]*GossipSyncer, 0, numActiveSyncers)
  399. for i := 0; i < numActiveSyncers; i++ {
  400. peer := randPeer(t, syncMgr.quit)
  401. peers = append(peers, peer)
  402. syncMgr.InitSyncState(peer)
  403. s := assertSyncerExistence(t, syncMgr, peer)
  404. syncers = append(syncers, s)
  405. // The first one always attempts a historical sync. We won't
  406. // transition it to chansSynced to ensure the remaining syncers
  407. // aren't started as active.
  408. if i == 0 {
  409. assertSyncerStatus(t, s, syncingChans, PassiveSync)
  410. continue
  411. }
  412. // The rest should remain in a passive and chansSynced state,
  413. // and they should be queued to transition to active once the
  414. // initial historical sync is completed.
  415. assertNoMsgSent(t, peer)
  416. assertSyncerStatus(t, s, chansSynced, PassiveSync)
  417. }
  418. // To ensure we don't transition any pending active syncers that have
  419. // previously disconnected, we'll disconnect the last one.
  420. stalePeer := peers[numActiveSyncers-1]
  421. syncMgr.PruneSyncState(stalePeer.PubKey())
  422. // Then, we'll complete the initial historical sync by transitioning the
  423. // historical syncer to its final chansSynced state. This should trigger
  424. // all of the pending active syncers to transition, except for the one
  425. // we disconnected.
  426. assertTransitionToChansSynced(t, syncers[0], peers[0])
  427. for i, s := range syncers {
  428. if i == numActiveSyncers-1 {
  429. assertNoMsgSent(t, peers[i])
  430. continue
  431. }
  432. assertPassiveSyncerTransition(t, s, peers[i])
  433. }
  434. }
  435. // assertNoMsgSent is a helper function that ensures a peer hasn't sent any
  436. // messages.
  437. func assertNoMsgSent(t *testing.T, peer *mockPeer) {
  438. t.Helper()
  439. select {
  440. case msg := <-peer.sentMsgs:
  441. t.Fatalf("peer %x sent unexpected message %v", peer.PubKey(),
  442. spew.Sdump(msg))
  443. case <-time.After(time.Second):
  444. }
  445. }
  446. // assertMsgSent asserts that the peer has sent the given message.
  447. func assertMsgSent(t *testing.T, peer *mockPeer, msg lnwire.Message) {
  448. t.Helper()
  449. var msgSent lnwire.Message
  450. select {
  451. case msgSent = <-peer.sentMsgs:
  452. case <-time.After(time.Second):
  453. t.Fatalf("expected peer %x to send %T message", peer.PubKey(),
  454. msg)
  455. }
  456. if !reflect.DeepEqual(msgSent, msg) {
  457. t.Fatalf("expected peer %x to send message: %v\ngot: %v",
  458. peer.PubKey(), spew.Sdump(msg), spew.Sdump(msgSent))
  459. }
  460. }
  461. // assertActiveGossipTimestampRange is a helper function that ensures a peer has
  462. // sent a lnwire.GossipTimestampRange message indicating that it would like to
  463. // receive new graph updates.
  464. func assertActiveGossipTimestampRange(t *testing.T, peer *mockPeer) {
  465. t.Helper()
  466. var msgSent lnwire.Message
  467. select {
  468. case msgSent = <-peer.sentMsgs:
  469. case <-time.After(2 * time.Second):
  470. t.Fatalf("expected peer %x to send lnwire.GossipTimestampRange "+
  471. "message", peer.PubKey())
  472. }
  473. msg, ok := msgSent.(*lnwire.GossipTimestampRange)
  474. if !ok {
  475. t.Fatalf("expected peer %x to send %T message", peer.PubKey(),
  476. msg)
  477. }
  478. if msg.FirstTimestamp == 0 {
  479. t.Fatalf("expected *lnwire.GossipTimestampRange message with " +
  480. "non-zero FirstTimestamp")
  481. }
  482. if msg.TimestampRange == 0 {
  483. t.Fatalf("expected *lnwire.GossipTimestampRange message with " +
  484. "non-zero TimestampRange")
  485. }
  486. }
  487. // assertSyncerExistence asserts that a GossipSyncer exists for the given peer.
  488. func assertSyncerExistence(t *testing.T, syncMgr *SyncManager,
  489. peer *mockPeer) *GossipSyncer {
  490. t.Helper()
  491. s, ok := syncMgr.GossipSyncer(peer.PubKey())
  492. if !ok {
  493. t.Fatalf("gossip syncer for peer %x not found", peer.PubKey())
  494. }
  495. return s
  496. }
  497. // assertSyncerStatus asserts that the gossip syncer for the given peer matches
  498. // the expected sync state and type.
  499. func assertSyncerStatus(t *testing.T, s *GossipSyncer, syncState syncerState,
  500. syncType SyncerType) {
  501. t.Helper()
  502. // We'll check the status of our syncer within a WaitPredicate as some
  503. // sync transitions might cause this to be racy.
  504. err := wait.NoError(func() error {
  505. state := s.syncState()
  506. if s.syncState() != syncState {
  507. return fmt.Errorf("expected syncState %v for peer "+
  508. "%x, got %v", syncState, s.cfg.peerPub, state)
  509. }
  510. typ := s.SyncType()
  511. if s.SyncType() != syncType {
  512. return fmt.Errorf("expected syncType %v for peer "+
  513. "%x, got %v", syncType, s.cfg.peerPub, typ)
  514. }
  515. return nil
  516. }, time.Second)
  517. if err != nil {
  518. t.Fatal(err)
  519. }
  520. }
  521. // assertTransitionToChansSynced asserts the transition of an ActiveSync
  522. // GossipSyncer to its final chansSynced state.
  523. func assertTransitionToChansSynced(t *testing.T, s *GossipSyncer, peer *mockPeer) {
  524. t.Helper()
  525. query := &lnwire.QueryChannelRange{
  526. FirstBlockHeight: 0,
  527. NumBlocks: latestKnownHeight,
  528. QueryOptions: lnwire.NewTimestampQueryOption(),
  529. }
  530. assertMsgSent(t, peer, query)
  531. require.Eventually(t, func() bool {
  532. return s.syncState() == waitingQueryRangeReply
  533. }, time.Second, 500*time.Millisecond)
  534. require.NoError(t, s.ProcessQueryMsg(&lnwire.ReplyChannelRange{
  535. ChainHash: query.ChainHash,
  536. FirstBlockHeight: query.FirstBlockHeight,
  537. NumBlocks: query.NumBlocks,
  538. Complete: 1,
  539. }, nil))
  540. chanSeries := s.cfg.channelSeries.(*mockChannelGraphTimeSeries)
  541. select {
  542. case <-chanSeries.filterReq:
  543. chanSeries.filterResp <- nil
  544. case <-time.After(2 * time.Second):
  545. t.Fatal("expected to receive FilterKnownChanIDs request")
  546. }
  547. err := wait.NoError(func() error {
  548. state := syncerState(atomic.LoadUint32(&s.state))
  549. if state != chansSynced {
  550. return fmt.Errorf("expected syncerState %v, got %v",
  551. chansSynced, state)
  552. }
  553. return nil
  554. }, time.Second)
  555. if err != nil {
  556. t.Fatal(err)
  557. }
  558. }
  559. // assertPassiveSyncerTransition asserts that a gossip syncer goes through all
  560. // of its expected steps when transitioning from passive to active.
  561. func assertPassiveSyncerTransition(t *testing.T, s *GossipSyncer, peer *mockPeer) {
  562. t.Helper()
  563. assertActiveGossipTimestampRange(t, peer)
  564. assertSyncerStatus(t, s, chansSynced, ActiveSync)
  565. }
  566. // assertActiveSyncerTransition asserts that a gossip syncer goes through all of
  567. // its expected steps when transitioning from active to passive.
  568. func assertActiveSyncerTransition(t *testing.T, s *GossipSyncer, peer *mockPeer) {
  569. t.Helper()
  570. assertMsgSent(t, peer, &lnwire.GossipTimestampRange{
  571. FirstTimestamp: uint32(zeroTimestamp.Unix()),
  572. TimestampRange: 0,
  573. })
  574. assertSyncerStatus(t, s, chansSynced, PassiveSync)
  575. }