dao.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/service/bbq/video/api/grpc/v1"
  7. "go-common/app/service/bbq/video/conf"
  8. "go-common/app/service/bbq/video/model/grpc"
  9. account "go-common/app/service/main/account/api"
  10. archive "go-common/app/service/main/archive/api"
  11. "go-common/library/cache/redis"
  12. "go-common/library/conf/env"
  13. xsql "go-common/library/database/sql"
  14. "go-common/library/ecode"
  15. "go-common/library/log"
  16. bm "go-common/library/net/http/blademaster"
  17. "go-common/library/net/rpc/warden"
  18. "go-common/library/queue/databus"
  19. "go-common/library/sync/pipeline/fanout"
  20. "net/url"
  21. "reflect"
  22. "strconv"
  23. jsoniter "github.com/json-iterator/go"
  24. )
  25. //go:generate $GOPATH/src/go-common/app/tool/cache/gen
  26. type _cache interface {
  27. // cache: -batch_err=break -nullcache=&v1.VideoBase{Svid:-1} -check_null_code=$==nil||$.Svid==-1
  28. VideoBase(c context.Context, svid []int64) (map[int64]*v1.VideoBase, error)
  29. }
  30. // Dao dao
  31. type Dao struct {
  32. c *conf.Config
  33. redis *redis.Pool
  34. cache *fanout.Fanout
  35. db *xsql.DB
  36. cmsdb *xsql.DB
  37. httpClient *bm.Client
  38. AccountClient account.AccountClient
  39. cmsPub *databus.Databus
  40. archiveSub *databus.Databus
  41. archiveFilters *ArchiveFilters
  42. ArchiveClient archive.ArchiveClient
  43. bvcPlayClient grpc.PlayurlServiceClient
  44. }
  45. // New init mysql db
  46. func New(c *conf.Config) (dao *Dao) {
  47. dao = &Dao{
  48. c: c,
  49. redis: redis.NewPool(c.Redis),
  50. cache: fanout.New("cache"),
  51. db: xsql.NewMySQL(c.MySQL),
  52. cmsdb: xsql.NewMySQL(c.CMSMySQL),
  53. httpClient: bm.NewClient(c.BM.Client),
  54. cmsPub: databus.New(conf.Conf.Databus["cms"]),
  55. archiveSub: newArchiveSub(c),
  56. AccountClient: newAccountClient(c.GRPCClient["account"]),
  57. ArchiveClient: newArchiveClient(c.GRPCClient["archive"]),
  58. bvcPlayClient: newBVCPlayClient(c.GRPCClient["bvcplay"]),
  59. }
  60. dao.newArchiveFilters(conf.ArchiveRules)
  61. return
  62. }
  63. // newBVCPlayClient .
  64. func newBVCPlayClient(cfg *conf.GRPCConf) grpc.PlayurlServiceClient {
  65. cc, err := warden.NewClient(cfg.WardenConf).Dial(context.Background(), cfg.Addr)
  66. if err != nil {
  67. panic(err)
  68. }
  69. return grpc.NewPlayurlServiceClient(cc)
  70. }
  71. // newAccountClient .
  72. func newAccountClient(cfg *conf.GRPCConf) account.AccountClient {
  73. cc, err := warden.NewClient(cfg.WardenConf).Dial(context.Background(), cfg.Addr)
  74. if err != nil {
  75. panic(err)
  76. }
  77. return account.NewAccountClient(cc)
  78. }
  79. // newArchiveClient .
  80. func newArchiveClient(cfg *conf.GRPCConf) archive.ArchiveClient {
  81. cc, err := warden.NewClient(cfg.WardenConf).Dial(context.Background(), cfg.Addr)
  82. if err != nil {
  83. panic(err)
  84. }
  85. return archive.NewArchiveClient(cc)
  86. }
  87. // newArchiveFilters .
  88. func (d *Dao) newArchiveFilters(c *conf.Rules) {
  89. response, err := d.ArchiveClient.Types(context.Background(), &archive.NoArgRequest{})
  90. if err != nil {
  91. panic(err)
  92. }
  93. detailFilter := &ArchiveDetailFilter{
  94. rule: c.Archive,
  95. archiveType: response.Types,
  96. }
  97. upFilter := &ArchiveUpFilter{
  98. rule: c.Up,
  99. accountClient: d.AccountClient,
  100. }
  101. dimensionFilter := &ArchivePageFilter{
  102. rule: c.Dimension,
  103. archiveClient: d.ArchiveClient,
  104. }
  105. d.archiveFilters = NewArchiveFilters(detailFilter, upFilter, dimensionFilter)
  106. }
  107. func newArchiveSub(c *conf.Config) *databus.Databus {
  108. if env.DeployEnv != env.DeployEnvProd {
  109. return nil
  110. }
  111. if _, ok := c.Databus["archive"]; !ok {
  112. return nil
  113. }
  114. return databus.New(c.Databus["archive"])
  115. }
  116. // Close close the resource.
  117. func (d *Dao) Close() {
  118. d.redis.Close()
  119. d.db.Close()
  120. }
  121. // Ping dao ping
  122. func (d *Dao) Ping(c context.Context) error {
  123. // TODO: if you need use mc,redis, please add
  124. return d.db.Ping(c)
  125. }
  126. // BeginTran begin mysql transaction
  127. func (d *Dao) BeginTran(c context.Context) (*xsql.Tx, error) {
  128. return d.db.Begin(c)
  129. }
  130. // ReplyHTTPCommon 评论公用请求
  131. func replyHTTPCommon(c context.Context, httpClient *bm.Client, path string, method string, data map[string]interface{}, ip string) (r []byte, err error) {
  132. params := url.Values{}
  133. t := reflect.TypeOf(data).Kind()
  134. if t == reflect.Map {
  135. for k, v := range data {
  136. // params.Set(k, v.(string))
  137. switch reflect.TypeOf(v).Kind() {
  138. case reflect.Int64:
  139. params.Set(k, strconv.FormatInt(v.(int64), 10))
  140. case reflect.Int16:
  141. params.Set(k, strconv.FormatInt(int64(v.(int16)), 10))
  142. case reflect.String:
  143. params.Set(k, v.(string))
  144. case reflect.Int:
  145. params.Set(k, strconv.FormatInt(int64(v.(int)), 10))
  146. }
  147. }
  148. }
  149. log.V(5).Infov(c, log.KV("log", fmt.Sprintf("reply req url(%s)", path+"?"+params.Encode())))
  150. req, err := httpClient.NewRequest(method, path, ip, params)
  151. if err != nil {
  152. log.Errorv(c, log.KV("log", fmt.Sprintf("reply url(%s) error(%v)", path+"?"+params.Encode(), err)))
  153. return
  154. }
  155. var res struct {
  156. Code int `json:"code"`
  157. Msg string `json:"message"`
  158. Data json.RawMessage `json:"data"`
  159. }
  160. var json = jsoniter.ConfigCompatibleWithStandardLibrary
  161. if err = httpClient.Do(c, req, &res); err != nil {
  162. str, _ := json.Marshal(res)
  163. log.Errorv(c, log.KV("log", fmt.Sprintf("reply ret data(%s) err[%v]", str, err)))
  164. return
  165. }
  166. str, _ := json.Marshal(res)
  167. log.V(5).Infov(c, log.KV("log", fmt.Sprintf("reply ret data(%s)", str)))
  168. if res.Code != 0 {
  169. err = ecode.Int(res.Code)
  170. log.Warnv(c, log.KV("log", fmt.Sprintf("reply url(%s) error(%v)", path+"?"+params.Encode(), err)))
  171. }
  172. r = res.Data
  173. return
  174. }