matcher.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package dispatch
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "github.com/ipipdotnet/ipdb-go"
  6. "go-common/app/service/live/broadcast-proxy/conf"
  7. "go-common/app/service/live/broadcast-proxy/expr"
  8. "go-common/library/log"
  9. "math"
  10. "math/rand"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. )
  15. type Matcher struct {
  16. ipDataV4 *ipdb.City
  17. ipDataV6 *ipdb.City
  18. heapPool sync.Pool
  19. Config string
  20. MaxLimit int `json:"ip_max_limit"`
  21. DefaultDomain string `json:"default_domain"`
  22. WildcardDomainSuffix string `json:"wildcard_domain_suffix"`
  23. CommonDispatch struct {
  24. ChinaDispatch struct {
  25. ChinaTelecom *CommonBucket `json:"china_telecom"`
  26. ChinaUnicom *CommonBucket `json:"china_unicom"`
  27. CMCC *CommonBucket `json:"cmcc"`
  28. ChinaOther *CommonBucket `json:"other"`
  29. } `json:"china"`
  30. OverseaDispatch []*CommonRuleBucket `json:"oversea"`
  31. UnknownAreaDispatch *CommonBucket `json:"unknown"`
  32. } `json:"danmaku_common_dispatch"`
  33. VIPDispatch []*VIPRuleBucket `json:"danmaku_vip_dispatch"`
  34. ServerGroup map[string][]string `json:"danmaku_comet_group"`
  35. ServerHost map[string]string `json:"danmaku_comet_host"`
  36. IPBlack []string `json:"ip_black"`
  37. TempV6 []string `json:"temp_v6"`
  38. forbiddenIP map[string]struct{}
  39. }
  40. type CommonBucket struct {
  41. Master map[string]int `json:"master"`
  42. Slave map[string]int `json:"slave"`
  43. }
  44. type CommonRuleBucket struct {
  45. CommonBucket
  46. Rule string `json:"rule"`
  47. RuleExpr expr.Expr
  48. }
  49. type VIPRuleBucket struct {
  50. Rule string `json:"rule"`
  51. RuleExpr expr.Expr
  52. IP []string `json:"ip"`
  53. Group []string `json:"group"`
  54. }
  55. func NewMatcher(matcherConfig []byte, ipDataV4 *ipdb.City, ipDataV6 *ipdb.City, dispatchConfig *conf.DispatchConfig) (*Matcher, error) {
  56. matcher := new(Matcher)
  57. matcher.heapPool = sync.Pool{
  58. New: func() interface{} {
  59. return NewMinHeap()
  60. },
  61. }
  62. matcher.forbiddenIP = make(map[string]struct{})
  63. if ipDataV4 == nil || ipDataV6 == nil {
  64. return nil, errors.New("invalid IP database")
  65. }
  66. matcher.ipDataV4 = ipDataV4
  67. matcher.ipDataV6 = ipDataV6
  68. matcher.Config = string(matcherConfig)
  69. if err := json.Unmarshal(matcherConfig, matcher); err != nil {
  70. return nil, err
  71. }
  72. for _, ip := range matcher.IPBlack {
  73. matcher.forbiddenIP[ip] = struct{}{}
  74. }
  75. parser := expr.NewExpressionParser()
  76. for _, oversea := range matcher.CommonDispatch.OverseaDispatch {
  77. if oversea.Rule == "" {
  78. oversea.Rule = "true"
  79. }
  80. if err := parser.Parse(oversea.Rule); err != nil {
  81. log.Error("[Matcher] Parse rule expr:%s, error:%+v", oversea.Rule, err)
  82. return nil, err
  83. }
  84. for _, variable := range parser.GetVariable() {
  85. if variable != "$lng" && variable != "$lat" {
  86. return nil, errors.New("oversea dispatch only supports variable $lng and $lat")
  87. }
  88. }
  89. oversea.RuleExpr = parser.GetExpr()
  90. }
  91. for _, vip := range matcher.VIPDispatch {
  92. if err := parser.Parse(vip.Rule); err != nil {
  93. log.Error("[Matcher] Parse rule expr:%s, error:%+v", vip.Rule, err)
  94. return nil, err
  95. }
  96. for _, variable := range parser.GetVariable() {
  97. if variable != "$uid" {
  98. return nil, errors.New("vip dispatch only supports variable $uid")
  99. }
  100. }
  101. if len(parser.GetVariable()) == 0 {
  102. return nil, errors.New("vip dispatch must contains variable $uid")
  103. }
  104. vip.RuleExpr = parser.GetExpr()
  105. }
  106. if matcher.MaxLimit == 0 && dispatchConfig != nil {
  107. matcher.MaxLimit = dispatchConfig.MaxLimit
  108. }
  109. if matcher.DefaultDomain == "" && dispatchConfig != nil {
  110. matcher.DefaultDomain = dispatchConfig.DefaultDomain
  111. }
  112. if matcher.WildcardDomainSuffix == "" && dispatchConfig != nil {
  113. matcher.WildcardDomainSuffix = dispatchConfig.WildcardDomainSuffix
  114. }
  115. return matcher, nil
  116. }
  117. func (matcher *Matcher) GetConfig() string {
  118. return matcher.Config
  119. }
  120. func (matcher *Matcher) Dispatch(ip string, uid int64) ([]string, []string) {
  121. danmakuIP := matcher.dispatchInternal(ip, uid)
  122. danmakuHost := make([]string, 0, len(danmakuIP))
  123. for _, singleDanmakuIP := range danmakuIP {
  124. if host, ok := matcher.ServerHost[singleDanmakuIP]; ok {
  125. danmakuHost = append(danmakuHost, host+matcher.WildcardDomainSuffix)
  126. }
  127. }
  128. danmakuIP = append(danmakuIP, matcher.DefaultDomain)
  129. danmakuHost = append(danmakuHost, matcher.DefaultDomain)
  130. return danmakuIP, danmakuHost
  131. }
  132. func (matcher *Matcher) dispatchInternal(ip string, uid int64) []string {
  133. if _, ok := matcher.forbiddenIP[ip]; ok {
  134. return []string{}
  135. }
  136. // VIP Dispatch
  137. vipDispatchEnv := make(map[expr.Var]interface{})
  138. vipDispatchEnv[expr.Var("$uid")] = uid
  139. for _, vip := range matcher.VIPDispatch {
  140. if v, err := expr.SafetyEvalBool(vip.RuleExpr, vipDispatchEnv); v && err == nil {
  141. return matcher.pickFromVIPRuleBucket(vip)
  142. } else {
  143. if err != nil {
  144. log.Error("[Matcher] VIP dispatch, uid:%d, eval rule expr:%s error:%+v", uid, vip.Rule, err)
  145. }
  146. }
  147. }
  148. // Common Dispatch
  149. var ipDatabase *ipdb.City
  150. for i := 0; i < len(ip); i++ {
  151. if ip[i] == '.' {
  152. ipDatabase = matcher.ipDataV4
  153. break
  154. } else if ip[i] == ':' {
  155. ipDatabase = matcher.ipDataV6
  156. //break
  157. //TODO: this is temp solution, replace this block with "break" here when all server supports IPv6
  158. return matcher.randomPickN(matcher.TempV6, matcher.MaxLimit)
  159. }
  160. }
  161. if ipDatabase == nil {
  162. return matcher.pickFromCommonBucket(matcher.CommonDispatch.UnknownAreaDispatch)
  163. }
  164. detail, err := ipDatabase.FindMap(ip, "EN")
  165. if err != nil {
  166. return matcher.pickFromCommonBucket(matcher.CommonDispatch.UnknownAreaDispatch)
  167. }
  168. country := strings.TrimSpace(detail["country_name"])
  169. province := strings.TrimSpace(detail["region_name"])
  170. isp := strings.TrimSpace(detail["isp_domain"])
  171. latitude, _ := strconv.ParseFloat(detail["latitude"], 64)
  172. longitude, _ := strconv.ParseFloat(detail["longitude"], 64)
  173. if country != "China" && country != "Reserved" && country != "LAN Address" && country != "Loopback" {
  174. return matcher.pickFromCommonRuleBucket(matcher.CommonDispatch.OverseaDispatch, latitude, longitude)
  175. } else if country == "China" {
  176. if province == "Hong Kong" || province == "Macau" || province == "Taiwan" {
  177. return matcher.pickFromCommonRuleBucket(matcher.CommonDispatch.OverseaDispatch, latitude, longitude)
  178. } else {
  179. switch isp {
  180. case "ChinaTelecom":
  181. return matcher.pickFromCommonBucket(matcher.CommonDispatch.ChinaDispatch.ChinaTelecom)
  182. case "ChinaMobile":
  183. return matcher.pickFromCommonBucket(matcher.CommonDispatch.ChinaDispatch.CMCC)
  184. case "ChinaUnicom":
  185. return matcher.pickFromCommonBucket(matcher.CommonDispatch.ChinaDispatch.ChinaUnicom)
  186. default:
  187. return matcher.pickFromCommonBucket(matcher.CommonDispatch.ChinaDispatch.ChinaOther)
  188. }
  189. }
  190. } else {
  191. return matcher.pickFromCommonBucket(matcher.CommonDispatch.UnknownAreaDispatch)
  192. }
  193. }
  194. func (matcher *Matcher) pickFromCommonRuleBucket(overseaBucket []*CommonRuleBucket, latitude float64, longitude float64) []string {
  195. overseaDispatchEnv := make(map[expr.Var]interface{})
  196. overseaDispatchEnv[expr.Var("$lat")] = latitude
  197. overseaDispatchEnv[expr.Var("$lng")] = longitude
  198. for _, bucket := range overseaBucket {
  199. if v, err := expr.SafetyEvalBool(bucket.RuleExpr, overseaDispatchEnv); v && err == nil {
  200. return matcher.pickFromCommonBucket(&bucket.CommonBucket)
  201. }
  202. }
  203. return []string{}
  204. }
  205. func (matcher *Matcher) pickOneFromWeightedGroup(groupWeightDict map[string]int) (string, string) {
  206. var luckyKey float64
  207. var luckyGroup string
  208. for group, weight := range groupWeightDict {
  209. if weight > 0 {
  210. key := math.Pow(rand.Float64(), 1.0/float64(weight))
  211. if key >= luckyKey {
  212. luckyKey = key
  213. luckyGroup = group
  214. }
  215. }
  216. }
  217. luckyIP := matcher.ServerGroup[luckyGroup]
  218. if len(luckyIP) == 0 {
  219. return "", ""
  220. }
  221. return matcher.randomPickOne(luckyIP), luckyGroup
  222. }
  223. func (matcher *Matcher) pickNFromWeightedGroup(groupWeightDict map[string]int, n int, groupIgnore string) []string {
  224. h := matcher.heapPool.Get().(*MinHeap)
  225. for group, weight := range groupWeightDict {
  226. if group != groupIgnore && weight > 0 {
  227. key := math.Pow(rand.Float64(), 1.0/float64(weight))
  228. if h.HeapLength() < n {
  229. h.HeapPush(group, key)
  230. } else {
  231. _, top, _ := h.HeapTop()
  232. if key > top {
  233. h.HeapPush(group, key)
  234. h.HeapPop()
  235. }
  236. }
  237. }
  238. }
  239. r := make([]string, 0, n)
  240. for h.HeapLength() > 0 {
  241. v, _, _ := h.HeapPop()
  242. member := matcher.ServerGroup[v.(string)]
  243. if len(member) > 0 {
  244. r = append(r, matcher.randomPickOne(member))
  245. }
  246. }
  247. matcher.heapPool.Put(h)
  248. return r
  249. }
  250. func (matcher *Matcher) pickFromCommonBucket(b *CommonBucket) []string {
  251. r := make([]string, 0, matcher.MaxLimit)
  252. masterIP, masterGroup := matcher.pickOneFromWeightedGroup(b.Master)
  253. if masterIP != "" {
  254. r = append(r, masterIP)
  255. }
  256. for _, slaveIP := range matcher.pickNFromWeightedGroup(b.Slave, matcher.MaxLimit-len(r), masterGroup) {
  257. r = append(r, slaveIP)
  258. }
  259. return r
  260. }
  261. func (matcher *Matcher) pickFromVIPRuleBucket(b *VIPRuleBucket) []string {
  262. var length int
  263. for _, group := range b.Group {
  264. length += len(matcher.ServerGroup[group])
  265. }
  266. length += len(b.IP)
  267. candidate := make([]string, length)
  268. i := 0
  269. for _, group := range b.Group {
  270. i += copy(candidate[i:], matcher.ServerGroup[group])
  271. }
  272. i += copy(candidate[i:], b.IP)
  273. return matcher.randomPickN(candidate, matcher.MaxLimit)
  274. }
  275. func (matcher *Matcher) randomPickOne(s []string) string {
  276. return s[rand.Intn(len(s))]
  277. }
  278. func (matcher *Matcher) randomPickN(s []string, n int) []string {
  279. var r []string
  280. if n > len(s) {
  281. n = len(s)
  282. }
  283. for _, v := range rand.Perm(len(s))[0:n] {
  284. r = append(r, s[v])
  285. }
  286. return r
  287. }