cluster_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946
  1. package redis_test
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "testing"
  10. "time"
  11. "github.com/go-redis/redis"
  12. "github.com/go-redis/redis/internal/hashtag"
  13. . "github.com/onsi/ginkgo"
  14. . "github.com/onsi/gomega"
  15. )
  16. type clusterScenario struct {
  17. ports []string
  18. nodeIds []string
  19. processes map[string]*redisProcess
  20. clients map[string]*redis.Client
  21. }
  22. func (s *clusterScenario) masters() []*redis.Client {
  23. result := make([]*redis.Client, 3)
  24. for pos, port := range s.ports[:3] {
  25. result[pos] = s.clients[port]
  26. }
  27. return result
  28. }
  29. func (s *clusterScenario) slaves() []*redis.Client {
  30. result := make([]*redis.Client, 3)
  31. for pos, port := range s.ports[3:] {
  32. result[pos] = s.clients[port]
  33. }
  34. return result
  35. }
  36. func (s *clusterScenario) addrs() []string {
  37. addrs := make([]string, len(s.ports))
  38. for i, port := range s.ports {
  39. addrs[i] = net.JoinHostPort("127.0.0.1", port)
  40. }
  41. return addrs
  42. }
  43. func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient {
  44. opt.Addrs = s.addrs()
  45. client := redis.NewClusterClient(opt)
  46. Eventually(func() bool {
  47. state, err := client.GetState()
  48. if err != nil {
  49. return false
  50. }
  51. return state.IsConsistent()
  52. }, 30*time.Second).Should(BeTrue())
  53. return client
  54. }
  55. func startCluster(scenario *clusterScenario) error {
  56. // Start processes and collect node ids
  57. for pos, port := range scenario.ports {
  58. process, err := startRedis(port, "--cluster-enabled", "yes")
  59. if err != nil {
  60. return err
  61. }
  62. client := redis.NewClient(&redis.Options{
  63. Addr: ":" + port,
  64. })
  65. info, err := client.ClusterNodes().Result()
  66. if err != nil {
  67. return err
  68. }
  69. scenario.processes[port] = process
  70. scenario.clients[port] = client
  71. scenario.nodeIds[pos] = info[:40]
  72. }
  73. // Meet cluster nodes.
  74. for _, client := range scenario.clients {
  75. err := client.ClusterMeet("127.0.0.1", scenario.ports[0]).Err()
  76. if err != nil {
  77. return err
  78. }
  79. }
  80. // Bootstrap masters.
  81. slots := []int{0, 5000, 10000, 16384}
  82. for pos, master := range scenario.masters() {
  83. err := master.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err()
  84. if err != nil {
  85. return err
  86. }
  87. }
  88. // Bootstrap slaves.
  89. for idx, slave := range scenario.slaves() {
  90. masterId := scenario.nodeIds[idx]
  91. // Wait until master is available
  92. err := eventually(func() error {
  93. s := slave.ClusterNodes().Val()
  94. wanted := masterId
  95. if !strings.Contains(s, wanted) {
  96. return fmt.Errorf("%q does not contain %q", s, wanted)
  97. }
  98. return nil
  99. }, 10*time.Second)
  100. if err != nil {
  101. return err
  102. }
  103. err = slave.ClusterReplicate(masterId).Err()
  104. if err != nil {
  105. return err
  106. }
  107. }
  108. // Wait until all nodes have consistent info.
  109. wanted := []redis.ClusterSlot{{
  110. Start: 0,
  111. End: 4999,
  112. Nodes: []redis.ClusterNode{{
  113. Id: "",
  114. Addr: "127.0.0.1:8220",
  115. }, {
  116. Id: "",
  117. Addr: "127.0.0.1:8223",
  118. }},
  119. }, {
  120. Start: 5000,
  121. End: 9999,
  122. Nodes: []redis.ClusterNode{{
  123. Id: "",
  124. Addr: "127.0.0.1:8221",
  125. }, {
  126. Id: "",
  127. Addr: "127.0.0.1:8224",
  128. }},
  129. }, {
  130. Start: 10000,
  131. End: 16383,
  132. Nodes: []redis.ClusterNode{{
  133. Id: "",
  134. Addr: "127.0.0.1:8222",
  135. }, {
  136. Id: "",
  137. Addr: "127.0.0.1:8225",
  138. }},
  139. }}
  140. for _, client := range scenario.clients {
  141. err := eventually(func() error {
  142. res, err := client.ClusterSlots().Result()
  143. if err != nil {
  144. return err
  145. }
  146. return assertSlotsEqual(res, wanted)
  147. }, 30*time.Second)
  148. if err != nil {
  149. return err
  150. }
  151. }
  152. return nil
  153. }
  154. func assertSlotsEqual(slots, wanted []redis.ClusterSlot) error {
  155. outer_loop:
  156. for _, s2 := range wanted {
  157. for _, s1 := range slots {
  158. if slotEqual(s1, s2) {
  159. continue outer_loop
  160. }
  161. }
  162. return fmt.Errorf("%v not found in %v", s2, slots)
  163. }
  164. return nil
  165. }
  166. func slotEqual(s1, s2 redis.ClusterSlot) bool {
  167. if s1.Start != s2.Start {
  168. return false
  169. }
  170. if s1.End != s2.End {
  171. return false
  172. }
  173. if len(s1.Nodes) != len(s2.Nodes) {
  174. return false
  175. }
  176. for i, n1 := range s1.Nodes {
  177. if n1.Addr != s2.Nodes[i].Addr {
  178. return false
  179. }
  180. }
  181. return true
  182. }
  183. func stopCluster(scenario *clusterScenario) error {
  184. for _, client := range scenario.clients {
  185. if err := client.Close(); err != nil {
  186. return err
  187. }
  188. }
  189. for _, process := range scenario.processes {
  190. if err := process.Close(); err != nil {
  191. return err
  192. }
  193. }
  194. return nil
  195. }
  196. //------------------------------------------------------------------------------
  197. var _ = Describe("ClusterClient", func() {
  198. var failover bool
  199. var opt *redis.ClusterOptions
  200. var client *redis.ClusterClient
  201. assertClusterClient := func() {
  202. It("should GET/SET/DEL", func() {
  203. err := client.Get("A").Err()
  204. Expect(err).To(Equal(redis.Nil))
  205. err = client.Set("A", "VALUE", 0).Err()
  206. Expect(err).NotTo(HaveOccurred())
  207. Eventually(func() string {
  208. return client.Get("A").Val()
  209. }, 30*time.Second).Should(Equal("VALUE"))
  210. cnt, err := client.Del("A").Result()
  211. Expect(err).NotTo(HaveOccurred())
  212. Expect(cnt).To(Equal(int64(1)))
  213. })
  214. It("GET follows redirects", func() {
  215. err := client.Set("A", "VALUE", 0).Err()
  216. Expect(err).NotTo(HaveOccurred())
  217. if !failover {
  218. Eventually(func() int64 {
  219. nodes, err := client.Nodes("A")
  220. if err != nil {
  221. return 0
  222. }
  223. return nodes[1].Client.DBSize().Val()
  224. }, 30*time.Second).Should(Equal(int64(1)))
  225. Eventually(func() error {
  226. return client.SwapNodes("A")
  227. }, 30*time.Second).ShouldNot(HaveOccurred())
  228. }
  229. v, err := client.Get("A").Result()
  230. Expect(err).NotTo(HaveOccurred())
  231. Expect(v).To(Equal("VALUE"))
  232. })
  233. It("SET follows redirects", func() {
  234. if !failover {
  235. Eventually(func() error {
  236. return client.SwapNodes("A")
  237. }, 30*time.Second).ShouldNot(HaveOccurred())
  238. }
  239. err := client.Set("A", "VALUE", 0).Err()
  240. Expect(err).NotTo(HaveOccurred())
  241. v, err := client.Get("A").Result()
  242. Expect(err).NotTo(HaveOccurred())
  243. Expect(v).To(Equal("VALUE"))
  244. })
  245. It("distributes keys", func() {
  246. for i := 0; i < 100; i++ {
  247. err := client.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
  248. Expect(err).NotTo(HaveOccurred())
  249. }
  250. client.ForEachMaster(func(master *redis.Client) error {
  251. defer GinkgoRecover()
  252. Eventually(func() string {
  253. return master.Info("keyspace").Val()
  254. }, 30*time.Second).Should(Or(
  255. ContainSubstring("keys=31"),
  256. ContainSubstring("keys=29"),
  257. ContainSubstring("keys=40"),
  258. ))
  259. return nil
  260. })
  261. })
  262. It("distributes keys when using EVAL", func() {
  263. script := redis.NewScript(`
  264. local r = redis.call('SET', KEYS[1], ARGV[1])
  265. return r
  266. `)
  267. var key string
  268. for i := 0; i < 100; i++ {
  269. key = fmt.Sprintf("key%d", i)
  270. err := script.Run(client, []string{key}, "value").Err()
  271. Expect(err).NotTo(HaveOccurred())
  272. }
  273. for _, master := range cluster.masters() {
  274. Eventually(func() string {
  275. return master.Info("keyspace").Val()
  276. }, 30*time.Second).Should(Or(
  277. ContainSubstring("keys=31"),
  278. ContainSubstring("keys=29"),
  279. ContainSubstring("keys=40"),
  280. ))
  281. }
  282. })
  283. It("supports Watch", func() {
  284. var incr func(string) error
  285. // Transactionally increments key using GET and SET commands.
  286. incr = func(key string) error {
  287. err := client.Watch(func(tx *redis.Tx) error {
  288. n, err := tx.Get(key).Int64()
  289. if err != nil && err != redis.Nil {
  290. return err
  291. }
  292. _, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
  293. pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
  294. return nil
  295. })
  296. return err
  297. }, key)
  298. if err == redis.TxFailedErr {
  299. return incr(key)
  300. }
  301. return err
  302. }
  303. var wg sync.WaitGroup
  304. for i := 0; i < 100; i++ {
  305. wg.Add(1)
  306. go func() {
  307. defer GinkgoRecover()
  308. defer wg.Done()
  309. err := incr("key")
  310. Expect(err).NotTo(HaveOccurred())
  311. }()
  312. }
  313. wg.Wait()
  314. Eventually(func() string {
  315. return client.Get("key").Val()
  316. }, 30*time.Second).Should(Equal("100"))
  317. })
  318. Describe("pipelining", func() {
  319. var pipe *redis.Pipeline
  320. assertPipeline := func() {
  321. keys := []string{"A", "B", "C", "D", "E", "F", "G"}
  322. It("follows redirects", func() {
  323. if !failover {
  324. for _, key := range keys {
  325. Eventually(func() error {
  326. return client.SwapNodes(key)
  327. }, 30*time.Second).ShouldNot(HaveOccurred())
  328. }
  329. }
  330. for i, key := range keys {
  331. pipe.Set(key, key+"_value", 0)
  332. pipe.Expire(key, time.Duration(i+1)*time.Hour)
  333. }
  334. cmds, err := pipe.Exec()
  335. Expect(err).NotTo(HaveOccurred())
  336. Expect(cmds).To(HaveLen(14))
  337. _ = client.ForEachNode(func(node *redis.Client) error {
  338. defer GinkgoRecover()
  339. Eventually(func() int64 {
  340. return node.DBSize().Val()
  341. }, 30*time.Second).ShouldNot(BeZero())
  342. return nil
  343. })
  344. if !failover {
  345. for _, key := range keys {
  346. Eventually(func() error {
  347. return client.SwapNodes(key)
  348. }, 30*time.Second).ShouldNot(HaveOccurred())
  349. }
  350. }
  351. for _, key := range keys {
  352. pipe.Get(key)
  353. pipe.TTL(key)
  354. }
  355. cmds, err = pipe.Exec()
  356. Expect(err).NotTo(HaveOccurred())
  357. Expect(cmds).To(HaveLen(14))
  358. for i, key := range keys {
  359. get := cmds[i*2].(*redis.StringCmd)
  360. Expect(get.Val()).To(Equal(key + "_value"))
  361. ttl := cmds[(i*2)+1].(*redis.DurationCmd)
  362. dur := time.Duration(i+1) * time.Hour
  363. Expect(ttl.Val()).To(BeNumerically("~", dur, 10*time.Second))
  364. }
  365. })
  366. It("works with missing keys", func() {
  367. pipe.Set("A", "A_value", 0)
  368. pipe.Set("C", "C_value", 0)
  369. _, err := pipe.Exec()
  370. Expect(err).NotTo(HaveOccurred())
  371. a := pipe.Get("A")
  372. b := pipe.Get("B")
  373. c := pipe.Get("C")
  374. cmds, err := pipe.Exec()
  375. Expect(err).To(Equal(redis.Nil))
  376. Expect(cmds).To(HaveLen(3))
  377. Expect(a.Err()).NotTo(HaveOccurred())
  378. Expect(a.Val()).To(Equal("A_value"))
  379. Expect(b.Err()).To(Equal(redis.Nil))
  380. Expect(b.Val()).To(Equal(""))
  381. Expect(c.Err()).NotTo(HaveOccurred())
  382. Expect(c.Val()).To(Equal("C_value"))
  383. })
  384. }
  385. Describe("with Pipeline", func() {
  386. BeforeEach(func() {
  387. pipe = client.Pipeline().(*redis.Pipeline)
  388. })
  389. AfterEach(func() {
  390. Expect(pipe.Close()).NotTo(HaveOccurred())
  391. })
  392. assertPipeline()
  393. })
  394. Describe("with TxPipeline", func() {
  395. BeforeEach(func() {
  396. pipe = client.TxPipeline().(*redis.Pipeline)
  397. })
  398. AfterEach(func() {
  399. Expect(pipe.Close()).NotTo(HaveOccurred())
  400. })
  401. assertPipeline()
  402. })
  403. })
  404. It("supports PubSub", func() {
  405. pubsub := client.Subscribe("mychannel")
  406. defer pubsub.Close()
  407. Eventually(func() error {
  408. _, err := client.Publish("mychannel", "hello").Result()
  409. if err != nil {
  410. return err
  411. }
  412. msg, err := pubsub.ReceiveTimeout(time.Second)
  413. if err != nil {
  414. return err
  415. }
  416. _, ok := msg.(*redis.Message)
  417. if !ok {
  418. return fmt.Errorf("got %T, wanted *redis.Message", msg)
  419. }
  420. return nil
  421. }, 30*time.Second).ShouldNot(HaveOccurred())
  422. })
  423. }
  424. Describe("ClusterClient", func() {
  425. BeforeEach(func() {
  426. opt = redisClusterOptions()
  427. client = cluster.clusterClient(opt)
  428. err := client.ForEachMaster(func(master *redis.Client) error {
  429. return master.FlushDB().Err()
  430. })
  431. Expect(err).NotTo(HaveOccurred())
  432. })
  433. AfterEach(func() {
  434. _ = client.ForEachMaster(func(master *redis.Client) error {
  435. return master.FlushDB().Err()
  436. })
  437. Expect(client.Close()).NotTo(HaveOccurred())
  438. })
  439. It("returns pool stats", func() {
  440. stats := client.PoolStats()
  441. Expect(stats).To(BeAssignableToTypeOf(&redis.PoolStats{}))
  442. })
  443. It("removes idle connections", func() {
  444. stats := client.PoolStats()
  445. Expect(stats.TotalConns).NotTo(BeZero())
  446. Expect(stats.FreeConns).NotTo(BeZero())
  447. time.Sleep(2 * time.Second)
  448. stats = client.PoolStats()
  449. Expect(stats.TotalConns).To(BeZero())
  450. Expect(stats.FreeConns).To(BeZero())
  451. })
  452. It("returns an error when there are no attempts left", func() {
  453. opt := redisClusterOptions()
  454. opt.MaxRedirects = -1
  455. client := cluster.clusterClient(opt)
  456. Eventually(func() error {
  457. return client.SwapNodes("A")
  458. }, 30*time.Second).ShouldNot(HaveOccurred())
  459. err := client.Get("A").Err()
  460. Expect(err).To(HaveOccurred())
  461. Expect(err.Error()).To(ContainSubstring("MOVED"))
  462. Expect(client.Close()).NotTo(HaveOccurred())
  463. })
  464. It("calls fn for every master node", func() {
  465. for i := 0; i < 10; i++ {
  466. Expect(client.Set(strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
  467. }
  468. err := client.ForEachMaster(func(master *redis.Client) error {
  469. return master.FlushDB().Err()
  470. })
  471. Expect(err).NotTo(HaveOccurred())
  472. size, err := client.DBSize().Result()
  473. Expect(err).NotTo(HaveOccurred())
  474. Expect(size).To(Equal(int64(0)))
  475. })
  476. It("should CLUSTER SLOTS", func() {
  477. res, err := client.ClusterSlots().Result()
  478. Expect(err).NotTo(HaveOccurred())
  479. Expect(res).To(HaveLen(3))
  480. wanted := []redis.ClusterSlot{{
  481. Start: 0,
  482. End: 4999,
  483. Nodes: []redis.ClusterNode{{
  484. Id: "",
  485. Addr: "127.0.0.1:8220",
  486. }, {
  487. Id: "",
  488. Addr: "127.0.0.1:8223",
  489. }},
  490. }, {
  491. Start: 5000,
  492. End: 9999,
  493. Nodes: []redis.ClusterNode{{
  494. Id: "",
  495. Addr: "127.0.0.1:8221",
  496. }, {
  497. Id: "",
  498. Addr: "127.0.0.1:8224",
  499. }},
  500. }, {
  501. Start: 10000,
  502. End: 16383,
  503. Nodes: []redis.ClusterNode{{
  504. Id: "",
  505. Addr: "127.0.0.1:8222",
  506. }, {
  507. Id: "",
  508. Addr: "127.0.0.1:8225",
  509. }},
  510. }}
  511. Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
  512. })
  513. It("should CLUSTER NODES", func() {
  514. res, err := client.ClusterNodes().Result()
  515. Expect(err).NotTo(HaveOccurred())
  516. Expect(len(res)).To(BeNumerically(">", 400))
  517. })
  518. It("should CLUSTER INFO", func() {
  519. res, err := client.ClusterInfo().Result()
  520. Expect(err).NotTo(HaveOccurred())
  521. Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
  522. })
  523. It("should CLUSTER KEYSLOT", func() {
  524. hashSlot, err := client.ClusterKeySlot("somekey").Result()
  525. Expect(err).NotTo(HaveOccurred())
  526. Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
  527. })
  528. It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
  529. n, err := client.ClusterCountFailureReports(cluster.nodeIds[0]).Result()
  530. Expect(err).NotTo(HaveOccurred())
  531. Expect(n).To(Equal(int64(0)))
  532. })
  533. It("should CLUSTER COUNTKEYSINSLOT", func() {
  534. n, err := client.ClusterCountKeysInSlot(10).Result()
  535. Expect(err).NotTo(HaveOccurred())
  536. Expect(n).To(Equal(int64(0)))
  537. })
  538. It("should CLUSTER SAVECONFIG", func() {
  539. res, err := client.ClusterSaveConfig().Result()
  540. Expect(err).NotTo(HaveOccurred())
  541. Expect(res).To(Equal("OK"))
  542. })
  543. It("should CLUSTER SLAVES", func() {
  544. nodesList, err := client.ClusterSlaves(cluster.nodeIds[0]).Result()
  545. Expect(err).NotTo(HaveOccurred())
  546. Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
  547. Expect(nodesList).Should(HaveLen(1))
  548. })
  549. It("should RANDOMKEY", func() {
  550. const nkeys = 100
  551. for i := 0; i < nkeys; i++ {
  552. err := client.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
  553. Expect(err).NotTo(HaveOccurred())
  554. }
  555. var keys []string
  556. addKey := func(key string) {
  557. for _, k := range keys {
  558. if k == key {
  559. return
  560. }
  561. }
  562. keys = append(keys, key)
  563. }
  564. for i := 0; i < nkeys*10; i++ {
  565. key := client.RandomKey().Val()
  566. addKey(key)
  567. }
  568. Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10))
  569. })
  570. assertClusterClient()
  571. })
  572. Describe("ClusterClient failover", func() {
  573. BeforeEach(func() {
  574. failover = true
  575. opt = redisClusterOptions()
  576. opt.MinRetryBackoff = 250 * time.Millisecond
  577. opt.MaxRetryBackoff = time.Second
  578. client = cluster.clusterClient(opt)
  579. err := client.ForEachMaster(func(master *redis.Client) error {
  580. return master.FlushDB().Err()
  581. })
  582. Expect(err).NotTo(HaveOccurred())
  583. err = client.ForEachSlave(func(slave *redis.Client) error {
  584. defer GinkgoRecover()
  585. Eventually(func() int64 {
  586. return slave.DBSize().Val()
  587. }, 30*time.Second).Should(Equal(int64(0)))
  588. return nil
  589. })
  590. Expect(err).NotTo(HaveOccurred())
  591. state, err := client.GetState()
  592. Expect(err).NotTo(HaveOccurred())
  593. Expect(state.IsConsistent()).To(BeTrue())
  594. for _, slave := range state.Slaves {
  595. err = slave.Client.ClusterFailover().Err()
  596. Expect(err).NotTo(HaveOccurred())
  597. Eventually(func() bool {
  598. state, _ := client.LoadState()
  599. return state.IsConsistent()
  600. }, 30*time.Second).Should(BeTrue())
  601. }
  602. })
  603. AfterEach(func() {
  604. failover = false
  605. Expect(client.Close()).NotTo(HaveOccurred())
  606. })
  607. assertClusterClient()
  608. })
  609. Describe("ClusterClient with RouteByLatency", func() {
  610. BeforeEach(func() {
  611. opt = redisClusterOptions()
  612. opt.RouteByLatency = true
  613. client = cluster.clusterClient(opt)
  614. err := client.ForEachMaster(func(master *redis.Client) error {
  615. return master.FlushDB().Err()
  616. })
  617. Expect(err).NotTo(HaveOccurred())
  618. err = client.ForEachSlave(func(slave *redis.Client) error {
  619. Eventually(func() int64 {
  620. return client.DBSize().Val()
  621. }, 30*time.Second).Should(Equal(int64(0)))
  622. return nil
  623. })
  624. Expect(err).NotTo(HaveOccurred())
  625. })
  626. AfterEach(func() {
  627. err := client.ForEachSlave(func(slave *redis.Client) error {
  628. return slave.ReadWrite().Err()
  629. })
  630. Expect(err).NotTo(HaveOccurred())
  631. err = client.Close()
  632. Expect(err).NotTo(HaveOccurred())
  633. })
  634. assertClusterClient()
  635. })
  636. })
  637. var _ = Describe("ClusterClient without nodes", func() {
  638. var client *redis.ClusterClient
  639. BeforeEach(func() {
  640. client = redis.NewClusterClient(&redis.ClusterOptions{})
  641. })
  642. AfterEach(func() {
  643. Expect(client.Close()).NotTo(HaveOccurred())
  644. })
  645. It("Ping returns an error", func() {
  646. err := client.Ping().Err()
  647. Expect(err).To(MatchError("redis: cluster has no nodes"))
  648. })
  649. It("pipeline returns an error", func() {
  650. _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
  651. pipe.Ping()
  652. return nil
  653. })
  654. Expect(err).To(MatchError("redis: cluster has no nodes"))
  655. })
  656. })
  657. var _ = Describe("ClusterClient without valid nodes", func() {
  658. var client *redis.ClusterClient
  659. BeforeEach(func() {
  660. client = redis.NewClusterClient(&redis.ClusterOptions{
  661. Addrs: []string{redisAddr},
  662. })
  663. })
  664. AfterEach(func() {
  665. Expect(client.Close()).NotTo(HaveOccurred())
  666. })
  667. It("returns an error", func() {
  668. err := client.Ping().Err()
  669. Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
  670. })
  671. It("pipeline returns an error", func() {
  672. _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
  673. pipe.Ping()
  674. return nil
  675. })
  676. Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
  677. })
  678. })
  679. var _ = Describe("ClusterClient timeout", func() {
  680. var client *redis.ClusterClient
  681. AfterEach(func() {
  682. _ = client.Close()
  683. })
  684. testTimeout := func() {
  685. It("Ping timeouts", func() {
  686. err := client.Ping().Err()
  687. Expect(err).To(HaveOccurred())
  688. Expect(err.(net.Error).Timeout()).To(BeTrue())
  689. })
  690. It("Pipeline timeouts", func() {
  691. _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
  692. pipe.Ping()
  693. return nil
  694. })
  695. Expect(err).To(HaveOccurred())
  696. Expect(err.(net.Error).Timeout()).To(BeTrue())
  697. })
  698. It("Tx timeouts", func() {
  699. err := client.Watch(func(tx *redis.Tx) error {
  700. return tx.Ping().Err()
  701. }, "foo")
  702. Expect(err).To(HaveOccurred())
  703. Expect(err.(net.Error).Timeout()).To(BeTrue())
  704. })
  705. It("Tx Pipeline timeouts", func() {
  706. err := client.Watch(func(tx *redis.Tx) error {
  707. _, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
  708. pipe.Ping()
  709. return nil
  710. })
  711. return err
  712. }, "foo")
  713. Expect(err).To(HaveOccurred())
  714. Expect(err.(net.Error).Timeout()).To(BeTrue())
  715. })
  716. }
  717. const pause = 3 * time.Second
  718. Context("read/write timeout", func() {
  719. BeforeEach(func() {
  720. opt := redisClusterOptions()
  721. opt.ReadTimeout = 300 * time.Millisecond
  722. opt.WriteTimeout = 300 * time.Millisecond
  723. opt.MaxRedirects = 1
  724. client = cluster.clusterClient(opt)
  725. err := client.ForEachNode(func(client *redis.Client) error {
  726. return client.ClientPause(pause).Err()
  727. })
  728. Expect(err).NotTo(HaveOccurred())
  729. })
  730. AfterEach(func() {
  731. _ = client.ForEachNode(func(client *redis.Client) error {
  732. defer GinkgoRecover()
  733. Eventually(func() error {
  734. return client.Ping().Err()
  735. }, 2*pause).ShouldNot(HaveOccurred())
  736. return nil
  737. })
  738. })
  739. testTimeout()
  740. })
  741. })
  742. //------------------------------------------------------------------------------
  743. func BenchmarkRedisClusterPing(b *testing.B) {
  744. if testing.Short() {
  745. b.Skip("skipping in short mode")
  746. }
  747. cluster := &clusterScenario{
  748. ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
  749. nodeIds: make([]string, 6),
  750. processes: make(map[string]*redisProcess, 6),
  751. clients: make(map[string]*redis.Client, 6),
  752. }
  753. if err := startCluster(cluster); err != nil {
  754. b.Fatal(err)
  755. }
  756. defer stopCluster(cluster)
  757. client := cluster.clusterClient(redisClusterOptions())
  758. defer client.Close()
  759. b.ResetTimer()
  760. b.RunParallel(func(pb *testing.PB) {
  761. for pb.Next() {
  762. if err := client.Ping().Err(); err != nil {
  763. b.Fatal(err)
  764. }
  765. }
  766. })
  767. }
  768. func BenchmarkRedisClusterSetString(b *testing.B) {
  769. if testing.Short() {
  770. b.Skip("skipping in short mode")
  771. }
  772. cluster := &clusterScenario{
  773. ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
  774. nodeIds: make([]string, 6),
  775. processes: make(map[string]*redisProcess, 6),
  776. clients: make(map[string]*redis.Client, 6),
  777. }
  778. if err := startCluster(cluster); err != nil {
  779. b.Fatal(err)
  780. }
  781. defer stopCluster(cluster)
  782. client := cluster.clusterClient(redisClusterOptions())
  783. defer client.Close()
  784. value := string(bytes.Repeat([]byte{'1'}, 10000))
  785. b.ResetTimer()
  786. b.RunParallel(func(pb *testing.PB) {
  787. for pb.Next() {
  788. if err := client.Set("key", value, 0).Err(); err != nil {
  789. b.Fatal(err)
  790. }
  791. }
  792. })
  793. }