data.go 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/library/log"
  7. "go-common/library/sync/errgroup"
  8. )
  9. const ROOM_LEN = 300
  10. //实时数据处理逻辑,生成list
  11. func (s *Service) minuteDataToCacheList() {
  12. ctx := context.TODO()
  13. ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
  14. defer cancel()
  15. ctx = GetTraceLogCtx(ctx, "minuteDataToCacheList")
  16. log.Infow(ctx, "log", "minuteDataToCacheList_start")
  17. //获取需要生成的数据的content列表
  18. contentMap, err := s.dao.GetContentMap(ctx)
  19. if err != nil {
  20. log.Errorw(ctx, "log", fmt.Sprintf("minuteDataToCacheList_GetContentMap_error:reply=%v;err=%v", contentMap, err))
  21. return
  22. }
  23. //获取全量开播房间
  24. allLiveingRoomIds, err := s.dao.GetAllLiveRoomIds(ctx)
  25. if allLiveingRoomIds == nil || err != nil {
  26. log.Errorw(ctx, "log", fmt.Sprintf("minuteDataToCacheList_allLiveingRoomIds_error:reply=%v;err=%v", allLiveingRoomIds, err))
  27. return
  28. }
  29. eg := errgroup.Group{}
  30. for content := range contentMap {
  31. log.Infow(ctx, fmt.Sprintf("minuteDataToCacheList_start:%s", content))
  32. eg.Go(func(contentParam string) func() error {
  33. slice := make([]int64, 0)
  34. for i := 0; i < len(allLiveingRoomIds); {
  35. end := ROOM_LEN + i
  36. if ROOM_LEN+i >= len(allLiveingRoomIds) {
  37. end = len(allLiveingRoomIds)
  38. }
  39. slice = allLiveingRoomIds[i:end]
  40. if len(slice) <= 0 {
  41. break
  42. } else {
  43. s.dao.CreateCacheList(ctx, slice, contentParam)
  44. }
  45. i = end
  46. }
  47. log.Infow(ctx, "log", fmt.Sprintf("minuteDataToCacheList_end_content=%s;err=%v", contentParam, err))
  48. return nil
  49. }(content))
  50. }
  51. eg.Wait()
  52. log.Infow(ctx, "log", "minuteDataToCacheList_end")
  53. return
  54. }
  55. func (s *Service) minuteDataToDB() {
  56. ctx := context.TODO()
  57. ctx, cancel := context.WithTimeout(ctx, time.Minute*3)
  58. defer cancel()
  59. ctx = GetTraceLogCtx(ctx, "minuteDataToDB")
  60. log.Infow(ctx, "log", "minuteDataToDB_start")
  61. //获取需要生成的数据的content列表
  62. contentMap, err := s.dao.GetContentMap(ctx)
  63. if err != nil {
  64. log.Errorw(ctx, "log", fmt.Sprintf("data_allLiveingRoomIds_error:reply=%v;err=%v", contentMap, err))
  65. return
  66. }
  67. //获取全量开播房间
  68. allLiveingRoomIds, err := s.dao.GetAllLiveRoomIds(ctx)
  69. if allLiveingRoomIds == nil || err != nil {
  70. log.Errorw(ctx, "log", fmt.Sprintf("data_allLiveingRoomIds_error:reply=%v;err=%v", allLiveingRoomIds, err))
  71. return
  72. }
  73. eg := errgroup.Group{}
  74. for content := range contentMap {
  75. log.Info("minuteDataToCacheList_start:" + content)
  76. eg.Go(func(contentParam string) func() error {
  77. return func() (err error) {
  78. for _, roomId := range allLiveingRoomIds {
  79. s.dao.CreateDBData(ctx, []int64{int64(roomId)}, contentParam)
  80. }
  81. log.Infow(ctx, "log", fmt.Sprintf("minuteDataToCacheList_end_content=%s;err=%v", contentParam, err))
  82. return
  83. }
  84. }(content))
  85. }
  86. eg.Wait()
  87. log.Infow(ctx, "log", fmt.Sprintf("minuteDataToDB_end"))
  88. return
  89. }