upstream-summary.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package dao
  2. import (
  3. "context"
  4. "github.com/pkg/errors"
  5. "go-common/app/service/video/stream-mng/model"
  6. "go-common/library/database/sql"
  7. )
  8. // 存储上行调度信息
  9. const (
  10. _insertUpStreamInfo = "INSERT INTO `upstream_info` (room_id,cdn,platform,ip,country,city,isp) values(?,?,?,?,?,?,?)"
  11. _getSummaryUpStreamRtmp = "SELECT cdn, count(id) as value FROM upstream_info where mtime >= FROM_UNIXTIME(?) AND mtime <= FROM_UNIXTIME(?) group by cdn"
  12. _getSummaryUpStreamISP = "SELECT isp, count(id) as value FROM upstream_info where mtime >= FROM_UNIXTIME(?) AND mtime <= FROM_UNIXTIME(?) group by isp"
  13. _getSummaryUpStreamCountry = "SELECT country, count(id) as value FROM upstream_info where mtime >= FROM_UNIXTIME(?) AND mtime <= FROM_UNIXTIME(?) group by country"
  14. _getSummaryUpStreamPlatform = "SELECT platform, count(id) as value FROM upstream_info where mtime >= FROM_UNIXTIME(?) AND mtime <= FROM_UNIXTIME(?) group by platform"
  15. _getSummaryUpStreamCity = "SELECT city, count(id) as value FROM upstream_info where mtime >= FROM_UNIXTIME(?) AND mtime <= FROM_UNIXTIME(?) group by city"
  16. )
  17. // CreateUpStreamDispatch 创建一条上行调度信息
  18. func (d *Dao) CreateUpStreamDispatch(c context.Context, info *model.UpStreamInfo) error {
  19. _, err := d.stmtUpStreamDispatch.Exec(c, info.RoomID, info.CDN, info.PlatForm, info.IP, info.Country, info.City, info.ISP)
  20. return err
  21. }
  22. // GetSummaryUpStreamRtmp 得到统计信息
  23. func (d *Dao) GetSummaryUpStreamRtmp(c context.Context, start int64, end int64) (infos []*model.SummaryUpStreamRtmp, err error) {
  24. res := []*model.SummaryUpStreamRtmp{}
  25. var rows *sql.Rows
  26. if rows, err = d.tidb.Query(c, _getSummaryUpStreamRtmp, start, end); err != nil {
  27. err = errors.WithStack(err)
  28. return
  29. }
  30. defer rows.Close()
  31. for rows.Next() {
  32. info := new(model.SummaryUpStreamRtmp)
  33. if err = rows.Scan(&info.CDN, &info.Count); err != nil {
  34. err = errors.WithStack(err)
  35. infos = nil
  36. return
  37. }
  38. res = append(res, info)
  39. }
  40. err = rows.Err()
  41. return res, err
  42. }
  43. // GetSummaryUpStreamISP 得到ISP统计信息
  44. func (d *Dao) GetSummaryUpStreamISP(c context.Context, start int64, end int64) (infos []*model.SummaryUpStreamRtmp, err error) {
  45. res := []*model.SummaryUpStreamRtmp{}
  46. var rows *sql.Rows
  47. if rows, err = d.tidb.Query(c, _getSummaryUpStreamISP, start, end); err != nil {
  48. err = errors.WithStack(err)
  49. return
  50. }
  51. defer rows.Close()
  52. for rows.Next() {
  53. info := new(model.SummaryUpStreamRtmp)
  54. if err = rows.Scan(&info.ISP, &info.Count); err != nil {
  55. err = errors.WithStack(err)
  56. infos = nil
  57. return
  58. }
  59. res = append(res, info)
  60. }
  61. err = rows.Err()
  62. return res, err
  63. }
  64. // GetSummaryUpStreamCountry 得到Country统计信息
  65. func (d *Dao) GetSummaryUpStreamCountry(c context.Context, start int64, end int64) (infos []*model.SummaryUpStreamRtmp, err error) {
  66. res := []*model.SummaryUpStreamRtmp{}
  67. var rows *sql.Rows
  68. if rows, err = d.tidb.Query(c, _getSummaryUpStreamCountry, start, end); err != nil {
  69. err = errors.WithStack(err)
  70. return
  71. }
  72. defer rows.Close()
  73. for rows.Next() {
  74. info := new(model.SummaryUpStreamRtmp)
  75. if err = rows.Scan(&info.Country, &info.Count); err != nil {
  76. err = errors.WithStack(err)
  77. infos = nil
  78. return
  79. }
  80. res = append(res, info)
  81. }
  82. err = rows.Err()
  83. return res, err
  84. }
  85. // GetSummaryUpStreamPlatform 得到Platform统计信息
  86. func (d *Dao) GetSummaryUpStreamPlatform(c context.Context, start int64, end int64) (infos []*model.SummaryUpStreamRtmp, err error) {
  87. res := []*model.SummaryUpStreamRtmp{}
  88. var rows *sql.Rows
  89. if rows, err = d.tidb.Query(c, _getSummaryUpStreamPlatform, start, end); err != nil {
  90. err = errors.WithStack(err)
  91. return
  92. }
  93. defer rows.Close()
  94. for rows.Next() {
  95. info := new(model.SummaryUpStreamRtmp)
  96. if err = rows.Scan(&info.PlatForm, &info.Count); err != nil {
  97. err = errors.WithStack(err)
  98. infos = nil
  99. return
  100. }
  101. res = append(res, info)
  102. }
  103. err = rows.Err()
  104. return res, err
  105. }
  106. // GetSummaryUpStreamCity 得到City统计信息
  107. func (d *Dao) GetSummaryUpStreamCity(c context.Context, start int64, end int64) (infos []*model.SummaryUpStreamRtmp, err error) {
  108. res := []*model.SummaryUpStreamRtmp{}
  109. var rows *sql.Rows
  110. if rows, err = d.tidb.Query(c, _getSummaryUpStreamCity, start, end); err != nil {
  111. err = errors.WithStack(err)
  112. return
  113. }
  114. defer rows.Close()
  115. for rows.Next() {
  116. info := new(model.SummaryUpStreamRtmp)
  117. if err = rows.Scan(&info.City, &info.Count); err != nil {
  118. err = errors.WithStack(err)
  119. infos = nil
  120. return
  121. }
  122. res = append(res, info)
  123. }
  124. err = rows.Err()
  125. return res, err
  126. }