dao.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package dao
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/common/openplatform/encoding"
  6. acc "go-common/app/service/main/account/api"
  7. itemv1 "go-common/app/service/openplatform/ticket-item/api/grpc/v1"
  8. "go-common/app/service/openplatform/ticket-sales/conf"
  9. "go-common/app/service/openplatform/ticket-sales/model"
  10. "go-common/library/cache"
  11. "go-common/library/cache/redis"
  12. "go-common/library/database/sql"
  13. "go-common/library/log"
  14. "go-common/library/naming/discovery"
  15. bm "go-common/library/net/http/blademaster"
  16. "go-common/library/net/rpc/warden"
  17. "go-common/library/net/rpc/warden/resolver"
  18. "go-common/library/queue/databus"
  19. )
  20. //Dao 数据操作层结构体
  21. type Dao struct {
  22. c *conf.Config
  23. db *sql.DB
  24. redis *redis.Pool
  25. expire int32
  26. cache *cache.Cache
  27. httpClientR *bm.Client
  28. itemClient itemv1.ItemClient
  29. accClient acc.AccountClient
  30. databus *databus.Databus
  31. }
  32. //go:generate $GOPATH/src/go-common/app/tool/cache/gen
  33. type _cache interface {
  34. // cache: -nullcache=&model.Promotion{PromoID:-1} -check_null_code=$!=nil&&$.PromoID==-1
  35. Promo(c context.Context, promoID int64) (*model.Promotion, error)
  36. // cache: -nullcache=&model.PromotionGroup{GroupID:-1} -check_null_code=$!=nil&&$.GroupID==-1
  37. PromoGroup(c context.Context, groupID int64) (*model.PromotionGroup, error)
  38. // cache: -nullcache=&model.PromotionOrder{OrderID:-1} -check_null_code=$!=nil&&$.OrderID==-1
  39. PromoOrder(c context.Context, orderID int64) (*model.PromotionOrder, error)
  40. // cache: -nullcache=[]*model.PromotionOrder{{GroupID:-1}} -check_null_code=len($)==1&&$[0].GroupID==-1
  41. PromoOrders(c context.Context, groupID int64) ([]*model.PromotionOrder, error)
  42. // cache: -nullcache=[]*model.OrderMain{{OrderID:-1}} -check_null_code=len($)==1&&$[0].OrderID==-1
  43. Orders(context.Context, *model.OrderMainQuerier) ([]*model.OrderMain, error)
  44. // cache: -nullcache=-1 -check_null_code=$==-1
  45. OrderCount(context.Context, *model.OrderMainQuerier) (int64, error)
  46. // cache: -nullcache=&model.OrderDetail{OrderID:0} -check_null_code=$!=nil&&$.OrderID==0
  47. OrderDetails(context.Context, []int64) (map[int64]*model.OrderDetail, error)
  48. // cache: -nullcache=[]*model.OrderSKU{{OrderID:-1}} -check_null_code=len($)==1&&$[0].OrderID==-1
  49. OrderSKUs(context.Context, []int64) (map[int64][]*model.OrderSKU, error)
  50. // cache: -nullcache=&model.OrderPayCharge{ChargeID:""} -check_null_code=$!=nil&&$.ChargeID==""
  51. OrderPayCharges(context.Context, []int64) (map[int64]*model.OrderPayCharge, error)
  52. // cache:
  53. SkuByItemID(c context.Context, itemID int64) (map[string]*model.SKUStock, error)
  54. // cache:
  55. GetSKUs(c context.Context, skuIds []int64, withNewStock bool) (map[int64]*model.SKUStock, error)
  56. // cache:
  57. Stocks(c context.Context, keys []int64, isLocked bool) (res map[int64]int64, err error)
  58. // cache: -nullcache=[]*model.Ticket{{ID:-1}} -check_null_code=len($)==1&&$[0].ID==-1
  59. TicketsByOrderID(c context.Context, orderID int64) (res []*model.Ticket, err error)
  60. // cache: -nullcache=[]*model.Ticket{{ID:-1}} -check_null_code=len($)==1&&$[0].ID==-1
  61. TicketsByScreen(c context.Context, screenID int64, UID int64) (res []*model.Ticket, err error)
  62. // cache: -nullcache=&model.Ticket{ID:-1} -check_null_code=$!=nil&&$.ID==-1
  63. TicketsByID(c context.Context, id []int64) (res map[int64]*model.Ticket, err error)
  64. // cache: -nullcache=&model.TicketSend{ID:-1} -check_null_code=$!=nil&&$.ID==-1
  65. TicketSend(c context.Context, SendTID []int64, TIDType string) (res map[int64]*model.TicketSend, err error)
  66. }
  67. // FIXME this just a example
  68. func newItemClient(cfg *warden.ClientConfig) itemv1.ItemClient {
  69. cc, err := warden.NewClient(cfg).Dial(context.Background(), "discovery://default/ticket.service.item")
  70. if err != nil {
  71. panic(err)
  72. }
  73. return itemv1.NewItemClient(cc)
  74. }
  75. func newAccClient(cfg *warden.ClientConfig) acc.AccountClient {
  76. cc, err := warden.NewClient(cfg).Dial(context.Background(), "discovery://default/account.service")
  77. if err != nil {
  78. panic(err)
  79. }
  80. return acc.NewAccountClient(cc)
  81. }
  82. //New 根据配置文件 生成一个 Dao struct
  83. func New(c *conf.Config) (d *Dao) {
  84. resolver.Register(discovery.New(nil))
  85. d = &Dao{
  86. c: c,
  87. db: sql.NewMySQL(c.DB.Master),
  88. redis: redis.NewPool(c.Redis.Master),
  89. cache: cache.New(1, 1024),
  90. expire: int32(time.Duration(c.Redis.Expire) / time.Second),
  91. httpClientR: bm.NewClient(c.HTTPClient.Read),
  92. itemClient: newItemClient(c.GRPCClient["item"]),
  93. accClient: newAccClient(c.GRPCClient["account"]),
  94. databus: databus.New(c.Databus["update"]),
  95. }
  96. return
  97. }
  98. // Ping ping 方法
  99. func (d *Dao) Ping(c context.Context) (err error) {
  100. conn := d.redis.Get(c)
  101. defer conn.Close()
  102. _, err = conn.Do("PING")
  103. if err != nil {
  104. return
  105. }
  106. err = d.db.Ping(c)
  107. return
  108. }
  109. //Close 关闭redis 和 db 连接
  110. func (d *Dao) Close() (err error) {
  111. d.redis.Close()
  112. d.db.Close()
  113. return
  114. }
  115. //BeginTx 开启事务
  116. func (d *Dao) BeginTx(c context.Context) (conn *sql.Tx, err error) {
  117. return d.db.Begin(c)
  118. }
  119. //LogX 记录日志,args:请求参数 res:正常返回 err:错误返回 ld:更多日志项
  120. func LogX(ctx context.Context, args interface{}, res interface{}, err error, ld ...log.D) {
  121. l := len(ld)
  122. u := 0
  123. ld1 := make([]log.D, l)
  124. for i := 0; i < l; i++ {
  125. if ld[i].Key != "" {
  126. ld1[u] = ld[i]
  127. u++
  128. }
  129. }
  130. ld1 = ld1[:u]
  131. if args != nil {
  132. ld1 = append(ld1, log.KV("args", encoding.JSON(args)))
  133. }
  134. if err != nil {
  135. ld1 = append(ld1, log.KV("log", err.Error()))
  136. log.Errorv(ctx, ld1...)
  137. } else if log.V(3) {
  138. if res == nil {
  139. log.Infov(ctx, ld1...)
  140. return
  141. }
  142. ld1 = append(ld1, log.KV("log", encoding.JSON(res)))
  143. log.Infov(ctx, ld1...)
  144. }
  145. }
  146. //DatabusPub 向databus发布消息
  147. func (d *Dao) DatabusPub(ctx context.Context, action string, data interface{}) error {
  148. type input struct {
  149. Action string `json:"action"`
  150. Data interface{} `json:"data"`
  151. }
  152. err := d.databus.Send(ctx, action, &input{action, data})
  153. if err != nil {
  154. log.Error("pub databus failed action:%s, data:%+v", action, data)
  155. }
  156. return err
  157. }