pool.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package memcache
  2. import (
  3. "context"
  4. "io"
  5. "time"
  6. "go-common/library/container/pool"
  7. "go-common/library/stat"
  8. xtime "go-common/library/time"
  9. )
  10. var stats = stat.Cache
  11. // Config memcache config.
  12. type Config struct {
  13. *pool.Config
  14. Name string // memcache name, for trace
  15. Proto string
  16. Addr string
  17. DialTimeout xtime.Duration
  18. ReadTimeout xtime.Duration
  19. WriteTimeout xtime.Duration
  20. }
  21. // Pool memcache connection pool struct.
  22. type Pool struct {
  23. p pool.Pool
  24. c *Config
  25. }
  26. // NewPool new a memcache conn pool.
  27. func NewPool(c *Config) (p *Pool) {
  28. if c.DialTimeout <= 0 || c.ReadTimeout <= 0 || c.WriteTimeout <= 0 {
  29. panic("must config memcache timeout")
  30. }
  31. p1 := pool.NewList(c.Config)
  32. cnop := DialConnectTimeout(time.Duration(c.DialTimeout))
  33. rdop := DialReadTimeout(time.Duration(c.ReadTimeout))
  34. wrop := DialWriteTimeout(time.Duration(c.WriteTimeout))
  35. p1.New = func(ctx context.Context) (io.Closer, error) {
  36. conn, err := Dial(c.Proto, c.Addr, cnop, rdop, wrop)
  37. return &traceConn{Conn: conn, address: c.Addr}, err
  38. }
  39. p = &Pool{p: p1, c: c}
  40. return
  41. }
  42. // Get gets a connection. The application must close the returned connection.
  43. // This method always returns a valid connection so that applications can defer
  44. // error handling to the first use of the connection. If there is an error
  45. // getting an underlying connection, then the connection Err, Do, Send, Flush
  46. // and Receive methods return that error.
  47. func (p *Pool) Get(ctx context.Context) Conn {
  48. c, err := p.p.Get(ctx)
  49. if err != nil {
  50. return errorConnection{err}
  51. }
  52. c1, _ := c.(Conn)
  53. return &pooledConnection{p: p, c: c1.WithContext(ctx), ctx: ctx}
  54. }
  55. // Close release the resources used by the pool.
  56. func (p *Pool) Close() error {
  57. return p.p.Close()
  58. }
  59. type pooledConnection struct {
  60. p *Pool
  61. c Conn
  62. ctx context.Context
  63. }
  64. func pstat(key string, t time.Time, err error) {
  65. stats.Timing(key, int64(time.Since(t)/time.Millisecond))
  66. if err != nil {
  67. if msg := formatErr(err); msg != "" {
  68. stats.Incr("memcache", msg)
  69. }
  70. }
  71. }
  72. func (pc *pooledConnection) Close() error {
  73. c := pc.c
  74. if _, ok := c.(errorConnection); ok {
  75. return nil
  76. }
  77. pc.c = errorConnection{ErrConnClosed}
  78. pc.p.p.Put(context.Background(), c, c.Err() != nil)
  79. return nil
  80. }
  81. func (pc *pooledConnection) Err() error {
  82. return pc.c.Err()
  83. }
  84. func (pc *pooledConnection) Set(item *Item) (err error) {
  85. now := time.Now()
  86. err = pc.c.Set(item)
  87. pstat("memcache:set", now, err)
  88. return
  89. }
  90. func (pc *pooledConnection) Add(item *Item) (err error) {
  91. now := time.Now()
  92. err = pc.c.Add(item)
  93. pstat("memcache:add", now, err)
  94. return
  95. }
  96. func (pc *pooledConnection) Replace(item *Item) (err error) {
  97. now := time.Now()
  98. err = pc.c.Replace(item)
  99. pstat("memcache:replace", now, err)
  100. return
  101. }
  102. func (pc *pooledConnection) CompareAndSwap(item *Item) (err error) {
  103. now := time.Now()
  104. err = pc.c.CompareAndSwap(item)
  105. pstat("memcache:cas", now, err)
  106. return
  107. }
  108. func (pc *pooledConnection) Get(key string) (r *Item, err error) {
  109. now := time.Now()
  110. r, err = pc.c.Get(key)
  111. pstat("memcache:get", now, err)
  112. return
  113. }
  114. func (pc *pooledConnection) GetMulti(keys []string) (res map[string]*Item, err error) {
  115. // if keys is empty slice returns empty map direct
  116. if len(keys) == 0 {
  117. return make(map[string]*Item), nil
  118. }
  119. now := time.Now()
  120. res, err = pc.c.GetMulti(keys)
  121. pstat("memcache:gets", now, err)
  122. return
  123. }
  124. func (pc *pooledConnection) Touch(key string, timeout int32) (err error) {
  125. now := time.Now()
  126. err = pc.c.Touch(key, timeout)
  127. pstat("memcache:touch", now, err)
  128. return
  129. }
  130. func (pc *pooledConnection) Scan(item *Item, v interface{}) error {
  131. return pc.c.Scan(item, v)
  132. }
  133. func (pc *pooledConnection) WithContext(ctx context.Context) Conn {
  134. // TODO: set context
  135. pc.ctx = ctx
  136. return pc
  137. }
  138. func (pc *pooledConnection) Delete(key string) (err error) {
  139. now := time.Now()
  140. err = pc.c.Delete(key)
  141. pstat("memcache:delete", now, err)
  142. return
  143. }
  144. func (pc *pooledConnection) Increment(key string, delta uint64) (newValue uint64, err error) {
  145. now := time.Now()
  146. newValue, err = pc.c.Increment(key, delta)
  147. pstat("memcache:increment", now, err)
  148. return
  149. }
  150. func (pc *pooledConnection) Decrement(key string, delta uint64) (newValue uint64, err error) {
  151. now := time.Now()
  152. newValue, err = pc.c.Decrement(key, delta)
  153. pstat("memcache:decrement", now, err)
  154. return
  155. }
  156. type errorConnection struct{ err error }
  157. func (ec errorConnection) Err() error { return ec.err }
  158. func (ec errorConnection) Close() error { return ec.err }
  159. func (ec errorConnection) Add(item *Item) error { return ec.err }
  160. func (ec errorConnection) Set(item *Item) error { return ec.err }
  161. func (ec errorConnection) Replace(item *Item) error { return ec.err }
  162. func (ec errorConnection) CompareAndSwap(item *Item) error { return ec.err }
  163. func (ec errorConnection) Get(key string) (*Item, error) { return nil, ec.err }
  164. func (ec errorConnection) GetMulti(keys []string) (map[string]*Item, error) { return nil, ec.err }
  165. func (ec errorConnection) Touch(key string, timeout int32) error { return ec.err }
  166. func (ec errorConnection) Delete(key string) error { return ec.err }
  167. func (ec errorConnection) Increment(key string, delta uint64) (uint64, error) { return 0, ec.err }
  168. func (ec errorConnection) Decrement(key string, delta uint64) (uint64, error) { return 0, ec.err }
  169. func (ec errorConnection) Scan(item *Item, v interface{}) error { return ec.err }
  170. func (ec errorConnection) WithContext(ctx context.Context) Conn { return ec }