dao.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "net/http"
  7. "time"
  8. "go-common/app/admin/ep/merlin/conf"
  9. "go-common/library/cache/memcache"
  10. "go-common/library/database/orm"
  11. "go-common/library/log"
  12. xhttp "go-common/library/net/http/blademaster"
  13. "go-common/library/sync/pipeline/fanout"
  14. dclient "github.com/docker/docker/client"
  15. "github.com/jinzhu/gorm"
  16. "gopkg.in/gomail.v2"
  17. "gopkg.in/h2non/gock.v1"
  18. )
  19. // Dao dao.
  20. type Dao struct {
  21. c *conf.Config
  22. httpClient *xhttp.Client
  23. db *gorm.DB
  24. email *gomail.Dialer
  25. mc *memcache.Pool
  26. cache *fanout.Fanout
  27. hubCache *fanout.Fanout
  28. expire int32
  29. dockerClient *dclient.Client
  30. }
  31. // New init mysql db.
  32. func New(c *conf.Config) *Dao {
  33. var (
  34. dc *dclient.Client
  35. err error
  36. )
  37. if dc, err = dclient.NewClientWithOpts(dclient.FromEnv); err != nil {
  38. log.Error("docker client init error(%v)", err)
  39. panic(err)
  40. }
  41. return &Dao{
  42. c: c,
  43. httpClient: xhttp.NewClient(c.HTTPClient),
  44. db: orm.NewMySQL(c.ORM),
  45. email: gomail.NewDialer(c.Mail.Host, c.Mail.Port, c.Mail.Username, c.Mail.Password),
  46. mc: memcache.NewPool(c.Memcache.Config),
  47. cache: fanout.New("cache", fanout.Worker(10), fanout.Buffer(10240)),
  48. hubCache: fanout.New("hubCache", fanout.Worker(10), fanout.Buffer(10240)),
  49. expire: int32(time.Duration(c.Memcache.Expire) / time.Second),
  50. dockerClient: dc,
  51. }
  52. }
  53. // Close close the resource.
  54. func (d *Dao) Close() {
  55. if d.db != nil {
  56. d.db.Close()
  57. }
  58. if d.mc != nil {
  59. d.mc.Close()
  60. }
  61. }
  62. // SetProxy set test proxy.
  63. func (d *Dao) SetProxy() {
  64. d.httpClient.SetTransport(gock.DefaultTransport)
  65. }
  66. func (d *Dao) newRequest(method, url string, v interface{}) (req *http.Request, err error) {
  67. body := &bytes.Buffer{}
  68. if method != http.MethodGet {
  69. if err = json.NewEncoder(body).Encode(v); err != nil {
  70. log.Error("json encode value(%s) err(?) ", v, err)
  71. return
  72. }
  73. }
  74. if req, err = http.NewRequest(method, url, body); err != nil {
  75. log.Error("http new request url(?) err(?)", url, err)
  76. }
  77. return
  78. }
  79. // Ping verify server is ok.
  80. func (d *Dao) Ping(c context.Context) (err error) {
  81. if err = d.db.DB().Ping(); err != nil {
  82. log.Info("dao.cloudDB.Ping() error(%v)", err)
  83. }
  84. return
  85. }
  86. // tokenCacheSave The err does not need to return, because this method is irrelevant.
  87. func (d *Dao) tokenCacheSave(c context.Context, cacheItem *memcache.Item) {
  88. var f = func(c context.Context) {
  89. var (
  90. conn = d.mc.Get(c)
  91. err error
  92. )
  93. defer conn.Close()
  94. if err = conn.Set(cacheItem); err != nil {
  95. log.Error("AddCache conn.Set(%s) error(%v)", cacheItem.Key, err)
  96. }
  97. }
  98. if err := d.cache.Do(c, f); err != nil {
  99. log.Error("Token cache save err(%v)", err)
  100. }
  101. }