123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- package paladin
- import (
- "context"
- "encoding/json"
- "flag"
- "fmt"
- "io/ioutil"
- "net/http"
- "net/url"
- "os"
- "path"
- "strconv"
- "sync"
- "time"
- "go-common/library/conf/env"
- "go-common/library/ecode"
- "go-common/library/log"
- xip "go-common/library/net/ip"
- "go-common/library/net/netutil"
- "github.com/pkg/errors"
- )
- const (
- _apiGet = "http://%s/config/v2/get?%s"
- _apiCheck = "http://%s/config/v2/check?%s"
- _maxLoadRetries = 3
- )
- var (
- _ Client = &sven{}
- svenHost string
- svenVersion string
- svenPath string
- svenToken string
- svenAppoint string
- svenTreeid string
- _debug bool
- )
- func init() {
- flag.StringVar(&svenHost, "conf_host", os.Getenv("CONF_HOST"), `config api host.`)
- flag.StringVar(&svenVersion, "conf_version", os.Getenv("CONF_VERSION"), `app version.`)
- flag.StringVar(&svenPath, "conf_path", os.Getenv("CONF_PATH"), `config file path.`)
- flag.StringVar(&svenToken, "conf_token", os.Getenv("CONF_TOKEN"), `config token.`)
- flag.StringVar(&svenAppoint, "conf_appoint", os.Getenv("CONF_APPOINT"), `config appoint.`)
- flag.StringVar(&svenTreeid, "tree_id", os.Getenv("TREE_ID"), `tree id.`)
- if env.DeployEnv == env.DeployEnvDev {
- _debug = true
- }
- }
- type watcher struct {
- keys []string
- ch chan Event
- }
- func newWatcher(keys []string) *watcher {
- return &watcher{keys: keys, ch: make(chan Event, 5)}
- }
- func (w *watcher) HasKey(key string) bool {
- if len(w.keys) == 0 {
- return true
- }
- for _, k := range w.keys {
- if k == key {
- return true
- }
- }
- return false
- }
- func (w *watcher) Handle(event Event) {
- select {
- case w.ch <- event:
- default:
- log.Error("paladin: discard event:%+v", event)
- }
- }
- func (w *watcher) Chan() <-chan Event {
- return w.ch
- }
- func (w *watcher) Close() {
- close(w.ch)
- }
- // sven is sven config client.
- type sven struct {
- values *Map
- wmu sync.RWMutex
- watchers map[*watcher]struct{}
- httpCli *http.Client
- backoff *netutil.BackoffConfig
- }
- // NewSven new a config client.
- func NewSven() (Client, error) {
- s := &sven{
- values: new(Map),
- watchers: make(map[*watcher]struct{}),
- httpCli: &http.Client{Timeout: 60 * time.Second},
- backoff: &netutil.BackoffConfig{
- MaxDelay: 5 * time.Second,
- BaseDelay: 1.0 * time.Second,
- Factor: 1.6,
- Jitter: 0.2,
- },
- }
- if err := s.checkEnv(); err != nil {
- return nil, err
- }
- ver, err := s.load()
- if err != nil {
- return nil, err
- }
- go s.watchproc(ver)
- return s, nil
- }
- func (s *sven) checkEnv() error {
- if svenHost == "" || svenVersion == "" || svenPath == "" || svenToken == "" || svenTreeid == "" {
- return fmt.Errorf("config env invalid. conf_host(%s) conf_version(%s) conf_path(%s) conf_token(%s) conf_appoint(%s) tree_id(%s)", svenHost, svenVersion, svenPath, svenToken, svenAppoint, svenTreeid)
- }
- return nil
- }
- // Get return value by key.
- func (s *sven) Get(key string) *Value {
- return s.values.Get(key)
- }
- // GetAll return value map.
- func (s *sven) GetAll() *Map {
- return s.values
- }
- // WatchEvent watch with the specified keys.
- func (s *sven) WatchEvent(ctx context.Context, keys ...string) <-chan Event {
- w := newWatcher(keys)
- s.wmu.Lock()
- s.watchers[w] = struct{}{}
- s.wmu.Unlock()
- return w.Chan()
- }
- // Close close watcher.
- func (s *sven) Close() (err error) {
- s.wmu.RLock()
- for w := range s.watchers {
- w.Close()
- }
- s.wmu.RUnlock()
- return
- }
- func (s *sven) fireEvent(event Event) {
- s.wmu.RLock()
- for w := range s.watchers {
- if w.HasKey(event.Key) {
- w.Handle(event)
- }
- }
- s.wmu.RUnlock()
- }
- func (s *sven) load() (ver int64, err error) {
- var (
- v *version
- cs []*content
- )
- if v, err = s.check(-1); err != nil {
- log.Error("paladin: s.check(-1) error(%v)", err)
- return
- }
- for i := 0; i < _maxLoadRetries; i++ {
- if cs, err = s.config(v); err == nil {
- all := make(map[string]*Value, len(cs))
- for _, v := range cs {
- all[v.Name] = &Value{val: v.Config, raw: v.Config}
- }
- s.values.Store(all)
- return v.Version, nil
- }
- log.Error("paladin: s.config(%v) error(%v)", ver, err)
- time.Sleep(s.backoff.Backoff(i))
- }
- return 0, err
- }
- func (s *sven) watchproc(ver int64) {
- var retry int
- for {
- v, err := s.check(ver)
- if err != nil {
- if ecode.NotModified.Equal(err) {
- time.Sleep(time.Second)
- continue
- }
- log.Error("paladin: s.check(%d) error(%v)", ver, err)
- retry++
- time.Sleep(s.backoff.Backoff(retry))
- continue
- }
- cs, err := s.config(v)
- if err != nil {
- log.Error("paladin: s.config(%v) error(%v)", ver, err)
- retry++
- time.Sleep(s.backoff.Backoff(retry))
- continue
- }
- all := s.values.Load()
- news := make(map[string]*Value, len(cs))
- for _, v := range cs {
- if _, ok := all[v.Name]; !ok {
- go s.fireEvent(Event{Event: EventAdd, Key: v.Name, Value: v.Config})
- } else if v.Config != "" {
- go s.fireEvent(Event{Event: EventUpdate, Key: v.Name, Value: v.Config})
- } else {
- go s.fireEvent(Event{Event: EventRemove, Key: v.Name, Value: v.Config})
- }
- news[v.Name] = &Value{val: v.Config, raw: v.Config}
- }
- for k, v := range all {
- if _, ok := news[k]; !ok {
- news[k] = v
- }
- }
- s.values.Store(news)
- ver = v.Version
- retry = 0
- }
- }
- type version struct {
- Version int64 `json:"version"`
- Diffs []int64 `json:"diffs"`
- }
- type config struct {
- Version int64 `json:"version"`
- Content string `json:"content"`
- Md5 string `json:"md5"`
- }
- type content struct {
- Cid int64 `json:"cid"`
- Name string `json:"name"`
- Config string `json:"config"`
- }
- func (s *sven) check(ver int64) (v *version, err error) {
- params := newParams()
- params.Set("version", strconv.FormatInt(ver, 10))
- params.Set("appoint", svenAppoint)
- var res struct {
- Code int `json:"code"`
- Data *version `json:"data"`
- }
- uri := fmt.Sprintf(_apiCheck, svenHost, params.Encode())
- if _debug {
- fmt.Printf("paladin: check(%d) uri(%s)\n", ver, uri)
- }
- req, err := http.NewRequest("GET", uri, nil)
- if err != nil {
- return
- }
- resp, err := s.httpCli.Do(req)
- if err != nil {
- return
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- err = errors.Errorf("paladin: httpCli.GET(%s) error(%d)", params.Encode(), resp.StatusCode)
- return
- }
- b, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return
- }
- if err = json.Unmarshal(b, &res); err != nil {
- return
- }
- if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
- err = ec
- return
- }
- if res.Data == nil {
- err = errors.Errorf("paladin: http version is nil. params(%s)", params.Encode())
- return
- }
- v = res.Data
- return
- }
- func (s *sven) config(ver *version) (cts []*content, err error) {
- ids, _ := json.Marshal(ver.Diffs)
- params := newParams()
- params.Set("version", strconv.FormatInt(ver.Version, 10))
- params.Set("ids", string(ids))
- var res struct {
- Code int `json:"code"`
- Data *config `json:"data"`
- }
- uri := fmt.Sprintf(_apiGet, svenHost, params.Encode())
- if _debug {
- fmt.Printf("paladin: config(%+v) uri(%s)\n", ver, uri)
- }
- req, err := http.NewRequest("GET", uri, nil)
- if err != nil {
- return
- }
- resp, err := s.httpCli.Do(req)
- if err != nil {
- return
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- err = errors.Errorf("paladin: httpCli.GET(%s) error(%d)", params.Encode(), resp.StatusCode)
- return
- }
- b, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return
- }
- if err = json.Unmarshal(b, &res); err != nil {
- return
- }
- if !ecode.Int(res.Code).Equal(ecode.OK) || res.Data == nil {
- err = errors.Errorf("paladin: http config is nil. params(%s) ecode(%d)", params.Encode(), res.Code)
- return
- }
- if err = json.Unmarshal([]byte(res.Data.Content), &cts); err != nil {
- return
- }
- for _, c := range cts {
- if err = ioutil.WriteFile(path.Join(svenPath, c.Name), []byte(c.Config), 0644); err != nil {
- return
- }
- }
- return
- }
- func newParams() url.Values {
- params := url.Values{}
- params.Set("service", serviceName())
- params.Set("build", svenVersion)
- params.Set("token", svenToken)
- params.Set("hostname", env.Hostname)
- params.Set("ip", ipAddr())
- return params
- }
- func ipAddr() string {
- if env.IP != "" {
- return env.IP
- }
- return xip.InternalIP()
- }
- func serviceName() string {
- return fmt.Sprintf("%s_%s_%s", svenTreeid, env.DeployEnv, env.Zone)
- }
|