sven.go 8.0 KB


  1. package paladin
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "net/url"
  10. "os"
  11. "path"
  12. "strconv"
  13. "sync"
  14. "time"
  15. "go-common/library/conf/env"
  16. "go-common/library/ecode"
  17. "go-common/library/log"
  18. xip "go-common/library/net/ip"
  19. "go-common/library/net/netutil"
  20. "github.com/pkg/errors"
  21. )
  22. const (
  23. _apiGet = "http://%s/config/v2/get?%s"
  24. _apiCheck = "http://%s/config/v2/check?%s"
  25. _maxLoadRetries = 3
  26. )
  27. var (
  28. _ Client = &sven{}
  29. svenHost string
  30. svenVersion string
  31. svenPath string
  32. svenToken string
  33. svenAppoint string
  34. svenTreeid string
  35. _debug bool
  36. )
  37. func init() {
  38. flag.StringVar(&svenHost, "conf_host", os.Getenv("CONF_HOST"), `config api host.`)
  39. flag.StringVar(&svenVersion, "conf_version", os.Getenv("CONF_VERSION"), `app version.`)
  40. flag.StringVar(&svenPath, "conf_path", os.Getenv("CONF_PATH"), `config file path.`)
  41. flag.StringVar(&svenToken, "conf_token", os.Getenv("CONF_TOKEN"), `config token.`)
  42. flag.StringVar(&svenAppoint, "conf_appoint", os.Getenv("CONF_APPOINT"), `config appoint.`)
  43. flag.StringVar(&svenTreeid, "tree_id", os.Getenv("TREE_ID"), `tree id.`)
  44. if env.DeployEnv == env.DeployEnvDev {
  45. _debug = true
  46. }
  47. }
  48. type watcher struct {
  49. keys []string
  50. ch chan Event
  51. }
  52. func newWatcher(keys []string) *watcher {
  53. return &watcher{keys: keys, ch: make(chan Event, 5)}
  54. }
  55. func (w *watcher) HasKey(key string) bool {
  56. if len(w.keys) == 0 {
  57. return true
  58. }
  59. for _, k := range w.keys {
  60. if k == key {
  61. return true
  62. }
  63. }
  64. return false
  65. }
  66. func (w *watcher) Handle(event Event) {
  67. select {
  68. case w.ch <- event:
  69. default:
  70. log.Error("paladin: discard event:%+v", event)
  71. }
  72. }
  73. func (w *watcher) Chan() <-chan Event {
  74. return w.ch
  75. }
  76. func (w *watcher) Close() {
  77. close(w.ch)
  78. }
  79. // sven is sven config client.
  80. type sven struct {
  81. values *Map
  82. wmu sync.RWMutex
  83. watchers map[*watcher]struct{}
  84. httpCli *http.Client
  85. backoff *netutil.BackoffConfig
  86. }
  87. // NewSven new a config client.
  88. func NewSven() (Client, error) {
  89. s := &sven{
  90. values: new(Map),
  91. watchers: make(map[*watcher]struct{}),
  92. httpCli: &http.Client{Timeout: 60 * time.Second},
  93. backoff: &netutil.BackoffConfig{
  94. MaxDelay: 5 * time.Second,
  95. BaseDelay: 1.0 * time.Second,
  96. Factor: 1.6,
  97. Jitter: 0.2,
  98. },
  99. }
  100. if err := s.checkEnv(); err != nil {
  101. return nil, err
  102. }
  103. ver, err := s.load()
  104. if err != nil {
  105. return nil, err
  106. }
  107. go s.watchproc(ver)
  108. return s, nil
  109. }
  110. func (s *sven) checkEnv() error {
  111. if svenHost == "" || svenVersion == "" || svenPath == "" || svenToken == "" || svenTreeid == "" {
  112. return fmt.Errorf("config env invalid. conf_host(%s) conf_version(%s) conf_path(%s) conf_token(%s) conf_appoint(%s) tree_id(%s)", svenHost, svenVersion, svenPath, svenToken, svenAppoint, svenTreeid)
  113. }
  114. return nil
  115. }
  116. // Get return value by key.
  117. func (s *sven) Get(key string) *Value {
  118. return s.values.Get(key)
  119. }
  120. // GetAll return value map.
  121. func (s *sven) GetAll() *Map {
  122. return s.values
  123. }
  124. // WatchEvent watch with the specified keys.
  125. func (s *sven) WatchEvent(ctx context.Context, keys ...string) <-chan Event {
  126. w := newWatcher(keys)
  127. s.wmu.Lock()
  128. s.watchers[w] = struct{}{}
  129. s.wmu.Unlock()
  130. return w.Chan()
  131. }
  132. // Close close watcher.
  133. func (s *sven) Close() (err error) {
  134. s.wmu.RLock()
  135. for w := range s.watchers {
  136. w.Close()
  137. }
  138. s.wmu.RUnlock()
  139. return
  140. }
  141. func (s *sven) fireEvent(event Event) {
  142. s.wmu.RLock()
  143. for w := range s.watchers {
  144. if w.HasKey(event.Key) {
  145. w.Handle(event)
  146. }
  147. }
  148. s.wmu.RUnlock()
  149. }
  150. func (s *sven) load() (ver int64, err error) {
  151. var (
  152. v *version
  153. cs []*content
  154. )
  155. if v, err = s.check(-1); err != nil {
  156. log.Error("paladin: s.check(-1) error(%v)", err)
  157. return
  158. }
  159. for i := 0; i < _maxLoadRetries; i++ {
  160. if cs, err = s.config(v); err == nil {
  161. all := make(map[string]*Value, len(cs))
  162. for _, v := range cs {
  163. all[v.Name] = &Value{val: v.Config, raw: v.Config}
  164. }
  165. s.values.Store(all)
  166. return v.Version, nil
  167. }
  168. log.Error("paladin: s.config(%v) error(%v)", ver, err)
  169. time.Sleep(s.backoff.Backoff(i))
  170. }
  171. return 0, err
  172. }
  173. func (s *sven) watchproc(ver int64) {
  174. var retry int
  175. for {
  176. v, err := s.check(ver)
  177. if err != nil {
  178. if ecode.NotModified.Equal(err) {
  179. time.Sleep(time.Second)
  180. continue
  181. }
  182. log.Error("paladin: s.check(%d) error(%v)", ver, err)
  183. retry++
  184. time.Sleep(s.backoff.Backoff(retry))
  185. continue
  186. }
  187. cs, err := s.config(v)
  188. if err != nil {
  189. log.Error("paladin: s.config(%v) error(%v)", ver, err)
  190. retry++
  191. time.Sleep(s.backoff.Backoff(retry))
  192. continue
  193. }
  194. all := s.values.Load()
  195. news := make(map[string]*Value, len(cs))
  196. for _, v := range cs {
  197. if _, ok := all[v.Name]; !ok {
  198. go s.fireEvent(Event{Event: EventAdd, Key: v.Name, Value: v.Config})
  199. } else if v.Config != "" {
  200. go s.fireEvent(Event{Event: EventUpdate, Key: v.Name, Value: v.Config})
  201. } else {
  202. go s.fireEvent(Event{Event: EventRemove, Key: v.Name, Value: v.Config})
  203. }
  204. news[v.Name] = &Value{val: v.Config, raw: v.Config}
  205. }
  206. for k, v := range all {
  207. if _, ok := news[k]; !ok {
  208. news[k] = v
  209. }
  210. }
  211. s.values.Store(news)
  212. ver = v.Version
  213. retry = 0
  214. }
  215. }
  216. type version struct {
  217. Version int64 `json:"version"`
  218. Diffs []int64 `json:"diffs"`
  219. }
  220. type config struct {
  221. Version int64 `json:"version"`
  222. Content string `json:"content"`
  223. Md5 string `json:"md5"`
  224. }
  225. type content struct {
  226. Cid int64 `json:"cid"`
  227. Name string `json:"name"`
  228. Config string `json:"config"`
  229. }
  230. func (s *sven) check(ver int64) (v *version, err error) {
  231. params := newParams()
  232. params.Set("version", strconv.FormatInt(ver, 10))
  233. params.Set("appoint", svenAppoint)
  234. var res struct {
  235. Code int `json:"code"`
  236. Data *version `json:"data"`
  237. }
  238. uri := fmt.Sprintf(_apiCheck, svenHost, params.Encode())
  239. if _debug {
  240. fmt.Printf("paladin: check(%d) uri(%s)\n", ver, uri)
  241. }
  242. req, err := http.NewRequest("GET", uri, nil)
  243. if err != nil {
  244. return
  245. }
  246. resp, err := s.httpCli.Do(req)
  247. if err != nil {
  248. return
  249. }
  250. defer resp.Body.Close()
  251. if resp.StatusCode != http.StatusOK {
  252. err = errors.Errorf("paladin: httpCli.GET(%s) error(%d)", params.Encode(), resp.StatusCode)
  253. return
  254. }
  255. b, err := ioutil.ReadAll(resp.Body)
  256. if err != nil {
  257. return
  258. }
  259. if err = json.Unmarshal(b, &res); err != nil {
  260. return
  261. }
  262. if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
  263. err = ec
  264. return
  265. }
  266. if res.Data == nil {
  267. err = errors.Errorf("paladin: http version is nil. params(%s)", params.Encode())
  268. return
  269. }
  270. v = res.Data
  271. return
  272. }
  273. func (s *sven) config(ver *version) (cts []*content, err error) {
  274. ids, _ := json.Marshal(ver.Diffs)
  275. params := newParams()
  276. params.Set("version", strconv.FormatInt(ver.Version, 10))
  277. params.Set("ids", string(ids))
  278. var res struct {
  279. Code int `json:"code"`
  280. Data *config `json:"data"`
  281. }
  282. uri := fmt.Sprintf(_apiGet, svenHost, params.Encode())
  283. if _debug {
  284. fmt.Printf("paladin: config(%+v) uri(%s)\n", ver, uri)
  285. }
  286. req, err := http.NewRequest("GET", uri, nil)
  287. if err != nil {
  288. return
  289. }
  290. resp, err := s.httpCli.Do(req)
  291. if err != nil {
  292. return
  293. }
  294. defer resp.Body.Close()
  295. if resp.StatusCode != http.StatusOK {
  296. err = errors.Errorf("paladin: httpCli.GET(%s) error(%d)", params.Encode(), resp.StatusCode)
  297. return
  298. }
  299. b, err := ioutil.ReadAll(resp.Body)
  300. if err != nil {
  301. return
  302. }
  303. if err = json.Unmarshal(b, &res); err != nil {
  304. return
  305. }
  306. if !ecode.Int(res.Code).Equal(ecode.OK) || res.Data == nil {
  307. err = errors.Errorf("paladin: http config is nil. params(%s) ecode(%d)", params.Encode(), res.Code)
  308. return
  309. }
  310. if err = json.Unmarshal([]byte(res.Data.Content), &cts); err != nil {
  311. return
  312. }
  313. for _, c := range cts {
  314. if err = ioutil.WriteFile(path.Join(svenPath, c.Name), []byte(c.Config), 0644); err != nil {
  315. return
  316. }
  317. }
  318. return
  319. }
  320. func newParams() url.Values {
  321. params := url.Values{}
  322. params.Set("service", serviceName())
  323. params.Set("build", svenVersion)
  324. params.Set("token", svenToken)
  325. params.Set("hostname", env.Hostname)
  326. params.Set("ip", ipAddr())
  327. return params
  328. }
  329. func ipAddr() string {
  330. if env.IP != "" {
  331. return env.IP
  332. }
  333. return xip.InternalIP()
  334. }
  335. func serviceName() string {
  336. return fmt.Sprintf("%s_%s_%s", svenTreeid, env.DeployEnv, env.Zone)
  337. }