service.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package service
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "go-common/app/interface/live/push-live/conf"
  12. "go-common/app/interface/live/push-live/dao"
  13. "go-common/library/cache/redis"
  14. "go-common/library/log"
  15. "go-common/library/queue/databus"
  16. )
  17. var (
  18. _limitDecreaseUUIDKey = "ld:%s" // 接口请求防重复key
  19. errLimitRequestRepeat = errors.New("limit decrease request repeat")
  20. errConvertMidString = errors.New("convert mid string error")
  21. errConvertBusiness = errors.New("convert business error")
  22. )
  23. // Service struct
  24. type Service struct {
  25. c *conf.Config
  26. dao *dao.Dao
  27. liveStartSub *databus.Databus
  28. liveCommonSub *databus.Databus
  29. wg sync.WaitGroup
  30. closeCh chan bool
  31. pushTypes []string
  32. intervalExpired int32
  33. mutex sync.RWMutex
  34. }
  35. // New init
  36. func New(c *conf.Config) (s *Service) {
  37. s = &Service{
  38. c: c,
  39. dao: dao.New(c),
  40. liveStartSub: databus.New(c.LiveRoomSub),
  41. liveCommonSub: databus.New(c.LiveCommonSub),
  42. closeCh: make(chan bool),
  43. pushTypes: make([]string, 0, 4),
  44. mutex: sync.RWMutex{},
  45. }
  46. s.wg.Add(1)
  47. go s.loadPushConfig()
  48. for i := 0; i < c.Push.ConsumerProcNum; i++ {
  49. s.wg.Add(1)
  50. go s.liveMessageConsumeproc()
  51. }
  52. return s
  53. }
  54. // loadPushConfig Load push config
  55. func (s *Service) loadPushConfig() {
  56. var ctx = context.TODO()
  57. defer s.wg.Done()
  58. for {
  59. select {
  60. case _, ok := <-s.closeCh:
  61. if !ok {
  62. log.Info("[service.push|loadPushConfig] s.loadPushConfig is closed by closeCh")
  63. return
  64. }
  65. default:
  66. }
  67. // get push delay time
  68. interval, err := s.dao.GetPushInterval(ctx)
  69. if err != nil || interval < 0 {
  70. time.Sleep(time.Duration(time.Minute))
  71. continue
  72. }
  73. s.mutex.Lock()
  74. s.intervalExpired = interval
  75. s.mutex.Unlock()
  76. // get push options
  77. types, err := s.dao.GetPushConfig(ctx)
  78. if err != nil || len(types) == 0 {
  79. time.Sleep(time.Duration(time.Minute))
  80. continue
  81. }
  82. s.mutex.Lock()
  83. s.pushTypes = types
  84. s.mutex.Unlock()
  85. time.Sleep(time.Duration(time.Minute))
  86. }
  87. }
  88. // safeGetExpired
  89. func (s *Service) safeGetExpired() int32 {
  90. s.mutex.RLock()
  91. expired := s.intervalExpired
  92. s.mutex.RUnlock()
  93. return expired
  94. }
  95. // LimitDecrease do mid string limit decrease
  96. func (s *Service) LimitDecrease(ctx context.Context, business, targetID, uuid, midStr string) (err error) {
  97. var (
  98. f *dao.Filter
  99. mids []int64
  100. b int
  101. )
  102. // 判断请求是否重复
  103. err = s.limitDecreaseUnique(getUniqueKey(business, targetID, uuid))
  104. if err != nil {
  105. log.Error("[service.service|LimitDecrease] limitDecreaseUnique error(%v), uuid(%s), business(%s), targetID(%s), mid(%s)",
  106. err, uuid, business, targetID, midStr)
  107. return
  108. }
  109. b, err = strconv.Atoi(business)
  110. if err != nil {
  111. log.Error("[service.service|LimitDecrease] strconv business params error(%v)", err)
  112. err = errConvertBusiness
  113. return
  114. }
  115. filterConf := &dao.FilterConfig{
  116. Business: b,
  117. DailyExpired: dailyExpired(time.Now())}
  118. // convert mid string to []int64
  119. mids, err = s.convertStrToInt64(midStr)
  120. if err != nil {
  121. log.Error("[service.service|LimitDecrease] convertStrToInt64 error(%v), business(%s), uuid(%s), mids(%s)",
  122. err, business, uuid, midStr)
  123. err = errConvertMidString
  124. return
  125. }
  126. // aysnc decrease limit
  127. f, err = s.dao.NewFilter(filterConf)
  128. if err != nil {
  129. log.Error("[service.service|LimitDecrease] new filter error(%v), business(%s), uuid(%s), mids(%v)",
  130. err, business, uuid, mids)
  131. return
  132. }
  133. go f.BatchDecreaseLimit(ctx, mids)
  134. return
  135. }
  136. // Ping Service
  137. func (s *Service) Ping(c context.Context) (err error) {
  138. return nil
  139. }
  140. // Close Service
  141. func (s *Service) Close() {
  142. close(s.closeCh)
  143. s.subClose()
  144. s.wg.Wait()
  145. s.dao.Close()
  146. }
  147. // subClose Close all sub channels
  148. func (s *Service) subClose() {
  149. s.liveCommonSub.Close()
  150. s.liveStartSub.Close()
  151. }
  152. // dailyExpired
  153. func dailyExpired(from time.Time) float64 {
  154. tm1 := time.Date(from.Year(), from.Month(), from.Day(), 0, 0, 0, 0, from.Location())
  155. tm2 := tm1.AddDate(0, 0, 1)
  156. return math.Floor(tm2.Sub(from).Seconds())
  157. }
  158. // convertStrToInt64 convert mid string to []int64 slice
  159. func (s *Service) convertStrToInt64(m string) (mInts []int64, err error) {
  160. var (
  161. mSplit []string
  162. errCount int
  163. )
  164. if m == "" {
  165. return
  166. }
  167. mSplit = strings.Split(m, ",")
  168. for _, mStr := range mSplit {
  169. mInt, convErr := strconv.Atoi(mStr)
  170. if convErr != nil {
  171. log.Error("[service.push|formatMidstr] convert mid(%v), error(%v)", mStr, convErr)
  172. errCount++
  173. continue
  174. }
  175. mInts = append(mInts, int64(mInt))
  176. }
  177. if errCount == len(mSplit) {
  178. err = fmt.Errorf("[service.push|formatMidstr] convert all mid failed, midstr(%s)", m)
  179. }
  180. return
  181. }
  182. // limitDecreaseUnique
  183. func (s *Service) limitDecreaseUnique(key string) (err error) {
  184. var (
  185. conn redis.Conn
  186. reply interface{}
  187. )
  188. defer func() {
  189. if conn != nil {
  190. conn.Close()
  191. }
  192. }()
  193. conn, err = redis.Dial(s.c.Redis.PushInterval.Proto, s.c.Redis.PushInterval.Addr, s.dao.RedisOption()...)
  194. if err != nil {
  195. log.Error("[service.service|limitDecreaseUnique] redis.Dial error(%v)", err)
  196. return
  197. }
  198. // redis cache exists judgement
  199. reply, err = conn.Do("SET", key, time.Now(), "EX", dailyExpired(time.Now()), "NX")
  200. if err != nil {
  201. return
  202. }
  203. // key exists
  204. if reply == nil {
  205. err = errLimitRequestRepeat
  206. return
  207. }
  208. return
  209. }
  210. // getUniqueKey get request unique key
  211. func getUniqueKey(a, b, c string) string {
  212. return fmt.Sprintf(_limitDecreaseUUIDKey, a+b+c)
  213. }