service.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "math"
  7. "net/url"
  8. "strconv"
  9. "time"
  10. "go-common/app/job/main/upload/conf"
  11. "go-common/app/job/main/upload/dao"
  12. "go-common/library/log"
  13. bm "go-common/library/net/http/blademaster"
  14. xhttp "go-common/library/net/http/blademaster"
  15. "go-common/library/queue/databus"
  16. )
  17. const (
  18. _downloadFmt = "%s/bfs/%s/%s"
  19. _uploadAdminAddFmt = "%s/add"
  20. )
  21. // Service struct
  22. type Service struct {
  23. c *conf.Config
  24. dao *dao.Dao
  25. }
  26. // Meta describe databus message value format from bfs proxy
  27. type Meta struct {
  28. Bucket string `json:"bucket"`
  29. Filename string `json:"filename"`
  30. Mine string `json:"mine"`
  31. }
  32. // New init
  33. func New(c *conf.Config) (s *Service) {
  34. s = &Service{
  35. c: c,
  36. dao: dao.New(c),
  37. }
  38. Run(c)
  39. return s
  40. }
  41. // Ping Service
  42. func (s *Service) Ping(c context.Context) (err error) {
  43. return s.dao.Ping(c)
  44. }
  45. // Close Service
  46. func (s *Service) Close() {
  47. s.dao.Close()
  48. }
  49. // Job .
  50. type Job struct {
  51. downloadHost string
  52. uploadHost string
  53. uploadAdminHost string
  54. xclient *xhttp.Client
  55. sub *databus.Databus
  56. ch chan *Meta
  57. routineCount int
  58. AIYellowingConsumer *databus.Databus //consume response from ai
  59. AIYellowingProducer *databus.Databus //produce request to ai
  60. AIYellowingExceptBuckets map[string]bool
  61. Threshold *conf.Threshold //score threshold
  62. }
  63. // AIReqMessage defined send and receive databus message format with AI
  64. type AIReqMessage struct {
  65. Bucket string `json:"bucket"`
  66. FileName string `json:"file_name"`
  67. URL string `json:"url"`
  68. IsYellow bool `json:"is_yellow"`
  69. }
  70. //AIRespMessage defined response databus message format from AI
  71. type AIRespMessage struct {
  72. URL string `json:"url"`
  73. FileName string `json:"file_name"`
  74. Bucket string `json:"bucket"`
  75. Sex float64 `json:"sex"`
  76. Violent float64 `json:"violent"`
  77. Blood float64 `json:"blood"`
  78. Politics float64 `json:"politics"`
  79. IsYellow bool `json:"is_yellow"`
  80. ErrorCode int64 `json:"error_code"`
  81. ErrorMsg string `json:"error_msg"`
  82. }
  83. // Add describe params of /x/admin/bfs-upload/add
  84. type Add struct {
  85. Bucket string `json:"bucket"`
  86. FileName string `json:"filename"`
  87. }
  88. // AddResp describe response of /x/admin/bfs-upload/add
  89. type AddResp struct {
  90. Code int `json:"code"`
  91. Message string `json:"message"`
  92. TTL int `json:"ttl"`
  93. }
  94. // Run .
  95. func Run(cfg *conf.Config) {
  96. exceptBuckets := make(map[string]bool)
  97. for _, bucket := range cfg.AIYellowing.ExceptBuckets {
  98. exceptBuckets[bucket] = false
  99. }
  100. if cfg.Threshold == nil {
  101. cfg.Threshold = &conf.Threshold{
  102. Sex: 5000,
  103. Politics: 5000,
  104. Blood: 5000,
  105. Violent: 5000,
  106. }
  107. }
  108. if cfg.Threshold.Sex == 0 {
  109. cfg.Threshold.Sex = 5000
  110. }
  111. if cfg.Threshold.Politics == 0 {
  112. cfg.Threshold.Politics = 5000
  113. }
  114. if cfg.Threshold.Blood == 0 {
  115. cfg.Threshold.Blood = 5000
  116. }
  117. if cfg.Threshold.Violent == 0 {
  118. cfg.Threshold.Violent = 5000
  119. }
  120. j := &Job{
  121. downloadHost: cfg.DonwloadHost,
  122. uploadHost: cfg.UploadHost,
  123. uploadAdminHost: cfg.UploadAdminHost,
  124. routineCount: cfg.RoutineCount,
  125. xclient: bm.NewClient(cfg.HTTPClient),
  126. sub: databus.New(cfg.Databus),
  127. ch: make(chan *Meta, 1024),
  128. AIYellowingConsumer: databus.New(cfg.AIYellowing.Consumer),
  129. AIYellowingProducer: databus.New(cfg.AIYellowing.Producer),
  130. AIYellowingExceptBuckets: exceptBuckets,
  131. Threshold: cfg.Threshold,
  132. }
  133. go j.aireqproc()
  134. go j.proxysyncproc(context.Background())
  135. go j.aiResp(context.Background()) // deal ai response
  136. }
  137. func (j *Job) aireqproc() {
  138. for i := 0; i < 10; i++ {
  139. go func() {
  140. for meta := range j.ch {
  141. j.aiReq(context.Background(), meta)
  142. }
  143. }()
  144. }
  145. }
  146. // Run .
  147. func (j *Job) proxysyncproc(ctx context.Context) {
  148. var (
  149. err error
  150. meta *Meta
  151. )
  152. for {
  153. msg, ok := <-j.sub.Messages()
  154. if !ok {
  155. log.Error("consume msg error, uploadproc exit!")
  156. return
  157. }
  158. if err = msg.Commit(); err != nil {
  159. log.Error("msg.Commit() error(%v)", err)
  160. continue
  161. }
  162. meta = new(Meta)
  163. if err = json.Unmarshal(msg.Value, meta); err != nil {
  164. log.Error("Job.run.Unmarshal(key:%v),err(%v)", msg.Key, err)
  165. continue
  166. }
  167. // if bucket need yellow inspects
  168. _, ok = j.AIYellowingExceptBuckets[meta.Bucket]
  169. if !ok {
  170. log.Info("bfs-proxy sync a msg:(%+v) meta(%+v) need to req ai", msg, meta)
  171. j.add(meta)
  172. }
  173. }
  174. }
  175. func (j *Job) add(item *Meta) {
  176. j.ch <- item
  177. }
  178. // addRecord add a record into upload-admin if a pic is yellow
  179. func (j *Job) addRecord(aiMsg *AIRespMessage) (err error) {
  180. var (
  181. uploadAdminAddURL string
  182. )
  183. addResp := new(AddResp)
  184. params := url.Values{}
  185. params.Add("bucket", aiMsg.Bucket)
  186. params.Add("filename", aiMsg.FileName)
  187. params.Add("url", aiMsg.URL)
  188. params.Add("sex", strconv.Itoa(int(math.Round(aiMsg.Sex*10000))))
  189. params.Add("politics", strconv.Itoa(int(math.Round(aiMsg.Politics*10000))))
  190. uploadAdminAddURL = fmt.Sprintf(_uploadAdminAddFmt, j.uploadAdminHost)
  191. if err = j.xclient.Post(context.TODO(), uploadAdminAddURL, "", params, addResp); err != nil {
  192. return
  193. }
  194. if addResp.Code != 0 {
  195. log.Error("call /x/admin/bfs-upload/add code error, code(%d),message(%s)", addResp.Code, addResp.Message)
  196. return
  197. }
  198. log.Info("upload-admin add success(%+v)", aiMsg)
  199. return
  200. }
  201. // RetryAddRecord try to add a record to upload-admin
  202. func (j *Job) RetryAddRecord(aiMsg *AIRespMessage) (err error) {
  203. attempts := 3
  204. for i := 0; i < attempts; i++ {
  205. err = j.addRecord(aiMsg)
  206. if err == nil {
  207. return
  208. }
  209. time.Sleep(1 * time.Second)
  210. }
  211. return
  212. }