service.go 7.1 KB


  1. package esports
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "time"
  7. "go-common/app/job/main/web-goblin/conf"
  8. "go-common/app/job/main/web-goblin/dao/esports"
  9. mdlesp "go-common/app/job/main/web-goblin/model/esports"
  10. arcclient "go-common/app/service/main/archive/api"
  11. arcmdl "go-common/app/service/main/archive/api"
  12. favrpc "go-common/app/service/main/favorite/api/gorpc"
  13. "go-common/app/service/main/favorite/model"
  14. mdlfav "go-common/app/service/main/favorite/model"
  15. "go-common/library/ecode"
  16. "go-common/library/log"
  17. "github.com/pkg/errors"
  18. )
  19. const (
  20. _favUsers = 1000
  21. _favTryTimes = 3
  22. _defContest = 0
  23. _linkinfo = "点击前往直播间"
  24. _pushinfo = "进入直播>>"
  25. _msgSize = 1000
  26. _tpMessage = 0
  27. _tpPush = 1
  28. _arcMaxLimit = 50
  29. )
  30. // Service struct
  31. type Service struct {
  32. c *conf.Config
  33. dao *esports.Dao
  34. // rpc
  35. fav *favrpc.Service
  36. arcClient arcclient.ArchiveClient
  37. }
  38. // New init
  39. func New(c *conf.Config) (s *Service) {
  40. s = &Service{
  41. c: c,
  42. dao: esports.New(c),
  43. fav: favrpc.New2(c.FavoriteRPC),
  44. }
  45. var err error
  46. if s.arcClient, err = arcclient.NewClient(c.ArcClient); err != nil {
  47. panic(err)
  48. }
  49. go s.contests()
  50. go s.contestsPush()
  51. go s.arcScore()
  52. return s
  53. }
  54. func (s *Service) contests() {
  55. var (
  56. err error
  57. contests []*mdlesp.Contest
  58. )
  59. for {
  60. stime := time.Now().Add(time.Duration(s.c.Rule.Before))
  61. etime := stime.Add(time.Duration(s.c.Rule.SleepInterval))
  62. if contests, err = s.dao.Contests(context.Background(), stime.Unix(), etime.Unix()); err != nil {
  63. time.Sleep(time.Second)
  64. log.Error("contests s.dao.Contests stime(%d) error(%v)", stime.Unix(), err)
  65. continue
  66. }
  67. for _, contest := range contests {
  68. tmpContest := contest
  69. go s.sendContests(tmpContest)
  70. }
  71. time.Sleep(time.Duration(s.c.Rule.SleepInterval))
  72. }
  73. }
  74. func (s *Service) contestsPush() {
  75. var (
  76. err error
  77. contests []*mdlesp.Contest
  78. )
  79. for {
  80. stime := time.Now()
  81. etime := stime.Add(time.Second)
  82. if contests, err = s.dao.Contests(context.Background(), stime.Unix(), etime.Unix()); err != nil {
  83. time.Sleep(time.Millisecond * 100)
  84. log.Error("contestsPush s.dao.Contests stime(%d) error(%v)", stime.Unix(), err)
  85. continue
  86. }
  87. for _, contest := range contests {
  88. tmpContest := contest
  89. go s.pubContests(tmpContest)
  90. }
  91. time.Sleep(time.Second)
  92. }
  93. }
  94. // Ping Service
  95. func (s *Service) Ping(c context.Context) (err error) {
  96. return s.dao.Ping(c)
  97. }
  98. // Close Service
  99. func (s *Service) Close() {
  100. s.dao.Close()
  101. }
  102. func (s *Service) sendContests(contest *mdlesp.Contest) (err error) {
  103. var (
  104. mids []int64
  105. msg string
  106. )
  107. link := fmt.Sprintf("#{%s}{\"https://live.bilibili.com/%d\"}", _linkinfo, contest.LiveRoom)
  108. if mids, msg, err = s.midsParams(contest, link, _tpMessage); err != nil {
  109. log.Error("sendContests s.midsParams(%+v) mids_total(%d) error(%v)", contest, len(mids), err)
  110. return
  111. }
  112. s.dao.Batch(mids, msg, contest, _msgSize, s.dao.SendMessage)
  113. return
  114. }
  115. func (s *Service) pubContests(contest *mdlesp.Contest) (err error) {
  116. var (
  117. mids []int64
  118. msg string
  119. )
  120. if mids, msg, err = s.midsParams(contest, _pushinfo, _tpPush); err != nil {
  121. log.Error("pubContests s.midsParams(%+v) mids_total(%d) error(%v)", contest, len(mids), err)
  122. return
  123. }
  124. s.dao.Batch(mids, msg, contest, s.c.Push.PartSize, s.dao.NoticeUser)
  125. return
  126. }
  127. func (s *Service) midsParams(contest *mdlesp.Contest, link string, tp int) (mids []int64, msg string, err error) {
  128. var (
  129. userList *mdlfav.UserList
  130. teams []*mdlesp.Team
  131. homeID, awayID int64
  132. tMap map[int64]string
  133. pageCount int
  134. )
  135. if userList, err = s.favUsers(contest.ID, 1); err != nil || userList == nil || len(userList.List) == 0 {
  136. log.Error("s.favUsers contestID(%v) first error(%+v)", contest.ID, err)
  137. return
  138. }
  139. ms := make(map[int64]struct{}, userList.Page.Total)
  140. for _, user := range userList.List {
  141. if _, ok := ms[user.Mid]; ok {
  142. continue
  143. }
  144. ms[user.Mid] = struct{}{}
  145. mids = append(mids, user.Mid)
  146. }
  147. if userList.Page.Size == 0 {
  148. pageCount = 0
  149. } else {
  150. pageCount = int(math.Ceil(float64(userList.Page.Total) / float64(userList.Page.Size)))
  151. }
  152. for i := 2; i <= pageCount; i++ {
  153. if userList, err = s.favUsers(contest.ID, i); err != nil || userList == nil || len(userList.List) == 0 {
  154. log.Error("s.favUsers contestID(%v) pn(%d) error(%+v)", contest.ID, i, err)
  155. err = nil
  156. continue
  157. }
  158. for _, user := range userList.List {
  159. if _, ok := ms[user.Mid]; ok {
  160. continue
  161. }
  162. ms[user.Mid] = struct{}{}
  163. mids = append(mids, user.Mid)
  164. }
  165. }
  166. if len(mids) == 0 {
  167. err = ecode.RequestErr
  168. return
  169. }
  170. tm := time.Unix(contest.Stime, 0)
  171. stime := tm.Format("2006-01-02 15:04:05")
  172. if contest.Special == _defContest {
  173. homeID = contest.HomeID
  174. awayID = contest.AwayID
  175. if teams, err = s.dao.Teams(context.Background(), homeID, awayID); err != nil || len(teams) == 0 {
  176. log.Error("midsParams s.dao.Teams homeID(%d) awayID(%d) error(%v)", homeID, awayID, err)
  177. return
  178. }
  179. tMap = make(map[int64]string, 2)
  180. for _, temp := range teams {
  181. tMap[temp.ID] = temp.Title
  182. }
  183. if tp == _tpMessage {
  184. msg = fmt.Sprintf(s.c.Rule.AlertBodyDefault, contest.SeasonTitle, stime, tMap[contest.HomeID], tMap[contest.AwayID], link)
  185. } else if tp == _tpPush {
  186. msg = fmt.Sprintf(s.c.Push.BodyDefault, contest.SeasonTitle, tMap[contest.HomeID], tMap[contest.AwayID], link)
  187. }
  188. } else {
  189. if tp == _tpMessage {
  190. msg = fmt.Sprintf(s.c.Rule.AlertBodySpecial, contest.SeasonTitle, stime, contest.SpecialName, link)
  191. } else if tp == _tpPush {
  192. msg = fmt.Sprintf(s.c.Push.BodySpecial, contest.SeasonTitle, contest.SpecialName, link)
  193. }
  194. }
  195. count := len(mids)
  196. log.Info("midsParams get contest cid(%d) users number(%d)", contest.ID, count)
  197. return
  198. }
  199. func (s *Service) favUsers(cid int64, pn int) (res *mdlfav.UserList, err error) {
  200. for i := 0; i < _favTryTimes; i++ {
  201. if res, err = s.fav.Users(context.Background(), &model.ArgUsers{Type: model.TypeEsports, Oid: cid, Pn: pn, Ps: _favUsers}); err == nil {
  202. break
  203. }
  204. time.Sleep(time.Millisecond * 100)
  205. }
  206. if err != nil {
  207. err = errors.Wrapf(err, "favUsers s.fav.Users cid(%d) pn(%d)", cid, pn)
  208. }
  209. return
  210. }
  211. func (s *Service) arcScore() {
  212. var (
  213. id int64
  214. c = context.Background()
  215. )
  216. for {
  217. av, err := s.dao.Arcs(c, id, _arcMaxLimit)
  218. if err != nil {
  219. log.Error("ArcScore s.dao.Arcs ID(%d) Limit(%d) error(%v)", id, _arcMaxLimit, err)
  220. id = id + int64(_arcMaxLimit)
  221. time.Sleep(time.Second)
  222. continue
  223. }
  224. if len(av) == 0 {
  225. id = 0
  226. time.Sleep(time.Duration(s.c.Rule.ScoreSleep))
  227. continue
  228. }
  229. go s.upArcScore(c, av)
  230. id = av[len(av)-1].ID
  231. time.Sleep(time.Second)
  232. }
  233. }
  234. func (s *Service) upArcScore(c context.Context, partArcs []*mdlesp.Arc) (err error) {
  235. var (
  236. partAids []int64
  237. arcsReply *arcmdl.ArcsReply
  238. )
  239. for _, arc := range partArcs {
  240. partAids = append(partAids, arc.Aid)
  241. }
  242. if len(partAids) == 0 {
  243. return
  244. }
  245. if arcsReply, err = s.arcClient.Arcs(c, &arcmdl.ArcsRequest{Aids: partAids}); err != nil || arcsReply == nil {
  246. log.Error("upArcScore s.arcClient.Arcs(%v) error(%v)", partAids, err)
  247. return
  248. }
  249. if len(arcsReply.Arcs) > 0 {
  250. if err = s.dao.UpArcScore(c, partArcs, arcsReply.Arcs); err != nil {
  251. log.Error("upArcScore s.dao.UpArcScore arcs(%+v) error(%v)", arcsReply, err)
  252. }
  253. }
  254. return
  255. }