push.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "encoding/hex"
  7. "fmt"
  8. "go-common/app/interface/live/push-live/model"
  9. "go-common/library/log"
  10. "go-common/library/xstr"
  11. "io"
  12. "mime/multipart"
  13. "net/http"
  14. "net/url"
  15. "sort"
  16. "strconv"
  17. "time"
  18. )
  19. type _response struct {
  20. Code int `json:"code"`
  21. Data int `json:"data"`
  22. }
  23. // BatchPush 批量推送,失败重试
  24. func (d *Dao) BatchPush(fans *[]int64, task *model.ApPushTask) (total int) {
  25. limit := d.c.Push.PushOnceLimit
  26. retry := d.c.Push.PushRetryTimes
  27. var times int
  28. for {
  29. var (
  30. mids []int64
  31. err error
  32. )
  33. uuid := d.GetUUID(task, times)
  34. l := len(*fans)
  35. if l == 0 {
  36. break
  37. } else if l <= limit {
  38. mids = (*fans)[:l]
  39. } else {
  40. mids = (*fans)[:limit]
  41. l = limit
  42. }
  43. *fans = (*fans)[l:]
  44. for i := 0; i < retry; i++ {
  45. // 每次投递成功结束循环
  46. if err = d.Push(mids, task, uuid); err == nil {
  47. total += len(mids) //单次投递成功数
  48. break
  49. }
  50. time.Sleep(time.Duration(time.Second * 3))
  51. }
  52. times++
  53. if err != nil {
  54. // 重试若干次仍然失败,需要记录日志并且配置elk告警
  55. log.Error("[dao.push|BatchPush] retry push failed. error(%+v), retry times(%d), task(%+v)", err, retry, task)
  56. }
  57. }
  58. return
  59. }
  60. // Push 调用推送接口
  61. func (d *Dao) Push(fans []int64, task *model.ApPushTask, uuid string) (err error) {
  62. if len(fans) == 0 {
  63. log.Info("[dao.push|Push] empty fans. task(%+v)", task)
  64. return
  65. }
  66. // 业务参数
  67. businessID, token := d.getPushBusiness(task.Group)
  68. buf := new(bytes.Buffer)
  69. w := multipart.NewWriter(buf)
  70. w.WriteField("app_id", strconv.Itoa(d.c.Push.AppID))
  71. w.WriteField("business_id", strconv.Itoa(businessID))
  72. w.WriteField("alert_title", d.GetPushTemplate(task.Group, task.AlertTitle))
  73. w.WriteField("alert_body", task.AlertBody)
  74. w.WriteField("mids", xstr.JoinInts(fans))
  75. w.WriteField("link_type", strconv.Itoa(task.LinkType))
  76. w.WriteField("link_value", task.LinkValue)
  77. w.WriteField("expire_time", strconv.Itoa(task.ExpireTime))
  78. w.WriteField("group", task.Group)
  79. w.WriteField("uuid", uuid)
  80. w.Close()
  81. // 签名
  82. query := map[string]string{
  83. "ts": strconv.FormatInt(time.Now().Unix(), 10),
  84. "appkey": d.c.HTTPClient.Key,
  85. }
  86. query["sign"] = d.getSign(query, d.c.HTTPClient.Secret)
  87. requestURL := fmt.Sprintf("%s?ts=%s&appkey=%s&sign=%s", d.c.Push.MultiAPI, query["ts"], query["appkey"], query["sign"])
  88. // request
  89. req, err := http.NewRequest(http.MethodPost, requestURL, buf)
  90. if err != nil {
  91. log.Error("[dao.push|Push] http.NewRequest error(%+v), url(%s), uuid(%s), task(%+v)",
  92. err, requestURL, uuid, task)
  93. PromError("[dao.push|Push] http:NewRequest")
  94. return
  95. }
  96. req.Header.Set("Content-Type", w.FormDataContentType())
  97. req.Header.Set("Authorization", "token="+token)
  98. res := &_response{}
  99. if err = d.httpClient.Do(context.TODO(), req, res); err != nil {
  100. log.Error("[dao|push|Push] httpClient.Do error(%+v), url(%s), uuid(%s), task(%+v)",
  101. err, requestURL, uuid, task)
  102. PromError("[dao.push|Push] http:Do")
  103. return
  104. }
  105. // response
  106. if res.Code != 0 || res.Data == 0 {
  107. log.Error("[dao.push|Push] push failed. url(%s), uuid(%s), response(%+v), task(%+v)", requestURL, uuid, res, task)
  108. err = fmt.Errorf("[dao.push|Push] push failed. url(%s), uuid(%s), response(%+v), task(%+v)", requestURL, uuid, res, task)
  109. } else {
  110. log.Info("[dao.push|Push] push success. url(%s), uuid(%s), response(%+v), task(%+v)", requestURL, uuid, res, task)
  111. }
  112. return
  113. }
  114. // GetPushTemplate 根据类型返回不同的推送文案
  115. func (d *Dao) GetPushTemplate(group string, part string) (template string) {
  116. switch group {
  117. case model.SpecialGroup:
  118. template = fmt.Sprintf(d.c.Push.SpecialCopyWriting, part)
  119. case model.AttentionGroup:
  120. template = fmt.Sprintf(d.c.Push.DefaultCopyWriting, part)
  121. default:
  122. template = part
  123. }
  124. return
  125. }
  126. // GetUUID 构造一个每次请求的uuid
  127. func (d *Dao) GetUUID(task *model.ApPushTask, times int) string {
  128. var b bytes.Buffer
  129. b.WriteString(strconv.Itoa(times))
  130. b.WriteString(task.Group) // Group必须加入uuid计算,区分单次开播提醒关注与特别关注
  131. b.WriteString(strconv.Itoa(d.c.Push.BusinessID))
  132. b.WriteString(strconv.FormatInt(task.TargetID, 10))
  133. b.WriteString(strconv.Itoa(task.ExpireTime))
  134. b.WriteString(strconv.FormatInt(time.Now().UnixNano(), 10))
  135. mh := md5.Sum(b.Bytes())
  136. uuid := hex.EncodeToString(mh[:])
  137. return uuid
  138. }
  139. // getSign 获取签名
  140. func (d *Dao) getSign(params map[string]string, secret string) (sign string) {
  141. keys := []string{}
  142. for k := range params {
  143. keys = append(keys, k)
  144. }
  145. sort.Strings(keys)
  146. buf := bytes.Buffer{}
  147. for _, k := range keys {
  148. if buf.Len() > 0 {
  149. buf.WriteByte('&')
  150. }
  151. buf.WriteString(url.QueryEscape(k) + "=")
  152. buf.WriteString(url.QueryEscape(params[k]))
  153. }
  154. hash := md5.New()
  155. io.WriteString(hash, buf.String()+secret)
  156. return fmt.Sprintf("%x", hash.Sum(nil))
  157. }
  158. // getPushBusiness 获取推送配置
  159. func (d *Dao) getPushBusiness(group string) (businessID int, token string) {
  160. // 预约走单独的白名单通道,business id 和token不一样
  161. if group == "activity_appointment" {
  162. businessID = 41
  163. token = "13aoowdzm0u8pcqdoulvj5vdkihohtcj"
  164. } else {
  165. businessID = d.c.Push.BusinessID
  166. token = d.c.Push.BusinessToken
  167. }
  168. return
  169. }