server.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. // Package grpc generate by warden_gen
  2. package grpc
  3. import (
  4. "context"
  5. pb "go-common/app/interface/main/history/api/grpc"
  6. "go-common/app/interface/main/history/model"
  7. service "go-common/app/interface/main/history/service"
  8. "go-common/library/net/rpc/warden"
  9. )
  10. // New History warden rpc server
  11. func New(c *warden.ServerConfig, svr *service.Service) *warden.Server {
  12. ws := warden.NewServer(c)
  13. pb.RegisterHistoryServer(ws.Server(), &server{svr})
  14. ws, err := ws.Start()
  15. if err != nil {
  16. panic(err)
  17. }
  18. return ws
  19. }
  20. type server struct {
  21. svr *service.Service
  22. }
  23. var _ pb.HistoryServer = &server{}
  24. // AddHistory add hisotry progress into hbase.
  25. func (s *server) AddHistory(ctx context.Context, req *pb.AddHistoryReq) (*pb.AddHistoryReply, error) {
  26. tp, err := model.MustCheckBusiness(req.H.Business)
  27. if err != nil {
  28. return nil, err
  29. }
  30. h := &model.History{
  31. Mid: req.H.Mid,
  32. Aid: req.H.Aid,
  33. Sid: req.H.Sid,
  34. Epid: req.H.Epid,
  35. TP: tp,
  36. Business: req.H.Business,
  37. STP: int8(req.H.Stp),
  38. Cid: req.H.Cid,
  39. DT: int8(req.H.Dt),
  40. Pro: req.H.Pro,
  41. Unix: req.H.Unix,
  42. }
  43. return nil, s.svr.AddHistory(ctx, req.Mid, req.Rtime, h)
  44. }
  45. // Progress get view progress from cache/hbase.
  46. func (s *server) Progress(ctx context.Context, req *pb.ProgressReq) (*pb.ProgressReply, error) {
  47. histories, err := s.svr.Progress(ctx, req.Mid, req.Aids)
  48. if err != nil {
  49. return nil, err
  50. }
  51. reply := &pb.ProgressReply{Res: make(map[int64]*pb.ModelHistory)}
  52. for k, v := range histories {
  53. reply.Res[k] = &pb.ModelHistory{
  54. Mid: v.Mid,
  55. Aid: v.Aid,
  56. Sid: v.Sid,
  57. Epid: v.Epid,
  58. Business: v.Business,
  59. Stp: int32(v.STP),
  60. Cid: v.Cid,
  61. Dt: int32(v.DT),
  62. Pro: v.Pro,
  63. Unix: v.Unix,
  64. }
  65. }
  66. return reply, nil
  67. }
  68. // Position get view progress from cache/hbase.
  69. func (s *server) Position(ctx context.Context, req *pb.PositionReq) (*pb.PositionReply, error) {
  70. tp, err := model.MustCheckBusiness(req.Business)
  71. if err != nil {
  72. return nil, err
  73. }
  74. h, err := s.svr.Position(ctx, req.Mid, req.Aid, tp)
  75. if err != nil {
  76. return nil, err
  77. }
  78. reply := &pb.PositionReply{
  79. Res: &pb.ModelHistory{
  80. Mid: h.Mid,
  81. Aid: h.Aid,
  82. Sid: h.Sid,
  83. Epid: h.Epid,
  84. Business: h.Business,
  85. Stp: int32(h.STP),
  86. Cid: h.Cid,
  87. Dt: int32(h.DT),
  88. Pro: h.Pro,
  89. Unix: h.Unix,
  90. },
  91. }
  92. return reply, err
  93. }
  94. // ClearHistory clear user's historys.
  95. func (s *server) ClearHistory(ctx context.Context, req *pb.ClearHistoryReq) (*pb.ClearHistoryReply, error) {
  96. var tps []int8
  97. for _, b := range req.Businesses {
  98. tp, err := model.MustCheckBusiness(b)
  99. if err != nil {
  100. return nil, err
  101. }
  102. tps = append(tps, tp)
  103. }
  104. return nil, s.svr.ClearHistory(ctx, req.Mid, tps)
  105. }
  106. // Histories return the user all av history.
  107. func (s *server) Histories(ctx context.Context, req *pb.HistoriesReq) (*pb.HistoriesReply, error) {
  108. tp, err := model.MustCheckBusiness(req.Business)
  109. if err != nil {
  110. return nil, err
  111. }
  112. resources, err := s.svr.Histories(ctx, req.Mid, tp, int(req.Pn), int(req.Ps))
  113. if err != nil {
  114. return nil, err
  115. }
  116. reply := &pb.HistoriesReply{}
  117. for _, v := range resources {
  118. reply.Res = append(reply.Res,
  119. &pb.ModelResource{
  120. Mid: v.Mid,
  121. Oid: v.Oid,
  122. Sid: v.Sid,
  123. Epid: v.Epid,
  124. Business: v.Business,
  125. Stp: int32(v.STP),
  126. Cid: v.Cid,
  127. Dt: int32(v.DT),
  128. Pro: v.Pro,
  129. Unix: v.Unix,
  130. })
  131. }
  132. return reply, nil
  133. }
  134. // HistoryCursor return the user all av history.
  135. func (s *server) HistoryCursor(ctx context.Context, req *pb.HistoryCursorReq) (*pb.HistoryCursorReply, error) {
  136. tp, err := model.MustCheckBusiness(req.Business)
  137. if err != nil {
  138. return nil, err
  139. }
  140. var tps []int8
  141. for _, b := range req.Businesses {
  142. t, er := model.MustCheckBusiness(b)
  143. if er != nil {
  144. return nil, er
  145. }
  146. tps = append(tps, t)
  147. }
  148. resources, err := s.svr.HistoryCursor(ctx, req.Mid, req.Max, req.ViewAt, int(req.Ps), tp, tps, req.Ip)
  149. if err != nil {
  150. return nil, err
  151. }
  152. reply := &pb.HistoryCursorReply{}
  153. for _, v := range resources {
  154. reply.Res = append(reply.Res,
  155. &pb.ModelResource{
  156. Mid: v.Mid,
  157. Oid: v.Oid,
  158. Sid: v.Sid,
  159. Epid: v.Epid,
  160. Business: v.Business,
  161. Stp: int32(v.STP),
  162. Cid: v.Cid,
  163. Dt: int32(v.DT),
  164. Pro: v.Pro,
  165. Unix: v.Unix,
  166. })
  167. }
  168. return reply, nil
  169. }
  170. // Delete .
  171. func (s *server) Delete(ctx context.Context, req *pb.DeleteReq) (*pb.DeleteReply, error) {
  172. var his []*model.History
  173. for _, b := range req.His {
  174. tp, err := model.MustCheckBusiness(b.Business)
  175. if err != nil {
  176. return nil, err
  177. }
  178. h := &model.History{
  179. Mid: b.Mid,
  180. Aid: b.Aid,
  181. Sid: b.Sid,
  182. Epid: b.Epid,
  183. TP: tp,
  184. Business: b.Business,
  185. STP: int8(b.Stp),
  186. Cid: b.Cid,
  187. DT: int8(b.Dt),
  188. Pro: b.Pro,
  189. Unix: b.Unix,
  190. }
  191. his = append(his, h)
  192. }
  193. return nil, s.svr.Delete(ctx, req.Mid, his)
  194. }
  195. // FlushHistory flush to hbase from cache.
  196. func (s *server) FlushHistory(ctx context.Context, req *pb.FlushHistoryReq) (*pb.FlushHistoryReply, error) {
  197. return nil, s.svr.FlushHistory(ctx, req.Mids, req.Stime)
  198. }