service.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package migrate
  2. import (
  3. "context"
  4. defaultsql "database/sql"
  5. "fmt"
  6. "go-common/app/job/live/push-search/conf"
  7. "go-common/app/job/live/push-search/dao/migrate"
  8. "go-common/app/job/live/push-search/model"
  9. "go-common/library/database/sql"
  10. "strconv"
  11. "sync"
  12. "time"
  13. )
  14. const _sql = "select roomid, short_id,uid,uname,area,title,tags, mtime,a.ctime,try_time,user_cover,a.lock_status,hidden_status,attentions,live_time,area_v2_id,area_v2_parent_id,b.name as area_v2_name, virtual,round_status,on_flag,online, cover from ap_room a left join ap_room_area_v2 b on a.area_v2_id=b.id where roomid > %d order by roomid asc limit 100 "
  15. const hbaseTable = "live:PushSearch"
  16. const hbaseFamily = "search"
  17. type message struct {
  18. rowKey string
  19. values map[string]map[string][]byte
  20. }
  21. type MService struct {
  22. c *conf.Config
  23. dao *migrate.Dao
  24. hChan []chan *message
  25. waiterChan *sync.WaitGroup
  26. mainWaiter *sync.WaitGroup
  27. }
  28. func NewMigrateS(c *conf.Config) (s *MService) {
  29. s = &MService{
  30. c: c,
  31. dao: migrate.NewMigrate(c),
  32. hChan: make([]chan *message, c.MigrateNum),
  33. waiterChan: new(sync.WaitGroup),
  34. mainWaiter: new(sync.WaitGroup),
  35. }
  36. //ap room 表 binlog qps 高, hash roomId 并行
  37. for i := 0; i < c.MigrateNum; i++ {
  38. ch := make(chan *message, 1024)
  39. s.hChan[i] = ch
  40. go s.handle(ch)
  41. }
  42. return s
  43. }
  44. func (ms *MService) Migrate (roomid string, isTest string) {
  45. id, err := strconv.Atoi(roomid)
  46. if err != nil {
  47. fmt.Println("roomid error")
  48. }
  49. ms.mainWaiter.Add(1)
  50. defer ms.mainWaiter.Done()
  51. var rows *sql.Rows
  52. online := &defaultsql.NullInt64{}
  53. cover := &defaultsql.NullString{}
  54. areaV2Name := &defaultsql.NullString{}
  55. for {
  56. rows, err := ms.dao.RoomDb.Query(context.TODO(), fmt.Sprintf(_sql, id))
  57. if err != nil {
  58. fmt.Println("query error:%+v", err)
  59. return
  60. }
  61. for rows.Next() {
  62. r := new(model.TableField)
  63. if err = rows.Scan(&r.RoomId, &r.ShortId, &r.Uid, &r.UName, &r.Area, &r.Title, &r.Tag, &r.MTime, &r.CTime, &r.TryTime, &r.UserCover, &r.LockStatus, &r.HiddenStatus, &r.Attentions, &r.LiveTime, &r.AreaV2Id, &r.AreaV2ParentId, areaV2Name, &r.Virtual, &r.RoundStatus, &r.OnFlag, online, cover); err != nil {
  64. if !online.Valid {
  65. r.Online = 0
  66. }
  67. if !cover.Valid {
  68. r.Cover = ""
  69. }
  70. }
  71. r.AreaV2Name = areaV2Name.String
  72. r.Online = int(online.Int64)
  73. r.Cover = cover.String
  74. zijie := ms.generateSearchInfo(r)
  75. if r.LiveTime != "0000-00-00 00:00:00" {
  76. fmt.Println(r.RoomId, "jump live room")
  77. continue
  78. }
  79. values := map[string]map[string][]byte{hbaseFamily: zijie}
  80. rowKey := ms.rowKey(r.RoomId)
  81. m := &message{
  82. rowKey: rowKey,
  83. values: values,
  84. }
  85. ms.hChan[r.RoomId % ms.c.MigrateNum] <- m
  86. fmt.Println(r.RoomId)
  87. if isTest == "1" {
  88. return
  89. }
  90. id = r.RoomId
  91. }
  92. }
  93. rows.Close()
  94. }
  95. func (ms *MService) handle(c chan *message) {
  96. ms.waiterChan.Add(1)
  97. defer ms.waiterChan.Done()
  98. for {
  99. msgData, ok := <-c
  100. if !ok {
  101. fmt.Println("close chan")
  102. return
  103. }
  104. ms.dao.SearchHBase.PutStr(context.TODO(), hbaseTable, msgData.rowKey, msgData.values)
  105. }
  106. }
  107. func (ms *MService) Close() {
  108. ms.dao.Close()
  109. ms.mainWaiter.Wait()
  110. for _, ch := range ms.hChan {
  111. close(ch)
  112. }
  113. ms.waiterChan.Wait()
  114. ms.dao.SearchHBase.Close()
  115. }
  116. func (ms *MService) rowKey(roomId int) string{
  117. key := fmt.Sprintf("%d_%d", roomId % 10, roomId)
  118. return key
  119. }
  120. func (ms *MService) generateSearchInfo(new *model.TableField) (retByte map[string][]byte){
  121. newByteMap := make(map[string][]byte)
  122. newByteMap["id"] = []byte(strconv.Itoa(new.RoomId))
  123. newByteMap["short_id"] = []byte(strconv.Itoa(new.ShortId))
  124. newByteMap["uid"] = []byte(strconv.FormatInt(new.Uid, 10))
  125. newByteMap["uname"] = []byte(new.UName)
  126. newByteMap["category"] = []byte(strconv.Itoa(new.Area))
  127. newByteMap["title"] = []byte(new.Title)
  128. newByteMap["tag"] = []byte(new.Tag)
  129. tryTime, _ := time.ParseInLocation("2006-01-02T15:04:05+08:00", new.TryTime, time.Local)
  130. tryTimeStr := tryTime.Format("2006-01-02 15:04:05")
  131. if tryTimeStr == "0001-01-01 00:00:00" {
  132. tryTimeStr = "0000-00-00 00:00:00"
  133. new.TryTime = tryTimeStr
  134. }
  135. newByteMap["try_time"] = []byte(tryTimeStr)
  136. newByteMap["cover"] = []byte(new.Cover)
  137. newByteMap["user_cover"] = []byte(new.UserCover)
  138. lockStatus, _ := time.ParseInLocation("2006-01-02T15:04:05+08:00", new.LockStatus, time.Local)
  139. lockStatusStr := lockStatus.Format("2006-01-02 15:04:05")
  140. if lockStatusStr == "0001-01-01 00:00:00" {
  141. lockStatusStr = "0000-00-00 00:00:00"
  142. new.LockStatus = lockStatusStr
  143. }
  144. newByteMap["lock_status"] = []byte(strconv.Itoa(ms.getLockStatus(lockStatusStr)))
  145. hiddenStatus, _ := time.ParseInLocation("2006-01-02T15:04:05+08:00", new.HiddenStatus, time.Local)
  146. hiddenStatusStr := hiddenStatus.Format("2006-01-02 15:04:05")
  147. if hiddenStatusStr == "0001-01-01 00:00:00" {
  148. hiddenStatusStr = "0000-00-00 00:00:00"
  149. new.HiddenStatus = hiddenStatusStr
  150. }
  151. newByteMap["hidden_status"] = []byte(strconv.Itoa(ms.getHiddenStatus(hiddenStatusStr)))
  152. newByteMap["attentions"] = []byte(strconv.Itoa(new.Attentions))
  153. newByteMap["attention"] = []byte(strconv.Itoa(new.Attentions))
  154. newByteMap["online"] = []byte(strconv.Itoa(new.Online))
  155. liveTime, _ := time.ParseInLocation("2006-01-02T15:04:05+08:00", new.LiveTime, time.Local)
  156. liveTimeStr := liveTime.Format("2006-01-02 15:04:05")
  157. if liveTimeStr == "0001-01-01 00:00:00" {
  158. liveTimeStr = "0000-00-00 00:00:00"
  159. new.LiveTime = liveTimeStr
  160. }
  161. newByteMap["live_time"] = []byte(liveTimeStr)
  162. newByteMap["area_v2_id"] = []byte(strconv.Itoa(new.AreaV2Id))
  163. newByteMap["ord"] = []byte(strconv.Itoa(new.AreaV2ParentId))
  164. newByteMap["arcrank"] = []byte(strconv.Itoa(new.Virtual))
  165. cTime, _ := time.ParseInLocation("2006-01-02T15:04:05+08:00", new.CTime, time.Local)
  166. cTimeStr := cTime.Format("2006-01-02 15:04:05")
  167. if cTimeStr == "0001-01-01 00:00:00" {
  168. new.CTime = "0000-00-00 00:00:00"
  169. }else{
  170. new.CTime = cTimeStr
  171. }
  172. mTime, _ := time.ParseInLocation("2006-01-02T15:04:05+08:00", new.MTime, time.Local)
  173. mTimeStr := mTime.Format("2006-01-02 15:04:05")
  174. if mTimeStr == "0001-01-01 00:00:00" {
  175. new.MTime = "0000-00-00 00:00:00"
  176. }else{
  177. new.MTime = mTimeStr
  178. }
  179. newByteMap["lastupdate"] = []byte(ms.getLastUpdate(new))
  180. newByteMap["is_live"] = []byte(strconv.Itoa(ms.getLiveStatus(new)))
  181. newByteMap["s_category"] = []byte(new.AreaV2Name)
  182. return newByteMap
  183. }
  184. //获取直播状态
  185. func (ms *MService) getLiveStatus(roomInfo *model.TableField) int{
  186. if roomInfo.LiveTime != "0000-00-00 00:00:00" {
  187. return 1
  188. }
  189. if roomInfo.RoundStatus == 1 && roomInfo.OnFlag == 1{
  190. return 2
  191. }
  192. return 0
  193. }
  194. //获取房间最后更新时间
  195. func (ms *MService) getLastUpdate(roomInfo *model.TableField) string{
  196. if roomInfo.MTime != "0000-00-00 00:00:00" {
  197. return roomInfo.MTime
  198. }
  199. return roomInfo.CTime
  200. }
  201. func (ms *MService) getLockStatus(lockStatus string) int{
  202. status := 0
  203. if lockStatus != "0000-00-00 00:00:00" {
  204. status = 1
  205. }
  206. return status
  207. }
  208. func (ms *MService) getHiddenStatus(HiddenStatus string) int{
  209. status := 0
  210. if HiddenStatus != "0000-00-00 00:00:00" {
  211. status = 1
  212. }
  213. return status
  214. }