dao.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package dao
  2. import (
  3. "context"
  4. "net"
  5. "net/http"
  6. "time"
  7. "fmt"
  8. "go-common/app/service/bbq/search/conf"
  9. "go-common/library/cache/redis"
  10. xsql "go-common/library/database/sql"
  11. "gopkg.in/olivere/elastic.v5"
  12. )
  13. // Dao dao
  14. type Dao struct {
  15. c *conf.Config
  16. redis *redis.Pool
  17. db *xsql.DB
  18. esPool map[string]*elastic.Client
  19. httpClient []*http.Client
  20. }
  21. // New init mysql db
  22. func New(c *conf.Config) (dao *Dao) {
  23. dao = &Dao{
  24. c: c,
  25. redis: redis.NewPool(c.Redis),
  26. db: xsql.NewMySQL(c.MySQL),
  27. esPool: newEsPool(c.Es),
  28. httpClient: dao.createHTTPClient(),
  29. }
  30. dao.createESIndex(_bbqEsName, _videoIndex, _videoMapping)
  31. return
  32. }
  33. // newEsPool new es cluster action
  34. func newEsPool(esInfo map[string]*conf.Es) (esCluster map[string]*elastic.Client) {
  35. esCluster = make(map[string]*elastic.Client)
  36. for esName, e := range esInfo {
  37. client, err := elastic.NewClient(elastic.SetURL(e.Addr...), elastic.SetSniff(false))
  38. if err != nil {
  39. panic(fmt.Sprintf("es:集群连接失败, cluster: %s, %v", esName, err))
  40. }
  41. esCluster[esName] = client
  42. }
  43. return
  44. }
  45. // createESIndex 初始化es索引
  46. func (d *Dao) createESIndex(esName, index, mapping string) {
  47. exist, err := d.esPool[esName].IndexExists(index).Do(context.Background())
  48. if err != nil {
  49. panic(fmt.Sprintf("check if index exists, name(%s) error (%v)", esName, err))
  50. }
  51. if exist {
  52. return
  53. }
  54. if _, err = d.esPool[esName].CreateIndex(index).Body(mapping).Do(context.Background()); err != nil {
  55. panic(fmt.Sprintf("create index, name(%s) error (%v)", esName, err))
  56. }
  57. }
  58. // createHTTPClient .
  59. func (d *Dao) createHTTPClient() []*http.Client {
  60. clients := make([]*http.Client, 0)
  61. for i := 0; i < 10; i++ {
  62. client := &http.Client{
  63. Transport: &http.Transport{
  64. Proxy: http.ProxyFromEnvironment,
  65. DialContext: (&net.Dialer{
  66. Timeout: 30 * time.Second,
  67. KeepAlive: 30 * time.Second,
  68. }).DialContext,
  69. MaxIdleConns: 100,
  70. MaxIdleConnsPerHost: 100,
  71. IdleConnTimeout: 30 * time.Second,
  72. },
  73. Timeout: 2 * time.Second,
  74. }
  75. clients = append(clients, client)
  76. }
  77. return clients
  78. }
  79. // Close close the resource.
  80. func (d *Dao) Close() {
  81. d.redis.Close()
  82. d.db.Close()
  83. }
  84. // Ping dao ping
  85. func (d *Dao) Ping(c context.Context) error {
  86. // TODO: if you need use mc,redis, please add
  87. return d.db.Ping(c)
  88. }