result.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. package service
  2. import (
  3. "encoding/json"
  4. "go-common/app/job/main/videoup/model/archive"
  5. "go-common/app/job/main/videoup/model/message"
  6. "go-common/library/log"
  7. )
  8. const (
  9. _archive = "archive"
  10. )
  11. // arcResultConsumer consume archive result databus message
  12. func (s *Service) arcResultConsumer() {
  13. defer s.wg.Done()
  14. var (
  15. msgs = s.arcResultSub.Messages()
  16. err error
  17. )
  18. for {
  19. msg, ok := <-msgs
  20. if !ok {
  21. log.Error("s.arcResultSub.Messages closed")
  22. return
  23. }
  24. msg.Commit()
  25. s.arcResultMo++
  26. m := &message.ArcResult{}
  27. if err = json.Unmarshal(msg.Value, m); err != nil {
  28. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  29. continue
  30. }
  31. newArc := &archive.Result{}
  32. if err = json.Unmarshal(m.New, newArc); err != nil {
  33. log.Error("json.Unmarshal(%s) error(%v)", m.New, err)
  34. continue
  35. }
  36. log.Info("arcResultConsumer Topic(%s) partition(%d) offset(%d) commit start", msg.Topic, msg.Partition, msg.Offset)
  37. if m.Table == _archive {
  38. log.Info("arcResultConsumer aid(%d) SendBblog msg(%v)", newArc.Aid, newArc)
  39. s.sendBblog(newArc)
  40. }
  41. }
  42. }