123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570 |
- package v1
- import (
- "context"
- "crypto/md5"
- "encoding/hex"
- "encoding/json"
- "fmt"
- "time"
- "go-common/app/infra/config/model"
- "go-common/library/database/sql"
- "go-common/library/ecode"
- "go-common/library/log"
- xtime "go-common/library/time"
- )
- const (
- _buildVerKey = "%s_%s_%s"
- _pushKey = "%s_%s_%s"
- _cacheKey = "%s_%s_%d_%s"
- _cacheKey2 = "%s_%s_%d_%s_2"
- _fileKey = "%s_%d"
- )
- var (
- addBusiness = "前端接口api添加配置版本"
- addInfo = "添加版本:%d"
- copyBusiness = "前端接口api拷贝配置版本"
- copyInfo = "拷贝版本:%d,新的版本:%d"
- updateBusiness = "前端接口api更新配置"
- updateInfo = "更新版本:%d"
- )
- // PushKey push sub id
- func pushKey(svr, host, env string) string {
- return fmt.Sprintf(_pushKey, svr, host, env)
- }
- // buildVerKey version mapping key
- func buildVerKey(svr, bver, env string) string {
- return fmt.Sprintf(_buildVerKey, svr, bver, env)
- }
- // cacheKey config cache key
- func cacheKey(svr, bver, env string, ver int64) string {
- return fmt.Sprintf(_cacheKey, svr, bver, ver, env)
- }
- // cacheKey config cache key
- func cacheKey2(svr, bver, env string, ver int64) string {
- return fmt.Sprintf(_cacheKey2, svr, bver, ver, env)
- }
- // fileKey
- func fileKey(filename string, ver int64) string {
- return fmt.Sprintf(_fileKey, filename, ver)
- }
- // tokenKey
- func tokenKey(svr, env string) string {
- return fmt.Sprintf("%s_%s", svr, env)
- }
- // genConfig generate config
- func genConfig(ver int64, cs []*model.NSValue) (conf *model.Content, err error) {
- var b []byte
- data := make(map[string]string)
- for _, c := range cs {
- data[c.Name] = c.Config
- }
- if b, err = json.Marshal(data); err != nil {
- return
- }
- mb := md5.Sum(b)
- conf = &model.Content{
- Version: ver,
- Md5: hex.EncodeToString(mb[:]),
- Content: string(b),
- }
- return
- }
- // genConfig2 generate config
- func genConfig2(ver int64, cs []*model.NSValue, ns map[int64]string) (conf *model.Content, err error) {
- var (
- b []byte
- v string
- ok bool
- s *model.Namespace
- )
- nsc := make(map[string]*model.Namespace)
- for _, c := range cs {
- if v, ok = ns[c.NamespaceID]; !ok && c.NamespaceID != 0 {
- continue
- }
- if s, ok = nsc[v]; !ok {
- s = &model.Namespace{Name: v, Data: map[string]string{}}
- nsc[v] = s
- }
- s.Data[c.Name] = c.Config
- }
- if b, err = json.Marshal(nsc); err != nil {
- return
- }
- mb := md5.Sum(b)
- conf = &model.Content{
- Version: ver,
- Md5: hex.EncodeToString(mb[:]),
- Content: string(b),
- }
- return
- }
- // Push version to clients & generate config caches
- func (s *Service) Push(c context.Context, svr *model.Service) (err error) {
- var (
- hosts []*model.Host
- values []*model.NSValue
- conf *model.Content
- conf2 *model.Content
- namespaces map[int64]string
- )
- if values, err = s.dao.Values(c, svr.Version); err != nil {
- return
- }
- if namespaces, err = s.dao.Namespaces(c, svr.Version); err != nil {
- return
- }
- if len(values) == 0 {
- err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
- log.Error("%v", err)
- return
- }
- // compatible old version sdk
- if conf, err = genConfig(svr.Version, values); err != nil {
- log.Error("get config value:%s error(%v) ", values, err)
- return
- }
- cacheKey := cacheKey(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
- if err = s.dao.SetFile(cacheKey, conf); err != nil {
- log.Error("set confCashe error. svr:%s, buildVer:%s, ver:%d", svr.Name, svr.BuildVersion, svr.Env)
- err = nil
- }
- if conf2, err = genConfig2(svr.Version, values, namespaces); err != nil {
- log.Error("get config2 value:%s error(%v) ", values, err)
- return
- }
- cacheKey2 := cacheKey2(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
- if err = s.dao.SetFile(cacheKey2, conf2); err != nil {
- log.Error("set confCashe2 error. svr:%s, buildVer:%s, ver:%d", svr.Name, svr.BuildVersion, svr.Env)
- err = nil
- }
- s.setVersion(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
- // push hosts
- if hosts, err = s.dao.Hosts(c, svr.Name, svr.Env); err != nil {
- log.Error("get hosts error. svr:%s, buildVer:%s, ver:%d", svr.Name, svr.BuildVersion, svr.Version)
- err = nil
- return
- }
- for _, h := range hosts {
- if h.State == model.HostOnline {
- pushKey := pushKey(h.Service, h.Name, svr.Env)
- if ok := s.pubEvent(pushKey, &model.Version{Version: conf.Version}); ok {
- log.Info("s.events.Pub(%s, %d) ok: %t", pushKey, conf.Version, ok)
- }
- }
- }
- return
- }
- // Config return config content.
- func (s *Service) Config(c context.Context, svr *model.Service) (conf *model.Content, err error) {
- var values []*model.NSValue
- if err = s.appAuth(c, svr.Name, svr.Env, svr.Token); err != nil {
- return
- }
- cacheName := cacheKey(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
- if conf, err = s.dao.File(cacheName); err == nil {
- return
- }
- if values, err = s.dao.Values(c, svr.Version); err != nil {
- return
- }
- if len(values) == 0 {
- err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
- log.Error("%v", err)
- return
- }
- if conf, err = genConfig(svr.Version, values); err != nil {
- log.Error("get config value:%s error(%v) ", values, err)
- return
- }
- if err = s.dao.SetFile(cacheName, conf); err != nil {
- err = nil
- }
- return
- }
- // Config2 return config content.
- func (s *Service) Config2(c context.Context, svr *model.Service) (conf *model.Content, err error) {
- var (
- values []*model.NSValue
- namespaces map[int64]string
- )
- if err = s.appAuth(c, svr.Name, svr.Env, svr.Token); err != nil {
- return
- }
- cacheName := cacheKey2(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
- if conf, err = s.dao.File(cacheName); err == nil {
- return
- }
- if namespaces, err = s.dao.Namespaces(c, svr.Version); err != nil {
- return
- }
- if values, err = s.dao.Values(c, svr.Version); err != nil {
- return
- }
- if len(values) == 0 {
- err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
- log.Error("%v", err)
- return
- }
- if conf, err = genConfig2(svr.Version, values, namespaces); err != nil {
- log.Error("get config value:(%s) error(%v) ", values, err)
- return
- }
- if err = s.dao.SetFile(cacheName, conf); err != nil {
- err = nil
- }
- return
- }
- // File get one file content.
- func (s *Service) File(c context.Context, svr *model.Service) (val string, err error) {
- var (
- curVer int64
- ok bool
- )
- if err = s.appAuth(c, svr.Name, svr.Env, svr.Token); err != nil {
- return
- }
- if svr.Version != model.UnknownVersion {
- curVer = svr.Version
- } else {
- curVer, ok = s.version(svr.Name, svr.BuildVersion, svr.Env)
- if !ok {
- if curVer, err = s.dao.BuildVersion(c, svr.Name, svr.BuildVersion, svr.Env); err != nil {
- log.Error("BuildVersion(%v) error(%v)", svr, err)
- return
- }
- s.setVersion(svr.Name, svr.BuildVersion, svr.Env, curVer)
- }
- }
- fKey := fileKey(svr.File, curVer)
- if val, err = s.dao.FileStr(fKey); err == nil {
- return
- }
- if val, err = s.dao.Value(c, svr.File, curVer); err != nil {
- log.Error("Value(%v) error(%v)", svr.File, err)
- return
- }
- s.dao.SetFileStr(fKey, val)
- return
- }
- // CheckVersion check client version.
- func (s *Service) CheckVersion(c context.Context, rhost *model.Host, env, token string) (evt chan *model.Version, err error) {
- var (
- curVer int64
- )
- if err = s.appAuth(c, rhost.Service, env, token); err != nil {
- return
- }
- // set heartbeat
- rhost.HeartbeatTime = xtime.Time(time.Now().Unix())
- if err = s.dao.SetHost(c, rhost, rhost.Service, env); err != nil {
- err = nil
- }
- evt = make(chan *model.Version, 1)
- if rhost.Appoint > 0 {
- if rhost.Appoint != rhost.ConfigVersion {
- evt <- &model.Version{Version: rhost.Appoint}
- }
- return
- }
- // get current version, return if has new config version
- if curVer, err = s.curVer(c, rhost.Service, rhost.BuildVersion, env); err != nil {
- return
- }
- if curVer == model.UnknownVersion {
- err = ecode.NothingFound
- return
- }
- if curVer != rhost.ConfigVersion {
- evt <- &model.Version{Version: curVer}
- return
- }
- pushKey := pushKey(rhost.Service, rhost.Name, env)
- s.eLock.Lock()
- s.events[pushKey] = evt
- s.eLock.Unlock()
- return
- }
- // AppAuth check app is auth
- func (s *Service) appAuth(c context.Context, svr, env, token string) (err error) {
- var (
- dbToken string
- ok bool
- tokenKey = tokenKey(svr, env)
- )
- s.eLock.RLock()
- dbToken, ok = s.token[tokenKey]
- s.eLock.RUnlock()
- if !ok {
- if dbToken, err = s.dao.Token(c, svr, env); err != nil {
- log.Error("Token(%v,%v) error(%v)", svr, env, err)
- return
- }
- s.SetToken(c, svr, env, dbToken)
- }
- if dbToken != token {
- err = ecode.AccessDenied
- }
- return
- }
- // SetToken update Token
- func (s *Service) SetToken(c context.Context, svr, env, token string) {
- tokenKey := tokenKey(svr, env)
- s.eLock.Lock()
- s.token[tokenKey] = token
- s.eLock.Unlock()
- }
- // Hosts return client hosts.
- func (s *Service) Hosts(c context.Context, svr, env string) (hosts []*model.Host, err error) {
- return s.dao.Hosts(c, svr, env)
- }
- // VersionSuccess return client versions which configuration is complete
- func (s *Service) VersionSuccess(c context.Context, svr, env, bver string) (versions *model.Versions, err error) {
- var (
- vers []*model.ReVer
- ver int64
- )
- if vers, err = s.dao.Versions(c, svr, env, model.ConfigEnd); err != nil {
- log.Error("Versions(%v,%v,%v) error(%v)", svr, env, bver, err)
- return
- }
- if ver, err = s.dao.BuildVersion(c, svr, bver, env); err != nil {
- log.Error("BuildVersion(%v) error(%v)", svr, err)
- return
- }
- versions = &model.Versions{
- Version: vers,
- DefVer: ver,
- }
- return
- }
- // VersionIng return client versions which configuration is creating
- func (s *Service) VersionIng(c context.Context, svr, env string) (vers []int64, err error) {
- var (
- res []*model.ReVer
- )
- if res, err = s.dao.Versions(c, svr, env, model.ConfigIng); err != nil {
- log.Error("Versions(%v,%v) error(%v)", svr, env, err)
- return
- }
- vers = make([]int64, 0)
- for _, reVer := range res {
- vers = append(vers, reVer.Version)
- }
- return
- }
- // Builds all builds
- func (s *Service) Builds(c context.Context, svr, env string) (builds []string, err error) {
- return s.dao.Builds(c, svr, env)
- }
- // AddConfigs insert config into db.
- func (s *Service) AddConfigs(c context.Context, svr, env, token, user string, data map[string]string) (err error) {
- var (
- svrID int64
- ver int64
- )
- if err = s.appAuth(c, svr, env, token); err != nil {
- return
- }
- if svrID, err = s.dao.ServiceID(c, svr, env); err != nil {
- return
- }
- var tx *sql.Tx
- if tx, err = s.dao.BeginTran(c); err != nil {
- log.Error("begin tran error(%v)", err)
- return
- }
- if ver, err = s.dao.TxInsertVer(tx, svrID, user); err != nil {
- tx.Rollback()
- return
- }
- if len(data) != 0 {
- if err = s.dao.TxInsertValues(c, tx, ver, user, data); err != nil {
- tx.Rollback()
- return
- }
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit error(%v)", err)
- return
- }
- s.dao.InsertLog(c, user, addBusiness, fmt.Sprintf(addInfo, ver))
- return
- }
- // CopyConfigs copy config in newVer.
- func (s *Service) CopyConfigs(c context.Context, svr, env, token, user string, build string) (ver int64, err error) {
- var (
- svrID int64
- curVer int64
- values []*model.NSValue
- )
- if err = s.appAuth(c, svr, env, token); err != nil {
- return
- }
- if curVer, err = s.curVer(c, svr, build, env); err != nil {
- return
- }
- if values, err = s.dao.Values(c, curVer); err != nil {
- return
- }
- data := make(map[string]string)
- for _, c := range values {
- data[c.Name] = c.Config
- }
- if svrID, err = s.dao.ServiceID(c, svr, env); err != nil {
- return
- }
- var tx *sql.Tx
- if tx, err = s.dao.BeginTran(c); err != nil {
- log.Error("begin tran error(%v)", err)
- return
- }
- if ver, err = s.dao.TxInsertVer(tx, svrID, user); err != nil {
- tx.Rollback()
- return
- }
- if err = s.dao.TxInsertValues(c, tx, ver, user, data); err != nil {
- tx.Rollback()
- return
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit error(%v)", err)
- return
- }
- s.dao.InsertLog(c, user, copyBusiness, fmt.Sprintf(copyInfo, curVer, ver))
- return
- }
- // UpdateConfigs update config.
- func (s *Service) UpdateConfigs(c context.Context, svr, env, token, user string, ver int64, data map[string]string) (err error) {
- var (
- values []*model.NSValue
- addData = make(map[string]string)
- udata = make(map[string]string)
- )
- if err = s.appAuth(c, svr, env, token); err != nil {
- return
- }
- if len(data) == 0 {
- return
- }
- if values, err = s.dao.Values(c, ver); err != nil {
- return
- }
- if len(values) == 0 {
- return ecode.NothingFound
- }
- oldData := make(map[string]string)
- for _, c := range values {
- oldData[c.Name] = c.Config
- }
- for k, v := range data {
- if _, ok := oldData[k]; ok {
- udata[k] = v
- } else {
- addData[k] = v
- }
- }
- var tx *sql.Tx
- if tx, err = s.dao.BeginTran(c); err != nil {
- log.Error("begin tran error(%v)", err)
- return
- }
- if len(addData) != 0 {
- if err = s.dao.TxInsertValues(c, tx, ver, user, addData); err != nil {
- tx.Rollback()
- return
- }
- }
- if len(udata) != 0 {
- if err = s.dao.TxUpdateValues(tx, ver, user, udata); err != nil {
- tx.Rollback()
- return
- }
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit error(%v)", err)
- return
- }
- s.dao.InsertLog(c, user, updateBusiness, fmt.Sprintf(updateInfo, ver))
- return
- }
- func (s *Service) version(svr, bver, env string) (ver int64, ok bool) {
- verKey := buildVerKey(svr, bver, env)
- s.vLock.RLock()
- ver, ok = s.versions[verKey]
- s.vLock.RUnlock()
- return
- }
- func (s *Service) setVersion(svr, bver, env string, ver int64) {
- verKey := buildVerKey(svr, bver, env)
- s.vLock.Lock()
- s.versions[verKey] = ver
- s.vLock.Unlock()
- }
- // ClearHost clear service hosts.
- func (s *Service) ClearHost(c context.Context, svr, env string) (err error) {
- return s.dao.ClearHost(c, svr, env)
- }
- // pubEvent publish a event to chan.
- func (s *Service) pubEvent(key string, evt *model.Version) (ok bool) {
- s.eLock.RLock()
- c, ok := s.events[key]
- s.eLock.RUnlock()
- if ok {
- c <- evt
- }
- return
- }
- // Unsub unsub a event.
- func (s *Service) Unsub(svr, host, env string) {
- key := pushKey(svr, host, env)
- s.eLock.Lock()
- delete(s.events, key)
- s.eLock.Unlock()
- }
- func (s *Service) curVer(c context.Context, svr, build, env string) (ver int64, err error) {
- var ok bool
- // get current version, return if has new config version
- ver, ok = s.version(svr, build, env)
- if !ok {
- if ver, err = s.dao.BuildVersion(c, svr, build, env); err != nil {
- return
- }
- s.setVersion(svr, build, env, ver)
- }
- return
- }
|