aiyellowing.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "math"
  7. "time"
  8. "go-common/library/log"
  9. "go-common/library/queue/databus"
  10. "go-common/library/queue/databus/report"
  11. )
  12. // aiReq is a producer send file data to eroticism inspect serve
  13. func (j *Job) aiReq(ctx context.Context, meta *Meta) {
  14. url := fmt.Sprintf(_downloadFmt, j.downloadHost, meta.Bucket, meta.Filename)
  15. msg := &AIReqMessage{URL: url, FileName: meta.Filename, Bucket: meta.Bucket}
  16. if err := j.AIYellowingProducer.Send(ctx, meta.Filename, msg); err != nil {
  17. log.Error("d.databus.Send(%s,%v) error(%v)", meta.Bucket, *msg, err)
  18. return
  19. }
  20. log.Info("send msg to ai success: key:(%s),value:(%+v)", meta.Filename, msg)
  21. }
  22. // aiResp is a consumer receive message from eroticism inspect serve
  23. func (j *Job) aiResp(ctx context.Context) {
  24. var (
  25. msgs <-chan *databus.Message
  26. ok bool
  27. msg *databus.Message
  28. aiMsg *AIRespMessage
  29. err error
  30. )
  31. msgs = j.AIYellowingConsumer.Messages() //ai response message
  32. for {
  33. if msg, ok = <-msgs; !ok {
  34. log.Info("databus Consumer exit")
  35. break
  36. }
  37. if err = msg.Commit(); err != nil {
  38. log.Error("msg.Commit() error(%v)", err)
  39. continue
  40. }
  41. aiMsg = new(AIRespMessage)
  42. // todo: parse kafka message from ai and judge if add to db
  43. if err = json.Unmarshal(msg.Value, aiMsg); err != nil {
  44. log.Error("Job.run.Unmarshal(key:%s, value:%s),err(%v)", msg.Key, string(msg.Value), err)
  45. continue
  46. }
  47. log.Info("consume msg from ai success: key:(%s),value:(%+v)", msg.Key, aiMsg)
  48. if aiMsg.ErrorCode != 0 {
  49. log.Error("ai audit failed code: %d, message: %s", aiMsg.ErrorCode, aiMsg.ErrorMsg)
  50. continue
  51. }
  52. // deal ai result
  53. // sex > threshold will save in db
  54. sex := int(math.Round(aiMsg.Sex * 10000))
  55. politics := int(math.Round(aiMsg.Politics * 10000))
  56. blood := int(math.Round(aiMsg.Blood * 10000))
  57. violent := int(math.Round(aiMsg.Violent * 10000))
  58. if sex > j.Threshold.Sex || politics > j.Threshold.Politics || blood > j.Threshold.Blood || violent > j.Threshold.Violent {
  59. log.Warn("url:%s need save db points(sex:%d,politics:%d,blood:%d,violent:%d)", aiMsg.URL, sex, politics, blood, violent)
  60. if err = j.RetryAddRecord(aiMsg); err != nil {
  61. log.Error("j.RetryAddRecord(%+v) failed(%v)", aiMsg, err)
  62. continue
  63. } else {
  64. log.Info("call upload-admin success: %+v", aiMsg)
  65. }
  66. }
  67. //todo: send to audit log platform
  68. m := &report.ManagerInfo{
  69. Uname: "ai",
  70. UID: 0,
  71. Business: 161, //bfs 上传审核
  72. Type: 1, //ai审核
  73. Action: "ai",
  74. Index: []interface{}{aiMsg.URL, aiMsg.Bucket, aiMsg.FileName, int(math.Round(aiMsg.Sex * 10000)), int(math.Round(aiMsg.Politics * 10000))},
  75. Ctime: time.Now(),
  76. Content: map[string]interface{}{ //分数
  77. "sex": aiMsg.Sex,
  78. "violent": aiMsg.Violent,
  79. "politics": aiMsg.Politics,
  80. "blood": aiMsg.Blood,
  81. "is_yellow": aiMsg.IsYellow,
  82. },
  83. }
  84. if err = report.Manager(m); err != nil {
  85. log.Error("report.Manager() m(%+v) error:%v", m, err)
  86. continue
  87. }
  88. log.Info("send log report success: (%+v) \n", m)
  89. }
  90. }