redis.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. package redis
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "os"
  7. "time"
  8. "github.com/go-redis/redis/internal"
  9. "github.com/go-redis/redis/internal/pool"
  10. "github.com/go-redis/redis/internal/proto"
  11. )
  12. // Nil reply Redis returns when key does not exist.
  13. const Nil = proto.Nil
  14. func init() {
  15. SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
  16. }
  17. func SetLogger(logger *log.Logger) {
  18. internal.Logger = logger
  19. }
  20. type baseClient struct {
  21. opt *Options
  22. connPool pool.Pooler
  23. process func(Cmder) error
  24. processPipeline func([]Cmder) error
  25. processTxPipeline func([]Cmder) error
  26. onClose func() error // hook called when client is closed
  27. }
  28. func (c *baseClient) init() {
  29. c.process = c.defaultProcess
  30. c.processPipeline = c.defaultProcessPipeline
  31. c.processTxPipeline = c.defaultProcessTxPipeline
  32. }
  33. func (c *baseClient) String() string {
  34. return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
  35. }
  36. func (c *baseClient) newConn() (*pool.Conn, error) {
  37. cn, err := c.connPool.NewConn()
  38. if err != nil {
  39. return nil, err
  40. }
  41. if !cn.Inited {
  42. if err := c.initConn(cn); err != nil {
  43. _ = c.connPool.CloseConn(cn)
  44. return nil, err
  45. }
  46. }
  47. return cn, nil
  48. }
  49. func (c *baseClient) getConn() (*pool.Conn, error) {
  50. cn, err := c.connPool.Get()
  51. if err != nil {
  52. return nil, err
  53. }
  54. if !cn.Inited {
  55. err := c.initConn(cn)
  56. if err != nil {
  57. c.connPool.Remove(cn)
  58. return nil, err
  59. }
  60. }
  61. return cn, nil
  62. }
  63. func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool {
  64. if internal.IsBadConn(err, false) {
  65. c.connPool.Remove(cn)
  66. return false
  67. }
  68. c.connPool.Put(cn)
  69. return true
  70. }
  71. func (c *baseClient) initConn(cn *pool.Conn) error {
  72. cn.Inited = true
  73. if c.opt.Password == "" &&
  74. c.opt.DB == 0 &&
  75. !c.opt.readOnly &&
  76. c.opt.OnConnect == nil {
  77. return nil
  78. }
  79. conn := newConn(c.opt, cn)
  80. _, err := conn.Pipelined(func(pipe Pipeliner) error {
  81. if c.opt.Password != "" {
  82. pipe.Auth(c.opt.Password)
  83. }
  84. if c.opt.DB > 0 {
  85. pipe.Select(c.opt.DB)
  86. }
  87. if c.opt.readOnly {
  88. pipe.ReadOnly()
  89. }
  90. return nil
  91. })
  92. if err != nil {
  93. return err
  94. }
  95. if c.opt.OnConnect != nil {
  96. return c.opt.OnConnect(conn)
  97. }
  98. return nil
  99. }
  100. // WrapProcess wraps function that processes Redis commands.
  101. func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
  102. c.process = fn(c.process)
  103. }
  104. func (c *baseClient) Process(cmd Cmder) error {
  105. return c.process(cmd)
  106. }
  107. func (c *baseClient) defaultProcess(cmd Cmder) error {
  108. for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
  109. if attempt > 0 {
  110. time.Sleep(c.retryBackoff(attempt))
  111. }
  112. cn, err := c.getConn()
  113. if err != nil {
  114. cmd.setErr(err)
  115. if internal.IsRetryableError(err, true) {
  116. continue
  117. }
  118. return err
  119. }
  120. cn.SetWriteTimeout(c.opt.WriteTimeout)
  121. if err := writeCmd(cn, cmd); err != nil {
  122. c.releaseConn(cn, err)
  123. cmd.setErr(err)
  124. if internal.IsRetryableError(err, true) {
  125. continue
  126. }
  127. return err
  128. }
  129. cn.SetReadTimeout(c.cmdTimeout(cmd))
  130. err = cmd.readReply(cn)
  131. c.releaseConn(cn, err)
  132. if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
  133. continue
  134. }
  135. return err
  136. }
  137. return cmd.Err()
  138. }
  139. func (c *baseClient) retryBackoff(attempt int) time.Duration {
  140. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  141. }
  142. func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
  143. if timeout := cmd.readTimeout(); timeout != nil {
  144. return *timeout
  145. }
  146. return c.opt.ReadTimeout
  147. }
  148. // Close closes the client, releasing any open resources.
  149. //
  150. // It is rare to Close a Client, as the Client is meant to be
  151. // long-lived and shared between many goroutines.
  152. func (c *baseClient) Close() error {
  153. var firstErr error
  154. if c.onClose != nil {
  155. if err := c.onClose(); err != nil && firstErr == nil {
  156. firstErr = err
  157. }
  158. }
  159. if err := c.connPool.Close(); err != nil && firstErr == nil {
  160. firstErr = err
  161. }
  162. return firstErr
  163. }
  164. func (c *baseClient) getAddr() string {
  165. return c.opt.Addr
  166. }
  167. func (c *baseClient) WrapProcessPipeline(
  168. fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
  169. ) {
  170. c.processPipeline = fn(c.processPipeline)
  171. c.processTxPipeline = fn(c.processTxPipeline)
  172. }
  173. func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error {
  174. return c.generalProcessPipeline(cmds, c.pipelineProcessCmds)
  175. }
  176. func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error {
  177. return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds)
  178. }
  179. type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
  180. func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error {
  181. for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
  182. if attempt > 0 {
  183. time.Sleep(c.retryBackoff(attempt))
  184. }
  185. cn, err := c.getConn()
  186. if err != nil {
  187. setCmdsErr(cmds, err)
  188. return err
  189. }
  190. canRetry, err := p(cn, cmds)
  191. if err == nil || internal.IsRedisError(err) {
  192. c.connPool.Put(cn)
  193. break
  194. }
  195. c.connPool.Remove(cn)
  196. if !canRetry || !internal.IsRetryableError(err, true) {
  197. break
  198. }
  199. }
  200. return firstCmdsErr(cmds)
  201. }
  202. func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
  203. cn.SetWriteTimeout(c.opt.WriteTimeout)
  204. if err := writeCmd(cn, cmds...); err != nil {
  205. setCmdsErr(cmds, err)
  206. return true, err
  207. }
  208. // Set read timeout for all commands.
  209. cn.SetReadTimeout(c.opt.ReadTimeout)
  210. return true, pipelineReadCmds(cn, cmds)
  211. }
  212. func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error {
  213. for _, cmd := range cmds {
  214. err := cmd.readReply(cn)
  215. if err != nil && !internal.IsRedisError(err) {
  216. return err
  217. }
  218. }
  219. return nil
  220. }
  221. func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
  222. cn.SetWriteTimeout(c.opt.WriteTimeout)
  223. if err := txPipelineWriteMulti(cn, cmds); err != nil {
  224. setCmdsErr(cmds, err)
  225. return true, err
  226. }
  227. // Set read timeout for all commands.
  228. cn.SetReadTimeout(c.opt.ReadTimeout)
  229. if err := c.txPipelineReadQueued(cn, cmds); err != nil {
  230. setCmdsErr(cmds, err)
  231. return false, err
  232. }
  233. return false, pipelineReadCmds(cn, cmds)
  234. }
  235. func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error {
  236. multiExec := make([]Cmder, 0, len(cmds)+2)
  237. multiExec = append(multiExec, NewStatusCmd("MULTI"))
  238. multiExec = append(multiExec, cmds...)
  239. multiExec = append(multiExec, NewSliceCmd("EXEC"))
  240. return writeCmd(cn, multiExec...)
  241. }
  242. func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error {
  243. // Parse queued replies.
  244. var statusCmd StatusCmd
  245. if err := statusCmd.readReply(cn); err != nil {
  246. return err
  247. }
  248. for _ = range cmds {
  249. err := statusCmd.readReply(cn)
  250. if err != nil && !internal.IsRedisError(err) {
  251. return err
  252. }
  253. }
  254. // Parse number of replies.
  255. line, err := cn.Rd.ReadLine()
  256. if err != nil {
  257. if err == Nil {
  258. err = TxFailedErr
  259. }
  260. return err
  261. }
  262. switch line[0] {
  263. case proto.ErrorReply:
  264. return proto.ParseErrorReply(line)
  265. case proto.ArrayReply:
  266. // ok
  267. default:
  268. err := fmt.Errorf("redis: expected '*', but got line %q", line)
  269. return err
  270. }
  271. return nil
  272. }
  273. //------------------------------------------------------------------------------
  274. // Client is a Redis client representing a pool of zero or more
  275. // underlying connections. It's safe for concurrent use by multiple
  276. // goroutines.
  277. type Client struct {
  278. baseClient
  279. cmdable
  280. ctx context.Context
  281. }
  282. // NewClient returns a client to the Redis Server specified by Options.
  283. func NewClient(opt *Options) *Client {
  284. opt.init()
  285. c := Client{
  286. baseClient: baseClient{
  287. opt: opt,
  288. connPool: newConnPool(opt),
  289. },
  290. }
  291. c.baseClient.init()
  292. c.init()
  293. return &c
  294. }
  295. func (c *Client) init() {
  296. c.cmdable.setProcessor(c.Process)
  297. }
  298. func (c *Client) Context() context.Context {
  299. if c.ctx != nil {
  300. return c.ctx
  301. }
  302. return context.Background()
  303. }
  304. func (c *Client) WithContext(ctx context.Context) *Client {
  305. if ctx == nil {
  306. panic("nil context")
  307. }
  308. c2 := c.copy()
  309. c2.ctx = ctx
  310. return c2
  311. }
  312. func (c *Client) copy() *Client {
  313. cp := *c
  314. cp.init()
  315. return &cp
  316. }
  317. // Options returns read-only Options that were used to create the client.
  318. func (c *Client) Options() *Options {
  319. return c.opt
  320. }
  321. type PoolStats pool.Stats
  322. // PoolStats returns connection pool stats.
  323. func (c *Client) PoolStats() *PoolStats {
  324. stats := c.connPool.Stats()
  325. return (*PoolStats)(stats)
  326. }
  327. func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  328. return c.Pipeline().Pipelined(fn)
  329. }
  330. func (c *Client) Pipeline() Pipeliner {
  331. pipe := Pipeline{
  332. exec: c.processPipeline,
  333. }
  334. pipe.statefulCmdable.setProcessor(pipe.Process)
  335. return &pipe
  336. }
  337. func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  338. return c.TxPipeline().Pipelined(fn)
  339. }
  340. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  341. func (c *Client) TxPipeline() Pipeliner {
  342. pipe := Pipeline{
  343. exec: c.processTxPipeline,
  344. }
  345. pipe.statefulCmdable.setProcessor(pipe.Process)
  346. return &pipe
  347. }
  348. func (c *Client) pubSub() *PubSub {
  349. return &PubSub{
  350. opt: c.opt,
  351. newConn: func(channels []string) (*pool.Conn, error) {
  352. return c.newConn()
  353. },
  354. closeConn: c.connPool.CloseConn,
  355. }
  356. }
  357. // Subscribe subscribes the client to the specified channels.
  358. // Channels can be omitted to create empty subscription.
  359. func (c *Client) Subscribe(channels ...string) *PubSub {
  360. pubsub := c.pubSub()
  361. if len(channels) > 0 {
  362. _ = pubsub.Subscribe(channels...)
  363. }
  364. return pubsub
  365. }
  366. // PSubscribe subscribes the client to the given patterns.
  367. // Patterns can be omitted to create empty subscription.
  368. func (c *Client) PSubscribe(channels ...string) *PubSub {
  369. pubsub := c.pubSub()
  370. if len(channels) > 0 {
  371. _ = pubsub.PSubscribe(channels...)
  372. }
  373. return pubsub
  374. }
  375. //------------------------------------------------------------------------------
  376. // Conn is like Client, but its pool contains single connection.
  377. type Conn struct {
  378. baseClient
  379. statefulCmdable
  380. }
  381. func newConn(opt *Options, cn *pool.Conn) *Conn {
  382. c := Conn{
  383. baseClient: baseClient{
  384. opt: opt,
  385. connPool: pool.NewSingleConnPool(cn),
  386. },
  387. }
  388. c.baseClient.init()
  389. c.statefulCmdable.setProcessor(c.Process)
  390. return &c
  391. }
  392. func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  393. return c.Pipeline().Pipelined(fn)
  394. }
  395. func (c *Conn) Pipeline() Pipeliner {
  396. pipe := Pipeline{
  397. exec: c.processPipeline,
  398. }
  399. pipe.statefulCmdable.setProcessor(pipe.Process)
  400. return &pipe
  401. }
  402. func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  403. return c.TxPipeline().Pipelined(fn)
  404. }
  405. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  406. func (c *Conn) TxPipeline() Pipeliner {
  407. pipe := Pipeline{
  408. exec: c.processTxPipeline,
  409. }
  410. pipe.statefulCmdable.setProcessor(pipe.Process)
  411. return &pipe
  412. }