sentinel.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. package redis
  2. import (
  3. "crypto/tls"
  4. "errors"
  5. "net"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/go-redis/redis/internal"
  10. "github.com/go-redis/redis/internal/pool"
  11. )
  12. //------------------------------------------------------------------------------
  13. // FailoverOptions are used to configure a failover client and should
  14. // be passed to NewFailoverClient.
  15. type FailoverOptions struct {
  16. // The master name.
  17. MasterName string
  18. // A seed list of host:port addresses of sentinel nodes.
  19. SentinelAddrs []string
  20. // Following options are copied from Options struct.
  21. OnConnect func(*Conn) error
  22. Password string
  23. DB int
  24. MaxRetries int
  25. DialTimeout time.Duration
  26. ReadTimeout time.Duration
  27. WriteTimeout time.Duration
  28. PoolSize int
  29. PoolTimeout time.Duration
  30. IdleTimeout time.Duration
  31. IdleCheckFrequency time.Duration
  32. TLSConfig *tls.Config
  33. }
  34. func (opt *FailoverOptions) options() *Options {
  35. return &Options{
  36. Addr: "FailoverClient",
  37. OnConnect: opt.OnConnect,
  38. DB: opt.DB,
  39. Password: opt.Password,
  40. MaxRetries: opt.MaxRetries,
  41. DialTimeout: opt.DialTimeout,
  42. ReadTimeout: opt.ReadTimeout,
  43. WriteTimeout: opt.WriteTimeout,
  44. PoolSize: opt.PoolSize,
  45. PoolTimeout: opt.PoolTimeout,
  46. IdleTimeout: opt.IdleTimeout,
  47. IdleCheckFrequency: opt.IdleCheckFrequency,
  48. TLSConfig: opt.TLSConfig,
  49. }
  50. }
  51. // NewFailoverClient returns a Redis client that uses Redis Sentinel
  52. // for automatic failover. It's safe for concurrent use by multiple
  53. // goroutines.
  54. func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
  55. opt := failoverOpt.options()
  56. opt.init()
  57. failover := &sentinelFailover{
  58. masterName: failoverOpt.MasterName,
  59. sentinelAddrs: failoverOpt.SentinelAddrs,
  60. opt: opt,
  61. }
  62. c := Client{
  63. baseClient: baseClient{
  64. opt: opt,
  65. connPool: failover.Pool(),
  66. onClose: func() error {
  67. return failover.Close()
  68. },
  69. },
  70. }
  71. c.baseClient.init()
  72. c.setProcessor(c.Process)
  73. return &c
  74. }
  75. //------------------------------------------------------------------------------
  76. type SentinelClient struct {
  77. baseClient
  78. }
  79. func NewSentinelClient(opt *Options) *SentinelClient {
  80. opt.init()
  81. c := &SentinelClient{
  82. baseClient: baseClient{
  83. opt: opt,
  84. connPool: newConnPool(opt),
  85. },
  86. }
  87. c.baseClient.init()
  88. return c
  89. }
  90. func (c *SentinelClient) PubSub() *PubSub {
  91. return &PubSub{
  92. opt: c.opt,
  93. newConn: func(channels []string) (*pool.Conn, error) {
  94. return c.newConn()
  95. },
  96. closeConn: c.connPool.CloseConn,
  97. }
  98. }
  99. func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
  100. cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
  101. c.Process(cmd)
  102. return cmd
  103. }
  104. func (c *SentinelClient) Sentinels(name string) *SliceCmd {
  105. cmd := NewSliceCmd("SENTINEL", "sentinels", name)
  106. c.Process(cmd)
  107. return cmd
  108. }
  109. type sentinelFailover struct {
  110. sentinelAddrs []string
  111. opt *Options
  112. pool *pool.ConnPool
  113. poolOnce sync.Once
  114. mu sync.RWMutex
  115. masterName string
  116. _masterAddr string
  117. sentinel *SentinelClient
  118. }
  119. func (d *sentinelFailover) Close() error {
  120. return d.resetSentinel()
  121. }
  122. func (d *sentinelFailover) Pool() *pool.ConnPool {
  123. d.poolOnce.Do(func() {
  124. d.opt.Dialer = d.dial
  125. d.pool = newConnPool(d.opt)
  126. })
  127. return d.pool
  128. }
  129. func (d *sentinelFailover) dial() (net.Conn, error) {
  130. addr, err := d.MasterAddr()
  131. if err != nil {
  132. return nil, err
  133. }
  134. return net.DialTimeout("tcp", addr, d.opt.DialTimeout)
  135. }
  136. func (d *sentinelFailover) MasterAddr() (string, error) {
  137. d.mu.Lock()
  138. defer d.mu.Unlock()
  139. addr, err := d.masterAddr()
  140. if err != nil {
  141. return "", err
  142. }
  143. if d._masterAddr != addr {
  144. d.switchMaster(addr)
  145. }
  146. return addr, nil
  147. }
  148. func (d *sentinelFailover) masterAddr() (string, error) {
  149. // Try last working sentinel.
  150. if d.sentinel != nil {
  151. addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
  152. if err == nil {
  153. addr := net.JoinHostPort(addr[0], addr[1])
  154. internal.Logf("sentinel: master=%q addr=%q", d.masterName, addr)
  155. return addr, nil
  156. }
  157. internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", d.masterName, err)
  158. d._resetSentinel()
  159. }
  160. for i, sentinelAddr := range d.sentinelAddrs {
  161. sentinel := NewSentinelClient(&Options{
  162. Addr: sentinelAddr,
  163. DialTimeout: d.opt.DialTimeout,
  164. ReadTimeout: d.opt.ReadTimeout,
  165. WriteTimeout: d.opt.WriteTimeout,
  166. PoolSize: d.opt.PoolSize,
  167. PoolTimeout: d.opt.PoolTimeout,
  168. IdleTimeout: d.opt.IdleTimeout,
  169. })
  170. masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
  171. if err != nil {
  172. internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
  173. d.masterName, err)
  174. sentinel.Close()
  175. continue
  176. }
  177. // Push working sentinel to the top.
  178. d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]
  179. d.setSentinel(sentinel)
  180. addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
  181. return addr, nil
  182. }
  183. return "", errors.New("redis: all sentinels are unreachable")
  184. }
  185. func (d *sentinelFailover) switchMaster(masterAddr string) {
  186. internal.Logf(
  187. "sentinel: new master=%q addr=%q",
  188. d.masterName, masterAddr,
  189. )
  190. _ = d.Pool().Filter(func(cn *pool.Conn) bool {
  191. return cn.RemoteAddr().String() != masterAddr
  192. })
  193. d._masterAddr = masterAddr
  194. }
  195. func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) {
  196. d.discoverSentinels(sentinel)
  197. d.sentinel = sentinel
  198. go d.listen(sentinel)
  199. }
  200. func (d *sentinelFailover) resetSentinel() error {
  201. var err error
  202. d.mu.Lock()
  203. if d.sentinel != nil {
  204. err = d._resetSentinel()
  205. }
  206. d.mu.Unlock()
  207. return err
  208. }
  209. func (d *sentinelFailover) _resetSentinel() error {
  210. err := d.sentinel.Close()
  211. d.sentinel = nil
  212. return err
  213. }
  214. func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
  215. sentinels, err := sentinel.Sentinels(d.masterName).Result()
  216. if err != nil {
  217. internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err)
  218. return
  219. }
  220. for _, sentinel := range sentinels {
  221. vals := sentinel.([]interface{})
  222. for i := 0; i < len(vals); i += 2 {
  223. key := vals[i].(string)
  224. if key == "name" {
  225. sentinelAddr := vals[i+1].(string)
  226. if !contains(d.sentinelAddrs, sentinelAddr) {
  227. internal.Logf(
  228. "sentinel: discovered new sentinel=%q for master=%q",
  229. sentinelAddr, d.masterName,
  230. )
  231. d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)
  232. }
  233. }
  234. }
  235. }
  236. }
  237. func (d *sentinelFailover) listen(sentinel *SentinelClient) {
  238. var pubsub *PubSub
  239. for {
  240. if pubsub == nil {
  241. pubsub = sentinel.PubSub()
  242. if err := pubsub.Subscribe("+switch-master"); err != nil {
  243. internal.Logf("sentinel: Subscribe failed: %s", err)
  244. pubsub.Close()
  245. d.resetSentinel()
  246. return
  247. }
  248. }
  249. msg, err := pubsub.ReceiveMessage()
  250. if err != nil {
  251. if err != pool.ErrClosed {
  252. internal.Logf("sentinel: ReceiveMessage failed: %s", err)
  253. pubsub.Close()
  254. }
  255. d.resetSentinel()
  256. return
  257. }
  258. switch msg.Channel {
  259. case "+switch-master":
  260. parts := strings.Split(msg.Payload, " ")
  261. if parts[0] != d.masterName {
  262. internal.Logf("sentinel: ignore addr for master=%q", parts[0])
  263. continue
  264. }
  265. addr := net.JoinHostPort(parts[3], parts[4])
  266. d.mu.Lock()
  267. if d._masterAddr != addr {
  268. d.switchMaster(addr)
  269. }
  270. d.mu.Unlock()
  271. }
  272. }
  273. }
  274. func contains(slice []string, str string) bool {
  275. for _, s := range slice {
  276. if s == str {
  277. return true
  278. }
  279. }
  280. return false
  281. }