123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686 |
- package discovery
- import (
- "fmt"
- "reflect"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/btcsuite/btcd/btcec/v2"
- "github.com/davecgh/go-spew/spew"
- "github.com/lightningnetwork/lnd/lntest/wait"
- "github.com/lightningnetwork/lnd/lnwire"
- "github.com/lightningnetwork/lnd/routing/route"
- "github.com/lightningnetwork/lnd/ticker"
- "github.com/stretchr/testify/require"
- )
- // randPeer creates a random peer.
- func randPeer(t *testing.T, quit chan struct{}) *mockPeer {
- t.Helper()
- pk := randPubKey(t)
- return peerWithPubkey(pk, quit)
- }
- func peerWithPubkey(pk *btcec.PublicKey, quit chan struct{}) *mockPeer {
- return &mockPeer{
- pk: pk,
- sentMsgs: make(chan lnwire.Message),
- quit: quit,
- }
- }
- // newTestSyncManager creates a new test SyncManager using mock implementations
- // of its dependencies.
- func newTestSyncManager(numActiveSyncers int) *SyncManager {
- return newPinnedTestSyncManager(numActiveSyncers, nil)
- }
- // newTestSyncManager creates a new test SyncManager with a set of pinned
- // syncers using mock implementations of its dependencies.
- func newPinnedTestSyncManager(numActiveSyncers int,
- pinnedSyncers PinnedSyncers) *SyncManager {
- hID := lnwire.ShortChannelID{BlockHeight: latestKnownHeight}
- return newSyncManager(&SyncManagerCfg{
- ChanSeries: newMockChannelGraphTimeSeries(hID),
- RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
- HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
- NumActiveSyncers: numActiveSyncers,
- BestHeight: func() uint32 {
- return latestKnownHeight
- },
- PinnedSyncers: pinnedSyncers,
- })
- }
- // TestSyncManagerNumActiveSyncers ensures that we are unable to have more than
- // NumActiveSyncers active syncers.
- func TestSyncManagerNumActiveSyncers(t *testing.T) {
- t.Parallel()
- // We'll start by creating our test sync manager which will hold up to
- // 3 active syncers.
- const numActiveSyncers = 3
- const numPinnedSyncers = 3
- const numInactiveSyncers = 1
- pinnedSyncers := make(PinnedSyncers)
- pinnedPubkeys := make(map[route.Vertex]*btcec.PublicKey)
- for i := 0; i < numPinnedSyncers; i++ {
- pubkey := randPubKey(t)
- vertex := route.NewVertex(pubkey)
- pinnedSyncers[vertex] = struct{}{}
- pinnedPubkeys[vertex] = pubkey
- }
- syncMgr := newPinnedTestSyncManager(numActiveSyncers, pinnedSyncers)
- syncMgr.Start()
- defer syncMgr.Stop()
- // First we'll start by adding the pinned syncers. These should
- // immediately be assigned PinnedSync.
- for _, pubkey := range pinnedPubkeys {
- peer := peerWithPubkey(pubkey, syncMgr.quit)
- err := syncMgr.InitSyncState(peer)
- require.NoError(t, err)
- s := assertSyncerExistence(t, syncMgr, peer)
- assertTransitionToChansSynced(t, s, peer)
- assertActiveGossipTimestampRange(t, peer)
- assertSyncerStatus(t, s, chansSynced, PinnedSync)
- }
- // We'll go ahead and create our syncers. We'll gather the ones which
- // should be active and passive to check them later on. The pinned peers
- // added above should not influence the active syncer count.
- for i := 0; i < numActiveSyncers; i++ {
- peer := randPeer(t, syncMgr.quit)
- err := syncMgr.InitSyncState(peer)
- require.NoError(t, err)
- s := assertSyncerExistence(t, syncMgr, peer)
- // The first syncer registered always attempts a historical
- // sync.
- if i == 0 {
- assertTransitionToChansSynced(t, s, peer)
- }
- assertActiveGossipTimestampRange(t, peer)
- assertSyncerStatus(t, s, chansSynced, ActiveSync)
- }
- for i := 0; i < numInactiveSyncers; i++ {
- peer := randPeer(t, syncMgr.quit)
- err := syncMgr.InitSyncState(peer)
- require.NoError(t, err)
- s := assertSyncerExistence(t, syncMgr, peer)
- assertSyncerStatus(t, s, chansSynced, PassiveSync)
- }
- }
- // TestSyncManagerNewActiveSyncerAfterDisconnect ensures that we can regain an
- // active syncer after losing one due to the peer disconnecting.
- func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) {
- t.Parallel()
- // We'll create our test sync manager to have two active syncers.
- syncMgr := newTestSyncManager(2)
- syncMgr.Start()
- defer syncMgr.Stop()
- // The first will be an active syncer that performs a historical sync
- // since it is the first one registered with the SyncManager.
- historicalSyncPeer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(historicalSyncPeer)
- historicalSyncer := assertSyncerExistence(t, syncMgr, historicalSyncPeer)
- assertTransitionToChansSynced(t, historicalSyncer, historicalSyncPeer)
- assertActiveGossipTimestampRange(t, historicalSyncPeer)
- assertSyncerStatus(t, historicalSyncer, chansSynced, ActiveSync)
- // Then, we'll create the second active syncer, which is the one we'll
- // disconnect.
- activeSyncPeer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(activeSyncPeer)
- activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer)
- assertActiveGossipTimestampRange(t, activeSyncPeer)
- assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync)
- // It will then be torn down to simulate a disconnection. Since there
- // are no other candidate syncers available, the active syncer won't be
- // replaced.
- syncMgr.PruneSyncState(activeSyncPeer.PubKey())
- // Then, we'll start our active syncer again, but this time we'll also
- // have a passive syncer available to replace the active syncer after
- // the peer disconnects.
- syncMgr.InitSyncState(activeSyncPeer)
- activeSyncer = assertSyncerExistence(t, syncMgr, activeSyncPeer)
- assertActiveGossipTimestampRange(t, activeSyncPeer)
- assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync)
- // Create our second peer, which should be initialized as a passive
- // syncer.
- newActiveSyncPeer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(newActiveSyncPeer)
- newActiveSyncer := assertSyncerExistence(t, syncMgr, newActiveSyncPeer)
- assertSyncerStatus(t, newActiveSyncer, chansSynced, PassiveSync)
- // Disconnect our active syncer, which should trigger the SyncManager to
- // replace it with our passive syncer.
- go syncMgr.PruneSyncState(activeSyncPeer.PubKey())
- assertPassiveSyncerTransition(t, newActiveSyncer, newActiveSyncPeer)
- }
- // TestSyncManagerRotateActiveSyncerCandidate tests that we can successfully
- // rotate our active syncers after a certain interval.
- func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) {
- t.Parallel()
- // We'll create our sync manager with three active syncers.
- syncMgr := newTestSyncManager(1)
- syncMgr.Start()
- defer syncMgr.Stop()
- // The first syncer registered always performs a historical sync.
- activeSyncPeer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(activeSyncPeer)
- activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer)
- assertTransitionToChansSynced(t, activeSyncer, activeSyncPeer)
- assertActiveGossipTimestampRange(t, activeSyncPeer)
- assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync)
- // We'll send a tick to force a rotation. Since there aren't any
- // candidates, none of the active syncers will be rotated.
- syncMgr.cfg.RotateTicker.(*ticker.Force).Force <- time.Time{}
- assertNoMsgSent(t, activeSyncPeer)
- assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync)
- // We'll then go ahead and add a passive syncer.
- passiveSyncPeer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(passiveSyncPeer)
- passiveSyncer := assertSyncerExistence(t, syncMgr, passiveSyncPeer)
- assertSyncerStatus(t, passiveSyncer, chansSynced, PassiveSync)
- // We'll force another rotation - this time, since we have a passive
- // syncer available, they should be rotated.
- syncMgr.cfg.RotateTicker.(*ticker.Force).Force <- time.Time{}
- // The transition from an active syncer to a passive syncer causes the
- // peer to send out a new GossipTimestampRange in the past so that they
- // don't receive new graph updates.
- assertActiveSyncerTransition(t, activeSyncer, activeSyncPeer)
- // The transition from a passive syncer to an active syncer causes the
- // peer to send a new GossipTimestampRange with the current timestamp to
- // signal that they would like to receive new graph updates from their
- // peers. This will also cause the gossip syncer to redo its state
- // machine, starting from its initial syncingChans state. We'll then
- // need to transition it to its final chansSynced state to ensure the
- // next syncer is properly started in the round-robin.
- assertPassiveSyncerTransition(t, passiveSyncer, passiveSyncPeer)
- }
- // TestSyncManagerNoInitialHistoricalSync ensures no initial sync is attempted
- // when NumActiveSyncers is set to 0.
- func TestSyncManagerNoInitialHistoricalSync(t *testing.T) {
- t.Parallel()
- syncMgr := newTestSyncManager(0)
- syncMgr.Start()
- defer syncMgr.Stop()
- // We should not expect any messages from the peer.
- peer := randPeer(t, syncMgr.quit)
- err := syncMgr.InitSyncState(peer)
- require.NoError(t, err)
- assertNoMsgSent(t, peer)
- // Force the historical syncer to tick. This shouldn't happen normally
- // since the ticker is never started. However, we will test that even if
- // this were to occur that a historical sync does not progress.
- syncMgr.cfg.HistoricalSyncTicker.(*ticker.Force).Force <- time.Time{}
- assertNoMsgSent(t, peer)
- s := assertSyncerExistence(t, syncMgr, peer)
- assertSyncerStatus(t, s, chansSynced, PassiveSync)
- }
- // TestSyncManagerInitialHistoricalSync ensures that we only attempt a single
- // historical sync during the SyncManager's startup. If the peer corresponding
- // to the initial historical syncer disconnects, we should attempt to find a
- // replacement.
- func TestSyncManagerInitialHistoricalSync(t *testing.T) {
- t.Parallel()
- syncMgr := newTestSyncManager(1)
- // The graph should not be considered as synced since the sync manager
- // has yet to start.
- if syncMgr.IsGraphSynced() {
- t.Fatal("expected graph to not be considered as synced")
- }
- syncMgr.Start()
- defer syncMgr.Stop()
- // We should expect to see a QueryChannelRange message with a
- // FirstBlockHeight of the genesis block, signaling that an initial
- // historical sync is being attempted.
- peer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(peer)
- assertMsgSent(t, peer, &lnwire.QueryChannelRange{
- FirstBlockHeight: 0,
- NumBlocks: latestKnownHeight,
- QueryOptions: lnwire.NewTimestampQueryOption(),
- })
- // The graph should not be considered as synced since the initial
- // historical sync has not finished.
- if syncMgr.IsGraphSynced() {
- t.Fatal("expected graph to not be considered as synced")
- }
- // If an additional peer connects, then another historical sync should
- // not be attempted.
- finalHistoricalPeer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(finalHistoricalPeer)
- finalHistoricalSyncer := assertSyncerExistence(t, syncMgr, finalHistoricalPeer)
- assertNoMsgSent(t, finalHistoricalPeer)
- // If we disconnect the peer performing the initial historical sync, a
- // new one should be chosen.
- syncMgr.PruneSyncState(peer.PubKey())
- // Complete the initial historical sync by transitionining the syncer to
- // its final chansSynced state. The graph should be considered as synced
- // after the fact.
- assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer)
- if !syncMgr.IsGraphSynced() {
- t.Fatal("expected graph to be considered as synced")
- }
- // The historical syncer should be active after the sync completes.
- assertActiveGossipTimestampRange(t, finalHistoricalPeer)
- // Once the initial historical sync has succeeded, another one should
- // not be attempted by disconnecting the peer who performed it.
- extraPeer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(extraPeer)
- // Pruning the first peer will cause the passive peer to send an active
- // gossip timestamp msg, which we must consume asynchronously for the
- // call to return.
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- assertActiveGossipTimestampRange(t, extraPeer)
- }()
- syncMgr.PruneSyncState(finalHistoricalPeer.PubKey())
- wg.Wait()
- // No further messages should be sent.
- assertNoMsgSent(t, extraPeer)
- }
- // TestSyncManagerHistoricalSyncOnReconnect tests that the sync manager will
- // re-trigger a historical sync when a new peer connects after a historical
- // sync has completed, but we have lost all peers.
- func TestSyncManagerHistoricalSyncOnReconnect(t *testing.T) {
- t.Parallel()
- syncMgr := newTestSyncManager(2)
- syncMgr.Start()
- defer syncMgr.Stop()
- // We should expect to see a QueryChannelRange message with a
- // FirstBlockHeight of the genesis block, signaling that an initial
- // historical sync is being attempted.
- peer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(peer)
- s := assertSyncerExistence(t, syncMgr, peer)
- assertTransitionToChansSynced(t, s, peer)
- assertActiveGossipTimestampRange(t, peer)
- assertSyncerStatus(t, s, chansSynced, ActiveSync)
- // Now that the historical sync is completed, we prune the syncer,
- // simulating all peers having disconnected.
- syncMgr.PruneSyncState(peer.PubKey())
- // If a new peer now connects, then another historical sync should
- // be attempted. This is to ensure we get an up-to-date graph if we
- // haven't had any peers for a time.
- nextPeer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(nextPeer)
- s1 := assertSyncerExistence(t, syncMgr, nextPeer)
- assertTransitionToChansSynced(t, s1, nextPeer)
- assertActiveGossipTimestampRange(t, nextPeer)
- assertSyncerStatus(t, s1, chansSynced, ActiveSync)
- }
- // TestSyncManagerForceHistoricalSync ensures that we can perform routine
- // historical syncs whenever the HistoricalSyncTicker fires.
- func TestSyncManagerForceHistoricalSync(t *testing.T) {
- t.Parallel()
- syncMgr := newTestSyncManager(1)
- syncMgr.Start()
- defer syncMgr.Stop()
- // We should expect to see a QueryChannelRange message with a
- // FirstBlockHeight of the genesis block, signaling that a historical
- // sync is being attempted.
- peer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(peer)
- assertMsgSent(t, peer, &lnwire.QueryChannelRange{
- FirstBlockHeight: 0,
- NumBlocks: latestKnownHeight,
- QueryOptions: lnwire.NewTimestampQueryOption(),
- })
- // If an additional peer connects, then a historical sync should not be
- // attempted again.
- extraPeer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(extraPeer)
- assertNoMsgSent(t, extraPeer)
- // Then, we'll send a tick to force a historical sync. This should
- // trigger the extra peer to also perform a historical sync since the
- // first peer is not eligible due to not being in a chansSynced state.
- syncMgr.cfg.HistoricalSyncTicker.(*ticker.Force).Force <- time.Time{}
- assertMsgSent(t, extraPeer, &lnwire.QueryChannelRange{
- FirstBlockHeight: 0,
- NumBlocks: latestKnownHeight,
- QueryOptions: lnwire.NewTimestampQueryOption(),
- })
- }
- // TestSyncManagerGraphSyncedAfterHistoricalSyncReplacement ensures that the
- // sync manager properly marks the graph as synced given that our initial
- // historical sync has stalled, but a replacement has fully completed.
- func TestSyncManagerGraphSyncedAfterHistoricalSyncReplacement(t *testing.T) {
- t.Parallel()
- syncMgr := newTestSyncManager(1)
- syncMgr.Start()
- defer syncMgr.Stop()
- // We should expect to see a QueryChannelRange message with a
- // FirstBlockHeight of the genesis block, signaling that an initial
- // historical sync is being attempted.
- peer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(peer)
- assertMsgSent(t, peer, &lnwire.QueryChannelRange{
- FirstBlockHeight: 0,
- NumBlocks: latestKnownHeight,
- QueryOptions: lnwire.NewTimestampQueryOption(),
- })
- // The graph should not be considered as synced since the initial
- // historical sync has not finished.
- if syncMgr.IsGraphSynced() {
- t.Fatal("expected graph to not be considered as synced")
- }
- // If an additional peer connects, then another historical sync should
- // not be attempted.
- finalHistoricalPeer := randPeer(t, syncMgr.quit)
- syncMgr.InitSyncState(finalHistoricalPeer)
- finalHistoricalSyncer := assertSyncerExistence(t, syncMgr, finalHistoricalPeer)
- assertNoMsgSent(t, finalHistoricalPeer)
- // To simulate that our initial historical sync has stalled, we'll force
- // a historical sync with the new peer to ensure it is replaced.
- syncMgr.cfg.HistoricalSyncTicker.(*ticker.Force).Force <- time.Time{}
- // The graph should still not be considered as synced since the
- // replacement historical sync has not finished.
- if syncMgr.IsGraphSynced() {
- t.Fatal("expected graph to not be considered as synced")
- }
- // Complete the replacement historical sync by transitioning the syncer
- // to its final chansSynced state. The graph should be considered as
- // synced after the fact.
- assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer)
- if !syncMgr.IsGraphSynced() {
- t.Fatal("expected graph to be considered as synced")
- }
- }
- // TestSyncManagerWaitUntilInitialHistoricalSync ensures that no GossipSyncers
- // are initialized as ActiveSync until the initial historical sync has been
- // completed. Once it does, the pending GossipSyncers should be transitioned to
- // ActiveSync.
- func TestSyncManagerWaitUntilInitialHistoricalSync(t *testing.T) {
- t.Parallel()
- const numActiveSyncers = 2
- // We'll start by creating our test sync manager which will hold up to
- // 2 active syncers.
- syncMgr := newTestSyncManager(numActiveSyncers)
- syncMgr.Start()
- defer syncMgr.Stop()
- // We'll go ahead and create our syncers.
- peers := make([]*mockPeer, 0, numActiveSyncers)
- syncers := make([]*GossipSyncer, 0, numActiveSyncers)
- for i := 0; i < numActiveSyncers; i++ {
- peer := randPeer(t, syncMgr.quit)
- peers = append(peers, peer)
- syncMgr.InitSyncState(peer)
- s := assertSyncerExistence(t, syncMgr, peer)
- syncers = append(syncers, s)
- // The first one always attempts a historical sync. We won't
- // transition it to chansSynced to ensure the remaining syncers
- // aren't started as active.
- if i == 0 {
- assertSyncerStatus(t, s, syncingChans, PassiveSync)
- continue
- }
- // The rest should remain in a passive and chansSynced state,
- // and they should be queued to transition to active once the
- // initial historical sync is completed.
- assertNoMsgSent(t, peer)
- assertSyncerStatus(t, s, chansSynced, PassiveSync)
- }
- // To ensure we don't transition any pending active syncers that have
- // previously disconnected, we'll disconnect the last one.
- stalePeer := peers[numActiveSyncers-1]
- syncMgr.PruneSyncState(stalePeer.PubKey())
- // Then, we'll complete the initial historical sync by transitioning the
- // historical syncer to its final chansSynced state. This should trigger
- // all of the pending active syncers to transition, except for the one
- // we disconnected.
- assertTransitionToChansSynced(t, syncers[0], peers[0])
- for i, s := range syncers {
- if i == numActiveSyncers-1 {
- assertNoMsgSent(t, peers[i])
- continue
- }
- assertPassiveSyncerTransition(t, s, peers[i])
- }
- }
- // assertNoMsgSent is a helper function that ensures a peer hasn't sent any
- // messages.
- func assertNoMsgSent(t *testing.T, peer *mockPeer) {
- t.Helper()
- select {
- case msg := <-peer.sentMsgs:
- t.Fatalf("peer %x sent unexpected message %v", peer.PubKey(),
- spew.Sdump(msg))
- case <-time.After(time.Second):
- }
- }
- // assertMsgSent asserts that the peer has sent the given message.
- func assertMsgSent(t *testing.T, peer *mockPeer, msg lnwire.Message) {
- t.Helper()
- var msgSent lnwire.Message
- select {
- case msgSent = <-peer.sentMsgs:
- case <-time.After(time.Second):
- t.Fatalf("expected peer %x to send %T message", peer.PubKey(),
- msg)
- }
- if !reflect.DeepEqual(msgSent, msg) {
- t.Fatalf("expected peer %x to send message: %v\ngot: %v",
- peer.PubKey(), spew.Sdump(msg), spew.Sdump(msgSent))
- }
- }
- // assertActiveGossipTimestampRange is a helper function that ensures a peer has
- // sent a lnwire.GossipTimestampRange message indicating that it would like to
- // receive new graph updates.
- func assertActiveGossipTimestampRange(t *testing.T, peer *mockPeer) {
- t.Helper()
- var msgSent lnwire.Message
- select {
- case msgSent = <-peer.sentMsgs:
- case <-time.After(2 * time.Second):
- t.Fatalf("expected peer %x to send lnwire.GossipTimestampRange "+
- "message", peer.PubKey())
- }
- msg, ok := msgSent.(*lnwire.GossipTimestampRange)
- if !ok {
- t.Fatalf("expected peer %x to send %T message", peer.PubKey(),
- msg)
- }
- if msg.FirstTimestamp == 0 {
- t.Fatalf("expected *lnwire.GossipTimestampRange message with " +
- "non-zero FirstTimestamp")
- }
- if msg.TimestampRange == 0 {
- t.Fatalf("expected *lnwire.GossipTimestampRange message with " +
- "non-zero TimestampRange")
- }
- }
- // assertSyncerExistence asserts that a GossipSyncer exists for the given peer.
- func assertSyncerExistence(t *testing.T, syncMgr *SyncManager,
- peer *mockPeer) *GossipSyncer {
- t.Helper()
- s, ok := syncMgr.GossipSyncer(peer.PubKey())
- if !ok {
- t.Fatalf("gossip syncer for peer %x not found", peer.PubKey())
- }
- return s
- }
- // assertSyncerStatus asserts that the gossip syncer for the given peer matches
- // the expected sync state and type.
- func assertSyncerStatus(t *testing.T, s *GossipSyncer, syncState syncerState,
- syncType SyncerType) {
- t.Helper()
- // We'll check the status of our syncer within a WaitPredicate as some
- // sync transitions might cause this to be racy.
- err := wait.NoError(func() error {
- state := s.syncState()
- if s.syncState() != syncState {
- return fmt.Errorf("expected syncState %v for peer "+
- "%x, got %v", syncState, s.cfg.peerPub, state)
- }
- typ := s.SyncType()
- if s.SyncType() != syncType {
- return fmt.Errorf("expected syncType %v for peer "+
- "%x, got %v", syncType, s.cfg.peerPub, typ)
- }
- return nil
- }, time.Second)
- if err != nil {
- t.Fatal(err)
- }
- }
- // assertTransitionToChansSynced asserts the transition of an ActiveSync
- // GossipSyncer to its final chansSynced state.
- func assertTransitionToChansSynced(t *testing.T, s *GossipSyncer, peer *mockPeer) {
- t.Helper()
- query := &lnwire.QueryChannelRange{
- FirstBlockHeight: 0,
- NumBlocks: latestKnownHeight,
- QueryOptions: lnwire.NewTimestampQueryOption(),
- }
- assertMsgSent(t, peer, query)
- require.Eventually(t, func() bool {
- return s.syncState() == waitingQueryRangeReply
- }, time.Second, 500*time.Millisecond)
- require.NoError(t, s.ProcessQueryMsg(&lnwire.ReplyChannelRange{
- ChainHash: query.ChainHash,
- FirstBlockHeight: query.FirstBlockHeight,
- NumBlocks: query.NumBlocks,
- Complete: 1,
- }, nil))
- chanSeries := s.cfg.channelSeries.(*mockChannelGraphTimeSeries)
- select {
- case <-chanSeries.filterReq:
- chanSeries.filterResp <- nil
- case <-time.After(2 * time.Second):
- t.Fatal("expected to receive FilterKnownChanIDs request")
- }
- err := wait.NoError(func() error {
- state := syncerState(atomic.LoadUint32(&s.state))
- if state != chansSynced {
- return fmt.Errorf("expected syncerState %v, got %v",
- chansSynced, state)
- }
- return nil
- }, time.Second)
- if err != nil {
- t.Fatal(err)
- }
- }
- // assertPassiveSyncerTransition asserts that a gossip syncer goes through all
- // of its expected steps when transitioning from passive to active.
- func assertPassiveSyncerTransition(t *testing.T, s *GossipSyncer, peer *mockPeer) {
- t.Helper()
- assertActiveGossipTimestampRange(t, peer)
- assertSyncerStatus(t, s, chansSynced, ActiveSync)
- }
- // assertActiveSyncerTransition asserts that a gossip syncer goes through all of
- // its expected steps when transitioning from active to passive.
- func assertActiveSyncerTransition(t *testing.T, s *GossipSyncer, peer *mockPeer) {
- t.Helper()
- assertMsgSent(t, peer, &lnwire.GossipTimestampRange{
- FirstTimestamp: uint32(zeroTimestamp.Unix()),
- TimestampRange: 0,
- })
- assertSyncerStatus(t, s, chansSynced, PassiveSync)
- }
|