history_service.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package service
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/interface/main/history/model"
  6. hisapi "go-common/app/service/main/history/api/grpc"
  7. history "go-common/app/service/main/history/model"
  8. "go-common/library/log"
  9. "go-common/library/net/metadata"
  10. "github.com/pkg/errors"
  11. )
  12. func (s *Service) serviceRun(f func()) {
  13. select {
  14. case s.serviceChan <- f:
  15. default:
  16. log.Error("serviceChan full")
  17. }
  18. }
  19. func (s *Service) serviceproc() {
  20. for {
  21. f := <-s.serviceChan
  22. f()
  23. }
  24. }
  25. func (s *Service) serviceAdd(arg *model.History) {
  26. s.serviceRun(func() {
  27. h := arg.ConvertServiceType()
  28. arg := &hisapi.AddHistoryReq{
  29. Mid: h.Mid,
  30. Business: h.Business,
  31. Kid: h.Kid,
  32. Aid: h.Aid,
  33. Sid: h.Sid,
  34. Epid: h.Epid,
  35. Cid: h.Cid,
  36. SubType: h.SubType,
  37. Device: h.Device,
  38. Progress: h.Progress,
  39. ViewAt: h.ViewAt,
  40. }
  41. s.hisRPC.AddHistory(context.Background(), arg)
  42. })
  43. }
  44. func (s *Service) serviceAdds(mid int64, hs []*model.History) {
  45. s.serviceRun(func() {
  46. arg := &hisapi.AddHistoriesReq{}
  47. for _, a := range hs {
  48. h := a.ConvertServiceType()
  49. arg.Histories = append(arg.Histories, &hisapi.AddHistoryReq{
  50. Mid: mid,
  51. Business: h.Business,
  52. Kid: h.Kid,
  53. Aid: h.Aid,
  54. Sid: h.Sid,
  55. Epid: h.Epid,
  56. Cid: h.Cid,
  57. SubType: h.SubType,
  58. Device: h.Device,
  59. Progress: h.Progress,
  60. ViewAt: h.ViewAt,
  61. })
  62. }
  63. s.hisRPC.AddHistories(context.Background(), arg)
  64. })
  65. }
  66. func (s *Service) serviceDel(ctx context.Context, mid int64, his []*model.History) error {
  67. arg := &hisapi.DelHistoriesReq{
  68. Mid: mid,
  69. Records: []*hisapi.DelHistoriesReq_Record{},
  70. }
  71. var aids []int64
  72. for _, v := range his {
  73. if v.TP == model.TypePGC || v.TP == model.TypeUGC {
  74. aids = append(aids, v.Aid)
  75. }
  76. }
  77. seasonMap := make(map[int64]*model.BangumiSeason)
  78. if len(aids) > 0 {
  79. seasonMap, _ = s.season(ctx, mid, aids, metadata.String(ctx, metadata.RemoteIP))
  80. }
  81. for _, h := range his {
  82. if value, ok := seasonMap[h.Aid]; ok && value != nil {
  83. arg.Records = append(arg.Records, &hisapi.DelHistoriesReq_Record{
  84. Business: model.BusinessByTP(model.TypePGC),
  85. ID: value.ID,
  86. })
  87. log.Warn("seasonMap(%d,%v)season:%d", mid, h.Aid, value.ID)
  88. }
  89. arg.Records = append(arg.Records, &hisapi.DelHistoriesReq_Record{
  90. Business: model.BusinessByTP(h.TP),
  91. ID: h.Aid,
  92. })
  93. }
  94. if _, err := s.hisRPC.DelHistories(ctx, arg); err != nil {
  95. log.Error("s.hisRPC.DelHistories(%+v %+v) err:%+v", his, arg, errors.WithStack(err))
  96. return err
  97. }
  98. return nil
  99. }
  100. func (s *Service) serviceClear(mid int64, tps []int8) {
  101. s.serviceRun(func() {
  102. arg := &hisapi.ClearHistoryReq{
  103. Mid: mid,
  104. }
  105. for _, t := range tps {
  106. arg.Businesses = append(arg.Businesses, model.BusinessByTP(t))
  107. }
  108. s.hisRPC.ClearHistory(context.Background(), arg)
  109. })
  110. }
  111. func (s *Service) serviceDels(ctx context.Context, mid int64, aids []int64, typ int8) error {
  112. arg := &hisapi.DelHistoriesReq{
  113. Mid: mid,
  114. Records: []*hisapi.DelHistoriesReq_Record{},
  115. }
  116. seasonMap := make(map[int64]*model.BangumiSeason)
  117. if typ == 0 {
  118. seasonMap, _ = s.season(ctx, mid, aids, metadata.String(ctx, metadata.RemoteIP))
  119. }
  120. b := model.BusinessByTP(typ)
  121. for _, aid := range aids {
  122. if value, ok := seasonMap[aid]; ok && value != nil {
  123. arg.Records = append(arg.Records, &hisapi.DelHistoriesReq_Record{
  124. Business: model.BusinessByTP(model.TypePGC),
  125. ID: value.ID,
  126. })
  127. log.Warn("seasonMap(%d,%v)season:%d", mid, aid, value.ID)
  128. }
  129. arg.Records = append(arg.Records, &hisapi.DelHistoriesReq_Record{
  130. Business: b,
  131. ID: aid,
  132. })
  133. }
  134. if _, err := s.hisRPC.DelHistories(ctx, arg); err != nil {
  135. log.Error("s.hisRPC.DelHistories(%v %+v) err:%+v", mid, arg, errors.WithStack(err))
  136. return err
  137. }
  138. return nil
  139. }
  140. func (s *Service) serviceHide(mid int64, hide bool) {
  141. s.serviceRun(func() {
  142. arg := &hisapi.UpdateUserHideReq{
  143. Mid: mid,
  144. Hide: hide,
  145. }
  146. s.hisRPC.UpdateUserHide(context.Background(), arg)
  147. })
  148. }
  149. func (s *Service) serviceHistoryCursor(c context.Context, mid int64, kid int64, businesses []string, business string, viewAt int64, ps int) ([]*model.Resource, error) {
  150. if viewAt == 0 {
  151. viewAt = time.Now().Unix()
  152. }
  153. arg := &hisapi.UserHistoriesReq{
  154. Mid: mid,
  155. Businesses: businesses,
  156. Business: business,
  157. Kid: kid,
  158. ViewAt: viewAt,
  159. Ps: int64(ps),
  160. }
  161. reply, err := s.hisRPC.UserHistories(c, arg)
  162. if err != nil {
  163. log.Error("s.hisRPC.UserHistories(%+v) err:%+v", arg, err)
  164. return nil, err
  165. }
  166. if reply == nil {
  167. return nil, err
  168. }
  169. his := make([]*model.Resource, 0)
  170. for _, v := range reply.Histories {
  171. tp, _ := model.CheckBusiness(v.Business)
  172. his = append(his, &model.Resource{
  173. Mid: v.Mid,
  174. Oid: v.Aid,
  175. Sid: v.Sid,
  176. Epid: v.Epid,
  177. TP: tp,
  178. Business: v.Business,
  179. STP: int8(v.SubType),
  180. Cid: v.Cid,
  181. DT: int8(v.Device),
  182. Pro: int64(v.Progress),
  183. Unix: v.ViewAt,
  184. })
  185. }
  186. return his, nil
  187. }
  188. func (s *Service) servicePnPsCursor(c context.Context, mid int64, businesses []string, pn, ps int) ([]*model.History, []int64, error) {
  189. if pn*ps > 1000 {
  190. return nil, nil, nil
  191. }
  192. arg := &hisapi.UserHistoriesReq{
  193. Mid: mid,
  194. Businesses: businesses,
  195. Ps: int64(pn * ps),
  196. ViewAt: time.Now().Unix(),
  197. }
  198. reply, err := s.hisRPC.UserHistories(c, arg)
  199. if err != nil {
  200. log.Error("s.hisRPC.UserHistories(%+v) err:%+v", arg, err)
  201. return nil, nil, err
  202. }
  203. if reply == nil {
  204. return nil, nil, err
  205. }
  206. size := len(reply.Histories)
  207. start := (pn - 1) * ps
  208. end := start + ps - 1
  209. switch {
  210. case size > start && size > end:
  211. reply.Histories = reply.Histories[start : end+1]
  212. case size > start && size <= end:
  213. reply.Histories = reply.Histories[start:]
  214. default:
  215. reply.Histories = make([]*history.History, 0)
  216. }
  217. var epids []int64
  218. his := make([]*model.History, 0)
  219. for _, v := range reply.Histories {
  220. tp, _ := model.CheckBusiness(v.Business)
  221. if tp == model.TypePGC {
  222. epids = append(epids, v.Epid)
  223. }
  224. his = append(his, &model.History{
  225. Mid: v.Mid,
  226. Aid: v.Aid,
  227. Sid: v.Sid,
  228. Epid: v.Epid,
  229. TP: tp,
  230. Business: v.Business,
  231. STP: int8(v.SubType),
  232. Cid: v.Cid,
  233. DT: int8(v.Device),
  234. Pro: int64(v.Progress),
  235. Unix: v.ViewAt,
  236. })
  237. }
  238. return his, epids, nil
  239. }
  240. func (s *Service) servicePosition(c context.Context, mid int64, business string, kids []int64) (map[int64]*model.History, error) {
  241. arg := &hisapi.HistoriesReq{
  242. Mid: mid,
  243. Business: business,
  244. Kids: kids,
  245. }
  246. reply, err := s.hisRPC.Histories(c, arg)
  247. if err != nil {
  248. log.Error("s.hisRPC.Histories(%+v) err:%+v", arg, err)
  249. return nil, err
  250. }
  251. if reply == nil {
  252. return nil, err
  253. }
  254. now := time.Now().Unix() - 8*60*60
  255. his := make(map[int64]*model.History)
  256. for _, v := range reply.Histories {
  257. if business == model.BusinessByTP(model.TypeUGC) && v.ViewAt < now {
  258. continue
  259. }
  260. tp, _ := model.CheckBusiness(v.Business)
  261. his[v.Aid] = &model.History{
  262. Mid: v.Mid,
  263. Aid: v.Aid,
  264. Sid: v.Sid,
  265. Epid: v.Epid,
  266. TP: tp,
  267. Business: v.Business,
  268. STP: int8(v.SubType),
  269. Cid: v.Cid,
  270. DT: int8(v.Device),
  271. Pro: int64(v.Progress),
  272. Unix: v.ViewAt,
  273. }
  274. }
  275. return his, nil
  276. }
  277. func (s *Service) serviceHistoryType(c context.Context, mid int64, business string, kids []int64) ([]*model.History, error) {
  278. arg := &hisapi.HistoriesReq{
  279. Mid: mid,
  280. Business: business,
  281. Kids: kids,
  282. }
  283. reply, err := s.hisRPC.Histories(c, arg)
  284. if err != nil {
  285. log.Error("s.hisRPC.Histories(%+v) err:%+v", arg, err)
  286. return nil, err
  287. }
  288. if reply == nil {
  289. return nil, err
  290. }
  291. his := make([]*model.History, 0)
  292. for _, v := range reply.Histories {
  293. tp, _ := model.CheckBusiness(v.Business)
  294. his = append(his, &model.History{
  295. Mid: v.Mid,
  296. Aid: v.Aid,
  297. Sid: v.Sid,
  298. Epid: v.Epid,
  299. TP: tp,
  300. Business: v.Business,
  301. STP: int8(v.SubType),
  302. Cid: v.Cid,
  303. DT: int8(v.Device),
  304. Pro: int64(v.Progress),
  305. Unix: v.ViewAt,
  306. })
  307. }
  308. return his, nil
  309. }
  310. func (s *Service) serviceHideState(c context.Context, mid int64) (int64, error) {
  311. arg := &hisapi.UserHideReq{
  312. Mid: mid,
  313. }
  314. reply, err := s.hisRPC.UserHide(c, arg)
  315. if err != nil {
  316. log.Error("s.hisRPC.UserHide(%d) err:%+v", mid, err)
  317. return 0, err
  318. }
  319. if !reply.Hide {
  320. return 0, nil
  321. }
  322. return 1, nil
  323. }