pipeline.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package redis
  2. // Not thread-safe.
  3. type Pipeline struct {
  4. *Client
  5. closed bool
  6. }
  7. func (c *Client) Pipeline() *Pipeline {
  8. return &Pipeline{
  9. Client: &Client{
  10. baseClient: &baseClient{
  11. opt: c.opt,
  12. connPool: c.connPool,
  13. cmds: make([]Cmder, 0),
  14. },
  15. },
  16. }
  17. }
  18. func (c *Client) Pipelined(f func(*Pipeline) error) ([]Cmder, error) {
  19. pc := c.Pipeline()
  20. if err := f(pc); err != nil {
  21. return nil, err
  22. }
  23. cmds, err := pc.Exec()
  24. pc.Close()
  25. return cmds, err
  26. }
  27. func (c *Pipeline) Close() error {
  28. c.closed = true
  29. return nil
  30. }
  31. func (c *Pipeline) Discard() error {
  32. if c.closed {
  33. return errClosed
  34. }
  35. c.cmds = c.cmds[:0]
  36. return nil
  37. }
  38. // Exec always returns list of commands and error of the first failed
  39. // command if any.
  40. func (c *Pipeline) Exec() ([]Cmder, error) {
  41. if c.closed {
  42. return nil, errClosed
  43. }
  44. cmds := c.cmds
  45. c.cmds = make([]Cmder, 0)
  46. if len(cmds) == 0 {
  47. return []Cmder{}, nil
  48. }
  49. cn, err := c.conn()
  50. if err != nil {
  51. setCmdsErr(cmds, err)
  52. return cmds, err
  53. }
  54. if err := c.execCmds(cn, cmds); err != nil {
  55. c.freeConn(cn, err)
  56. return cmds, err
  57. }
  58. c.putConn(cn)
  59. return cmds, nil
  60. }
  61. func (c *Pipeline) execCmds(cn *conn, cmds []Cmder) error {
  62. if err := c.writeCmd(cn, cmds...); err != nil {
  63. setCmdsErr(cmds, err)
  64. return err
  65. }
  66. var firstCmdErr error
  67. for _, cmd := range cmds {
  68. if err := cmd.parseReply(cn.rd); err != nil {
  69. if firstCmdErr == nil {
  70. firstCmdErr = err
  71. }
  72. }
  73. }
  74. return firstCmdErr
  75. }