mengde.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. package mengde
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "net"
  9. "net/http"
  10. "os"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "go-common/library/log"
  17. )
  18. const (
  19. kSvenCheckTimeout = time.Minute
  20. kSvenGetTimeout = 30 * time.Second
  21. kSvenHost = "http://config.bilibili.co"
  22. kSvenCheckAPI = kSvenHost + "/config/v2/check"
  23. kSvenGetAPI = kSvenHost + "/config/v2/get"
  24. kDefaultHost = "invalid-host-name"
  25. kDefaultIP = "127.0.0.1"
  26. kCodeNotModified = -304
  27. )
  28. type MengdeClient struct {
  29. treeID string
  30. zone string
  31. env string
  32. build string
  33. token string
  34. host string
  35. ip string
  36. config atomic.Value
  37. notify chan *MengdeConfig
  38. changeSignal chan struct{}
  39. httpClient *http.Client
  40. ctx context.Context
  41. cancel context.CancelFunc
  42. closeSignal chan struct{}
  43. configLoadWg sync.WaitGroup
  44. configNotifyWg sync.WaitGroup
  45. }
  46. type MengdeConfig struct {
  47. Version int
  48. Config map[string]string
  49. }
  50. func NewMengdeClient(treeID string, zone string, env string, build string, token string) (*MengdeClient, error) {
  51. host, err := os.Hostname()
  52. if err != nil {
  53. host = kDefaultHost
  54. }
  55. ip := kDefaultIP
  56. addr, err := net.InterfaceAddrs()
  57. if err == nil {
  58. for _, a := range addr {
  59. if n, ok := a.(*net.IPNet); ok && !n.IP.IsLoopback() && n.IP.To4() != nil {
  60. ip = n.IP.String()
  61. }
  62. }
  63. }
  64. c := &MengdeClient{
  65. treeID: treeID,
  66. zone: zone,
  67. env: env,
  68. build: build,
  69. token: token,
  70. host: host,
  71. ip: ip,
  72. notify: make(chan *MengdeConfig),
  73. changeSignal: make(chan struct{}, 1),
  74. httpClient: new(http.Client),
  75. closeSignal: make(chan struct{}),
  76. }
  77. c.ctx, c.cancel = context.WithCancel(context.Background())
  78. version, config, err := c.svenConfigSync()
  79. if err != nil {
  80. return nil, err
  81. }
  82. c.config.Store(&MengdeConfig{
  83. Version: version,
  84. Config: config,
  85. })
  86. c.changeSignal <- struct{}{}
  87. c.configLoadWg.Add(1)
  88. go func() {
  89. defer c.configLoadWg.Done()
  90. c.configLoadProcess()
  91. }()
  92. c.configNotifyWg.Add(1)
  93. go func() {
  94. defer c.configNotifyWg.Done()
  95. c.configNotifyProcess()
  96. }()
  97. return c, nil
  98. }
  99. func (c *MengdeClient) Close() {
  100. close(c.closeSignal)
  101. c.cancel()
  102. c.configLoadWg.Wait()
  103. close(c.changeSignal)
  104. c.configNotifyWg.Wait()
  105. }
  106. func (c *MengdeClient) Config() *MengdeConfig {
  107. result := &MengdeConfig{
  108. Version: -1,
  109. Config: make(map[string]string),
  110. }
  111. if config, ok := c.config.Load().(*MengdeConfig); ok {
  112. result.Version = config.Version
  113. for k, v := range config.Config {
  114. result.Config[k] = v
  115. }
  116. }
  117. return result
  118. }
  119. func (c *MengdeClient) ConfigNotify() <-chan *MengdeConfig {
  120. return c.notify
  121. }
  122. func (c *MengdeClient) configLoadProcess() {
  123. for {
  124. select {
  125. case <-c.closeSignal:
  126. return
  127. default:
  128. }
  129. current, ok := c.config.Load().(*MengdeConfig)
  130. if !ok {
  131. current = &MengdeConfig{
  132. Version: -1,
  133. }
  134. }
  135. version, err := c.svenCheckVersion(current.Version)
  136. //config not modified
  137. if version == current.Version {
  138. log.Info("[sven]check version config not modified")
  139. continue
  140. }
  141. if err != nil {
  142. log.Error("[sven]check version error:%s", err.Error())
  143. continue
  144. }
  145. if current.Version == version {
  146. continue
  147. }
  148. config, err := c.svenGetConfig(version)
  149. if err != nil {
  150. log.Error(err.Error())
  151. continue
  152. }
  153. c.config.Store(&MengdeConfig{
  154. Version: version,
  155. Config: config,
  156. })
  157. select {
  158. case c.changeSignal <- struct{}{}:
  159. default:
  160. }
  161. }
  162. }
  163. func (c *MengdeClient) configNotifyProcess() {
  164. for range c.changeSignal {
  165. select {
  166. case <-c.closeSignal:
  167. return
  168. case c.notify <- c.Config():
  169. }
  170. }
  171. }
  172. func (c *MengdeClient) svenConfigSync() (int, map[string]string, error) {
  173. version, err := c.svenCheckVersion(-1)
  174. if err != nil {
  175. return -1, nil, err
  176. }
  177. config, err := c.svenGetConfig(version)
  178. if err != nil {
  179. return -1, nil, err
  180. }
  181. return version, config, nil
  182. }
  183. func (c *MengdeClient) svenCheckVersion(version int) (int, error) {
  184. var err error
  185. req, err := http.NewRequest("GET", kSvenCheckAPI, nil)
  186. if err != nil {
  187. return -1, err
  188. }
  189. q := req.URL.Query()
  190. q.Add("build", c.build)
  191. q.Add("hostname", c.host)
  192. q.Add("ip", c.ip)
  193. q.Add("service", strings.Join([]string{c.treeID, c.env, c.zone}, "_"))
  194. q.Add("token", c.token)
  195. q.Add("version", strconv.Itoa(version))
  196. req.URL.RawQuery = q.Encode()
  197. ctx, cancel := context.WithTimeout(c.ctx, kSvenCheckTimeout)
  198. defer cancel()
  199. req = req.WithContext(ctx)
  200. resp, err := c.httpClient.Do(req)
  201. if err != nil {
  202. return -1, err
  203. }
  204. defer resp.Body.Close()
  205. if resp.StatusCode != http.StatusOK {
  206. return -1, errors.New(fmt.Sprintf("http request URL:%s result:%s", req.URL.RawQuery, resp.Status))
  207. }
  208. result, err := ioutil.ReadAll(resp.Body)
  209. if err != nil {
  210. return -1, err
  211. }
  212. var svenCheckRespBody struct {
  213. Code int `json:"code"`
  214. Message string `json:"message"`
  215. Data struct {
  216. Version int `json:"version"`
  217. } `json:"data"`
  218. }
  219. if err = json.Unmarshal(result, &svenCheckRespBody); err != nil {
  220. return -1, err
  221. }
  222. if svenCheckRespBody.Code != 0 {
  223. if svenCheckRespBody.Code == kCodeNotModified {
  224. return version, nil
  225. }
  226. return -1, errors.New(svenCheckRespBody.Message)
  227. }
  228. return svenCheckRespBody.Data.Version, nil
  229. }
  230. func (c *MengdeClient) svenGetConfig(version int) (map[string]string, error) {
  231. var err error
  232. req, err := http.NewRequest("GET", kSvenGetAPI, nil)
  233. if err != nil {
  234. return nil, err
  235. }
  236. q := req.URL.Query()
  237. q.Add("build", c.build)
  238. q.Add("hostname", c.host)
  239. q.Add("ip", c.ip)
  240. q.Add("service", strings.Join([]string{c.treeID, c.env, c.zone}, "_"))
  241. q.Add("token", c.token)
  242. q.Add("version", strconv.Itoa(version))
  243. req.URL.RawQuery = q.Encode()
  244. ctx, cancel := context.WithTimeout(c.ctx, kSvenGetTimeout)
  245. defer cancel()
  246. req = req.WithContext(ctx)
  247. resp, err := c.httpClient.Do(req)
  248. if err != nil {
  249. return nil, err
  250. }
  251. defer resp.Body.Close()
  252. if resp.StatusCode != http.StatusOK {
  253. return nil, errors.New(fmt.Sprintf("http request URL:%s result:%s", req.URL.RawQuery, resp.Status))
  254. }
  255. result, err := ioutil.ReadAll(resp.Body)
  256. if err != nil {
  257. return nil, err
  258. }
  259. var svenGetRespBody struct {
  260. Code int `json:"code"`
  261. Message string `json:"message"`
  262. TTL int `json:"ttl"`
  263. Data struct {
  264. Version int `json:"version"`
  265. MD5 string `json:"md5"`
  266. Content string `json:"content"`
  267. } `json:"data"`
  268. }
  269. if err = json.Unmarshal(result, &svenGetRespBody); err != nil {
  270. return nil, err
  271. }
  272. if svenGetRespBody.Code != 0 {
  273. return nil, errors.New(svenGetRespBody.Message)
  274. }
  275. var configContent []struct {
  276. Name string `json:"name"`
  277. Config string `json:"config"`
  278. }
  279. if err = json.Unmarshal([]byte(svenGetRespBody.Data.Content), &configContent); err != nil {
  280. return nil, err
  281. }
  282. config := make(map[string]string)
  283. for _, c := range configContent {
  284. config[c.Name] = strings.TrimSpace(c.Config)
  285. }
  286. return config, nil
  287. }