123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- package service
- import (
- "context"
- "encoding/json"
- "fmt"
- "math"
- "net/url"
- "strconv"
- "time"
- "go-common/app/job/main/upload/conf"
- "go-common/app/job/main/upload/dao"
- "go-common/library/log"
- bm "go-common/library/net/http/blademaster"
- xhttp "go-common/library/net/http/blademaster"
- "go-common/library/queue/databus"
- )
- const (
- _downloadFmt = "%s/bfs/%s/%s"
- _uploadAdminAddFmt = "%s/add"
- )
- // Service struct
- type Service struct {
- c *conf.Config
- dao *dao.Dao
- }
- // Meta describe databus message value format from bfs proxy
- type Meta struct {
- Bucket string `json:"bucket"`
- Filename string `json:"filename"`
- Mine string `json:"mine"`
- }
- // New init
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- c: c,
- dao: dao.New(c),
- }
- Run(c)
- return s
- }
- // Ping Service
- func (s *Service) Ping(c context.Context) (err error) {
- return s.dao.Ping(c)
- }
- // Close Service
- func (s *Service) Close() {
- s.dao.Close()
- }
- // Job .
- type Job struct {
- downloadHost string
- uploadHost string
- uploadAdminHost string
- xclient *xhttp.Client
- sub *databus.Databus
- ch chan *Meta
- routineCount int
- AIYellowingConsumer *databus.Databus //consume response from ai
- AIYellowingProducer *databus.Databus //produce request to ai
- AIYellowingExceptBuckets map[string]bool
- Threshold *conf.Threshold //score threshold
- }
- // AIReqMessage defined send and receive databus message format with AI
- type AIReqMessage struct {
- Bucket string `json:"bucket"`
- FileName string `json:"file_name"`
- URL string `json:"url"`
- IsYellow bool `json:"is_yellow"`
- }
- //AIRespMessage defined response databus message format from AI
- type AIRespMessage struct {
- URL string `json:"url"`
- FileName string `json:"file_name"`
- Bucket string `json:"bucket"`
- Sex float64 `json:"sex"`
- Violent float64 `json:"violent"`
- Blood float64 `json:"blood"`
- Politics float64 `json:"politics"`
- IsYellow bool `json:"is_yellow"`
- ErrorCode int64 `json:"error_code"`
- ErrorMsg string `json:"error_msg"`
- }
- // Add describe params of /x/admin/bfs-upload/add
- type Add struct {
- Bucket string `json:"bucket"`
- FileName string `json:"filename"`
- }
- // AddResp describe response of /x/admin/bfs-upload/add
- type AddResp struct {
- Code int `json:"code"`
- Message string `json:"message"`
- TTL int `json:"ttl"`
- }
- // Run .
- func Run(cfg *conf.Config) {
- exceptBuckets := make(map[string]bool)
- for _, bucket := range cfg.AIYellowing.ExceptBuckets {
- exceptBuckets[bucket] = false
- }
- if cfg.Threshold == nil {
- cfg.Threshold = &conf.Threshold{
- Sex: 5000,
- Politics: 5000,
- Blood: 5000,
- Violent: 5000,
- }
- }
- if cfg.Threshold.Sex == 0 {
- cfg.Threshold.Sex = 5000
- }
- if cfg.Threshold.Politics == 0 {
- cfg.Threshold.Politics = 5000
- }
- if cfg.Threshold.Blood == 0 {
- cfg.Threshold.Blood = 5000
- }
- if cfg.Threshold.Violent == 0 {
- cfg.Threshold.Violent = 5000
- }
- j := &Job{
- downloadHost: cfg.DonwloadHost,
- uploadHost: cfg.UploadHost,
- uploadAdminHost: cfg.UploadAdminHost,
- routineCount: cfg.RoutineCount,
- xclient: bm.NewClient(cfg.HTTPClient),
- sub: databus.New(cfg.Databus),
- ch: make(chan *Meta, 1024),
- AIYellowingConsumer: databus.New(cfg.AIYellowing.Consumer),
- AIYellowingProducer: databus.New(cfg.AIYellowing.Producer),
- AIYellowingExceptBuckets: exceptBuckets,
- Threshold: cfg.Threshold,
- }
- go j.aireqproc()
- go j.proxysyncproc(context.Background())
- go j.aiResp(context.Background()) // deal ai response
- }
- func (j *Job) aireqproc() {
- for i := 0; i < 10; i++ {
- go func() {
- for meta := range j.ch {
- j.aiReq(context.Background(), meta)
- }
- }()
- }
- }
- // Run .
- func (j *Job) proxysyncproc(ctx context.Context) {
- var (
- err error
- meta *Meta
- )
- for {
- msg, ok := <-j.sub.Messages()
- if !ok {
- log.Error("consume msg error, uploadproc exit!")
- return
- }
- if err = msg.Commit(); err != nil {
- log.Error("msg.Commit() error(%v)", err)
- continue
- }
- meta = new(Meta)
- if err = json.Unmarshal(msg.Value, meta); err != nil {
- log.Error("Job.run.Unmarshal(key:%v),err(%v)", msg.Key, err)
- continue
- }
- // if bucket need yellow inspects
- _, ok = j.AIYellowingExceptBuckets[meta.Bucket]
- if !ok {
- log.Info("bfs-proxy sync a msg:(%+v) meta(%+v) need to req ai", msg, meta)
- j.add(meta)
- }
- }
- }
- func (j *Job) add(item *Meta) {
- j.ch <- item
- }
- // addRecord add a record into upload-admin if a pic is yellow
- func (j *Job) addRecord(aiMsg *AIRespMessage) (err error) {
- var (
- uploadAdminAddURL string
- )
- addResp := new(AddResp)
- params := url.Values{}
- params.Add("bucket", aiMsg.Bucket)
- params.Add("filename", aiMsg.FileName)
- params.Add("url", aiMsg.URL)
- params.Add("sex", strconv.Itoa(int(math.Round(aiMsg.Sex*10000))))
- params.Add("politics", strconv.Itoa(int(math.Round(aiMsg.Politics*10000))))
- uploadAdminAddURL = fmt.Sprintf(_uploadAdminAddFmt, j.uploadAdminHost)
- if err = j.xclient.Post(context.TODO(), uploadAdminAddURL, "", params, addResp); err != nil {
- return
- }
- if addResp.Code != 0 {
- log.Error("call /x/admin/bfs-upload/add code error, code(%d),message(%s)", addResp.Code, addResp.Message)
- return
- }
- log.Info("upload-admin add success(%+v)", aiMsg)
- return
- }
- // RetryAddRecord try to add a record to upload-admin
- func (j *Job) RetryAddRecord(aiMsg *AIRespMessage) (err error) {
- attempts := 3
- for i := 0; i < attempts; i++ {
- err = j.addRecord(aiMsg)
- if err == nil {
- return
- }
- time.Sleep(1 * time.Second)
- }
- return
- }
|