notify.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/admin/main/workflow/model/param"
  7. "go-common/app/job/main/workflow/model"
  8. "go-common/library/database/elastic"
  9. "go-common/library/ecode"
  10. "go-common/library/log"
  11. )
  12. const (
  13. // LOGAPPID audit log appid .
  14. LOGAPPID = "log_audit"
  15. // LOGINDEX .
  16. LOGINDEX = "log_audit_11_all"
  17. // GROUPAPPID .
  18. GROUPAPPID = "workflow_group_common"
  19. // GROUPINDEX .
  20. GROUPINDEX = "workflow_group_common"
  21. // CHALLAPPID .
  22. CHALLAPPID = "workflow_chall_common"
  23. // CHALLINDEX .
  24. CHALLINDEX = "workflow_chall_common"
  25. // BUSINESS .
  26. BUSINESS = 1
  27. // MC .
  28. MC = "1_15_1"
  29. )
  30. // notifyState .
  31. var notifyState = []int64{0, 2}
  32. // notifyproc .
  33. func (s *Service) notifyproc(c context.Context) {
  34. for {
  35. if err := s.sendMessage(c); err != nil {
  36. log.Error("s.sendMessage type(%d) error(%v)", err)
  37. time.Sleep(time.Second * 3)
  38. continue
  39. }
  40. time.Sleep(time.Second * 60)
  41. }
  42. }
  43. // SendMessage .
  44. func (s *Service) sendMessage(c context.Context) (err error) {
  45. var (
  46. group []*model.GroupRes
  47. chall map[int64][]*model.ChallRes
  48. gids []int64
  49. )
  50. if group, err = s.NotNotifyGroupSearch(c); err != nil {
  51. log.Error("s.NotNotifyGroupSearch error(%v)", err)
  52. return
  53. }
  54. if len(group) <= 0 {
  55. log.Info("group search length is 0")
  56. return
  57. }
  58. for _, g := range group {
  59. gids = append(gids, g.ID)
  60. }
  61. if chall, err = s.NotNotifyChallSearch(c, gids); err != nil {
  62. log.Error("s.NotNotifyChallSearch error(%v)", err)
  63. return
  64. }
  65. if len(chall) <= 0 {
  66. log.Info("chall search length is 0")
  67. return
  68. }
  69. for _, g := range group {
  70. mids := []int64{}
  71. ok := false
  72. var chs []*model.ChallRes
  73. if chs, ok = chall[g.ID]; !ok {
  74. log.Error("gid(%d) in group but not in chall", g.ID)
  75. continue
  76. }
  77. for _, ch := range chs {
  78. mids = append(mids, ch.MID)
  79. }
  80. if len(mids) <= 0 {
  81. log.Error("gid(%d) with mid in chall is empty", g.ID)
  82. continue
  83. }
  84. param := &param.MessageParam{
  85. Type: "json",
  86. Source: 1,
  87. DataType: 4,
  88. MC: MC,
  89. Title: "您的投诉已收到",
  90. Context: fmt.Sprintf("您对稿件(av%d)的举报我们已经收到。感谢您对 bilibili 社区秩序的维护,哔哩哔哩 (゜-゜)つロ 干杯~ ", g.OID),
  91. MidList: mids,
  92. }
  93. log.Info("send message param(%+v)", param)
  94. if err = s.dao.SendMessage(c, chs, param); err != nil {
  95. log.Error("s.dao.SendMessage error(%v)", err)
  96. }
  97. }
  98. return
  99. }
  100. // NotNotifyGroupSearch .
  101. func (s *Service) NotNotifyGroupSearch(c context.Context) (result []*model.GroupRes, err error) {
  102. var (
  103. pn = 1
  104. ps = 1000
  105. tempRes *model.SearchGroup
  106. )
  107. frontTenMin, _ := time.ParseDuration("-10m")
  108. frontTwelveMin, _ := time.ParseDuration("-12m")
  109. frontTenMinFormat := time.Now().Add(frontTenMin).Format("2006-01-02 15:04:05")
  110. frontTwelveMinFormat := time.Now().Add(frontTwelveMin).Format("2006-01-02 15:04:05")
  111. e := elastic.NewElastic(nil)
  112. r := e.NewRequest(GROUPAPPID).Index(GROUPINDEX).Fields("id", "oid").
  113. WhereEq("business", BUSINESS).
  114. WhereIn("state", notifyState).
  115. WhereRange("lasttime", frontTwelveMinFormat, frontTenMinFormat, elastic.RangeScopeLoRo).
  116. Pn(pn).
  117. Ps(ps)
  118. if err = r.Scan(context.Background(), &tempRes); err != nil {
  119. log.Error("elastic search group Scan error(%v)", err)
  120. return
  121. }
  122. log.Info("groupparams is(%v)", r.Params())
  123. res := tempRes.Result
  124. if len(res) <= 0 {
  125. return
  126. }
  127. // search audit log
  128. var logGids map[int64][]*model.LogRes
  129. if logGids, err = s.searchAuditLog(c, res); err != nil {
  130. log.Error("s.searchAuditLog error(%v)", err)
  131. return
  132. }
  133. for _, r := range res {
  134. if _, ok := logGids[r.OID]; !ok {
  135. result = append(result, r)
  136. }
  137. }
  138. return
  139. }
  140. // searchAuditLog .
  141. func (s *Service) searchAuditLog(c context.Context, grp []*model.GroupRes) (logGids map[int64][]*model.LogRes, err error) {
  142. pn := 1
  143. ps := 1000
  144. oids := []int64{}
  145. logGids = make(map[int64][]*model.LogRes)
  146. for _, g := range grp {
  147. oids = append(oids, g.OID)
  148. }
  149. e := elastic.NewElastic(nil)
  150. for {
  151. var res *model.AuditLog
  152. r := e.NewRequest(LOGAPPID).Index(LOGINDEX).Fields("int_1", "oid").
  153. WhereIn("oid", oids).
  154. WhereEq("type", 2).
  155. WhereEq("business", 11).
  156. WhereEq("action", "notify_users_received").
  157. Pn(pn).
  158. Ps(ps)
  159. if err = r.Scan(context.Background(), res); err != nil {
  160. log.Error("elastic search audit log Scan error(%v)", err)
  161. return
  162. }
  163. if res == nil {
  164. time.Sleep(time.Second * 3)
  165. continue
  166. }
  167. if res.Page.Total > 10000 {
  168. log.Error("elastic search audit log result too long")
  169. err = ecode.ServerErr
  170. return
  171. }
  172. log.Info("auditlogparams is(%v)", r.Params())
  173. logRes := res.Result
  174. for _, lg := range logRes {
  175. logGids[lg.Oid] = append(logGids[lg.Oid], lg)
  176. }
  177. if len(res.Result) < ps {
  178. break
  179. }
  180. pn++
  181. }
  182. return
  183. }
  184. // NotNotifyChallSearch .
  185. func (s *Service) NotNotifyChallSearch(c context.Context, gids []int64) (chall map[int64][]*model.ChallRes, err error) {
  186. var (
  187. pn = 1
  188. ps = 1000
  189. )
  190. if len(gids) <= 0 {
  191. return
  192. }
  193. chall = make(map[int64][]*model.ChallRes)
  194. frontTenMin, _ := time.ParseDuration("-10m")
  195. frontTwelveMin, _ := time.ParseDuration("-12m")
  196. frontTenMinFormat := time.Now().Add(frontTenMin).Format("2006-01-02 15:04:05")
  197. frontTwelveMinFormat := time.Now().Add(frontTwelveMin).Format("2006-01-02 15:04:05")
  198. e := elastic.NewElastic(nil)
  199. for {
  200. res := &model.SearchChall{}
  201. r := e.NewRequest(CHALLAPPID).Index(CHALLINDEX).Fields([]string{"id", "gid", "mid", "oid"}...).
  202. WhereEq("business", BUSINESS).
  203. WhereEq("state", 0).
  204. WhereIn("gid", gids).
  205. WhereRange("ctime", frontTwelveMinFormat, frontTenMinFormat, elastic.RangeScopeLoRo).
  206. Pn(pn).
  207. Ps(ps)
  208. if err = r.Scan(context.Background(), &res); err != nil {
  209. log.Error("elastic search chall Scan error(%v)", err)
  210. time.Sleep(time.Second * 3)
  211. continue
  212. }
  213. log.Info("challparams is(%v)", r.Params())
  214. for _, r := range res.Result {
  215. chall[r.GID] = append(chall[r.GID], r)
  216. }
  217. if len(res.Result) < ps {
  218. break
  219. }
  220. pn++
  221. }
  222. return
  223. }