sven.go 6.8 KB


  1. package grocery
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "io/ioutil"
  7. "net"
  8. "net/http"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. )
  16. const (
  17. kLogChanSize = 0xFF
  18. kSvenCheckTimeout = time.Minute
  19. kSvenGetTimeout = 30 * time.Second
  20. kSvenHost = "http://config.bilibili.co"
  21. kSvenCheckAPI = kSvenHost + "/config/v2/check"
  22. kSvenGetAPI = kSvenHost + "/config/v2/get"
  23. kDefaultHost = "invalid-host-name"
  24. kDefaultIP = "127.0.0.1"
  25. kCodeNotModified = -304
  26. )
  27. type SvenClient struct {
  28. treeID string
  29. zone string
  30. env string
  31. build string
  32. token string
  33. host string
  34. ip string
  35. config atomic.Value
  36. log chan *LogRecord
  37. notify chan *SvenConfig
  38. changeSignal chan struct{}
  39. httpClient *http.Client
  40. ctx context.Context
  41. cancel context.CancelFunc
  42. closeSignal chan struct{}
  43. configLoadWg sync.WaitGroup
  44. configNotifyWg sync.WaitGroup
  45. }
  46. type SvenConfig struct {
  47. Version int
  48. Config map[string]string
  49. }
  50. func NewSvenClient(treeID string, zone string, env string, build string, token string) (*SvenClient, error) {
  51. host, err := os.Hostname()
  52. if err != nil {
  53. host = kDefaultHost
  54. }
  55. ip := kDefaultIP
  56. addr, err := net.InterfaceAddrs()
  57. if err == nil {
  58. for _, a := range addr {
  59. if n, ok := a.(*net.IPNet); ok && !n.IP.IsLoopback() && n.IP.To4() != nil {
  60. ip = n.IP.String()
  61. }
  62. }
  63. }
  64. c := &SvenClient{
  65. treeID: treeID,
  66. zone: zone,
  67. env: env,
  68. build: build,
  69. token: token,
  70. host: host,
  71. ip: ip,
  72. log: make(chan *LogRecord, kLogChanSize),
  73. notify: make(chan *SvenConfig),
  74. changeSignal: make(chan struct{}, 1),
  75. httpClient: new(http.Client),
  76. closeSignal: make(chan struct{}),
  77. }
  78. c.ctx, c.cancel = context.WithCancel(context.Background())
  79. version, config, err := c.svenConfigSync()
  80. if err != nil {
  81. return nil, err
  82. }
  83. c.config.Store(&SvenConfig{
  84. Version: version,
  85. Config: config,
  86. })
  87. c.changeSignal <- struct{}{}
  88. c.configLoadWg.Add(1)
  89. go func() {
  90. defer c.configLoadWg.Done()
  91. c.configLoadProcess()
  92. }()
  93. c.configNotifyWg.Add(1)
  94. go func() {
  95. defer c.configNotifyWg.Done()
  96. c.configNotifyProcess()
  97. }()
  98. return c, nil
  99. }
  100. func (c *SvenClient) Close() {
  101. close(c.closeSignal)
  102. c.cancel()
  103. c.configLoadWg.Wait()
  104. close(c.changeSignal)
  105. c.configNotifyWg.Wait()
  106. }
  107. func (c *SvenClient) Config() *SvenConfig {
  108. result := &SvenConfig{
  109. Version: -1,
  110. Config: make(map[string]string),
  111. }
  112. if config, ok := c.config.Load().(*SvenConfig); ok {
  113. result.Version = config.Version
  114. for k, v := range config.Config {
  115. result.Config[k] = v
  116. }
  117. }
  118. return result
  119. }
  120. func (c *SvenClient) ConfigNotify() <-chan *SvenConfig {
  121. return c.notify
  122. }
  123. func (c *SvenClient) LogNotify() <-chan *LogRecord {
  124. return c.log
  125. }
  126. func (c *SvenClient) configLoadProcess() {
  127. for {
  128. select {
  129. case <-c.closeSignal:
  130. return
  131. default:
  132. }
  133. current, ok := c.config.Load().(*SvenConfig)
  134. if !ok {
  135. current = &SvenConfig{
  136. Version: -1,
  137. }
  138. }
  139. version, err := c.svenCheckVersion(current.Version)
  140. //config not modified
  141. if version == current.Version {
  142. c.postLog(INFO, "config not modified")
  143. continue
  144. }
  145. if err != nil {
  146. c.postLog(ERROR, err.Error())
  147. continue
  148. }
  149. if current.Version == version {
  150. continue
  151. }
  152. config, err := c.svenGetConfig(version)
  153. if err != nil {
  154. c.postLog(ERROR, err.Error())
  155. continue
  156. }
  157. c.config.Store(&SvenConfig{
  158. Version: version,
  159. Config: config,
  160. })
  161. select {
  162. case c.changeSignal <- struct{}{}:
  163. default:
  164. }
  165. }
  166. }
  167. func (c *SvenClient) configNotifyProcess() {
  168. for range c.changeSignal {
  169. select {
  170. case <-c.closeSignal:
  171. return
  172. case c.notify <- c.Config():
  173. }
  174. }
  175. }
  176. func (c *SvenClient) postLog(lv level, msg string) {
  177. select {
  178. case c.log <- &LogRecord{Level: lv, Message: msg}:
  179. default:
  180. }
  181. }
  182. func (c *SvenClient) svenConfigSync() (int, map[string]string, error) {
  183. version, err := c.svenCheckVersion(-1)
  184. if err != nil {
  185. return -1, nil, err
  186. }
  187. config, err := c.svenGetConfig(version)
  188. if err != nil {
  189. return -1, nil, err
  190. }
  191. return version, config, nil
  192. }
  193. func (c *SvenClient) svenCheckVersion(version int) (int, error) {
  194. var err error
  195. req, err := http.NewRequest("GET", kSvenCheckAPI, nil)
  196. if err != nil {
  197. return -1, err
  198. }
  199. q := req.URL.Query()
  200. q.Add("build", c.build)
  201. q.Add("hostname", c.host)
  202. q.Add("ip", c.ip)
  203. q.Add("service", strings.Join([]string{c.treeID, c.env, c.zone}, "_"))
  204. q.Add("token", c.token)
  205. q.Add("version", strconv.Itoa(version))
  206. req.URL.RawQuery = q.Encode()
  207. ctx, cancel := context.WithTimeout(c.ctx, kSvenCheckTimeout)
  208. defer cancel()
  209. req = req.WithContext(ctx)
  210. resp, err := c.httpClient.Do(req)
  211. if err != nil {
  212. return -1, err
  213. }
  214. defer resp.Body.Close()
  215. result, err := ioutil.ReadAll(resp.Body)
  216. if err != nil {
  217. return -1, err
  218. }
  219. var svenCheckRespBody struct {
  220. Code int `json:"code"`
  221. Message string `json:"message"`
  222. Data struct {
  223. Version int `json:"version"`
  224. } `json:"data"`
  225. }
  226. if err = json.Unmarshal(result, &svenCheckRespBody); err != nil {
  227. return -1, err
  228. }
  229. if svenCheckRespBody.Code != 0 {
  230. if svenCheckRespBody.Code == kCodeNotModified {
  231. return version, nil
  232. }
  233. return -1, errors.New(svenCheckRespBody.Message)
  234. }
  235. return svenCheckRespBody.Data.Version, nil
  236. }
  237. func (c *SvenClient) svenGetConfig(version int) (map[string]string, error) {
  238. var err error
  239. req, err := http.NewRequest("GET", kSvenGetAPI, nil)
  240. if err != nil {
  241. return nil, err
  242. }
  243. q := req.URL.Query()
  244. q.Add("build", c.build)
  245. q.Add("hostname", c.host)
  246. q.Add("ip", c.ip)
  247. q.Add("service", strings.Join([]string{c.treeID, c.env, c.zone}, "_"))
  248. q.Add("token", c.token)
  249. q.Add("version", strconv.Itoa(version))
  250. req.URL.RawQuery = q.Encode()
  251. ctx, cancel := context.WithTimeout(c.ctx, kSvenGetTimeout)
  252. defer cancel()
  253. req = req.WithContext(ctx)
  254. resp, err := c.httpClient.Do(req)
  255. if err != nil {
  256. return nil, err
  257. }
  258. defer resp.Body.Close()
  259. result, err := ioutil.ReadAll(resp.Body)
  260. if err != nil {
  261. return nil, err
  262. }
  263. var svenGetRespBody struct {
  264. Code int `json:"code"`
  265. Message string `json:"message"`
  266. TTL int `json:"ttl"`
  267. Data struct {
  268. Version int `json:"version"`
  269. MD5 string `json:"md5"`
  270. Content string `json:"content"`
  271. } `json:"data"`
  272. }
  273. if err = json.Unmarshal(result, &svenGetRespBody); err != nil {
  274. return nil, err
  275. }
  276. if svenGetRespBody.Code != 0 {
  277. return nil, errors.New(svenGetRespBody.Message)
  278. }
  279. var configContent []struct {
  280. Name string `json:"name"`
  281. Config string `json:"config"`
  282. }
  283. if err = json.Unmarshal([]byte(svenGetRespBody.Data.Content), &configContent); err != nil {
  284. return nil, err
  285. }
  286. config := make(map[string]string)
  287. for _, c := range configContent {
  288. config[c.Name] = strings.TrimSpace(c.Config)
  289. }
  290. return config, nil
  291. }