sns_log.go 673 B

123456789101112131415161718192021222324252627282930313233
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/passport-sns/model"
  7. "go-common/library/log"
  8. )
  9. func (s *Service) snsLogConsume() {
  10. for {
  11. msg, ok := <-s.snsLogConsumer.Messages()
  12. if !ok {
  13. log.Error("s.snsLogConsumer.Messages closed")
  14. return
  15. }
  16. snsLog := &model.SnsLog{}
  17. if err := json.Unmarshal(msg.Value, snsLog); err != nil {
  18. log.Error("json.Unmarshal(%s) error(%+v)", string(msg.Value), err)
  19. continue
  20. }
  21. log.Info("receive msg snsLog(%+v)", snsLog)
  22. for {
  23. if _, err := s.d.AddSnsLog(context.Background(), snsLog); err != nil {
  24. time.Sleep(100 * time.Millisecond)
  25. continue
  26. }
  27. break
  28. }
  29. }
  30. }