archive.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package data
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "encoding/hex"
  6. "sort"
  7. "strconv"
  8. "time"
  9. "go-common/app/interface/main/creative/model/data"
  10. "go-common/library/ecode"
  11. "go-common/library/log"
  12. "github.com/tsuna/gohbase/hrpc"
  13. "golang.org/x/net/context"
  14. )
  15. func hbaseMd5Key(aid int64) string {
  16. hasher := md5.New()
  17. hasher.Write([]byte(strconv.Itoa(int(aid))))
  18. return hex.EncodeToString(hasher.Sum(nil))
  19. }
  20. // VideoQuitPoints get video quit points.
  21. func (d *Dao) VideoQuitPoints(c context.Context, cid int64) (res []int64, err error) {
  22. var (
  23. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.HBase.ReadTimeout))
  24. tableName = HBaseVideoTablePrefix + time.Now().AddDate(0, 0, -1).Add(-12*time.Hour).Format("20060102") // change table at 12:00am
  25. backupTableName = HBaseVideoTablePrefix + time.Now().AddDate(0, 0, -2).Add(-12*time.Hour).Format("20060102") // change table at 12:00am
  26. key = hbaseMd5Key(cid)
  27. )
  28. defer cancel()
  29. result, err := d.hbase.GetStr(ctx, tableName, key)
  30. if err != nil {
  31. if result, err = d.hbase.GetStr(ctx, backupTableName, key); err != nil {
  32. log.Error("VideoQuitPoints d.hbase.GetStr backupTableName(%s)|cid(%d)|key(%v)|error(%v)", backupTableName, cid, key, err)
  33. err = ecode.CreativeDataErr
  34. return
  35. }
  36. }
  37. if result == nil {
  38. return
  39. }
  40. // get parts and max part for fill res
  41. partMap := make(map[int]int64)
  42. maxPart := 0
  43. for _, c := range result.Cells {
  44. if c != nil {
  45. part, _ := strconv.Atoi(string(c.Qualifier[:]))
  46. per, _ := strconv.ParseInt(string(c.Value[:]), 10, 64)
  47. partMap[part] = per
  48. if part > maxPart {
  49. maxPart = part
  50. }
  51. }
  52. }
  53. var restPercent int64 = 10000 // start from 100%
  54. for i := 1; i <= maxPart; i++ {
  55. if _, ok := partMap[i]; ok {
  56. restPercent = restPercent - partMap[i]
  57. }
  58. res = append(res, restPercent)
  59. }
  60. return
  61. }
  62. // ArchiveStat get the stat of archive.
  63. func (d *Dao) ArchiveStat(c context.Context, aid int64) (stat *data.ArchiveData, err error) {
  64. var (
  65. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.HBase.ReadTimeout))
  66. tableName = HBaseArchiveTablePrefix + time.Now().AddDate(0, 0, -1).Add(-12*time.Hour).Format("20060102") // change table at 12:00am
  67. backupTableName = HBaseArchiveTablePrefix + time.Now().AddDate(0, 0, -2).Add(-12*time.Hour).Format("20060102") // change table at 12:00am
  68. key = hbaseMd5Key(aid)
  69. )
  70. defer cancel()
  71. result, err := d.hbase.GetStr(ctx, tableName, key)
  72. if err != nil {
  73. if result, err = d.hbase.GetStr(ctx, backupTableName, key); err != nil {
  74. log.Error("ArchiveStat d.hbase.GetStr backupTableName(%s)|aid(%d)|key(%v)|error(%v)", backupTableName, aid, key, err)
  75. err = ecode.CreativeDataErr
  76. return
  77. }
  78. }
  79. if result == nil {
  80. return
  81. }
  82. stat = &data.ArchiveData{}
  83. stat.ArchiveSource = &data.ArchiveSource{}
  84. stat.ArchiveGroup = &data.ArchiveGroup{}
  85. stat.ArchiveStat = &data.ArchiveStat{}
  86. for _, c := range result.Cells {
  87. if c == nil {
  88. continue
  89. }
  90. v, _ := strconv.ParseInt(string(c.Value[:]), 10, 64)
  91. if !bytes.Equal(c.Family, HBaseFamilyPlat) {
  92. continue
  93. }
  94. switch {
  95. case bytes.Equal(c.Qualifier, HBaseColumnWebPC):
  96. stat.ArchiveSource.WebPC = v
  97. case bytes.Equal(c.Qualifier, HBaseColumnWebH5):
  98. stat.ArchiveSource.WebH5 = v
  99. case bytes.Equal(c.Qualifier, HBaseColumnOutsite):
  100. stat.ArchiveSource.Outsite = v
  101. case bytes.Equal(c.Qualifier, HBaseColumnIOS):
  102. stat.ArchiveSource.IOS = v
  103. case bytes.Equal(c.Qualifier, HBaseColumnAndroid):
  104. stat.ArchiveSource.Android = v
  105. case bytes.Equal(c.Qualifier, HBaseColumnElse):
  106. stat.ArchiveSource.Others = v
  107. case bytes.Equal(c.Qualifier, HBaseColumnFans):
  108. stat.ArchiveGroup.Fans = v
  109. case bytes.Equal(c.Qualifier, HBaseColumnGuest):
  110. stat.ArchiveGroup.Guest = v
  111. case bytes.Equal(c.Qualifier, HBaseColumnAll):
  112. stat.ArchiveStat.Play = v
  113. case bytes.Equal(c.Qualifier, HBaseColumnCoin):
  114. stat.ArchiveStat.Coin = v
  115. case bytes.Equal(c.Qualifier, HBaseColumnElec):
  116. stat.ArchiveStat.Elec = v
  117. case bytes.Equal(c.Qualifier, HBaseColumnFav):
  118. stat.ArchiveStat.Fav = v
  119. case bytes.Equal(c.Qualifier, HBaseColumnShare):
  120. stat.ArchiveStat.Share = v
  121. //【稿件分析】增加:播放、弹幕、评论、点赞 v:play v:danmu v:reply v:likes
  122. case bytes.Equal(c.Qualifier, []byte("play")):
  123. stat.ArchiveStat.Play = v
  124. case bytes.Equal(c.Qualifier, []byte("danmu")):
  125. stat.ArchiveStat.Dm = v
  126. case bytes.Equal(c.Qualifier, []byte("reply")):
  127. stat.ArchiveStat.Reply = v
  128. case bytes.Equal(c.Qualifier, []byte("likes")):
  129. stat.ArchiveStat.Like = v
  130. }
  131. }
  132. stat.ArchiveSource.Mainsite = stat.ArchiveSource.WebPC + stat.ArchiveSource.WebH5
  133. stat.ArchiveSource.Mobile = stat.ArchiveSource.Android + stat.ArchiveSource.IOS
  134. return
  135. }
  136. // ArchiveArea get the count of area.
  137. func (d *Dao) ArchiveArea(c context.Context, aid int64) (res []*data.ArchiveArea, err error) {
  138. var (
  139. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.HBase.ReadTimeout))
  140. tableName = HBaseAreaTablePrefix + time.Now().AddDate(0, 0, -1).Add(-12*time.Hour).Format("20060102") // change table at 12:00am
  141. backupTableName = HBaseAreaTablePrefix + time.Now().AddDate(0, 0, -2).Add(-12*time.Hour).Format("20060102") // change table at 12:00am
  142. key = hbaseMd5Key(aid)
  143. )
  144. defer cancel()
  145. result, err := d.hbase.GetStr(ctx, tableName, key)
  146. if err != nil {
  147. if result, err = d.hbase.GetStr(ctx, backupTableName, key); err != nil {
  148. log.Error("ArchiveArea d.hbase.GetStr backupTableName(%s)|aid(%d)|key(%v)|error(%v)", backupTableName, aid, key, err)
  149. err = ecode.CreativeDataErr
  150. return
  151. }
  152. }
  153. if result == nil {
  154. return
  155. }
  156. var countArr []int
  157. countMap := make(map[int64][]*data.ArchiveArea)
  158. countSet := make(map[int]struct{}) // empty struct{} for saving memory
  159. for _, c := range result.Cells {
  160. if c != nil {
  161. area := &data.ArchiveArea{}
  162. area.Location = string(c.Qualifier[:])
  163. area.Count, _ = strconv.ParseInt(string(c.Value[:]), 10, 64)
  164. countMap[area.Count] = append(countMap[area.Count], area)
  165. countSet[int(area.Count)] = struct{}{}
  166. }
  167. }
  168. for key := range countSet {
  169. countArr = append(countArr, key)
  170. }
  171. sort.Sort(sort.Reverse(sort.IntSlice(countArr)))
  172. for _, c := range countArr {
  173. if _, ok := countMap[int64(c)]; ok {
  174. res = append(res, countMap[int64(c)]...)
  175. }
  176. if len(res) >= 10 {
  177. res = res[:10] // exact 10 item
  178. break
  179. }
  180. }
  181. return
  182. }
  183. // BaseUpStat get base up stat.
  184. func (d *Dao) BaseUpStat(c context.Context, mid int64, date string) (stat *data.UpBaseStat, err error) {
  185. var (
  186. result *hrpc.Result
  187. ctx, cancel = context.WithTimeout(c, d.hbaseTimeOut)
  188. tableName = HBaseUpStatTablePrefix + date // change table at 12:00am
  189. key = hbaseMd5Key(mid)
  190. )
  191. defer cancel()
  192. if result, err = d.hbase.GetStr(ctx, tableName, key); err != nil {
  193. log.Error("BaseUpStat d.hbase.GetStr tableName(%s)|mid(%d)|key(%v)|error(%v)", tableName, mid, key, err)
  194. err = ecode.CreativeDataErr
  195. return
  196. }
  197. if result == nil {
  198. return
  199. }
  200. stat = &data.UpBaseStat{}
  201. for _, c := range result.Cells {
  202. if c == nil {
  203. continue
  204. }
  205. v, _ := strconv.ParseInt(string(c.Value[:]), 10, 64)
  206. if !bytes.Equal(c.Family, []byte("u")) {
  207. continue
  208. }
  209. switch {
  210. case bytes.Equal(c.Qualifier, []byte("play")):
  211. stat.View = v
  212. case bytes.Equal(c.Qualifier, []byte("dm")):
  213. stat.Dm = v
  214. case bytes.Equal(c.Qualifier, []byte("reply")):
  215. stat.Reply = v
  216. case bytes.Equal(c.Qualifier, []byte("fans")):
  217. stat.Fans = v
  218. case bytes.Equal(c.Qualifier, []byte("fav")):
  219. stat.Fav = v
  220. case bytes.Equal(c.Qualifier, []byte("like")):
  221. stat.Like = v
  222. case bytes.Equal(c.Qualifier, []byte("sh")):
  223. stat.Share = v
  224. case bytes.Equal(c.Qualifier, []byte("elec")): //【视频数据总览】增加:硬币、充电
  225. stat.Elec = v
  226. case bytes.Equal(c.Qualifier, []byte("coin")):
  227. stat.Coin = v
  228. }
  229. }
  230. log.Info("BaseUpStat d.hbase.GetStr tableName(%s)|mid(%d)|key(%v)|stat(%+v)", tableName, mid, key, stat)
  231. return
  232. }
  233. // UpArchiveStatQuery 获取最高播放/评论/弹幕/...数
  234. func (d *Dao) UpArchiveStatQuery(c context.Context, mid int64, date string) (res *data.ArchiveMaxStat, err error) {
  235. var (
  236. result *hrpc.Result
  237. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.HBase.ReadTimeout))
  238. tableName = HBaseupArchiveStatQuery + date // change table at 12:00am
  239. rowkey = hbaseMd5Key(mid)
  240. )
  241. defer cancel()
  242. log.Info("UpArchiveStatQuery aid(%d)|rowkey(%s)", mid, rowkey)
  243. if result, err = d.hbase.GetStr(ctx, tableName, rowkey); err != nil {
  244. log.Error("UpArchiveStatQuery d.hbase.GetStr tableName(%s)|mid(%d)|rowkey(%+v)|error(%v)", tableName, mid, rowkey, err)
  245. err = ecode.CreativeDataErr
  246. return
  247. }
  248. if result == nil || len(result.Cells) == 0 {
  249. log.Warn("UpArchiveStatQuery no data tableName(%s)|mid(%d)|rowkey(%+v)", tableName, mid, rowkey)
  250. return
  251. }
  252. var cells data.ArchiveMaxStat
  253. err = parser.Parse(result.Cells, &cells)
  254. if err != nil {
  255. log.Error("UpArchiveStatQuery parser.Parse tableName(%s)|mid(%d)|rowkey(%+v)|error(%v)", tableName, mid, rowkey, err)
  256. err = ecode.CreativeDataErr
  257. return
  258. }
  259. res = &cells
  260. log.Info("UpArchiveStatQuery mid(%d)|rowkey(%s)|res(%+v)", mid, rowkey, *res)
  261. return
  262. }