tx.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package redis
  2. import (
  3. "github.com/go-redis/redis/internal/pool"
  4. "github.com/go-redis/redis/internal/proto"
  5. )
  6. // TxFailedErr transaction redis failed.
  7. const TxFailedErr = proto.RedisError("redis: transaction failed")
  8. // Tx implements Redis transactions as described in
  9. // http://redis.io/topics/transactions. It's NOT safe for concurrent use
  10. // by multiple goroutines, because Exec resets list of watched keys.
  11. // If you don't need WATCH it is better to use Pipeline.
  12. type Tx struct {
  13. statefulCmdable
  14. baseClient
  15. }
  16. func (c *Client) newTx() *Tx {
  17. tx := Tx{
  18. baseClient: baseClient{
  19. opt: c.opt,
  20. connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true),
  21. },
  22. }
  23. tx.baseClient.init()
  24. tx.statefulCmdable.setProcessor(tx.Process)
  25. return &tx
  26. }
  27. func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
  28. tx := c.newTx()
  29. if len(keys) > 0 {
  30. if err := tx.Watch(keys...).Err(); err != nil {
  31. _ = tx.Close()
  32. return err
  33. }
  34. }
  35. err := fn(tx)
  36. _ = tx.Close()
  37. return err
  38. }
  39. // Close closes the transaction, releasing any open resources.
  40. func (c *Tx) Close() error {
  41. _ = c.Unwatch().Err()
  42. return c.baseClient.Close()
  43. }
  44. // Watch marks the keys to be watched for conditional execution
  45. // of a transaction.
  46. func (c *Tx) Watch(keys ...string) *StatusCmd {
  47. args := make([]interface{}, 1+len(keys))
  48. args[0] = "watch"
  49. for i, key := range keys {
  50. args[1+i] = key
  51. }
  52. cmd := NewStatusCmd(args...)
  53. c.Process(cmd)
  54. return cmd
  55. }
  56. // Unwatch flushes all the previously watched keys for a transaction.
  57. func (c *Tx) Unwatch(keys ...string) *StatusCmd {
  58. args := make([]interface{}, 1+len(keys))
  59. args[0] = "unwatch"
  60. for i, key := range keys {
  61. args[1+i] = key
  62. }
  63. cmd := NewStatusCmd(args...)
  64. c.Process(cmd)
  65. return cmd
  66. }
  67. func (c *Tx) Pipeline() Pipeliner {
  68. pipe := Pipeline{
  69. exec: c.processTxPipeline,
  70. }
  71. pipe.statefulCmdable.setProcessor(pipe.Process)
  72. return &pipe
  73. }
  74. // Pipelined executes commands queued in the fn in a transaction
  75. // and restores the connection state to normal.
  76. //
  77. // When using WATCH, EXEC will execute commands only if the watched keys
  78. // were not modified, allowing for a check-and-set mechanism.
  79. //
  80. // Exec always returns list of commands. If transaction fails
  81. // TxFailedErr is returned. Otherwise Exec returns error of the first
  82. // failed command or nil.
  83. func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  84. return c.Pipeline().Pipelined(fn)
  85. }
  86. func (c *Tx) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  87. return c.Pipelined(fn)
  88. }
  89. func (c *Tx) TxPipeline() Pipeliner {
  90. return c.Pipeline()
  91. }