monitor.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "go-common/app/admin/main/apm/model/monitor"
  8. "go-common/library/log"
  9. xtime "go-common/library/time"
  10. )
  11. // AddMonitor get monitor data and insert
  12. func (s *Service) AddMonitor(c context.Context) (err error) {
  13. var (
  14. rpc = make([]*monitor.Monitor, 0)
  15. http = make([]*monitor.Monitor, 0)
  16. tcs = make([]*monitor.Monitor, 0)
  17. dabs = make([]*monitor.Monitor, 0)
  18. key = make([]string, 0)
  19. value = make([]interface{}, 0)
  20. mt = &monitor.Monitor{}
  21. insertSQL = "INSERT INTO `monitor`(`app_id`, `interface`, `count`, `cost`) VALUES %s"
  22. )
  23. if rpc, err = s.RPCMonitor(c); err != nil {
  24. return
  25. }
  26. if http, err = s.HTTPMonitor(c); err != nil {
  27. return
  28. }
  29. if mt, err = s.Members(c); err != nil {
  30. return
  31. }
  32. if tcs, err = s.TenCent(c); err != nil {
  33. return
  34. }
  35. if dabs, err = s.DataBus(c); err != nil {
  36. return
  37. }
  38. rpc = append(rpc, http...)
  39. rpc = append(rpc, dabs...)
  40. rpc = append(rpc, tcs...)
  41. rpc = append(rpc, mt)
  42. if len(rpc) == 0 {
  43. err = fmt.Errorf("monitor data empty")
  44. log.Error("s.AddMonitor error(%v)", err)
  45. return
  46. }
  47. for _, mt := range rpc {
  48. key = append(key, "(?, ?, ?, ?)")
  49. value = append(value, mt.AppID, mt.Interface, mt.Count, mt.Cost)
  50. }
  51. return s.DB.Model(&monitor.Monitor{}).Exec(fmt.Sprintf(insertSQL, strings.Join(key, ",")), value...).Error
  52. }
  53. // AppNameList return app list
  54. func (s *Service) AppNameList(c context.Context) (apps []string) {
  55. return s.c.Apps.Name
  56. }
  57. // PrometheusList return PrometheusList data
  58. func (s *Service) PrometheusList(c context.Context, app, method, mType string) (ret *monitor.MoniRet, err error) {
  59. var (
  60. mt = &monitor.Monitor{}
  61. mts = make([]*monitor.Monitor, 0)
  62. )
  63. if err = s.DB.Select("interface,count,cost,mtime").Where("app_id = ?", app+"-"+method).Group("mtime,interface").Order("interface,mtime").Find(&mts).Error; err != nil {
  64. log.Error("s.PrometheusList query all error(%v)", err)
  65. return
  66. }
  67. if len(mts) < 1 {
  68. return
  69. }
  70. if err = s.DB.Where("app_id = ?", app+"-"+method).First(mt).Error; err != nil {
  71. log.Error("s.Prometheus query first error(%v)", err)
  72. return
  73. }
  74. return merge(s.packing(mts), s.times(mt.MTime), mType), err
  75. }
  76. // BroadCastList return BroadCastList data
  77. func (s *Service) BroadCastList(c context.Context) (ret *monitor.MoniRet, err error) {
  78. var (
  79. mt = &monitor.Monitor{}
  80. mts = make([]*monitor.Monitor, 0)
  81. )
  82. if err = s.DB.Select("substring_index(interface, '_', -1) as temp_name,sum(count) as count,mtime").Where("interface REGEXP 'ESTAB|InBound|OutBound|InPacket|OutPacket$'").Group("mtime,temp_name").Order("temp_name,mtime").Find(&mts).Error; err != nil {
  83. log.Error("s.BroadCastList query error(%v)", err)
  84. return
  85. }
  86. if err = s.DB.Where("interface REGEXP 'ESTAB|InBound|OutBound|InPacket|OutPacket$'").First(mt).Error; err != nil {
  87. log.Error("s.BroadCastList query first error(%v)", err)
  88. return
  89. }
  90. return merge(s.packing(mts), s.times(mt.MTime), "count"), err
  91. }
  92. // DataBusList return DataBusList data
  93. func (s *Service) DataBusList(c context.Context) (ret *monitor.MoniRet, err error) {
  94. var (
  95. mts = make([]*monitor.Monitor, 0)
  96. mt = &monitor.Monitor{}
  97. )
  98. if err = s.DB.Select("interface,count,mtime").Where("app_id=?", "kafka-databus").Group("mtime,interface").Order("interface,mtime").Find(&mts).Error; err != nil {
  99. log.Error("s.MonitorList query error(%v)", err)
  100. return
  101. }
  102. if len(mts) < 1 {
  103. return
  104. }
  105. if err = s.DB.Where("app_id=?", "kafka-databus").First(mt).Error; err != nil {
  106. log.Error("s.DataBusList query first error(%v)", err)
  107. return
  108. }
  109. return merge(s.packing(mts), s.times(mt.MTime), "count"), err
  110. }
  111. // OnlineList return online data
  112. func (s *Service) OnlineList(c context.Context) (ret *monitor.MoniRet, err error) {
  113. var (
  114. mts = make([]*monitor.Monitor, 0)
  115. mt = &monitor.Monitor{}
  116. )
  117. if err = s.DB.Select("interface,count,mtime").Where("app_id=?", "online").Find(&mts).Error; err != nil {
  118. log.Error("s.OnlineList query error(%v)", err)
  119. }
  120. if len(mts) < 1 {
  121. return
  122. }
  123. if err = s.DB.Where("app_id=?", "online").First(mt).Error; err != nil {
  124. log.Error("s.OnlineList query error(%v)", err)
  125. }
  126. return merge(s.packing(mts), s.times(mt.MTime), "count"), err
  127. }
  128. // merge .
  129. func merge(dts []*monitor.Data, strs []string, mType string) (ret *monitor.MoniRet) {
  130. items := make([]*monitor.Items, 0)
  131. for _, dt := range dts {
  132. var (
  133. yAxis []int64
  134. item = &monitor.Items{}
  135. )
  136. if mType == "count" {
  137. yAxis = formatArray(strs, dt.Times, dt.Counts)
  138. } else {
  139. yAxis = formatArray(strs, dt.Times, dt.Costs)
  140. }
  141. item.Interface = dt.Interface
  142. item.YAxis = yAxis
  143. items = append(items, item)
  144. }
  145. if len(items) > 0 {
  146. ret = &monitor.MoniRet{
  147. XAxis: strs,
  148. Items: items,
  149. }
  150. }
  151. return
  152. }
  153. // formatArray formatArray missing data by time
  154. func formatArray(strs, times []string, counts []int64) []int64 {
  155. var newCounts []int64
  156. for _, str := range strs {
  157. if len(counts) < 1 {
  158. break
  159. }
  160. if ok, index := inArray(times, str); ok {
  161. newCounts = append(newCounts, counts[index])
  162. } else {
  163. newCounts = append(newCounts, 0)
  164. }
  165. }
  166. return newCounts
  167. }
  168. // inArray check key in array or not and return position
  169. func inArray(arrays []string, key string) (bool, int) {
  170. for index, arr := range arrays {
  171. if key == arr {
  172. return true, index
  173. }
  174. }
  175. return false, 0
  176. }
  177. // times return a standard string time array
  178. func (s *Service) times(t xtime.Time) []string {
  179. var (
  180. nextDay string
  181. tList = []string{t.Time().Format("2006-01-02")}
  182. curDay = time.Now().Format("2006-01-02")
  183. nextTime = t.Time().Add(24 * time.Hour)
  184. )
  185. for {
  186. nextDay = nextTime.Format("2006-01-02")
  187. tList = append(tList, nextDay)
  188. nextTime = nextTime.Add(24 * time.Hour)
  189. if nextDay == curDay {
  190. break
  191. }
  192. }
  193. return tList
  194. }
  195. // packing .
  196. func (s *Service) packing(mts []*monitor.Monitor) (data []*monitor.Data) {
  197. d := &monitor.Data{}
  198. for k, mt := range mts {
  199. if mt.Interface == "" {
  200. mt.Interface = mt.TempName
  201. }
  202. if d.Interface != mt.Interface {
  203. if d.Interface != "" {
  204. data = append(data, d)
  205. }
  206. d = &monitor.Data{
  207. Interface: mt.Interface,
  208. Counts: []int64{mt.Count},
  209. Costs: []int64{mt.Cost},
  210. Times: []string{mt.MTime.Time().Format("2006-01-02")},
  211. }
  212. continue
  213. }
  214. d.Counts = append(d.Counts, mt.Count)
  215. d.Costs = append(d.Costs, mt.Cost)
  216. d.Times = append(d.Times, mt.MTime.Time().Format("2006-01-02"))
  217. if k == len(mts)-1 {
  218. data = append(data, d)
  219. }
  220. }
  221. return
  222. }