http.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package http
  2. import (
  3. "hash/crc32"
  4. "math/rand"
  5. "strconv"
  6. "sync/atomic"
  7. "time"
  8. "go-common/library/log"
  9. bm "go-common/library/net/http/blademaster"
  10. "go-common/library/rate"
  11. "go-common/library/rate/limit"
  12. "go-common/library/rate/limit/bench/stress/conf"
  13. "go-common/library/rate/limit/bench/stress/service"
  14. "go-common/library/rate/vegas"
  15. )
  16. var (
  17. svc *service.Service
  18. req int64
  19. qps int64
  20. )
  21. // Init init
  22. func Init(c *conf.Config) {
  23. rand.Seed(time.Now().Unix())
  24. initService(c)
  25. // init router
  26. engineInner := bm.DefaultServer(c.BM.Inner)
  27. outerRouter(engineInner)
  28. if err := engineInner.Start(); err != nil {
  29. log.Error("xhttp.Serve error(%v)", err)
  30. panic(err)
  31. }
  32. engineLocal := bm.DefaultServer(c.BM.Local)
  33. localRouter(engineLocal)
  34. if err := engineLocal.Start(); err != nil {
  35. log.Error("xhttp.Serve error(%v)", err)
  36. panic(err)
  37. }
  38. if log.V(1) {
  39. go calcuQPS()
  40. }
  41. }
  42. // initService init services.
  43. func initService(c *conf.Config) {
  44. // idfSvc = identify.New(c.Identify)
  45. svc = service.New(c)
  46. }
  47. // outerRouter init outer router api path.
  48. func outerRouter(e *bm.Engine) {
  49. v := vegas.New()
  50. go func() {
  51. ticker := time.NewTicker(time.Second * 3)
  52. defer ticker.Stop()
  53. for {
  54. <-ticker.C
  55. m := v.Stat()
  56. log.Info("vegas: limit(%d) inFlight(%d) minRtt(%v) rtt(%v)", m.Limit, m.InFlight, m.MinRTT, m.LastRTT)
  57. }
  58. }()
  59. l := limit.New(nil)
  60. //init api
  61. e.GET("/monitor/ping", ping)
  62. group := e.Group("/stress")
  63. group.GET("/normal", aqmTest)
  64. group.GET("/vegas", func(c *bm.Context) {
  65. start := time.Now()
  66. done, success := v.Acquire()
  67. if !success {
  68. done(time.Time{}, rate.Ignore)
  69. c.AbortWithStatus(509)
  70. return
  71. }
  72. defer done(start, rate.Success)
  73. c.Next()
  74. }, aqmTest)
  75. group.GET("/attack", func(c *bm.Context) {
  76. done, err := l.Allow(c)
  77. defer done(rate.Success)
  78. if err != nil {
  79. c.AbortWithStatus(509)
  80. return
  81. }
  82. c.Next()
  83. }, aqmTest)
  84. }
  85. func calcuQPS() {
  86. var creq, breq int64
  87. for {
  88. time.Sleep(time.Second * 5)
  89. creq = atomic.LoadInt64(&req)
  90. delta := creq - breq
  91. atomic.StoreInt64(&qps, delta/5)
  92. breq = creq
  93. log.Info("HTTP QPS:%d", atomic.LoadInt64(&qps))
  94. }
  95. }
  96. func aqmTest(c *bm.Context) {
  97. params := c.Request.Form
  98. sleep, err := strconv.ParseInt(params.Get("sleep"), 10, 64)
  99. if err == nil {
  100. time.Sleep(time.Millisecond * time.Duration(sleep))
  101. }
  102. atomic.AddInt64(&req, 1)
  103. for i := 0; i < 3000+rand.Intn(3000); i++ {
  104. crc32.Checksum([]byte(`testasdwfwfsddsfgwddcscsc
  105. http://git.bilibili.co/platform/go-common/merge_requests/new?merge_request%5Bsource_branch%5D=stress%2Fcodel`), crc32.IEEETable)
  106. }
  107. }
  108. // ping check server ok.
  109. func ping(c *bm.Context) {
  110. }
  111. // innerRouter init local router api path.
  112. func localRouter(e *bm.Engine) {
  113. //init api
  114. e.GET("/monitor/ping", ping)
  115. group := e.Group("/x/main/stress")
  116. {
  117. group.GET("", aqmTest)
  118. }
  119. }