ring_test.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package redis_test
  2. import (
  3. "crypto/rand"
  4. "fmt"
  5. "time"
  6. "github.com/go-redis/redis"
  7. . "github.com/onsi/ginkgo"
  8. . "github.com/onsi/gomega"
  9. )
  10. var _ = Describe("Redis Ring", func() {
  11. const heartbeat = 100 * time.Millisecond
  12. var ring *redis.Ring
  13. setRingKeys := func() {
  14. for i := 0; i < 100; i++ {
  15. err := ring.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
  16. Expect(err).NotTo(HaveOccurred())
  17. }
  18. }
  19. BeforeEach(func() {
  20. opt := redisRingOptions()
  21. opt.HeartbeatFrequency = heartbeat
  22. ring = redis.NewRing(opt)
  23. err := ring.ForEachShard(func(cl *redis.Client) error {
  24. return cl.FlushDB().Err()
  25. })
  26. Expect(err).NotTo(HaveOccurred())
  27. })
  28. AfterEach(func() {
  29. Expect(ring.Close()).NotTo(HaveOccurred())
  30. })
  31. It("distributes keys", func() {
  32. setRingKeys()
  33. // Both shards should have some keys now.
  34. Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=57"))
  35. Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
  36. })
  37. It("distributes keys when using EVAL", func() {
  38. script := redis.NewScript(`
  39. local r = redis.call('SET', KEYS[1], ARGV[1])
  40. return r
  41. `)
  42. var key string
  43. for i := 0; i < 100; i++ {
  44. key = fmt.Sprintf("key%d", i)
  45. err := script.Run(ring, []string{key}, "value").Err()
  46. Expect(err).NotTo(HaveOccurred())
  47. }
  48. Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=57"))
  49. Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
  50. })
  51. It("uses single shard when one of the shards is down", func() {
  52. // Stop ringShard2.
  53. Expect(ringShard2.Close()).NotTo(HaveOccurred())
  54. // Ring needs 3 * heartbeat time to detect that node is down.
  55. // Give it more to be sure.
  56. time.Sleep(2 * 3 * heartbeat)
  57. setRingKeys()
  58. // RingShard1 should have all keys.
  59. Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=100"))
  60. // Start ringShard2.
  61. var err error
  62. ringShard2, err = startRedis(ringShard2Port)
  63. Expect(err).NotTo(HaveOccurred())
  64. // Wait for ringShard2 to come up.
  65. Eventually(func() error {
  66. return ringShard2.Ping().Err()
  67. }, "1s").ShouldNot(HaveOccurred())
  68. // Ring needs heartbeat time to detect that node is up.
  69. // Give it more to be sure.
  70. time.Sleep(heartbeat + heartbeat)
  71. setRingKeys()
  72. // RingShard2 should have its keys.
  73. Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
  74. })
  75. It("supports hash tags", func() {
  76. for i := 0; i < 100; i++ {
  77. err := ring.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
  78. Expect(err).NotTo(HaveOccurred())
  79. }
  80. Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys="))
  81. Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100"))
  82. })
  83. Describe("pipeline", func() {
  84. It("distributes keys", func() {
  85. pipe := ring.Pipeline()
  86. for i := 0; i < 100; i++ {
  87. err := pipe.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
  88. Expect(err).NotTo(HaveOccurred())
  89. }
  90. cmds, err := pipe.Exec()
  91. Expect(err).NotTo(HaveOccurred())
  92. Expect(cmds).To(HaveLen(100))
  93. Expect(pipe.Close()).NotTo(HaveOccurred())
  94. for _, cmd := range cmds {
  95. Expect(cmd.Err()).NotTo(HaveOccurred())
  96. Expect(cmd.(*redis.StatusCmd).Val()).To(Equal("OK"))
  97. }
  98. // Both shards should have some keys now.
  99. Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=57"))
  100. Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
  101. })
  102. It("is consistent with ring", func() {
  103. var keys []string
  104. for i := 0; i < 100; i++ {
  105. key := make([]byte, 64)
  106. _, err := rand.Read(key)
  107. Expect(err).NotTo(HaveOccurred())
  108. keys = append(keys, string(key))
  109. }
  110. _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
  111. for _, key := range keys {
  112. pipe.Set(key, "value", 0).Err()
  113. }
  114. return nil
  115. })
  116. Expect(err).NotTo(HaveOccurred())
  117. for _, key := range keys {
  118. val, err := ring.Get(key).Result()
  119. Expect(err).NotTo(HaveOccurred())
  120. Expect(val).To(Equal("value"))
  121. }
  122. })
  123. It("supports hash tags", func() {
  124. _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
  125. for i := 0; i < 100; i++ {
  126. pipe.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
  127. }
  128. return nil
  129. })
  130. Expect(err).NotTo(HaveOccurred())
  131. Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys="))
  132. Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100"))
  133. })
  134. })
  135. })
  136. var _ = Describe("empty Redis Ring", func() {
  137. var ring *redis.Ring
  138. BeforeEach(func() {
  139. ring = redis.NewRing(&redis.RingOptions{})
  140. })
  141. AfterEach(func() {
  142. Expect(ring.Close()).NotTo(HaveOccurred())
  143. })
  144. It("returns an error", func() {
  145. err := ring.Ping().Err()
  146. Expect(err).To(MatchError("redis: all ring shards are down"))
  147. })
  148. It("pipeline returns an error", func() {
  149. _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
  150. pipe.Ping()
  151. return nil
  152. })
  153. Expect(err).To(MatchError("redis: all ring shards are down"))
  154. })
  155. })