archive_result.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package service
  2. import (
  3. "encoding/json"
  4. "go-common/app/job/main/videoup-report/model/archive"
  5. "go-common/library/log"
  6. )
  7. func (s *Service) arcResultConsume() {
  8. defer s.waiter.Done()
  9. var (
  10. err error
  11. msgs = s.arcResultSub.Messages()
  12. )
  13. for {
  14. msg, open := <-msgs
  15. if !open {
  16. log.Info("arcResultConsume s.arcResultSub.Messages is closed")
  17. return
  18. }
  19. msg.Commit()
  20. if msg == nil {
  21. continue
  22. }
  23. log.Info("arcResultConsume consume key(%s) offset(%d) value(%s)", msg.Key, msg.Offset, string(msg.Value))
  24. m := &archive.Message{}
  25. if err = json.Unmarshal(msg.Value, m); err != nil {
  26. log.Error("arcResultConsume json.Unmarshal error(%v)", err)
  27. continue
  28. }
  29. if m.Table != _archive {
  30. continue
  31. }
  32. nw := &archive.Archive{}
  33. if err = json.Unmarshal(m.New, nw); err != nil {
  34. log.Error("arcResultConsume json.Unmarshal error(%v) msg.new(%s)", err, string(m.New))
  35. continue
  36. }
  37. nw.ID = nw.AID
  38. var old *archive.Archive
  39. if m.Action == _insertAct {
  40. old = nil
  41. } else if m.Action == _updateAct {
  42. old = &archive.Archive{}
  43. if err = json.Unmarshal(m.Old, old); err != nil {
  44. log.Error("arcResultConsume json.Unmarshal error(%v) msg.old(%s)", err, string(m.Old))
  45. continue
  46. }
  47. old.ID = old.AID
  48. }
  49. go s.arcStateChange(nw, old, false)
  50. }
  51. }