prometheus.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. package service
  2. import (
  3. "context"
  4. "crypto/sha1"
  5. "encoding/hex"
  6. "fmt"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "go-common/app/admin/main/apm/model/monitor"
  13. "go-common/library/log"
  14. "github.com/gogo/protobuf/sortkeys"
  15. )
  16. const (
  17. countQuery = "sum(rate(go_%s_server_count{app='%s'}[2m])) by (method)"
  18. costQuery = "avg(increase(go_%s_server_sum{app='%s'}[5m]) >0) by (method)"
  19. inPacketQuery = "irate(node_network_receive_packets{instance_name=~'%s', device!~'^(lo|bond).*'}[5m]) or irate(node_network_receive_packets_total{instance_name='~%s', device!~'^(lo|bond).*'}[5m])"
  20. outPacketQuery = "irate(node_network_transmit_packets{instance_name=~'%s', device!~\"^(lo|bond).*\"}[5m]) or irate(node_network_transmit_packets_total{instance_name=~'%s', device!~\"^(lo|bond).*\"}[5m])"
  21. inBoundQuery = "irate(node_network_receive_bytes{instance_name=~'%s', device!~\"^(lo|bond).*\"}[5m]) * 8 or irate(node_network_receive_bytes_total{instance_name=~'%s', device!~\"^(lo|bond).*\"}[5m]) * 8"
  22. outBoundQuery = "irate(node_network_transmit_bytes{instance_name=~'%s', device!~\"^(lo|bond).*\"}[5m]) * 8 or irate(node_network_transmit_bytes_total{instance_name=~'%s', device!~\"^(lo|bond).*\"}[5m]) * 8"
  23. tcpStatQuery = "max(node_tcp_stat{instance_name=~'%s', stat='ESTAB'}) by (stat)"
  24. producerQuery = "sum(increase(go_databus_counter{operation='producer_msg_speed'}[2m]))"
  25. consumerQuery = "sum(increase(go_databus_counter{operation='consumer_msg_speed'}[2m]))"
  26. )
  27. // PrometheusRes http result
  28. type PrometheusRes struct {
  29. RetCode int `json:"RetCode"`
  30. Data []*Prometheus `json:"data"`
  31. }
  32. // Prometheus .
  33. type Prometheus struct {
  34. Metric struct {
  35. Method string `json:"method"`
  36. } `json:"metric"`
  37. Values [][]interface{} `json:"values"`
  38. }
  39. // Max max value
  40. type Max struct {
  41. K string
  42. V int64
  43. }
  44. // CommonRes packet or bound result data
  45. type CommonRes struct {
  46. RetCode int `json:"RetCode"`
  47. Data []*Common `json:"data"`
  48. }
  49. // Common .
  50. type Common struct {
  51. Metric struct {
  52. InstanceName string `json:"instance_name"`
  53. } `json:"metric"`
  54. Value []interface{} `json:"value"`
  55. }
  56. // auth calc sign
  57. func (s *Service) auth(params url.Values) string {
  58. var (
  59. sortKey = make([]string, 0)
  60. hash = sha1.New()
  61. signature string
  62. str string
  63. )
  64. for key := range params {
  65. sortKey = append(sortKey, key)
  66. }
  67. sortkeys.Strings(sortKey)
  68. for _, key := range sortKey {
  69. str += key + params.Get(key)
  70. }
  71. str += s.c.Prometheus.Secret
  72. hash.Write([]byte(str))
  73. signature = hex.EncodeToString(hash.Sum(nil))
  74. return signature
  75. }
  76. // PrometheusProxy Get Prometheus Data
  77. func (s *Service) PrometheusProxy(c context.Context, params url.Values, res interface{}) (err error) {
  78. var (
  79. req *http.Request
  80. uri = s.c.Prometheus.URL
  81. )
  82. if req, err = s.client.NewRequest(http.MethodPost, uri, "", params); err != nil {
  83. log.Error("s.PrometheusProxy.NewRequest error(%v) params(%s)", err, params.Encode())
  84. return
  85. }
  86. if err = s.client.Do(c, req, res); err != nil {
  87. log.Error("s.PrometheusProxy.Client.Do error(%v) params(%s)", err, params.Encode())
  88. }
  89. return
  90. }
  91. // prometheus get prometheus data
  92. func (s *Service) prometheus(c context.Context, method string) (mts []*monitor.Monitor, err error) {
  93. var (
  94. sign, ins string
  95. names []string
  96. params = url.Values{}
  97. )
  98. ins, _ = s.GetInterfaces(c)
  99. mts = make([]*monitor.Monitor, 0)
  100. params.Add("Action", "GetPromDataRange")
  101. params.Add("PublicKey", s.c.Prometheus.Key)
  102. params.Add("DataSource", "app")
  103. sign = s.auth(params)
  104. params.Add("Signature", sign)
  105. date := time.Now().Format("2006-01-02")
  106. params.Set("Start", date+" 23:00:00")
  107. params.Set("End", date+" 23:00:10")
  108. params.Set("Step", "30")
  109. names = s.c.Apps.Name
  110. for _, name := range names {
  111. var (
  112. costRet = &PrometheusRes{}
  113. countRet = &PrometheusRes{}
  114. )
  115. params.Set("Query", fmt.Sprintf(costQuery, method, name))
  116. if err = s.PrometheusProxy(c, params, costRet); err != nil {
  117. return
  118. }
  119. params.Set("Query", fmt.Sprintf(countQuery, method, name))
  120. if err = s.PrometheusProxy(c, params, countRet); err != nil {
  121. return
  122. }
  123. for _, val := range costRet.Data {
  124. var (
  125. count float64
  126. api = val.Metric.Method
  127. )
  128. if api == "inner.Ping" || len(val.Values) < 1 || len(val.Values[0]) < 1 {
  129. continue
  130. }
  131. cost, _ := strconv.ParseFloat(val.Values[0][1].(string), 64)
  132. if int64(cost) < s.c.Apps.Max && !strings.Contains(ins, api) {
  133. continue
  134. }
  135. for _, v := range countRet.Data {
  136. if api == v.Metric.Method {
  137. count, _ = strconv.ParseFloat(v.Values[0][1].(string), 64)
  138. break
  139. }
  140. }
  141. mt := &monitor.Monitor{
  142. AppID: name + "-" + method,
  143. Interface: api,
  144. Count: int64(count),
  145. Cost: int64(cost),
  146. }
  147. mts = append(mts, mt)
  148. }
  149. }
  150. return
  151. }
  152. // GetInterfaces .
  153. func (s *Service) GetInterfaces(c context.Context) (string, error) {
  154. mt := &monitor.Monitor{}
  155. if err := s.DB.Select("group_concat(interface) as interface").Find(mt).Error; err != nil {
  156. log.Error("s.GetInterfaces query error(%v)", err)
  157. return "", err
  158. }
  159. return mt.Interface, nil
  160. }
  161. // RPCMonitor get rpc monitor data
  162. func (s *Service) RPCMonitor(c context.Context) ([]*monitor.Monitor, error) {
  163. return s.prometheus(c, "rpc")
  164. }
  165. // HTTPMonitor get http monitor data
  166. func (s *Service) HTTPMonitor(c context.Context) ([]*monitor.Monitor, error) {
  167. return s.prometheus(c, "http")
  168. }
  169. // DataBus return DataBus monitor data
  170. func (s *Service) DataBus(c context.Context) (mts []*monitor.Monitor, err error) {
  171. var (
  172. sign string
  173. params = url.Values{}
  174. proRet = &CommonRes{}
  175. conRet = &CommonRes{}
  176. )
  177. mts = make([]*monitor.Monitor, 0)
  178. sign = s.auth(params)
  179. params.Add("Action", "GetCurrentPromData")
  180. params.Add("PublicKey", s.c.Prometheus.Key)
  181. params.Add("DataSource", "app")
  182. params.Add("Signature", sign)
  183. params.Set("Query", producerQuery)
  184. if err = s.PrometheusProxy(c, params, proRet); err != nil {
  185. return
  186. }
  187. mts = append(mts, pack(proRet.Data, "kafka-databus", "producer")...)
  188. params.Set("Query", consumerQuery)
  189. if err = s.PrometheusProxy(c, params, conRet); err != nil {
  190. return
  191. }
  192. mts = append(mts, pack(conRet.Data, "kafka-databus", "consumer")...)
  193. return
  194. }
  195. // TenCent return TenCent monitor data
  196. func (s *Service) TenCent(c context.Context) (mts []*monitor.Monitor, err error) {
  197. return s.BroadCast(c, s.c.BroadCast.TenCent, "tencent_main")
  198. }
  199. // KingSoft return KingSoft monitor data
  200. func (s *Service) KingSoft(c context.Context) (mts []*monitor.Monitor, err error) {
  201. return s.BroadCast(c, s.c.BroadCast.KingSoft, "kingsoft_main")
  202. }
  203. // BroadCast return monitor data
  204. func (s *Service) BroadCast(c context.Context, appName []string, app string) (mts []*monitor.Monitor, err error) {
  205. var (
  206. sign string
  207. names string
  208. params = url.Values{}
  209. inPacket = &CommonRes{}
  210. outPacket = &CommonRes{}
  211. inBound = &CommonRes{}
  212. outBound = &CommonRes{}
  213. tcpStat = &CommonRes{}
  214. )
  215. mts = make([]*monitor.Monitor, 0)
  216. params.Add("Action", "GetCurrentPromData")
  217. params.Add("PublicKey", s.c.Prometheus.Key)
  218. sign = s.auth(params)
  219. params.Add("Signature", sign)
  220. names = strings.Join(appName, "|")
  221. params.Set("Query", fmt.Sprintf(inPacketQuery, names, names))
  222. params.Set("DataSource", app)
  223. if err = s.PrometheusProxy(c, params, inPacket); err != nil {
  224. return
  225. }
  226. mts = append(mts, pack(inPacket.Data, "", "InPacket")...)
  227. params.Set("Query", fmt.Sprintf(outPacketQuery, names, names))
  228. if err = s.PrometheusProxy(c, params, outPacket); err != nil {
  229. return
  230. }
  231. mts = append(mts, pack(outPacket.Data, "", "OutPacket")...)
  232. params.Set("Query", fmt.Sprintf(inBoundQuery, names, names))
  233. if err = s.PrometheusProxy(c, params, inBound); err != nil {
  234. return
  235. }
  236. mts = append(mts, pack(inBound.Data, "", "InBound")...)
  237. params.Set("Query", fmt.Sprintf(outBoundQuery, names, names))
  238. if err = s.PrometheusProxy(c, params, outBound); err != nil {
  239. return
  240. }
  241. mts = append(mts, pack(outBound.Data, "", "OutBound")...)
  242. for _, name := range appName {
  243. params.Set("Query", fmt.Sprintf(tcpStatQuery, name))
  244. if err = s.PrometheusProxy(c, params, tcpStat); err != nil {
  245. return
  246. }
  247. mts = append(mts, pack(tcpStat.Data, name, "ESTAB")...)
  248. }
  249. return
  250. }
  251. // pack .
  252. func pack(pts []*Common, K, V string) (mts []*monitor.Monitor) {
  253. for _, pt := range pts {
  254. if K != "" {
  255. pt.Metric.InstanceName = K
  256. }
  257. count, _ := strconv.ParseFloat(pt.Value[1].(string), 64)
  258. if count == 0 {
  259. continue
  260. }
  261. mt := &monitor.Monitor{
  262. AppID: pt.Metric.InstanceName,
  263. Interface: pt.Metric.InstanceName + "_" + V,
  264. Count: int64(count),
  265. }
  266. mts = append(mts, mt)
  267. }
  268. return mts
  269. }