fan_group.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package dao
  2. import (
  3. "bytes"
  4. "encoding/hex"
  5. "fmt"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "go-common/app/interface/main/push-archive/conf"
  11. "go-common/app/interface/main/push-archive/model"
  12. "go-common/library/log"
  13. )
  14. // FanGroup 粉丝分组
  15. type FanGroup struct {
  16. // 组名
  17. Name string
  18. // 粉丝与up主的关注关系
  19. RelationType int
  20. Hitby string
  21. // 限制条数
  22. Limit int
  23. PerUpperLimit int
  24. LimitExpire int32
  25. // 本组获取粉丝的hbase信息
  26. HBaseTable string
  27. HBaseFamily []string
  28. MsgTemplateDesc string
  29. MsgTemplate string
  30. }
  31. func fanGroupKey(relationType int, name string) string {
  32. return fmt.Sprintf(`%d#%s`, relationType, name)
  33. }
  34. // NewFanGroups 实例化,验证配置, 若配置错误,则panic
  35. func NewFanGroups(config *conf.Config) (grp map[string]*FanGroup) {
  36. grp = make(map[string]*FanGroup)
  37. for _, g := range config.ArcPush.FanGroup {
  38. if g.Name == "" {
  39. log.Error("NewFanGroups config ArcPush.FanGroup.Name/hitby must not be empty")
  40. break
  41. }
  42. // 粉丝和up主的关系配置验证
  43. if g.RelationType != model.RelationAttention && g.RelationType != model.RelationSpecial {
  44. log.Error("NewFanGroups config ArcPush.FanGroup.RelationType not exist(%d)", g.RelationType)
  45. break
  46. }
  47. if g.Hitby != model.GroupDataTypeDefault && g.Hitby != model.GroupDataTypeHBase &&
  48. g.Hitby != model.GroupDataTypeAbtest && g.Hitby != model.GroupDataTypeAbComparison {
  49. log.Error("NewFanGroups config ArcPush.FanGroup.hitby(%s) must in [default,hbase]", g.Hitby)
  50. break
  51. }
  52. key := fanGroupKey(g.RelationType, g.Name)
  53. if _, ok := grp[key]; ok {
  54. log.Error("NewFanGroups config ArcPush.FanGroup.relationtype(%d) and name(%s) must be unique", g.RelationType, g.Name)
  55. break
  56. }
  57. // hbase配置
  58. if g.HBaseTable != "" && len(g.HBaseFamily) == 0 {
  59. log.Error("NewFanGroups config ArcPush.FanGroup.HbaseTable(%s) & HbaseFamily(%v) must exist togather", g.HBaseTable, g.HBaseFamily)
  60. break
  61. }
  62. msgTemp, err := decodeMsgTemplate(g.Name, g.MsgTemplate)
  63. if err != nil {
  64. log.Error("NewFanGroups config ArcPush.FanGroup.MsgTemplate(%s) decodeMsgTemplate error(%v)", g.MsgTemplate, err)
  65. break
  66. }
  67. if msgTemp != g.MsgTemplateDesc {
  68. log.Error("NewFanGroups config ArcPush.FanGroup.MsgTemplate decodeMsgTemplate(%s) must equal to MsgTemplateDesc(%s)", msgTemp, g.MsgTemplateDesc)
  69. break
  70. }
  71. if len(strings.SplitN(msgTemp, "\r\n", 2)) != 2 {
  72. log.Error("NewFanGroups config ArcPush.FanGroup.MsgTemplate(%s) decodeMsgTemplate(%s) must contains `\r\n`", g.MsgTemplate, msgTemp)
  73. break
  74. }
  75. grp[key] = &FanGroup{
  76. Name: strings.TrimSpace(g.Name),
  77. RelationType: g.RelationType,
  78. Hitby: strings.TrimSpace(g.Hitby),
  79. Limit: g.Limit,
  80. PerUpperLimit: g.PerUpperLimit,
  81. LimitExpire: int32(time.Duration(g.LimitExpire) / time.Second),
  82. HBaseTable: strings.TrimSpace(g.HBaseTable),
  83. HBaseFamily: g.HBaseFamily,
  84. MsgTemplateDesc: g.MsgTemplateDesc,
  85. MsgTemplate: msgTemp,
  86. }
  87. }
  88. if len(grp) < len(config.ArcPush.FanGroup) {
  89. fmt.Printf("NewFanGroups failed\r\n\r\n")
  90. os.Exit(1)
  91. }
  92. return
  93. }
  94. // decodeMsgTemplate 将ascii格式的文案模版,解码成中文格式---防止某些服务器不支持中文配置
  95. func decodeMsgTemplate(groupName string, temp string) (decode string, err error) {
  96. if temp == "" {
  97. return
  98. }
  99. b, err := hex.DecodeString(temp)
  100. if err != nil {
  101. log.Error("DecodeMsgTemplate hex.DecodeString error(%v) groupName(%s), temp(%s)", err, groupName, temp)
  102. return
  103. }
  104. buf := new(bytes.Buffer)
  105. temp = string(b)
  106. rows := strings.Split(temp[1:len(temp)-1], "\\r\\n")
  107. lenRows := len(rows) - 1
  108. for k, row := range rows {
  109. parts := strings.Split(row, "%s")
  110. lenParts := len(parts) - 1
  111. for kp, str := range parts {
  112. words := strings.Split(str, "\\u")
  113. for _, w := range words {
  114. if len(w) < 1 {
  115. continue
  116. }
  117. wi, err := strconv.ParseInt(w, 16, 32)
  118. if err != nil {
  119. log.Error("DecodeMsgTemplate error(%v) groupName(%s), decode(%s), word(%s)", err, groupName, temp, w)
  120. return "", err
  121. }
  122. buf.WriteString(fmt.Sprintf("%c", wi))
  123. }
  124. if kp >= lenParts {
  125. continue
  126. }
  127. buf.WriteString("%s")
  128. }
  129. if k >= lenRows {
  130. continue
  131. }
  132. buf.WriteString("\r\n")
  133. }
  134. decode = buf.String()
  135. return
  136. }
  137. // FansByHBase hbase表中查询粉丝所关联的up主,过滤up不在hbase结果中的粉丝
  138. func (d *Dao) FansByHBase(upper int64, fanGroupKey string, fans *[]int64) (result []int64, excluded []int64) {
  139. g := d.FanGroups[fanGroupKey]
  140. // 不过滤
  141. if len(g.HBaseTable) == 0 {
  142. result = *fans
  143. return
  144. }
  145. params := model.NewBatchParam(map[string]interface{}{
  146. "base": upper,
  147. "table": g.HBaseTable,
  148. "family": g.HBaseFamily,
  149. "result": &result,
  150. "excluded": &excluded,
  151. "handler": d.filterFanByUpper,
  152. }, nil)
  153. Batch(fans, 100, 1, params, d.FilterFans)
  154. return
  155. }
  156. // FansByActiveTime 配置了默认活跃时间,则批量过滤粉丝是否在活跃时间段内,否则不推送;未配置则不过滤活跃时间;若希望没有默认活跃时间但希望过滤活跃时间,配置成[0]
  157. func (d *Dao) FansByActiveTime(hour int, fans *[]int64) (result []int64, excluded []int64) {
  158. // 未配置则不过滤活跃时间
  159. if len(d.ActiveDefaultTime) <= 0 {
  160. result = *fans
  161. excluded = []int64{}
  162. return
  163. }
  164. params := model.NewBatchParam(map[string]interface{}{
  165. "base": hour,
  166. "table": "dm_member_push_active_hour",
  167. "family": []string{"p"},
  168. "result": &result,
  169. "excluded": &excluded,
  170. "handler": d.filterFanByActive,
  171. }, nil)
  172. Batch(fans, 100, 1, params, d.FilterFans)
  173. return
  174. }