cluster.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587
  1. package redis
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "errors"
  6. "fmt"
  7. "math"
  8. "math/rand"
  9. "net"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/go-redis/redis/internal"
  15. "github.com/go-redis/redis/internal/hashtag"
  16. "github.com/go-redis/redis/internal/pool"
  17. "github.com/go-redis/redis/internal/proto"
  18. "github.com/go-redis/redis/internal/singleflight"
  19. )
  20. var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
  21. // ClusterOptions are used to configure a cluster client and should be
  22. // passed to NewClusterClient.
  23. type ClusterOptions struct {
  24. // A seed list of host:port addresses of cluster nodes.
  25. Addrs []string
  26. // The maximum number of retries before giving up. Command is retried
  27. // on network errors and MOVED/ASK redirects.
  28. // Default is 8.
  29. MaxRedirects int
  30. // Enables read-only commands on slave nodes.
  31. ReadOnly bool
  32. // Allows routing read-only commands to the closest master or slave node.
  33. // It automatically enables ReadOnly.
  34. RouteByLatency bool
  35. // Allows routing read-only commands to the random master or slave node.
  36. RouteRandomly bool
  37. // Following options are copied from Options struct.
  38. OnConnect func(*Conn) error
  39. MaxRetries int
  40. MinRetryBackoff time.Duration
  41. MaxRetryBackoff time.Duration
  42. Password string
  43. DialTimeout time.Duration
  44. ReadTimeout time.Duration
  45. WriteTimeout time.Duration
  46. // PoolSize applies per cluster node and not for the whole cluster.
  47. PoolSize int
  48. PoolTimeout time.Duration
  49. IdleTimeout time.Duration
  50. IdleCheckFrequency time.Duration
  51. TLSConfig *tls.Config
  52. }
  53. func (opt *ClusterOptions) init() {
  54. if opt.MaxRedirects == -1 {
  55. opt.MaxRedirects = 0
  56. } else if opt.MaxRedirects == 0 {
  57. opt.MaxRedirects = 8
  58. }
  59. if opt.RouteByLatency {
  60. opt.ReadOnly = true
  61. }
  62. switch opt.ReadTimeout {
  63. case -1:
  64. opt.ReadTimeout = 0
  65. case 0:
  66. opt.ReadTimeout = 3 * time.Second
  67. }
  68. switch opt.WriteTimeout {
  69. case -1:
  70. opt.WriteTimeout = 0
  71. case 0:
  72. opt.WriteTimeout = opt.ReadTimeout
  73. }
  74. switch opt.MinRetryBackoff {
  75. case -1:
  76. opt.MinRetryBackoff = 0
  77. case 0:
  78. opt.MinRetryBackoff = 8 * time.Millisecond
  79. }
  80. switch opt.MaxRetryBackoff {
  81. case -1:
  82. opt.MaxRetryBackoff = 0
  83. case 0:
  84. opt.MaxRetryBackoff = 512 * time.Millisecond
  85. }
  86. }
  87. func (opt *ClusterOptions) clientOptions() *Options {
  88. const disableIdleCheck = -1
  89. return &Options{
  90. OnConnect: opt.OnConnect,
  91. MaxRetries: opt.MaxRetries,
  92. MinRetryBackoff: opt.MinRetryBackoff,
  93. MaxRetryBackoff: opt.MaxRetryBackoff,
  94. Password: opt.Password,
  95. readOnly: opt.ReadOnly,
  96. DialTimeout: opt.DialTimeout,
  97. ReadTimeout: opt.ReadTimeout,
  98. WriteTimeout: opt.WriteTimeout,
  99. PoolSize: opt.PoolSize,
  100. PoolTimeout: opt.PoolTimeout,
  101. IdleTimeout: opt.IdleTimeout,
  102. IdleCheckFrequency: disableIdleCheck,
  103. TLSConfig: opt.TLSConfig,
  104. }
  105. }
  106. //------------------------------------------------------------------------------
  107. type clusterNode struct {
  108. Client *Client
  109. latency uint32 // atomic
  110. generation uint32 // atomic
  111. loading uint32 // atomic
  112. }
  113. func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
  114. opt := clOpt.clientOptions()
  115. opt.Addr = addr
  116. node := clusterNode{
  117. Client: NewClient(opt),
  118. }
  119. node.latency = math.MaxUint32
  120. if clOpt.RouteByLatency {
  121. go node.updateLatency()
  122. }
  123. return &node
  124. }
  125. func (n *clusterNode) String() string {
  126. return n.Client.String()
  127. }
  128. func (n *clusterNode) Close() error {
  129. return n.Client.Close()
  130. }
  131. func (n *clusterNode) Test() error {
  132. return n.Client.ClusterInfo().Err()
  133. }
  134. func (n *clusterNode) updateLatency() {
  135. const probes = 10
  136. var latency uint32
  137. for i := 0; i < probes; i++ {
  138. start := time.Now()
  139. n.Client.Ping()
  140. probe := uint32(time.Since(start) / time.Microsecond)
  141. latency = (latency + probe) / 2
  142. }
  143. atomic.StoreUint32(&n.latency, latency)
  144. }
  145. func (n *clusterNode) Latency() time.Duration {
  146. latency := atomic.LoadUint32(&n.latency)
  147. return time.Duration(latency) * time.Microsecond
  148. }
  149. func (n *clusterNode) MarkAsLoading() {
  150. atomic.StoreUint32(&n.loading, uint32(time.Now().Unix()))
  151. }
  152. func (n *clusterNode) Loading() bool {
  153. const minute = int64(time.Minute / time.Second)
  154. loading := atomic.LoadUint32(&n.loading)
  155. if loading == 0 {
  156. return false
  157. }
  158. if time.Now().Unix()-int64(loading) < minute {
  159. return true
  160. }
  161. atomic.StoreUint32(&n.loading, 0)
  162. return false
  163. }
  164. func (n *clusterNode) Generation() uint32 {
  165. return atomic.LoadUint32(&n.generation)
  166. }
  167. func (n *clusterNode) SetGeneration(gen uint32) {
  168. for {
  169. v := atomic.LoadUint32(&n.generation)
  170. if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
  171. break
  172. }
  173. }
  174. }
  175. //------------------------------------------------------------------------------
  176. type clusterNodes struct {
  177. opt *ClusterOptions
  178. mu sync.RWMutex
  179. allAddrs []string
  180. allNodes map[string]*clusterNode
  181. clusterAddrs []string
  182. closed bool
  183. nodeCreateGroup singleflight.Group
  184. _generation uint32 // atomic
  185. }
  186. func newClusterNodes(opt *ClusterOptions) *clusterNodes {
  187. return &clusterNodes{
  188. opt: opt,
  189. allAddrs: opt.Addrs,
  190. allNodes: make(map[string]*clusterNode),
  191. }
  192. }
  193. func (c *clusterNodes) Close() error {
  194. c.mu.Lock()
  195. defer c.mu.Unlock()
  196. if c.closed {
  197. return nil
  198. }
  199. c.closed = true
  200. var firstErr error
  201. for _, node := range c.allNodes {
  202. if err := node.Client.Close(); err != nil && firstErr == nil {
  203. firstErr = err
  204. }
  205. }
  206. c.allNodes = nil
  207. c.clusterAddrs = nil
  208. return firstErr
  209. }
  210. func (c *clusterNodes) Addrs() ([]string, error) {
  211. var addrs []string
  212. c.mu.RLock()
  213. closed := c.closed
  214. if !closed {
  215. if len(c.clusterAddrs) > 0 {
  216. addrs = c.clusterAddrs
  217. } else {
  218. addrs = c.allAddrs
  219. }
  220. }
  221. c.mu.RUnlock()
  222. if closed {
  223. return nil, pool.ErrClosed
  224. }
  225. if len(addrs) == 0 {
  226. return nil, errClusterNoNodes
  227. }
  228. return addrs, nil
  229. }
  230. func (c *clusterNodes) NextGeneration() uint32 {
  231. return atomic.AddUint32(&c._generation, 1)
  232. }
  233. // GC removes unused nodes.
  234. func (c *clusterNodes) GC(generation uint32) {
  235. var collected []*clusterNode
  236. c.mu.Lock()
  237. for addr, node := range c.allNodes {
  238. if node.Generation() >= generation {
  239. continue
  240. }
  241. c.clusterAddrs = remove(c.clusterAddrs, addr)
  242. delete(c.allNodes, addr)
  243. collected = append(collected, node)
  244. }
  245. c.mu.Unlock()
  246. for _, node := range collected {
  247. _ = node.Client.Close()
  248. }
  249. }
  250. func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
  251. var node *clusterNode
  252. var err error
  253. c.mu.RLock()
  254. if c.closed {
  255. err = pool.ErrClosed
  256. } else {
  257. node = c.allNodes[addr]
  258. }
  259. c.mu.RUnlock()
  260. return node, err
  261. }
  262. func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
  263. node, err := c.Get(addr)
  264. if err != nil {
  265. return nil, err
  266. }
  267. if node != nil {
  268. return node, nil
  269. }
  270. v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
  271. node := newClusterNode(c.opt, addr)
  272. return node, node.Test()
  273. })
  274. c.mu.Lock()
  275. defer c.mu.Unlock()
  276. if c.closed {
  277. return nil, pool.ErrClosed
  278. }
  279. node, ok := c.allNodes[addr]
  280. if ok {
  281. _ = v.(*clusterNode).Close()
  282. return node, err
  283. }
  284. node = v.(*clusterNode)
  285. c.allAddrs = appendIfNotExists(c.allAddrs, addr)
  286. if err == nil {
  287. c.clusterAddrs = append(c.clusterAddrs, addr)
  288. }
  289. c.allNodes[addr] = node
  290. return node, err
  291. }
  292. func (c *clusterNodes) All() ([]*clusterNode, error) {
  293. c.mu.RLock()
  294. defer c.mu.RUnlock()
  295. if c.closed {
  296. return nil, pool.ErrClosed
  297. }
  298. cp := make([]*clusterNode, 0, len(c.allNodes))
  299. for _, node := range c.allNodes {
  300. cp = append(cp, node)
  301. }
  302. return cp, nil
  303. }
  304. func (c *clusterNodes) Random() (*clusterNode, error) {
  305. addrs, err := c.Addrs()
  306. if err != nil {
  307. return nil, err
  308. }
  309. n := rand.Intn(len(addrs))
  310. return c.GetOrCreate(addrs[n])
  311. }
  312. //------------------------------------------------------------------------------
  313. type clusterState struct {
  314. nodes *clusterNodes
  315. Masters []*clusterNode
  316. Slaves []*clusterNode
  317. slots [][]*clusterNode
  318. generation uint32
  319. createdAt time.Time
  320. }
  321. func newClusterState(
  322. nodes *clusterNodes, slots []ClusterSlot, origin string,
  323. ) (*clusterState, error) {
  324. c := clusterState{
  325. nodes: nodes,
  326. slots: make([][]*clusterNode, hashtag.SlotNumber),
  327. generation: nodes.NextGeneration(),
  328. createdAt: time.Now(),
  329. }
  330. isLoopbackOrigin := isLoopbackAddr(origin)
  331. for _, slot := range slots {
  332. var nodes []*clusterNode
  333. for i, slotNode := range slot.Nodes {
  334. addr := slotNode.Addr
  335. if !isLoopbackOrigin && useOriginAddr(origin, addr) {
  336. addr = origin
  337. }
  338. node, err := c.nodes.GetOrCreate(addr)
  339. if err != nil {
  340. return nil, err
  341. }
  342. node.SetGeneration(c.generation)
  343. nodes = append(nodes, node)
  344. if i == 0 {
  345. c.Masters = appendUniqueNode(c.Masters, node)
  346. } else {
  347. c.Slaves = appendUniqueNode(c.Slaves, node)
  348. }
  349. }
  350. for i := slot.Start; i <= slot.End; i++ {
  351. c.slots[i] = nodes
  352. }
  353. }
  354. time.AfterFunc(time.Minute, func() {
  355. nodes.GC(c.generation)
  356. })
  357. return &c, nil
  358. }
  359. func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
  360. nodes := c.slotNodes(slot)
  361. if len(nodes) > 0 {
  362. return nodes[0], nil
  363. }
  364. return c.nodes.Random()
  365. }
  366. func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
  367. nodes := c.slotNodes(slot)
  368. switch len(nodes) {
  369. case 0:
  370. return c.nodes.Random()
  371. case 1:
  372. return nodes[0], nil
  373. case 2:
  374. if slave := nodes[1]; !slave.Loading() {
  375. return slave, nil
  376. }
  377. return nodes[0], nil
  378. default:
  379. var slave *clusterNode
  380. for i := 0; i < 10; i++ {
  381. n := rand.Intn(len(nodes)-1) + 1
  382. slave = nodes[n]
  383. if !slave.Loading() {
  384. break
  385. }
  386. }
  387. return slave, nil
  388. }
  389. }
  390. func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
  391. const threshold = time.Millisecond
  392. nodes := c.slotNodes(slot)
  393. if len(nodes) == 0 {
  394. return c.nodes.Random()
  395. }
  396. var node *clusterNode
  397. for _, n := range nodes {
  398. if n.Loading() {
  399. continue
  400. }
  401. if node == nil || node.Latency()-n.Latency() > threshold {
  402. node = n
  403. }
  404. }
  405. return node, nil
  406. }
  407. func (c *clusterState) slotRandomNode(slot int) *clusterNode {
  408. nodes := c.slotNodes(slot)
  409. n := rand.Intn(len(nodes))
  410. return nodes[n]
  411. }
  412. func (c *clusterState) slotNodes(slot int) []*clusterNode {
  413. if slot >= 0 && slot < len(c.slots) {
  414. return c.slots[slot]
  415. }
  416. return nil
  417. }
  418. func (c *clusterState) IsConsistent() bool {
  419. if len(c.Masters) > len(c.Slaves) {
  420. return false
  421. }
  422. for _, master := range c.Masters {
  423. s := master.Client.Info("replication").Val()
  424. if !strings.Contains(s, "role:master") {
  425. return false
  426. }
  427. }
  428. for _, slave := range c.Slaves {
  429. s := slave.Client.Info("replication").Val()
  430. if !strings.Contains(s, "role:slave") {
  431. return false
  432. }
  433. }
  434. return true
  435. }
  436. //------------------------------------------------------------------------------
  437. type clusterStateHolder struct {
  438. load func() (*clusterState, error)
  439. state atomic.Value
  440. firstErrMu sync.RWMutex
  441. firstErr error
  442. reloading uint32 // atomic
  443. }
  444. func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder {
  445. return &clusterStateHolder{
  446. load: fn,
  447. }
  448. }
  449. func (c *clusterStateHolder) Reload() (*clusterState, error) {
  450. state, err := c.reload()
  451. if err != nil {
  452. return nil, err
  453. }
  454. if !state.IsConsistent() {
  455. c.LazyReload()
  456. }
  457. return state, nil
  458. }
  459. func (c *clusterStateHolder) reload() (*clusterState, error) {
  460. state, err := c.load()
  461. if err != nil {
  462. c.firstErrMu.Lock()
  463. if c.firstErr == nil {
  464. c.firstErr = err
  465. }
  466. c.firstErrMu.Unlock()
  467. return nil, err
  468. }
  469. c.state.Store(state)
  470. return state, nil
  471. }
  472. func (c *clusterStateHolder) LazyReload() {
  473. if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
  474. return
  475. }
  476. go func() {
  477. defer atomic.StoreUint32(&c.reloading, 0)
  478. for {
  479. state, err := c.reload()
  480. if err != nil {
  481. return
  482. }
  483. time.Sleep(100 * time.Millisecond)
  484. if state.IsConsistent() {
  485. return
  486. }
  487. }
  488. }()
  489. }
  490. func (c *clusterStateHolder) Get() (*clusterState, error) {
  491. v := c.state.Load()
  492. if v != nil {
  493. state := v.(*clusterState)
  494. if time.Since(state.createdAt) > time.Minute {
  495. c.LazyReload()
  496. }
  497. return state, nil
  498. }
  499. c.firstErrMu.RLock()
  500. err := c.firstErr
  501. c.firstErrMu.RUnlock()
  502. if err != nil {
  503. return nil, err
  504. }
  505. return nil, errors.New("redis: cluster has no state")
  506. }
  507. //------------------------------------------------------------------------------
  508. // ClusterClient is a Redis Cluster client representing a pool of zero
  509. // or more underlying connections. It's safe for concurrent use by
  510. // multiple goroutines.
  511. type ClusterClient struct {
  512. cmdable
  513. ctx context.Context
  514. opt *ClusterOptions
  515. nodes *clusterNodes
  516. state *clusterStateHolder
  517. cmdsInfoCache *cmdsInfoCache
  518. process func(Cmder) error
  519. processPipeline func([]Cmder) error
  520. processTxPipeline func([]Cmder) error
  521. }
  522. // NewClusterClient returns a Redis Cluster client as described in
  523. // http://redis.io/topics/cluster-spec.
  524. func NewClusterClient(opt *ClusterOptions) *ClusterClient {
  525. opt.init()
  526. c := &ClusterClient{
  527. opt: opt,
  528. nodes: newClusterNodes(opt),
  529. }
  530. c.state = newClusterStateHolder(c.loadState)
  531. c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
  532. c.process = c.defaultProcess
  533. c.processPipeline = c.defaultProcessPipeline
  534. c.processTxPipeline = c.defaultProcessTxPipeline
  535. c.init()
  536. _, _ = c.state.Reload()
  537. if opt.IdleCheckFrequency > 0 {
  538. go c.reaper(opt.IdleCheckFrequency)
  539. }
  540. return c
  541. }
  542. func (c *ClusterClient) init() {
  543. c.cmdable.setProcessor(c.Process)
  544. }
  545. func (c *ClusterClient) Context() context.Context {
  546. if c.ctx != nil {
  547. return c.ctx
  548. }
  549. return context.Background()
  550. }
  551. func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
  552. if ctx == nil {
  553. panic("nil context")
  554. }
  555. c2 := c.copy()
  556. c2.ctx = ctx
  557. return c2
  558. }
  559. func (c *ClusterClient) copy() *ClusterClient {
  560. cp := *c
  561. cp.init()
  562. return &cp
  563. }
  564. // Options returns read-only Options that were used to create the client.
  565. func (c *ClusterClient) Options() *ClusterOptions {
  566. return c.opt
  567. }
  568. func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
  569. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  570. }
  571. func (c *ClusterClient) cmdsInfo() (map[string]*CommandInfo, error) {
  572. addrs, err := c.nodes.Addrs()
  573. if err != nil {
  574. return nil, err
  575. }
  576. var firstErr error
  577. for _, addr := range addrs {
  578. node, err := c.nodes.Get(addr)
  579. if err != nil {
  580. return nil, err
  581. }
  582. if node == nil {
  583. continue
  584. }
  585. info, err := node.Client.Command().Result()
  586. if err == nil {
  587. return info, nil
  588. }
  589. if firstErr == nil {
  590. firstErr = err
  591. }
  592. }
  593. return nil, firstErr
  594. }
  595. func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
  596. cmdsInfo, err := c.cmdsInfoCache.Get()
  597. if err != nil {
  598. return nil
  599. }
  600. info := cmdsInfo[name]
  601. if info == nil {
  602. internal.Logf("info for cmd=%s not found", name)
  603. }
  604. return info
  605. }
  606. func cmdSlot(cmd Cmder, pos int) int {
  607. if pos == 0 {
  608. return hashtag.RandomSlot()
  609. }
  610. firstKey := cmd.stringArg(pos)
  611. return hashtag.Slot(firstKey)
  612. }
  613. func (c *ClusterClient) cmdSlot(cmd Cmder) int {
  614. cmdInfo := c.cmdInfo(cmd.Name())
  615. return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
  616. }
  617. func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
  618. state, err := c.state.Get()
  619. if err != nil {
  620. return 0, nil, err
  621. }
  622. cmdInfo := c.cmdInfo(cmd.Name())
  623. slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
  624. if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly {
  625. if c.opt.RouteByLatency {
  626. node, err := state.slotClosestNode(slot)
  627. return slot, node, err
  628. }
  629. if c.opt.RouteRandomly {
  630. node := state.slotRandomNode(slot)
  631. return slot, node, nil
  632. }
  633. node, err := state.slotSlaveNode(slot)
  634. return slot, node, err
  635. }
  636. node, err := state.slotMasterNode(slot)
  637. return slot, node, err
  638. }
  639. func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
  640. state, err := c.state.Get()
  641. if err != nil {
  642. return nil, err
  643. }
  644. nodes := state.slotNodes(slot)
  645. if len(nodes) > 0 {
  646. return nodes[0], nil
  647. }
  648. return c.nodes.Random()
  649. }
  650. func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
  651. if len(keys) == 0 {
  652. return fmt.Errorf("redis: Watch requires at least one key")
  653. }
  654. slot := hashtag.Slot(keys[0])
  655. for _, key := range keys[1:] {
  656. if hashtag.Slot(key) != slot {
  657. err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
  658. return err
  659. }
  660. }
  661. node, err := c.slotMasterNode(slot)
  662. if err != nil {
  663. return err
  664. }
  665. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  666. if attempt > 0 {
  667. time.Sleep(c.retryBackoff(attempt))
  668. }
  669. err = node.Client.Watch(fn, keys...)
  670. if err == nil {
  671. break
  672. }
  673. if internal.IsRetryableError(err, true) {
  674. continue
  675. }
  676. moved, ask, addr := internal.IsMovedError(err)
  677. if moved || ask {
  678. c.state.LazyReload()
  679. node, err = c.nodes.GetOrCreate(addr)
  680. if err != nil {
  681. return err
  682. }
  683. continue
  684. }
  685. if err == pool.ErrClosed {
  686. node, err = c.slotMasterNode(slot)
  687. if err != nil {
  688. return err
  689. }
  690. continue
  691. }
  692. return err
  693. }
  694. return err
  695. }
  696. // Close closes the cluster client, releasing any open resources.
  697. //
  698. // It is rare to Close a ClusterClient, as the ClusterClient is meant
  699. // to be long-lived and shared between many goroutines.
  700. func (c *ClusterClient) Close() error {
  701. return c.nodes.Close()
  702. }
  703. func (c *ClusterClient) WrapProcess(
  704. fn func(oldProcess func(Cmder) error) func(Cmder) error,
  705. ) {
  706. c.process = fn(c.process)
  707. }
  708. func (c *ClusterClient) Process(cmd Cmder) error {
  709. return c.process(cmd)
  710. }
  711. func (c *ClusterClient) defaultProcess(cmd Cmder) error {
  712. var node *clusterNode
  713. var ask bool
  714. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  715. if attempt > 0 {
  716. time.Sleep(c.retryBackoff(attempt))
  717. }
  718. if node == nil {
  719. var err error
  720. _, node, err = c.cmdSlotAndNode(cmd)
  721. if err != nil {
  722. cmd.setErr(err)
  723. break
  724. }
  725. }
  726. var err error
  727. if ask {
  728. pipe := node.Client.Pipeline()
  729. _ = pipe.Process(NewCmd("ASKING"))
  730. _ = pipe.Process(cmd)
  731. _, err = pipe.Exec()
  732. _ = pipe.Close()
  733. ask = false
  734. } else {
  735. err = node.Client.Process(cmd)
  736. }
  737. // If there is no error - we are done.
  738. if err == nil {
  739. break
  740. }
  741. // If slave is loading - read from master.
  742. if c.opt.ReadOnly && internal.IsLoadingError(err) {
  743. node.MarkAsLoading()
  744. continue
  745. }
  746. if internal.IsRetryableError(err, true) {
  747. // Firstly retry the same node.
  748. if attempt == 0 {
  749. continue
  750. }
  751. // Secondly try random node.
  752. node, err = c.nodes.Random()
  753. if err != nil {
  754. break
  755. }
  756. continue
  757. }
  758. var moved bool
  759. var addr string
  760. moved, ask, addr = internal.IsMovedError(err)
  761. if moved || ask {
  762. c.state.LazyReload()
  763. node, err = c.nodes.GetOrCreate(addr)
  764. if err != nil {
  765. break
  766. }
  767. continue
  768. }
  769. if err == pool.ErrClosed {
  770. node = nil
  771. continue
  772. }
  773. break
  774. }
  775. return cmd.Err()
  776. }
  777. // ForEachMaster concurrently calls the fn on each master node in the cluster.
  778. // It returns the first error if any.
  779. func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
  780. state, err := c.state.Reload()
  781. if err != nil {
  782. state, err = c.state.Get()
  783. if err != nil {
  784. return err
  785. }
  786. }
  787. var wg sync.WaitGroup
  788. errCh := make(chan error, 1)
  789. for _, master := range state.Masters {
  790. wg.Add(1)
  791. go func(node *clusterNode) {
  792. defer wg.Done()
  793. err := fn(node.Client)
  794. if err != nil {
  795. select {
  796. case errCh <- err:
  797. default:
  798. }
  799. }
  800. }(master)
  801. }
  802. wg.Wait()
  803. select {
  804. case err := <-errCh:
  805. return err
  806. default:
  807. return nil
  808. }
  809. }
  810. // ForEachSlave concurrently calls the fn on each slave node in the cluster.
  811. // It returns the first error if any.
  812. func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
  813. state, err := c.state.Reload()
  814. if err != nil {
  815. state, err = c.state.Get()
  816. if err != nil {
  817. return err
  818. }
  819. }
  820. var wg sync.WaitGroup
  821. errCh := make(chan error, 1)
  822. for _, slave := range state.Slaves {
  823. wg.Add(1)
  824. go func(node *clusterNode) {
  825. defer wg.Done()
  826. err := fn(node.Client)
  827. if err != nil {
  828. select {
  829. case errCh <- err:
  830. default:
  831. }
  832. }
  833. }(slave)
  834. }
  835. wg.Wait()
  836. select {
  837. case err := <-errCh:
  838. return err
  839. default:
  840. return nil
  841. }
  842. }
  843. // ForEachNode concurrently calls the fn on each known node in the cluster.
  844. // It returns the first error if any.
  845. func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
  846. state, err := c.state.Reload()
  847. if err != nil {
  848. state, err = c.state.Get()
  849. if err != nil {
  850. return err
  851. }
  852. }
  853. var wg sync.WaitGroup
  854. errCh := make(chan error, 1)
  855. worker := func(node *clusterNode) {
  856. defer wg.Done()
  857. err := fn(node.Client)
  858. if err != nil {
  859. select {
  860. case errCh <- err:
  861. default:
  862. }
  863. }
  864. }
  865. for _, node := range state.Masters {
  866. wg.Add(1)
  867. go worker(node)
  868. }
  869. for _, node := range state.Slaves {
  870. wg.Add(1)
  871. go worker(node)
  872. }
  873. wg.Wait()
  874. select {
  875. case err := <-errCh:
  876. return err
  877. default:
  878. return nil
  879. }
  880. }
  881. // PoolStats returns accumulated connection pool stats.
  882. func (c *ClusterClient) PoolStats() *PoolStats {
  883. var acc PoolStats
  884. state, _ := c.state.Get()
  885. if state == nil {
  886. return &acc
  887. }
  888. for _, node := range state.Masters {
  889. s := node.Client.connPool.Stats()
  890. acc.Hits += s.Hits
  891. acc.Misses += s.Misses
  892. acc.Timeouts += s.Timeouts
  893. acc.TotalConns += s.TotalConns
  894. acc.FreeConns += s.FreeConns
  895. acc.StaleConns += s.StaleConns
  896. }
  897. for _, node := range state.Slaves {
  898. s := node.Client.connPool.Stats()
  899. acc.Hits += s.Hits
  900. acc.Misses += s.Misses
  901. acc.Timeouts += s.Timeouts
  902. acc.TotalConns += s.TotalConns
  903. acc.FreeConns += s.FreeConns
  904. acc.StaleConns += s.StaleConns
  905. }
  906. return &acc
  907. }
  908. func (c *ClusterClient) loadState() (*clusterState, error) {
  909. addrs, err := c.nodes.Addrs()
  910. if err != nil {
  911. return nil, err
  912. }
  913. var firstErr error
  914. for _, addr := range addrs {
  915. node, err := c.nodes.GetOrCreate(addr)
  916. if err != nil {
  917. if firstErr == nil {
  918. firstErr = err
  919. }
  920. continue
  921. }
  922. slots, err := node.Client.ClusterSlots().Result()
  923. if err != nil {
  924. if firstErr == nil {
  925. firstErr = err
  926. }
  927. continue
  928. }
  929. return newClusterState(c.nodes, slots, node.Client.opt.Addr)
  930. }
  931. return nil, firstErr
  932. }
  933. // reaper closes idle connections to the cluster.
  934. func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
  935. ticker := time.NewTicker(idleCheckFrequency)
  936. defer ticker.Stop()
  937. for range ticker.C {
  938. nodes, err := c.nodes.All()
  939. if err != nil {
  940. break
  941. }
  942. for _, node := range nodes {
  943. _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
  944. if err != nil {
  945. internal.Logf("ReapStaleConns failed: %s", err)
  946. }
  947. }
  948. }
  949. }
  950. func (c *ClusterClient) Pipeline() Pipeliner {
  951. pipe := Pipeline{
  952. exec: c.processPipeline,
  953. }
  954. pipe.statefulCmdable.setProcessor(pipe.Process)
  955. return &pipe
  956. }
  957. func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  958. return c.Pipeline().Pipelined(fn)
  959. }
  960. func (c *ClusterClient) WrapProcessPipeline(
  961. fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
  962. ) {
  963. c.processPipeline = fn(c.processPipeline)
  964. }
  965. func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
  966. cmdsMap, err := c.mapCmdsByNode(cmds)
  967. if err != nil {
  968. setCmdsErr(cmds, err)
  969. return err
  970. }
  971. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  972. if attempt > 0 {
  973. time.Sleep(c.retryBackoff(attempt))
  974. }
  975. failedCmds := make(map[*clusterNode][]Cmder)
  976. for node, cmds := range cmdsMap {
  977. cn, err := node.Client.getConn()
  978. if err != nil {
  979. if err == pool.ErrClosed {
  980. c.remapCmds(cmds, failedCmds)
  981. } else {
  982. setCmdsErr(cmds, err)
  983. }
  984. continue
  985. }
  986. err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
  987. if err == nil || internal.IsRedisError(err) {
  988. node.Client.connPool.Put(cn)
  989. } else {
  990. node.Client.connPool.Remove(cn)
  991. }
  992. }
  993. if len(failedCmds) == 0 {
  994. break
  995. }
  996. cmdsMap = failedCmds
  997. }
  998. return firstCmdsErr(cmds)
  999. }
  1000. func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
  1001. state, err := c.state.Get()
  1002. if err != nil {
  1003. setCmdsErr(cmds, err)
  1004. return nil, err
  1005. }
  1006. cmdsMap := make(map[*clusterNode][]Cmder)
  1007. cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
  1008. for _, cmd := range cmds {
  1009. var node *clusterNode
  1010. var err error
  1011. if cmdsAreReadOnly {
  1012. _, node, err = c.cmdSlotAndNode(cmd)
  1013. } else {
  1014. slot := c.cmdSlot(cmd)
  1015. node, err = state.slotMasterNode(slot)
  1016. }
  1017. if err != nil {
  1018. return nil, err
  1019. }
  1020. cmdsMap[node] = append(cmdsMap[node], cmd)
  1021. }
  1022. return cmdsMap, nil
  1023. }
  1024. func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
  1025. for _, cmd := range cmds {
  1026. cmdInfo := c.cmdInfo(cmd.Name())
  1027. if cmdInfo == nil || !cmdInfo.ReadOnly {
  1028. return false
  1029. }
  1030. }
  1031. return true
  1032. }
  1033. func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) {
  1034. remappedCmds, err := c.mapCmdsByNode(cmds)
  1035. if err != nil {
  1036. setCmdsErr(cmds, err)
  1037. return
  1038. }
  1039. for node, cmds := range remappedCmds {
  1040. failedCmds[node] = cmds
  1041. }
  1042. }
  1043. func (c *ClusterClient) pipelineProcessCmds(
  1044. node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
  1045. ) error {
  1046. _ = cn.SetWriteTimeout(c.opt.WriteTimeout)
  1047. err := writeCmd(cn, cmds...)
  1048. if err != nil {
  1049. setCmdsErr(cmds, err)
  1050. failedCmds[node] = cmds
  1051. return err
  1052. }
  1053. // Set read timeout for all commands.
  1054. _ = cn.SetReadTimeout(c.opt.ReadTimeout)
  1055. return c.pipelineReadCmds(cn, cmds, failedCmds)
  1056. }
  1057. func (c *ClusterClient) pipelineReadCmds(
  1058. cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
  1059. ) error {
  1060. for _, cmd := range cmds {
  1061. err := cmd.readReply(cn)
  1062. if err == nil {
  1063. continue
  1064. }
  1065. if c.checkMovedErr(cmd, err, failedCmds) {
  1066. continue
  1067. }
  1068. if internal.IsRedisError(err) {
  1069. continue
  1070. }
  1071. return err
  1072. }
  1073. return nil
  1074. }
  1075. func (c *ClusterClient) checkMovedErr(
  1076. cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder,
  1077. ) bool {
  1078. moved, ask, addr := internal.IsMovedError(err)
  1079. if moved {
  1080. c.state.LazyReload()
  1081. node, err := c.nodes.GetOrCreate(addr)
  1082. if err != nil {
  1083. return false
  1084. }
  1085. failedCmds[node] = append(failedCmds[node], cmd)
  1086. return true
  1087. }
  1088. if ask {
  1089. node, err := c.nodes.GetOrCreate(addr)
  1090. if err != nil {
  1091. return false
  1092. }
  1093. failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
  1094. return true
  1095. }
  1096. return false
  1097. }
  1098. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  1099. func (c *ClusterClient) TxPipeline() Pipeliner {
  1100. pipe := Pipeline{
  1101. exec: c.processTxPipeline,
  1102. }
  1103. pipe.statefulCmdable.setProcessor(pipe.Process)
  1104. return &pipe
  1105. }
  1106. func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  1107. return c.TxPipeline().Pipelined(fn)
  1108. }
  1109. func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
  1110. state, err := c.state.Get()
  1111. if err != nil {
  1112. return err
  1113. }
  1114. cmdsMap := c.mapCmdsBySlot(cmds)
  1115. for slot, cmds := range cmdsMap {
  1116. node, err := state.slotMasterNode(slot)
  1117. if err != nil {
  1118. setCmdsErr(cmds, err)
  1119. continue
  1120. }
  1121. cmdsMap := map[*clusterNode][]Cmder{node: cmds}
  1122. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  1123. if attempt > 0 {
  1124. time.Sleep(c.retryBackoff(attempt))
  1125. }
  1126. failedCmds := make(map[*clusterNode][]Cmder)
  1127. for node, cmds := range cmdsMap {
  1128. cn, err := node.Client.getConn()
  1129. if err != nil {
  1130. if err == pool.ErrClosed {
  1131. c.remapCmds(cmds, failedCmds)
  1132. } else {
  1133. setCmdsErr(cmds, err)
  1134. }
  1135. continue
  1136. }
  1137. err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
  1138. if err == nil || internal.IsRedisError(err) {
  1139. node.Client.connPool.Put(cn)
  1140. } else {
  1141. node.Client.connPool.Remove(cn)
  1142. }
  1143. }
  1144. if len(failedCmds) == 0 {
  1145. break
  1146. }
  1147. cmdsMap = failedCmds
  1148. }
  1149. }
  1150. return firstCmdsErr(cmds)
  1151. }
  1152. func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
  1153. cmdsMap := make(map[int][]Cmder)
  1154. for _, cmd := range cmds {
  1155. slot := c.cmdSlot(cmd)
  1156. cmdsMap[slot] = append(cmdsMap[slot], cmd)
  1157. }
  1158. return cmdsMap
  1159. }
  1160. func (c *ClusterClient) txPipelineProcessCmds(
  1161. node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
  1162. ) error {
  1163. cn.SetWriteTimeout(c.opt.WriteTimeout)
  1164. if err := txPipelineWriteMulti(cn, cmds); err != nil {
  1165. setCmdsErr(cmds, err)
  1166. failedCmds[node] = cmds
  1167. return err
  1168. }
  1169. // Set read timeout for all commands.
  1170. cn.SetReadTimeout(c.opt.ReadTimeout)
  1171. if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil {
  1172. setCmdsErr(cmds, err)
  1173. return err
  1174. }
  1175. return pipelineReadCmds(cn, cmds)
  1176. }
  1177. func (c *ClusterClient) txPipelineReadQueued(
  1178. cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
  1179. ) error {
  1180. // Parse queued replies.
  1181. var statusCmd StatusCmd
  1182. if err := statusCmd.readReply(cn); err != nil {
  1183. return err
  1184. }
  1185. for _, cmd := range cmds {
  1186. err := statusCmd.readReply(cn)
  1187. if err == nil {
  1188. continue
  1189. }
  1190. if c.checkMovedErr(cmd, err, failedCmds) || internal.IsRedisError(err) {
  1191. continue
  1192. }
  1193. return err
  1194. }
  1195. // Parse number of replies.
  1196. line, err := cn.Rd.ReadLine()
  1197. if err != nil {
  1198. if err == Nil {
  1199. err = TxFailedErr
  1200. }
  1201. return err
  1202. }
  1203. switch line[0] {
  1204. case proto.ErrorReply:
  1205. err := proto.ParseErrorReply(line)
  1206. for _, cmd := range cmds {
  1207. if !c.checkMovedErr(cmd, err, failedCmds) {
  1208. break
  1209. }
  1210. }
  1211. return err
  1212. case proto.ArrayReply:
  1213. // ok
  1214. default:
  1215. err := fmt.Errorf("redis: expected '*', but got line %q", line)
  1216. return err
  1217. }
  1218. return nil
  1219. }
  1220. func (c *ClusterClient) pubSub(channels []string) *PubSub {
  1221. opt := c.opt.clientOptions()
  1222. var node *clusterNode
  1223. return &PubSub{
  1224. opt: opt,
  1225. newConn: func(channels []string) (*pool.Conn, error) {
  1226. if node == nil {
  1227. var slot int
  1228. if len(channels) > 0 {
  1229. slot = hashtag.Slot(channels[0])
  1230. } else {
  1231. slot = -1
  1232. }
  1233. masterNode, err := c.slotMasterNode(slot)
  1234. if err != nil {
  1235. return nil, err
  1236. }
  1237. node = masterNode
  1238. }
  1239. return node.Client.newConn()
  1240. },
  1241. closeConn: func(cn *pool.Conn) error {
  1242. return node.Client.connPool.CloseConn(cn)
  1243. },
  1244. }
  1245. }
  1246. // Subscribe subscribes the client to the specified channels.
  1247. // Channels can be omitted to create empty subscription.
  1248. func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
  1249. pubsub := c.pubSub(channels)
  1250. if len(channels) > 0 {
  1251. _ = pubsub.Subscribe(channels...)
  1252. }
  1253. return pubsub
  1254. }
  1255. // PSubscribe subscribes the client to the given patterns.
  1256. // Patterns can be omitted to create empty subscription.
  1257. func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
  1258. pubsub := c.pubSub(channels)
  1259. if len(channels) > 0 {
  1260. _ = pubsub.PSubscribe(channels...)
  1261. }
  1262. return pubsub
  1263. }
  1264. func useOriginAddr(originAddr, nodeAddr string) bool {
  1265. nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
  1266. if err != nil {
  1267. return false
  1268. }
  1269. nodeIP := net.ParseIP(nodeHost)
  1270. if nodeIP == nil {
  1271. return false
  1272. }
  1273. if !nodeIP.IsLoopback() {
  1274. return false
  1275. }
  1276. _, originPort, err := net.SplitHostPort(originAddr)
  1277. if err != nil {
  1278. return false
  1279. }
  1280. return nodePort == originPort
  1281. }
  1282. func isLoopbackAddr(addr string) bool {
  1283. host, _, err := net.SplitHostPort(addr)
  1284. if err != nil {
  1285. return false
  1286. }
  1287. ip := net.ParseIP(host)
  1288. if ip == nil {
  1289. return false
  1290. }
  1291. return ip.IsLoopback()
  1292. }
  1293. func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
  1294. for _, n := range nodes {
  1295. if n == node {
  1296. return nodes
  1297. }
  1298. }
  1299. return append(nodes, node)
  1300. }
  1301. func appendIfNotExists(ss []string, es ...string) []string {
  1302. loop:
  1303. for _, e := range es {
  1304. for _, s := range ss {
  1305. if s == e {
  1306. continue loop
  1307. }
  1308. }
  1309. ss = append(ss, e)
  1310. }
  1311. return ss
  1312. }
  1313. func remove(ss []string, es ...string) []string {
  1314. if len(es) == 0 {
  1315. return ss[:0]
  1316. }
  1317. for _, e := range es {
  1318. for i, s := range ss {
  1319. if s == e {
  1320. ss = append(ss[:i], ss[i+1:]...)
  1321. break
  1322. }
  1323. }
  1324. }
  1325. return ss
  1326. }