dao.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "go-common/app/service/main/search/conf"
  6. "go-common/library/log"
  7. "go-common/library/stat/prom"
  8. elastic "gopkg.in/olivere/elastic.v5"
  9. )
  10. type Dao struct {
  11. // conf
  12. c *conf.Config
  13. // esPool
  14. esPool map[string]*elastic.Client
  15. // sms
  16. sms *sms
  17. }
  18. // New init dao
  19. func New(c *conf.Config) (d *Dao) {
  20. d = &Dao{
  21. c: c,
  22. }
  23. d.sms = newSMS(d)
  24. // cluster
  25. d.esPool = newEsPool(c, d)
  26. return
  27. }
  28. // BulkItem .
  29. type BulkItem interface {
  30. IndexName() string
  31. IndexType() string
  32. IndexID() string
  33. }
  34. // BulkMapItem .
  35. type BulkMapItem interface {
  36. IndexName() string
  37. IndexType() string
  38. IndexID() string
  39. PField() map[string]interface{}
  40. }
  41. // newEsCluster cluster action
  42. func newEsPool(c *conf.Config, d *Dao) (esCluster map[string]*elastic.Client) {
  43. esCluster = make(map[string]*elastic.Client)
  44. for esName, e := range c.Es {
  45. if client, err := elastic.NewClient(elastic.SetURL(e.Addr...)); err == nil {
  46. esCluster[esName] = client
  47. } else {
  48. PromError("es:集群连接失败", "cluster: %s, %v", esName, err)
  49. if err := d.SendSMS(fmt.Sprintf("[search-job]%s集群连接失败", esName)); err != nil {
  50. PromError("es:集群连接短信失败", "cluster: %s, %v", esName, err)
  51. }
  52. }
  53. }
  54. return
  55. }
  56. // PromError prometheus error count.
  57. func PromError(name, format string, args ...interface{}) {
  58. prom.BusinessErrCount.Incr(name)
  59. log.Error(format, args...)
  60. }
  61. // Ping health of db.
  62. func (d *Dao) Ping(c context.Context) (err error) {
  63. if err = d.pingESCluster(c); err != nil {
  64. PromError("es:ping", "Ping %v", err)
  65. }
  66. return
  67. }
  68. // pingESCluster ping es cluster
  69. func (d *Dao) pingESCluster(ctx context.Context) (err error) {
  70. for name := range d.c.Es {
  71. client, ok := d.esPool[name]
  72. if !ok {
  73. continue
  74. }
  75. _, _, err = client.Ping(d.c.Es["replyExternal"].Addr[0]).Do(ctx)
  76. if err != nil {
  77. PromError("archiveESClient:Ping", "dao.pingESCluster error(%v) ", err)
  78. return
  79. }
  80. }
  81. return
  82. }