agent.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876
  1. package autopilot
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "sync"
  8. "time"
  9. "github.com/btcsuite/btcd/btcec/v2"
  10. "github.com/btcsuite/btcd/btcutil"
  11. "github.com/davecgh/go-spew/spew"
  12. "github.com/lightningnetwork/lnd/lnwire"
  13. )
  14. // Config couples all the items that an autopilot agent needs to function.
  15. // All items within the struct MUST be populated for the Agent to be able to
  16. // carry out its duties.
  17. type Config struct {
  18. // Self is the identity public key of the Lightning Network node that
  19. // is being driven by the agent. This is used to ensure that we don't
  20. // accidentally attempt to open a channel with ourselves.
  21. Self *btcec.PublicKey
  22. // Heuristic is an attachment heuristic which will govern to whom we
  23. // open channels to, and also what those channels look like in terms of
  24. // desired capacity. The Heuristic will take into account the current
  25. // state of the graph, our set of open channels, and the amount of
  26. // available funds when determining how channels are to be opened.
  27. // Additionally, a heuristic make also factor in extra-graph
  28. // information in order to make more pertinent recommendations.
  29. Heuristic AttachmentHeuristic
  30. // ChanController is an interface that is able to directly manage the
  31. // creation, closing and update of channels within the network.
  32. ChanController ChannelController
  33. // ConnectToPeer attempts to connect to the peer using one of its
  34. // advertised addresses. The boolean returned signals whether the peer
  35. // was already connected.
  36. ConnectToPeer func(*btcec.PublicKey, []net.Addr) (bool, error)
  37. // DisconnectPeer attempts to disconnect the peer with the given public
  38. // key.
  39. DisconnectPeer func(*btcec.PublicKey) error
  40. // WalletBalance is a function closure that should return the current
  41. // available balance of the backing wallet.
  42. WalletBalance func() (btcutil.Amount, error)
  43. // Graph is an abstract channel graph that the Heuristic and the Agent
  44. // will use to make decisions w.r.t channel allocation and placement
  45. // within the graph.
  46. Graph ChannelGraph
  47. // Constraints is the set of constraints the autopilot must adhere to
  48. // when opening channels.
  49. Constraints AgentConstraints
  50. // TODO(roasbeef): add additional signals from fee rates and revenue of
  51. // currently opened channels
  52. }
  53. // channelState is a type that represents the set of active channels of the
  54. // backing LN node that the Agent should be aware of. This type contains a few
  55. // helper utility methods.
  56. type channelState map[lnwire.ShortChannelID]LocalChannel
  57. // Channels returns a slice of all the active channels.
  58. func (c channelState) Channels() []LocalChannel {
  59. chans := make([]LocalChannel, 0, len(c))
  60. for _, channel := range c {
  61. chans = append(chans, channel)
  62. }
  63. return chans
  64. }
  65. // ConnectedNodes returns the set of nodes we currently have a channel with.
  66. // This information is needed as we want to avoid making repeated channels with
  67. // any node.
  68. func (c channelState) ConnectedNodes() map[NodeID]struct{} {
  69. nodes := make(map[NodeID]struct{})
  70. for _, channels := range c {
  71. nodes[channels.Node] = struct{}{}
  72. }
  73. // TODO(roasbeef): add outgoing, nodes, allow incoming and outgoing to
  74. // per node
  75. // * only add node is chan as funding amt set
  76. return nodes
  77. }
  78. // Agent implements a closed-loop control system which seeks to autonomously
  79. // optimize the allocation of satoshis within channels throughput the network's
  80. // channel graph. An agent is configurable by swapping out different
  81. // AttachmentHeuristic strategies. The agent uses external signals such as the
  82. // wallet balance changing, or new channels being opened/closed for the local
  83. // node as an indicator to re-examine its internal state, and the amount of
  84. // available funds in order to make updated decisions w.r.t the channel graph.
  85. // The Agent will automatically open, close, and splice in/out channel as
  86. // necessary for it to step closer to its optimal state.
  87. //
  88. // TODO(roasbeef): prob re-word
  89. type Agent struct {
  90. started sync.Once
  91. stopped sync.Once
  92. // cfg houses the configuration state of the Ant.
  93. cfg Config
  94. // chanState tracks the current set of open channels.
  95. chanState channelState
  96. chanStateMtx sync.Mutex
  97. // stateUpdates is a channel that any external state updates that may
  98. // affect the heuristics of the agent will be sent over.
  99. stateUpdates chan interface{}
  100. // balanceUpdates is a channel where notifications about updates to the
  101. // wallet's balance will be sent. This channel will be buffered to
  102. // ensure we have at most one pending update of this type to handle at
  103. // a given time.
  104. balanceUpdates chan *balanceUpdate
  105. // nodeUpdates is a channel that changes to the graph node landscape
  106. // will be sent over. This channel will be buffered to ensure we have
  107. // at most one pending update of this type to handle at a given time.
  108. nodeUpdates chan *nodeUpdates
  109. // pendingOpenUpdates is a channel where updates about channel pending
  110. // opening will be sent. This channel will be buffered to ensure we
  111. // have at most one pending update of this type to handle at a given
  112. // time.
  113. pendingOpenUpdates chan *chanPendingOpenUpdate
  114. // chanOpenFailures is a channel where updates about channel open
  115. // failures will be sent. This channel will be buffered to ensure we
  116. // have at most one pending update of this type to handle at a given
  117. // time.
  118. chanOpenFailures chan *chanOpenFailureUpdate
  119. // heuristicUpdates is a channel where updates from active heuristics
  120. // will be sent.
  121. heuristicUpdates chan *heuristicUpdate
  122. // totalBalance is the total number of satoshis the backing wallet is
  123. // known to control at any given instance. This value will be updated
  124. // when the agent receives external balance update signals.
  125. totalBalance btcutil.Amount
  126. // failedNodes lists nodes that we've previously attempted to initiate
  127. // channels with, but didn't succeed.
  128. failedNodes map[NodeID]struct{}
  129. // pendingConns tracks the nodes that we are attempting to make
  130. // connections to. This prevents us from making duplicate connection
  131. // requests to the same node.
  132. pendingConns map[NodeID]struct{}
  133. // pendingOpens tracks the channels that we've requested to be
  134. // initiated, but haven't yet been confirmed as being fully opened.
  135. // This state is required as otherwise, we may go over our allotted
  136. // channel limit, or open multiple channels to the same node.
  137. pendingOpens map[NodeID]LocalChannel
  138. pendingMtx sync.Mutex
  139. quit chan struct{}
  140. wg sync.WaitGroup
  141. }
  142. // New creates a new instance of the Agent instantiated using the passed
  143. // configuration and initial channel state. The initial channel state slice
  144. // should be populated with the set of Channels that are currently opened by
  145. // the backing Lightning Node.
  146. func New(cfg Config, initialState []LocalChannel) (*Agent, error) {
  147. a := &Agent{
  148. cfg: cfg,
  149. chanState: make(map[lnwire.ShortChannelID]LocalChannel),
  150. quit: make(chan struct{}),
  151. stateUpdates: make(chan interface{}),
  152. balanceUpdates: make(chan *balanceUpdate, 1),
  153. nodeUpdates: make(chan *nodeUpdates, 1),
  154. chanOpenFailures: make(chan *chanOpenFailureUpdate, 1),
  155. heuristicUpdates: make(chan *heuristicUpdate, 1),
  156. pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1),
  157. failedNodes: make(map[NodeID]struct{}),
  158. pendingConns: make(map[NodeID]struct{}),
  159. pendingOpens: make(map[NodeID]LocalChannel),
  160. }
  161. for _, c := range initialState {
  162. a.chanState[c.ChanID] = c
  163. }
  164. return a, nil
  165. }
  166. // Start starts the agent along with any goroutines it needs to perform its
  167. // normal duties.
  168. func (a *Agent) Start() error {
  169. var err error
  170. a.started.Do(func() {
  171. err = a.start()
  172. })
  173. return err
  174. }
  175. func (a *Agent) start() error {
  176. rand.Seed(time.Now().Unix())
  177. log.Infof("Autopilot Agent starting")
  178. a.wg.Add(1)
  179. go a.controller()
  180. return nil
  181. }
  182. // Stop signals the Agent to gracefully shutdown. This function will block
  183. // until all goroutines have exited.
  184. func (a *Agent) Stop() error {
  185. var err error
  186. a.stopped.Do(func() {
  187. err = a.stop()
  188. })
  189. return err
  190. }
  191. func (a *Agent) stop() error {
  192. log.Infof("Autopilot Agent stopping")
  193. close(a.quit)
  194. a.wg.Wait()
  195. return nil
  196. }
  197. // balanceUpdate is a type of external state update that reflects an
  198. // increase/decrease in the funds currently available to the wallet.
  199. type balanceUpdate struct {
  200. }
  201. // nodeUpdates is a type of external state update that reflects an addition or
  202. // modification in channel graph node membership.
  203. type nodeUpdates struct{}
  204. // chanOpenUpdate is a type of external state update that indicates a new
  205. // channel has been opened, either by the Agent itself (within the main
  206. // controller loop), or by an external user to the system.
  207. type chanOpenUpdate struct {
  208. newChan LocalChannel
  209. }
  210. // chanPendingOpenUpdate is a type of external state update that indicates a new
  211. // channel has been opened, either by the agent itself or an external subsystem,
  212. // but is still pending.
  213. type chanPendingOpenUpdate struct{}
  214. // chanOpenFailureUpdate is a type of external state update that indicates
  215. // a previous channel open failed, and that it might be possible to try again.
  216. type chanOpenFailureUpdate struct{}
  217. // heuristicUpdate is an update sent when one of the autopilot heuristics has
  218. // changed, and prompts the agent to make a new attempt at opening more
  219. // channels.
  220. type heuristicUpdate struct {
  221. heuristic AttachmentHeuristic
  222. }
  223. // chanCloseUpdate is a type of external state update that indicates that the
  224. // backing Lightning Node has closed a previously open channel.
  225. type chanCloseUpdate struct {
  226. closedChans []lnwire.ShortChannelID
  227. }
  228. // OnBalanceChange is a callback that should be executed each time the balance
  229. // of the backing wallet changes.
  230. func (a *Agent) OnBalanceChange() {
  231. select {
  232. case a.balanceUpdates <- &balanceUpdate{}:
  233. default:
  234. }
  235. }
  236. // OnNodeUpdates is a callback that should be executed each time our channel
  237. // graph has new nodes or their node announcements are updated.
  238. func (a *Agent) OnNodeUpdates() {
  239. select {
  240. case a.nodeUpdates <- &nodeUpdates{}:
  241. default:
  242. }
  243. }
  244. // OnChannelOpen is a callback that should be executed each time a new channel
  245. // is manually opened by the user or any system outside the autopilot agent.
  246. func (a *Agent) OnChannelOpen(c LocalChannel) {
  247. a.wg.Add(1)
  248. go func() {
  249. defer a.wg.Done()
  250. select {
  251. case a.stateUpdates <- &chanOpenUpdate{newChan: c}:
  252. case <-a.quit:
  253. }
  254. }()
  255. }
  256. // OnChannelPendingOpen is a callback that should be executed each time a new
  257. // channel is opened, either by the agent or an external subsystems, but is
  258. // still pending.
  259. func (a *Agent) OnChannelPendingOpen() {
  260. select {
  261. case a.pendingOpenUpdates <- &chanPendingOpenUpdate{}:
  262. default:
  263. }
  264. }
  265. // OnChannelOpenFailure is a callback that should be executed when the
  266. // autopilot has attempted to open a channel, but failed. In this case we can
  267. // retry channel creation with a different node.
  268. func (a *Agent) OnChannelOpenFailure() {
  269. select {
  270. case a.chanOpenFailures <- &chanOpenFailureUpdate{}:
  271. default:
  272. }
  273. }
  274. // OnChannelClose is a callback that should be executed each time a prior
  275. // channel has been closed for any reason. This includes regular
  276. // closes, force closes, and channel breaches.
  277. func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) {
  278. a.wg.Add(1)
  279. go func() {
  280. defer a.wg.Done()
  281. select {
  282. case a.stateUpdates <- &chanCloseUpdate{closedChans: closedChans}:
  283. case <-a.quit:
  284. }
  285. }()
  286. }
  287. // OnHeuristicUpdate is a method called when a heuristic has been updated, to
  288. // trigger the agent to do a new state assessment.
  289. func (a *Agent) OnHeuristicUpdate(h AttachmentHeuristic) {
  290. select {
  291. case a.heuristicUpdates <- &heuristicUpdate{
  292. heuristic: h,
  293. }:
  294. default:
  295. }
  296. }
  297. // mergeNodeMaps merges the Agent's set of nodes that it already has active
  298. // channels open to, with the other sets of nodes that should be removed from
  299. // consideration during heuristic selection. This ensures that the Agent doesn't
  300. // attempt to open any "duplicate" channels to the same node.
  301. func mergeNodeMaps(c map[NodeID]LocalChannel,
  302. skips ...map[NodeID]struct{}) map[NodeID]struct{} {
  303. numNodes := len(c)
  304. for _, skip := range skips {
  305. numNodes += len(skip)
  306. }
  307. res := make(map[NodeID]struct{}, numNodes)
  308. for nodeID := range c {
  309. res[nodeID] = struct{}{}
  310. }
  311. for _, skip := range skips {
  312. for nodeID := range skip {
  313. res[nodeID] = struct{}{}
  314. }
  315. }
  316. return res
  317. }
  318. // mergeChanState merges the Agent's set of active channels, with the set of
  319. // channels awaiting confirmation. This ensures that the agent doesn't go over
  320. // the prescribed channel limit or fund allocation limit.
  321. func mergeChanState(pendingChans map[NodeID]LocalChannel,
  322. activeChans channelState) []LocalChannel {
  323. numChans := len(pendingChans) + len(activeChans)
  324. totalChans := make([]LocalChannel, 0, numChans)
  325. totalChans = append(totalChans, activeChans.Channels()...)
  326. for _, pendingChan := range pendingChans {
  327. totalChans = append(totalChans, pendingChan)
  328. }
  329. return totalChans
  330. }
  331. // controller implements the closed-loop control system of the Agent. The
  332. // controller will make a decision w.r.t channel placement within the graph
  333. // based on: its current internal state of the set of active channels open,
  334. // and external state changes as a result of decisions it makes w.r.t channel
  335. // allocation, or attributes affecting its control loop being updated by the
  336. // backing Lightning Node.
  337. func (a *Agent) controller() {
  338. defer a.wg.Done()
  339. // We'll start off by assigning our starting balance, and injecting
  340. // that amount as an initial wake up to the main controller goroutine.
  341. a.OnBalanceChange()
  342. // TODO(roasbeef): do we in fact need to maintain order?
  343. // * use sync.Cond if so
  344. updateBalance := func() {
  345. newBalance, err := a.cfg.WalletBalance()
  346. if err != nil {
  347. log.Warnf("unable to update wallet balance: %v", err)
  348. return
  349. }
  350. a.totalBalance = newBalance
  351. }
  352. // TODO(roasbeef): add 10-minute wake up timer
  353. for {
  354. select {
  355. // A new external signal has arrived. We'll use this to update
  356. // our internal state, then determine if we should trigger a
  357. // channel state modification (open/close, splice in/out).
  358. case signal := <-a.stateUpdates:
  359. log.Infof("Processing new external signal")
  360. switch update := signal.(type) {
  361. // A new channel has been opened successfully. This was
  362. // either opened by the Agent, or an external system
  363. // that is able to drive the Lightning Node.
  364. case *chanOpenUpdate:
  365. log.Debugf("New channel successfully opened, "+
  366. "updating state with: %v",
  367. spew.Sdump(update.newChan))
  368. newChan := update.newChan
  369. a.chanStateMtx.Lock()
  370. a.chanState[newChan.ChanID] = newChan
  371. a.chanStateMtx.Unlock()
  372. a.pendingMtx.Lock()
  373. delete(a.pendingOpens, newChan.Node)
  374. a.pendingMtx.Unlock()
  375. updateBalance()
  376. // A channel has been closed, this may free up an
  377. // available slot, triggering a new channel update.
  378. case *chanCloseUpdate:
  379. log.Debugf("Applying closed channel "+
  380. "updates: %v",
  381. spew.Sdump(update.closedChans))
  382. a.chanStateMtx.Lock()
  383. for _, closedChan := range update.closedChans {
  384. delete(a.chanState, closedChan)
  385. }
  386. a.chanStateMtx.Unlock()
  387. updateBalance()
  388. }
  389. // A new channel has been opened by the agent or an external
  390. // subsystem, but is still pending confirmation.
  391. case <-a.pendingOpenUpdates:
  392. updateBalance()
  393. // The balance of the backing wallet has changed, if more funds
  394. // are now available, we may attempt to open up an additional
  395. // channel, or splice in funds to an existing one.
  396. case <-a.balanceUpdates:
  397. log.Debug("Applying external balance state update")
  398. updateBalance()
  399. // The channel we tried to open previously failed for whatever
  400. // reason.
  401. case <-a.chanOpenFailures:
  402. log.Debug("Retrying after previous channel open " +
  403. "failure.")
  404. updateBalance()
  405. // New nodes have been added to the graph or their node
  406. // announcements have been updated. We will consider opening
  407. // channels to these nodes if we haven't stabilized.
  408. case <-a.nodeUpdates:
  409. log.Debugf("Node updates received, assessing " +
  410. "need for more channels")
  411. // Any of the deployed heuristics has been updated, check
  412. // whether we have new channel candidates available.
  413. case upd := <-a.heuristicUpdates:
  414. log.Debugf("Heuristic %v updated, assessing need for "+
  415. "more channels", upd.heuristic.Name())
  416. // The agent has been signalled to exit, so we'll bail out
  417. // immediately.
  418. case <-a.quit:
  419. return
  420. }
  421. a.pendingMtx.Lock()
  422. log.Debugf("Pending channels: %v", spew.Sdump(a.pendingOpens))
  423. a.pendingMtx.Unlock()
  424. // With all the updates applied, we'll obtain a set of the
  425. // current active channels (confirmed channels), and also
  426. // factor in our set of unconfirmed channels.
  427. a.chanStateMtx.Lock()
  428. a.pendingMtx.Lock()
  429. totalChans := mergeChanState(a.pendingOpens, a.chanState)
  430. a.pendingMtx.Unlock()
  431. a.chanStateMtx.Unlock()
  432. // Now that we've updated our internal state, we'll consult our
  433. // channel attachment heuristic to determine if we can open
  434. // up any additional channels while staying within our
  435. // constraints.
  436. availableFunds, numChans := a.cfg.Constraints.ChannelBudget(
  437. totalChans, a.totalBalance,
  438. )
  439. switch {
  440. case numChans == 0:
  441. continue
  442. // If the amount is too small, we don't want to attempt opening
  443. // another channel.
  444. case availableFunds == 0:
  445. continue
  446. case availableFunds < a.cfg.Constraints.MinChanSize():
  447. continue
  448. }
  449. log.Infof("Triggering attachment directive dispatch, "+
  450. "total_funds=%v", a.totalBalance)
  451. err := a.openChans(availableFunds, numChans, totalChans)
  452. if err != nil {
  453. log.Errorf("Unable to open channels: %v", err)
  454. }
  455. }
  456. }
  457. // openChans queries the agent's heuristic for a set of channel candidates, and
  458. // attempts to open channels to them.
  459. func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
  460. totalChans []LocalChannel) error {
  461. // As channel size we'll use the maximum channel size available.
  462. chanSize := a.cfg.Constraints.MaxChanSize()
  463. if availableFunds < chanSize {
  464. chanSize = availableFunds
  465. }
  466. if chanSize < a.cfg.Constraints.MinChanSize() {
  467. return fmt.Errorf("not enough funds available to open a " +
  468. "single channel")
  469. }
  470. // We're to attempt an attachment so we'll obtain the set of
  471. // nodes that we currently have channels with so we avoid
  472. // duplicate edges.
  473. a.chanStateMtx.Lock()
  474. connectedNodes := a.chanState.ConnectedNodes()
  475. a.chanStateMtx.Unlock()
  476. for nID := range connectedNodes {
  477. log.Tracef("Skipping node %x with open channel", nID[:])
  478. }
  479. a.pendingMtx.Lock()
  480. for nID := range a.pendingOpens {
  481. log.Tracef("Skipping node %x with pending channel open", nID[:])
  482. }
  483. for nID := range a.pendingConns {
  484. log.Tracef("Skipping node %x with pending connection", nID[:])
  485. }
  486. for nID := range a.failedNodes {
  487. log.Tracef("Skipping failed node %v", nID[:])
  488. }
  489. nodesToSkip := mergeNodeMaps(a.pendingOpens,
  490. a.pendingConns, connectedNodes, a.failedNodes,
  491. )
  492. a.pendingMtx.Unlock()
  493. // Gather the set of all nodes in the graph, except those we
  494. // want to skip.
  495. selfPubBytes := a.cfg.Self.SerializeCompressed()
  496. nodes := make(map[NodeID]struct{})
  497. addresses := make(map[NodeID][]net.Addr)
  498. if err := a.cfg.Graph.ForEachNode(func(node Node) error {
  499. nID := NodeID(node.PubKey())
  500. // If we come across ourselves, them we'll continue in
  501. // order to avoid attempting to make a channel with
  502. // ourselves.
  503. if bytes.Equal(nID[:], selfPubBytes) {
  504. log.Tracef("Skipping self node %x", nID[:])
  505. return nil
  506. }
  507. // If the node has no known addresses, we cannot connect to it,
  508. // so we'll skip it.
  509. addrs := node.Addrs()
  510. if len(addrs) == 0 {
  511. log.Tracef("Skipping node %x since no addresses known",
  512. nID[:])
  513. return nil
  514. }
  515. addresses[nID] = addrs
  516. // Additionally, if this node is in the blacklist, then
  517. // we'll skip it.
  518. if _, ok := nodesToSkip[nID]; ok {
  519. log.Tracef("Skipping blacklisted node %x", nID[:])
  520. return nil
  521. }
  522. nodes[nID] = struct{}{}
  523. return nil
  524. }); err != nil {
  525. return fmt.Errorf("unable to get graph nodes: %w", err)
  526. }
  527. // Use the heuristic to calculate a score for each node in the
  528. // graph.
  529. log.Debugf("Scoring %d nodes for chan_size=%v", len(nodes), chanSize)
  530. scores, err := a.cfg.Heuristic.NodeScores(
  531. a.cfg.Graph, totalChans, chanSize, nodes,
  532. )
  533. if err != nil {
  534. return fmt.Errorf("unable to calculate node scores : %w", err)
  535. }
  536. log.Debugf("Got scores for %d nodes", len(scores))
  537. // Now use the score to make a weighted choice which nodes to attempt
  538. // to open channels to.
  539. scores, err = chooseN(numChans, scores)
  540. if err != nil {
  541. return fmt.Errorf("unable to make weighted choice: %w",
  542. err)
  543. }
  544. chanCandidates := make(map[NodeID]*AttachmentDirective)
  545. for nID := range scores {
  546. log.Tracef("Creating attachment directive for chosen node %x",
  547. nID[:])
  548. // Track the available funds we have left.
  549. if availableFunds < chanSize {
  550. chanSize = availableFunds
  551. }
  552. availableFunds -= chanSize
  553. // If we run out of funds, we can break early.
  554. if chanSize < a.cfg.Constraints.MinChanSize() {
  555. log.Tracef("Chan size %v too small to satisfy min "+
  556. "channel size %v, breaking", chanSize,
  557. a.cfg.Constraints.MinChanSize())
  558. break
  559. }
  560. chanCandidates[nID] = &AttachmentDirective{
  561. NodeID: nID,
  562. ChanAmt: chanSize,
  563. Addrs: addresses[nID],
  564. }
  565. }
  566. if len(chanCandidates) == 0 {
  567. log.Infof("No eligible candidates to connect to")
  568. return nil
  569. }
  570. log.Infof("Attempting to execute channel attachment "+
  571. "directives: %v", spew.Sdump(chanCandidates))
  572. // Before proceeding, check to see if we have any slots
  573. // available to open channels. If there are any, we will attempt
  574. // to dispatch the retrieved directives since we can't be
  575. // certain which ones may actually succeed. If too many
  576. // connections succeed, they will be ignored and made
  577. // available to future heuristic selections.
  578. a.pendingMtx.Lock()
  579. defer a.pendingMtx.Unlock()
  580. if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens() {
  581. log.Debugf("Reached cap of %v pending "+
  582. "channel opens, will retry "+
  583. "after success/failure",
  584. a.cfg.Constraints.MaxPendingOpens())
  585. return nil
  586. }
  587. // For each recommended attachment directive, we'll launch a
  588. // new goroutine to attempt to carry out the directive. If any
  589. // of these succeed, then we'll receive a new state update,
  590. // taking us back to the top of our controller loop.
  591. for _, chanCandidate := range chanCandidates {
  592. // Skip candidates which we are already trying
  593. // to establish a connection with.
  594. nodeID := chanCandidate.NodeID
  595. if _, ok := a.pendingConns[nodeID]; ok {
  596. continue
  597. }
  598. a.pendingConns[nodeID] = struct{}{}
  599. a.wg.Add(1)
  600. go a.executeDirective(*chanCandidate)
  601. }
  602. return nil
  603. }
  604. // executeDirective attempts to connect to the channel candidate specified by
  605. // the given attachment directive, and open a channel of the given size.
  606. //
  607. // NOTE: MUST be run as a goroutine.
  608. func (a *Agent) executeDirective(directive AttachmentDirective) {
  609. defer a.wg.Done()
  610. // We'll start out by attempting to connect to the peer in order to
  611. // begin the funding workflow.
  612. nodeID := directive.NodeID
  613. pub, err := btcec.ParsePubKey(nodeID[:])
  614. if err != nil {
  615. log.Errorf("Unable to parse pubkey %x: %v", nodeID, err)
  616. return
  617. }
  618. connected := make(chan bool)
  619. errChan := make(chan error)
  620. // To ensure a call to ConnectToPeer doesn't block the agent from
  621. // shutting down, we'll launch it in a non-waitgrouped goroutine, that
  622. // will signal when a result is returned.
  623. // TODO(halseth): use DialContext to cancel on transport level.
  624. go func() {
  625. alreadyConnected, err := a.cfg.ConnectToPeer(
  626. pub, directive.Addrs,
  627. )
  628. if err != nil {
  629. select {
  630. case errChan <- err:
  631. case <-a.quit:
  632. }
  633. return
  634. }
  635. select {
  636. case connected <- alreadyConnected:
  637. case <-a.quit:
  638. return
  639. }
  640. }()
  641. var alreadyConnected bool
  642. select {
  643. case alreadyConnected = <-connected:
  644. case err = <-errChan:
  645. case <-a.quit:
  646. return
  647. }
  648. if err != nil {
  649. log.Warnf("Unable to connect to %x: %v",
  650. pub.SerializeCompressed(), err)
  651. // Since we failed to connect to them, we'll mark them as
  652. // failed so that we don't attempt to connect to them again.
  653. a.pendingMtx.Lock()
  654. delete(a.pendingConns, nodeID)
  655. a.failedNodes[nodeID] = struct{}{}
  656. a.pendingMtx.Unlock()
  657. // Finally, we'll trigger the agent to select new peers to
  658. // connect to.
  659. a.OnChannelOpenFailure()
  660. return
  661. }
  662. // The connection was successful, though before progressing we must
  663. // check that we have not already met our quota for max pending open
  664. // channels. This can happen if multiple directives were spawned but
  665. // fewer slots were available, and other successful attempts finished
  666. // first.
  667. a.pendingMtx.Lock()
  668. if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens() {
  669. // Since we've reached our max number of pending opens, we'll
  670. // disconnect this peer and exit. However, if we were
  671. // previously connected to them, then we'll make sure to
  672. // maintain the connection alive.
  673. if alreadyConnected {
  674. // Since we succeeded in connecting, we won't add this
  675. // peer to the failed nodes map, but we will remove it
  676. // from a.pendingConns so that it can be retried in the
  677. // future.
  678. delete(a.pendingConns, nodeID)
  679. a.pendingMtx.Unlock()
  680. return
  681. }
  682. err = a.cfg.DisconnectPeer(pub)
  683. if err != nil {
  684. log.Warnf("Unable to disconnect peer %x: %v",
  685. pub.SerializeCompressed(), err)
  686. }
  687. // Now that we have disconnected, we can remove this node from
  688. // our pending conns map, permitting subsequent connection
  689. // attempts.
  690. delete(a.pendingConns, nodeID)
  691. a.pendingMtx.Unlock()
  692. return
  693. }
  694. // If we were successful, we'll track this peer in our set of pending
  695. // opens. We do this here to ensure we don't stall on selecting new
  696. // peers if the connection attempt happens to take too long.
  697. delete(a.pendingConns, nodeID)
  698. a.pendingOpens[nodeID] = LocalChannel{
  699. Balance: directive.ChanAmt,
  700. Node: nodeID,
  701. }
  702. a.pendingMtx.Unlock()
  703. // We can then begin the funding workflow with this peer.
  704. err = a.cfg.ChanController.OpenChannel(pub, directive.ChanAmt)
  705. if err != nil {
  706. log.Warnf("Unable to open channel to %x of %v: %v",
  707. pub.SerializeCompressed(), directive.ChanAmt, err)
  708. // As the attempt failed, we'll clear the peer from the set of
  709. // pending opens and mark them as failed so we don't attempt to
  710. // open a channel to them again.
  711. a.pendingMtx.Lock()
  712. delete(a.pendingOpens, nodeID)
  713. a.failedNodes[nodeID] = struct{}{}
  714. a.pendingMtx.Unlock()
  715. // Trigger the agent to re-evaluate everything and possibly
  716. // retry with a different node.
  717. a.OnChannelOpenFailure()
  718. // Finally, we should also disconnect the peer if we weren't
  719. // already connected to them beforehand by an external
  720. // subsystem.
  721. if alreadyConnected {
  722. return
  723. }
  724. err = a.cfg.DisconnectPeer(pub)
  725. if err != nil {
  726. log.Warnf("Unable to disconnect peer %x: %v",
  727. pub.SerializeCompressed(), err)
  728. }
  729. }
  730. // Since the channel open was successful and is currently pending,
  731. // we'll trigger the autopilot agent to query for more peers.
  732. // TODO(halseth): this triggers a new loop before all the new channels
  733. // are added to the pending channels map. Should add before executing
  734. // directive in goroutine?
  735. a.OnChannelPendingOpen()
  736. }