service.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "time"
  9. "go-common/app/admin/main/dm/conf"
  10. "go-common/app/admin/main/dm/dao"
  11. oplogDao "go-common/app/admin/main/dm/dao/oplog"
  12. "go-common/app/admin/main/dm/model"
  13. "go-common/app/admin/main/dm/model/oplog"
  14. accountApi "go-common/app/service/main/account/api"
  15. archive "go-common/app/service/main/archive/api/gorpc"
  16. "go-common/library/log"
  17. "go-common/library/log/infoc"
  18. "go-common/library/sync/pipeline/fanout"
  19. )
  20. // Service define Service struct
  21. type Service struct {
  22. // dao
  23. dao *dao.Dao
  24. oplogDao *oplogDao.Dao
  25. // rpc
  26. accountRPC accountApi.AccountClient
  27. arcRPC *archive.Service2
  28. dmOperationLogSvc *infoc.Infoc
  29. bakInfoc *infoc.Infoc
  30. opsLogCh chan *oplog.Infoc
  31. reduceMoralChan chan *model.ReduceMoral
  32. blockUserChan chan *model.BlockUser
  33. msgReporterChan chan *model.ReportMsg
  34. msgPosterChan chan *model.ReportMsg
  35. actionChan chan *model.Action
  36. // async proc
  37. cache *fanout.Fanout
  38. moniOidMap map[int64]struct{}
  39. oidLock sync.Mutex
  40. }
  41. // New new a Service and return.
  42. func New(c *conf.Config) *Service {
  43. s := &Service{
  44. // dao
  45. dao: dao.New(c),
  46. oplogDao: oplogDao.New(c),
  47. // rpc
  48. arcRPC: archive.New2(c.ArchiveRPC),
  49. dmOperationLogSvc: infoc.New(c.Infoc2),
  50. bakInfoc: infoc.New(c.InfocBak),
  51. reduceMoralChan: make(chan *model.ReduceMoral, 1024),
  52. blockUserChan: make(chan *model.BlockUser, 1024),
  53. msgReporterChan: make(chan *model.ReportMsg, 1024),
  54. msgPosterChan: make(chan *model.ReportMsg, 1024),
  55. actionChan: make(chan *model.Action, 1024),
  56. opsLogCh: make(chan *oplog.Infoc, 1024),
  57. cache: fanout.New("cache", fanout.Worker(1), fanout.Buffer(1024)),
  58. moniOidMap: make(map[int64]struct{}),
  59. }
  60. accountRPC, err := accountApi.NewClient(c.AccountRPC)
  61. if err != nil {
  62. panic(err)
  63. }
  64. s.accountRPC = accountRPC
  65. go s.changeReportStatProc()
  66. go s.actionproc()
  67. go s.oplogproc()
  68. go s.monitorproc()
  69. return s
  70. }
  71. // Ping check server ok
  72. func (s *Service) Ping(c context.Context) (err error) {
  73. if err = s.dao.Ping(c); err != nil {
  74. return
  75. }
  76. return
  77. }
  78. func (s *Service) addAction(action *model.Action) {
  79. select {
  80. case s.actionChan <- action:
  81. default:
  82. log.Error("action channel is full,action(%v) is discard", action)
  83. }
  84. }
  85. func (s *Service) actionproc() {
  86. for action := range s.actionChan {
  87. if err := s.dao.SendAction(context.TODO(), fmt.Sprint(action.Oid), action); err != nil {
  88. log.Error("dao.SendAction(%v) error(%v)", action, err)
  89. }
  90. }
  91. }
  92. func (s *Service) changeReportStatProc() {
  93. for {
  94. select {
  95. case msg := <-s.reduceMoralChan:
  96. if err := s.dao.ReduceMoral(context.TODO(), msg); err != nil {
  97. log.Error("s.dao.ReduceMoral(msg:%v) error(%v)", msg, err)
  98. }
  99. case msg := <-s.msgReporterChan:
  100. if err := s.dao.SendMsgToReporter(context.TODO(), msg); err != nil {
  101. log.Error("s.dao.SendMsgToReporter(msg:%v) error(%v)", msg, err)
  102. }
  103. case msg := <-s.msgPosterChan:
  104. if err := s.dao.SendMsgToPoster(context.TODO(), msg); err != nil {
  105. log.Error("s.dao.SendMsgToPoster(msg:%v) error(%v)", msg, err)
  106. }
  107. case msg := <-s.blockUserChan:
  108. if err := s.dao.BlockUser(context.TODO(), msg); err != nil {
  109. log.Error("s.dao.BlockUser(msg:%v) error(%v)", msg, err)
  110. }
  111. }
  112. }
  113. }
  114. func (s *Service) oplogproc() {
  115. for opLog := range s.opsLogCh {
  116. if len(opLog.Subject) == 0 || len(opLog.CurrentVal) == 0 || opLog.Source <= 0 ||
  117. opLog.Operator <= 0 || opLog.OperatorType <= 0 {
  118. log.Warn("oplogproc() it is an illegal log, warn(%v, %v, %v)", opLog.Subject, opLog.Subject, opLog.CurrentVal)
  119. continue
  120. } else {
  121. for _, dmid := range opLog.DMIds {
  122. s.dmOperationLogSvc.Info(opLog.Subject, strconv.FormatInt(opLog.Oid, 10), fmt.Sprint(opLog.Type),
  123. strconv.FormatInt(dmid, 10), opLog.Source.String(), opLog.OriginVal,
  124. opLog.CurrentVal, strconv.FormatInt(opLog.Operator, 10), opLog.OperatorType.String(),
  125. opLog.OperationTime, opLog.Remark)
  126. // 将管理员操作日志额外上报一份(用于验证数据报表完整性)
  127. s.bakInfoc.Info(opLog.Subject, strconv.FormatInt(opLog.Oid, 10), fmt.Sprint(opLog.Type),
  128. strconv.FormatInt(dmid, 10), opLog.Source.String(), opLog.OriginVal,
  129. opLog.CurrentVal, strconv.FormatInt(opLog.Operator, 10), opLog.OperatorType.String(),
  130. opLog.OperationTime, opLog.Remark)
  131. if strings.Contains(opLog.Subject, "\n") {
  132. log.Error("\n found in opLog.Subject(%s)", opLog.Source)
  133. }
  134. }
  135. }
  136. }
  137. }
  138. // OpLog put a new infoc format operation log into the channel
  139. func (s *Service) OpLog(c context.Context, cid, operator int64, typ int32, dmids []int64, subject, originVal, currentVal, remark string, source oplog.Source, operatorType oplog.OperatorType) (err error) {
  140. infoLog := new(oplog.Infoc)
  141. infoLog.Oid = cid
  142. infoLog.Type = typ
  143. infoLog.DMIds = dmids
  144. infoLog.Subject = subject
  145. infoLog.OriginVal = originVal
  146. infoLog.CurrentVal = currentVal
  147. infoLog.OperationTime = strconv.FormatInt(time.Now().Unix(), 10)
  148. infoLog.Source = source
  149. infoLog.OperatorType = operatorType
  150. infoLog.Operator = operator
  151. infoLog.Remark = remark
  152. select {
  153. case s.opsLogCh <- infoLog:
  154. default:
  155. err = fmt.Errorf("opsLogCh full")
  156. log.Error("opsLogCh full (%v)", infoLog)
  157. }
  158. return
  159. }
  160. // QueryOpLogs query operation logs of damku equals dmid
  161. func (s *Service) QueryOpLogs(c context.Context, dmid int64) (infos []*oplog.InfocResult, err error) {
  162. result, err := s.oplogDao.QueryOpLogs(c, dmid)
  163. if err != nil {
  164. return
  165. }
  166. for _, logVal := range result {
  167. var (
  168. tmp = &oplog.InfocResult{}
  169. )
  170. val, err := strconv.Atoi(logVal.CurrentVal)
  171. if err != nil {
  172. err = nil
  173. continue
  174. }
  175. switch logVal.Subject {
  176. case "status":
  177. tmp.Subject = model.StateDesc(int32(val))
  178. case "pool":
  179. if val == 0 {
  180. tmp.Subject = "普通弹幕池"
  181. } else if val == 1 {
  182. tmp.Subject = "字幕弹幕池"
  183. } else if val == 2 {
  184. tmp.Subject = "特殊弹幕池"
  185. }
  186. case "attribute":
  187. if val == 2 || (int32(val)>>model.AttrProtect)&int32(1) == 1 {
  188. tmp.Subject = "弹幕保护"
  189. } else if val == 3 || (int32(val)>>model.AttrProtect)&int32(1) == 0 {
  190. tmp.Subject = "取消弹幕保护"
  191. }
  192. default:
  193. tmp.Subject = logVal.Subject
  194. }
  195. tmp.CurrentVal = logVal.CurrentVal
  196. tmp.OperatorType = logVal.OperatorType
  197. if logVal.OperatorType == "用户" || logVal.OperatorType == "UP主" {
  198. mid, _ := strconv.ParseInt(logVal.Operator, 10, 64)
  199. arg3 := &accountApi.MidReq{Mid: mid}
  200. uInfo, err := s.accountRPC.Info3(c, arg3)
  201. if err != nil {
  202. tmp.Operator = logVal.Operator
  203. log.Error("s.accRPC.Info2(%v) error(%v)", arg3, err)
  204. err = nil
  205. } else {
  206. tmp.Operator = uInfo.GetInfo().GetName()
  207. }
  208. } else {
  209. tmp.Operator = logVal.Operator
  210. }
  211. OperationTimeStamp, _ := strconv.ParseInt(logVal.OperationTime, 10, 64)
  212. tmp.OperationTime = time.Unix(OperationTimeStamp, 0).Format("2006-01-02 15:04:05")
  213. tmp.Remark = "操作来源:" + logVal.Source + ";操作员身份:" + logVal.OperatorType + ";备注:" + logVal.Remark
  214. infos = append(infos, tmp)
  215. }
  216. return
  217. }
  218. func (s *Service) monitorproc() {
  219. for {
  220. time.Sleep(3 * time.Second)
  221. s.oidLock.Lock()
  222. oidMap := s.moniOidMap
  223. s.moniOidMap = make(map[int64]struct{})
  224. s.oidLock.Unlock()
  225. for oid := range oidMap {
  226. sub, err := s.dao.Subject(context.TODO(), model.SubTypeVideo, oid)
  227. if err != nil || sub == nil {
  228. continue
  229. }
  230. if err := s.updateMonitorCnt(context.TODO(), sub); err != nil {
  231. log.Error("s.updateMonitorCnt(%+v) error(%v)", sub, err)
  232. }
  233. }
  234. }
  235. }