degrade.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package cache
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "fmt"
  6. "net/http"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "go-common/library/ecode"
  12. "go-common/library/log"
  13. bm "go-common/library/net/http/blademaster"
  14. "go-common/library/net/http/blademaster/middleware/cache/store"
  15. )
  16. const (
  17. _degradeInterval = 60 * 10
  18. _degradePrefix = "bm.degrade"
  19. )
  20. var (
  21. _degradeBytes = []byte(fmt.Sprintf("{\"code\":%d, \"message\":\"\"}", ecode.Degrade))
  22. )
  23. // Degrader is the common degrader instance.
  24. type Degrader struct {
  25. lock sync.RWMutex
  26. urls map[string]*state
  27. expire int32
  28. ch chan *result
  29. pool sync.Pool // degradeWriter pool
  30. }
  31. // argsDegrader means the degrade will happened by args policy
  32. type argsDegrader struct {
  33. *Degrader
  34. args []string
  35. }
  36. type degradeWriter struct {
  37. *Degrader
  38. ctx *bm.Context
  39. response http.ResponseWriter
  40. store store.Store
  41. key string
  42. state *state
  43. }
  44. type state struct {
  45. // FIXME(zhoujiahui): using transient map to avoid potential memory leak?
  46. // record last cached time
  47. sync.RWMutex
  48. gens map[string]*int64
  49. }
  50. type result struct {
  51. key string
  52. value []byte
  53. store store.Store
  54. }
  55. var _ http.ResponseWriter = &degradeWriter{}
  56. var _ Policy = &argsDegrader{}
  57. // NewDegrader will create a new degrade struct
  58. func NewDegrader(expire int32) (d *Degrader) {
  59. d = &Degrader{
  60. urls: make(map[string]*state),
  61. ch: make(chan *result, 1024),
  62. expire: expire,
  63. }
  64. d.pool.New = func() interface{} {
  65. return &degradeWriter{
  66. Degrader: d,
  67. }
  68. }
  69. go d.degradeproc()
  70. return
  71. }
  72. func (d *Degrader) degradeproc() {
  73. for {
  74. r := <-d.ch
  75. if err := r.store.Set(context.Background(), r.key, r.value, d.expire); err != nil {
  76. log.Error("store write key(%s) error(%v)", r.key, err)
  77. }
  78. }
  79. }
  80. // Args means this path will be degrade by specified args
  81. func (d *Degrader) Args(args ...string) Policy {
  82. return &argsDegrader{
  83. Degrader: d,
  84. args: args,
  85. }
  86. }
  87. func (d *Degrader) state(path string) *state {
  88. d.lock.RLock()
  89. s, ok := d.urls[path]
  90. d.lock.RUnlock()
  91. if !ok {
  92. s = &state{
  93. gens: make(map[string]*int64),
  94. }
  95. d.lock.Lock()
  96. d.urls[path] = s
  97. d.lock.Unlock()
  98. }
  99. return s
  100. }
  101. // Key is used to identify response cache key in most key-value store
  102. func (ad *argsDegrader) Key(ctx *bm.Context) string {
  103. req := ctx.Request
  104. path := req.URL.Path
  105. params := req.Form
  106. vs := make([]string, 0, len(ad.args))
  107. for _, arg := range ad.args {
  108. vs = append(vs, params.Get(arg))
  109. }
  110. return fmt.Sprintf("%s:%s_%x", _degradePrefix, strings.Replace(path, "/", "_", -1), md5.Sum([]byte(strings.Join(vs, "-"))))
  111. }
  112. // Handler is used to execute degrade service
  113. func (ad *argsDegrader) Handler(store store.Store) bm.HandlerFunc {
  114. return func(ctx *bm.Context) {
  115. req := ctx.Request
  116. path := req.URL.Path
  117. writer := ad.pool.Get().(*degradeWriter)
  118. writer.response = ctx.Writer
  119. writer.ctx = ctx
  120. writer.store = store
  121. writer.state = ad.state(path)
  122. writer.key = ad.Key(ctx)
  123. ctx.Writer = writer // replace to degrade writer
  124. ctx.Next()
  125. ad.pool.Put(writer)
  126. }
  127. }
  128. func (w *degradeWriter) Header() http.Header { return w.response.Header() }
  129. func (w *degradeWriter) WriteHeader(code int) { w.response.WriteHeader(code) }
  130. func (w *degradeWriter) Write(data []byte) (size int, err error) {
  131. e := w.ctx.Error
  132. // if an degrade error code is raised from upstream,
  133. // degrade this request directly
  134. if e != nil {
  135. if ec := ecode.Cause(e); ec.Code() == ecode.Degrade.Code() {
  136. return w.write()
  137. }
  138. }
  139. // write origin response
  140. if size, err = w.response.Write(data); err != nil {
  141. return
  142. }
  143. // error raised, this is a unsuccessful response
  144. if e != nil {
  145. return
  146. }
  147. // is required to cache
  148. if !w.state.required(w.key) {
  149. return
  150. }
  151. // async cache succeeded response for further degradation
  152. select {
  153. case w.ch <- &result{key: w.key, value: data, store: w.store}:
  154. default:
  155. }
  156. return
  157. }
  158. func (w *degradeWriter) write() (int, error) {
  159. data, err := w.store.Get(w.ctx, w.key)
  160. if err != nil || len(data) == 0 {
  161. // FIXME(zhoujiahui): The default response data should be respect to render type or content-type header
  162. data = _degradeBytes
  163. }
  164. return w.response.Write(data)
  165. }
  166. // check is required to cache response
  167. // it depends on last cache time and _degradeInterval
  168. func (st *state) required(key string) bool {
  169. now := time.Now().Unix()
  170. st.RLock()
  171. pLast, ok := st.gens[key]
  172. st.RUnlock()
  173. if !ok {
  174. st.Lock()
  175. pLast = new(int64)
  176. st.gens[key] = pLast
  177. st.Unlock()
  178. }
  179. last := atomic.LoadInt64(pLast)
  180. if now-last < _degradeInterval {
  181. return false
  182. }
  183. return atomic.CompareAndSwapInt64(pLast, last, now)
  184. }