main_test.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. package redis_test
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "os"
  7. "os/exec"
  8. "path/filepath"
  9. "sync"
  10. "sync/atomic"
  11. "testing"
  12. "time"
  13. "github.com/go-redis/redis"
  14. . "github.com/onsi/ginkgo"
  15. . "github.com/onsi/gomega"
  16. )
  17. const (
  18. redisPort = "6380"
  19. redisAddr = ":" + redisPort
  20. redisSecondaryPort = "6381"
  21. )
  22. const (
  23. ringShard1Port = "6390"
  24. ringShard2Port = "6391"
  25. )
  26. const (
  27. sentinelName = "mymaster"
  28. sentinelMasterPort = "8123"
  29. sentinelSlave1Port = "8124"
  30. sentinelSlave2Port = "8125"
  31. sentinelPort = "8126"
  32. )
  33. var (
  34. redisMain *redisProcess
  35. ringShard1, ringShard2 *redisProcess
  36. sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess
  37. )
  38. var cluster = &clusterScenario{
  39. ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
  40. nodeIds: make([]string, 6),
  41. processes: make(map[string]*redisProcess, 6),
  42. clients: make(map[string]*redis.Client, 6),
  43. }
  44. var _ = BeforeSuite(func() {
  45. var err error
  46. redisMain, err = startRedis(redisPort)
  47. Expect(err).NotTo(HaveOccurred())
  48. ringShard1, err = startRedis(ringShard1Port)
  49. Expect(err).NotTo(HaveOccurred())
  50. ringShard2, err = startRedis(ringShard2Port)
  51. Expect(err).NotTo(HaveOccurred())
  52. sentinelMaster, err = startRedis(sentinelMasterPort)
  53. Expect(err).NotTo(HaveOccurred())
  54. sentinel, err = startSentinel(sentinelPort, sentinelName, sentinelMasterPort)
  55. Expect(err).NotTo(HaveOccurred())
  56. sentinelSlave1, err = startRedis(
  57. sentinelSlave1Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
  58. Expect(err).NotTo(HaveOccurred())
  59. sentinelSlave2, err = startRedis(
  60. sentinelSlave2Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
  61. Expect(err).NotTo(HaveOccurred())
  62. Expect(startCluster(cluster)).NotTo(HaveOccurred())
  63. })
  64. var _ = AfterSuite(func() {
  65. Expect(redisMain.Close()).NotTo(HaveOccurred())
  66. Expect(ringShard1.Close()).NotTo(HaveOccurred())
  67. Expect(ringShard2.Close()).NotTo(HaveOccurred())
  68. Expect(sentinel.Close()).NotTo(HaveOccurred())
  69. Expect(sentinelSlave1.Close()).NotTo(HaveOccurred())
  70. Expect(sentinelSlave2.Close()).NotTo(HaveOccurred())
  71. Expect(sentinelMaster.Close()).NotTo(HaveOccurred())
  72. Expect(stopCluster(cluster)).NotTo(HaveOccurred())
  73. })
  74. func TestGinkgoSuite(t *testing.T) {
  75. RegisterFailHandler(Fail)
  76. RunSpecs(t, "go-redis")
  77. }
  78. //------------------------------------------------------------------------------
  79. func redisOptions() *redis.Options {
  80. return &redis.Options{
  81. Addr: redisAddr,
  82. DB: 15,
  83. DialTimeout: 10 * time.Second,
  84. ReadTimeout: 30 * time.Second,
  85. WriteTimeout: 30 * time.Second,
  86. PoolSize: 10,
  87. PoolTimeout: 30 * time.Second,
  88. IdleTimeout: 500 * time.Millisecond,
  89. IdleCheckFrequency: 500 * time.Millisecond,
  90. }
  91. }
  92. func redisClusterOptions() *redis.ClusterOptions {
  93. return &redis.ClusterOptions{
  94. DialTimeout: 10 * time.Second,
  95. ReadTimeout: 30 * time.Second,
  96. WriteTimeout: 30 * time.Second,
  97. PoolSize: 10,
  98. PoolTimeout: 30 * time.Second,
  99. IdleTimeout: 500 * time.Millisecond,
  100. IdleCheckFrequency: 500 * time.Millisecond,
  101. }
  102. }
  103. func redisRingOptions() *redis.RingOptions {
  104. return &redis.RingOptions{
  105. Addrs: map[string]string{
  106. "ringShardOne": ":" + ringShard1Port,
  107. "ringShardTwo": ":" + ringShard2Port,
  108. },
  109. DialTimeout: 10 * time.Second,
  110. ReadTimeout: 30 * time.Second,
  111. WriteTimeout: 30 * time.Second,
  112. PoolSize: 10,
  113. PoolTimeout: 30 * time.Second,
  114. IdleTimeout: 500 * time.Millisecond,
  115. IdleCheckFrequency: 500 * time.Millisecond,
  116. }
  117. }
  118. func performAsync(n int, cbs ...func(int)) *sync.WaitGroup {
  119. var wg sync.WaitGroup
  120. for _, cb := range cbs {
  121. for i := 0; i < n; i++ {
  122. wg.Add(1)
  123. go func(cb func(int), i int) {
  124. defer GinkgoRecover()
  125. defer wg.Done()
  126. cb(i)
  127. }(cb, i)
  128. }
  129. }
  130. return &wg
  131. }
  132. func perform(n int, cbs ...func(int)) {
  133. wg := performAsync(n, cbs...)
  134. wg.Wait()
  135. }
  136. func eventually(fn func() error, timeout time.Duration) error {
  137. var exit int32
  138. errCh := make(chan error)
  139. done := make(chan struct{})
  140. go func() {
  141. defer GinkgoRecover()
  142. for atomic.LoadInt32(&exit) == 0 {
  143. err := fn()
  144. if err == nil {
  145. close(done)
  146. return
  147. }
  148. select {
  149. case errCh <- err:
  150. default:
  151. }
  152. time.Sleep(timeout / 100)
  153. }
  154. }()
  155. select {
  156. case <-done:
  157. return nil
  158. case <-time.After(timeout):
  159. atomic.StoreInt32(&exit, 1)
  160. select {
  161. case err := <-errCh:
  162. return err
  163. default:
  164. return fmt.Errorf("timeout after %s", timeout)
  165. }
  166. }
  167. }
  168. func execCmd(name string, args ...string) (*os.Process, error) {
  169. cmd := exec.Command(name, args...)
  170. if testing.Verbose() {
  171. cmd.Stdout = os.Stdout
  172. cmd.Stderr = os.Stderr
  173. }
  174. return cmd.Process, cmd.Start()
  175. }
  176. func connectTo(port string) (*redis.Client, error) {
  177. client := redis.NewClient(&redis.Options{
  178. Addr: ":" + port,
  179. })
  180. err := eventually(func() error {
  181. return client.Ping().Err()
  182. }, 30*time.Second)
  183. if err != nil {
  184. return nil, err
  185. }
  186. return client, nil
  187. }
  188. type redisProcess struct {
  189. *os.Process
  190. *redis.Client
  191. }
  192. func (p *redisProcess) Close() error {
  193. if err := p.Kill(); err != nil {
  194. return err
  195. }
  196. err := eventually(func() error {
  197. if err := p.Client.Ping().Err(); err != nil {
  198. return nil
  199. }
  200. return errors.New("client is not shutdown")
  201. }, 10*time.Second)
  202. if err != nil {
  203. return err
  204. }
  205. p.Client.Close()
  206. return nil
  207. }
  208. var (
  209. redisServerBin, _ = filepath.Abs(filepath.Join("testdata", "redis", "src", "redis-server"))
  210. redisServerConf, _ = filepath.Abs(filepath.Join("testdata", "redis.conf"))
  211. )
  212. func redisDir(port string) (string, error) {
  213. dir, err := filepath.Abs(filepath.Join("testdata", "instances", port))
  214. if err != nil {
  215. return "", err
  216. }
  217. if err := os.RemoveAll(dir); err != nil {
  218. return "", err
  219. }
  220. if err := os.MkdirAll(dir, 0775); err != nil {
  221. return "", err
  222. }
  223. return dir, nil
  224. }
  225. func startRedis(port string, args ...string) (*redisProcess, error) {
  226. dir, err := redisDir(port)
  227. if err != nil {
  228. return nil, err
  229. }
  230. if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
  231. return nil, err
  232. }
  233. baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir}
  234. process, err := execCmd(redisServerBin, append(baseArgs, args...)...)
  235. if err != nil {
  236. return nil, err
  237. }
  238. client, err := connectTo(port)
  239. if err != nil {
  240. process.Kill()
  241. return nil, err
  242. }
  243. return &redisProcess{process, client}, err
  244. }
  245. func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
  246. dir, err := redisDir(port)
  247. if err != nil {
  248. return nil, err
  249. }
  250. process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir)
  251. if err != nil {
  252. return nil, err
  253. }
  254. client, err := connectTo(port)
  255. if err != nil {
  256. process.Kill()
  257. return nil, err
  258. }
  259. for _, cmd := range []*redis.StatusCmd{
  260. redis.NewStatusCmd("SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "1"),
  261. redis.NewStatusCmd("SENTINEL", "SET", masterName, "down-after-milliseconds", "500"),
  262. redis.NewStatusCmd("SENTINEL", "SET", masterName, "failover-timeout", "1000"),
  263. redis.NewStatusCmd("SENTINEL", "SET", masterName, "parallel-syncs", "1"),
  264. } {
  265. client.Process(cmd)
  266. if err := cmd.Err(); err != nil {
  267. process.Kill()
  268. return nil, err
  269. }
  270. }
  271. return &redisProcess{process, client}, nil
  272. }
  273. //------------------------------------------------------------------------------
  274. type badConnError string
  275. func (e badConnError) Error() string { return string(e) }
  276. func (e badConnError) Timeout() bool { return false }
  277. func (e badConnError) Temporary() bool { return false }
  278. type badConn struct {
  279. net.TCPConn
  280. readDelay, writeDelay time.Duration
  281. readErr, writeErr error
  282. }
  283. var _ net.Conn = &badConn{}
  284. func (cn *badConn) Read([]byte) (int, error) {
  285. if cn.readDelay != 0 {
  286. time.Sleep(cn.readDelay)
  287. }
  288. if cn.readErr != nil {
  289. return 0, cn.readErr
  290. }
  291. return 0, badConnError("bad connection")
  292. }
  293. func (cn *badConn) Write([]byte) (int, error) {
  294. if cn.writeDelay != 0 {
  295. time.Sleep(cn.writeDelay)
  296. }
  297. if cn.writeErr != nil {
  298. return 0, cn.writeErr
  299. }
  300. return 0, badConnError("bad connection")
  301. }