123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769 |
- package v2
- import (
- "context"
- "crypto/md5"
- "encoding/hex"
- "encoding/json"
- "fmt"
- "strconv"
- "strings"
- "time"
- "go-common/app/infra/config/model"
- "go-common/library/ecode"
- "go-common/library/log"
- xtime "go-common/library/time"
- )
- const (
- _buildVerKey = "%d_%s"
- _pushKey = "%s_%s"
- _cacheKey = "%s_%d"
- _pushForceKey = "%s_%s_%s"
- _tagForceKey = "%d_%s_%d_tagforce"
- _tagIDkey = "%d_%s_tagID"
- _lastForceKey = "%d_%s_lastForce"
- )
- func lastForceKey(svr int64, bver string) string {
- return fmt.Sprintf(_tagIDkey, svr, bver)
- }
- func tagIDkey(svr int64, bver string) string {
- return fmt.Sprintf(_tagIDkey, svr, bver)
- }
- // tagForceKey...
- func tagForceKey(svr int64, bver string, tagID int64) string {
- return fmt.Sprintf(_tagForceKey, svr, bver, tagID)
- }
- // pushForceKey push
- func pushForceKey(svr, host, ip string) string {
- return fmt.Sprintf(_pushForceKey, svr, host, ip)
- }
- // PushKey push sub id
- func pushKey(svr, host string) string {
- return fmt.Sprintf(_pushKey, svr, host)
- }
- // buildVerKey version mapping key
- func buildVerKey(svr int64, bver string) string {
- return fmt.Sprintf(_buildVerKey, svr, bver)
- }
- // cacheKey config cache key
- func cacheKey(svr string, ver int64) string {
- return fmt.Sprintf(_cacheKey, svr, ver)
- }
- // genConfig generate config
- func genConfig(ver int64, values []*model.Value) (conf *model.Content, err error) {
- var b []byte
- if b, err = json.Marshal(values); err != nil {
- return
- }
- mb := md5.Sum(b)
- conf = &model.Content{
- Version: ver,
- Md5: hex.EncodeToString(mb[:]),
- Content: string(b),
- }
- return
- }
- // Push version to clients & generate config caches
- func (s *Service) Push(c context.Context, svr *model.Service) (err error) {
- var (
- hosts []*model.Host
- values []*model.Value
- conf *model.Content
- app *model.App
- ids []int64
- force int8
- )
- if ids, err = s.dao.ConfIDs(svr.Version); err != nil {
- return
- }
- if values, err = s.dao.ConfigsByIDs(ids); err != nil {
- return
- }
- if len(values) == 0 {
- err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
- log.Error("%v", err)
- return
- }
- // compatible old version sdk
- if conf, err = genConfig(svr.Version, values); err != nil {
- log.Error("get config value:%s error(%v) ", values, err)
- return
- }
- cacheName := cacheKey(svr.Name, svr.Version)
- if err = s.dao.SetFile(cacheName, conf); err != nil {
- return
- }
- if app, err = s.app(svr.Name); err != nil {
- return
- }
- tag := s.setTag(app.ID, svr.BuildVersion, &cacheTag{Tag: svr.Version, ConfIDs: ids})
- s.setTagID(app.ID, tag.C.Tag, svr.BuildVersion)
- if force, err = s.dao.TagForce(svr.Version); err != nil {
- return
- }
- if force == 1 {
- s.setLFVForces(svr.Version, app.ID, svr.BuildVersion)
- }
- // push hosts
- if hosts, err = s.dao.Hosts(c, svr.Name); err != nil {
- log.Error("get hosts error. svr:%s, buildVer:%s, ver:%d", svr.Name, svr.BuildVersion, svr.Version)
- err = nil
- return
- }
- for _, h := range hosts {
- if h.State == model.HostOnline {
- pushKey := pushKey(h.Service, h.Name)
- if ok := s.pubEvent(pushKey, &model.Diff{Version: svr.Version, Diffs: tag.diff()}); ok {
- log.Info("s.events.Pub(%s, %d) ok: %t", pushKey, conf.Version, ok)
- }
- }
- }
- return
- }
- // Config return config content.
- func (s *Service) Config(c context.Context, svrName, token string, ver int64, ids []int64) (conf *model.Content, err error) {
- var (
- values, all []*model.Value
- )
- if _, err = s.appAuth(c, svrName, token); err != nil {
- return
- }
- cacheName := cacheKey(svrName, ver)
- if conf, err = s.dao.File(cacheName); err == nil {
- if len(ids) == 0 {
- return
- }
- if err = json.Unmarshal([]byte(conf.Content), &all); err != nil {
- return
- }
- } else {
- if all, err = s.values(ver); err != nil {
- return
- }
- if conf, err = genConfig(ver, all); err != nil {
- return
- }
- cacheName := cacheKey(svrName, ver)
- if err = s.dao.SetFile(cacheName, conf); err != nil {
- return
- }
- if len(ids) == 0 {
- return
- }
- }
- if len(all) == 0 {
- err = ecode.NothingFound
- return
- }
- for _, v := range all {
- for _, id := range ids {
- if v.ConfigID == id {
- values = append(values, v)
- }
- }
- }
- if len(values) == 0 {
- log.Error("Config(%s,%v) error", svrName, ids)
- err = ecode.NothingFound
- return
- }
- if conf, err = genConfig(ver, values); err != nil {
- log.Error("get config value:%s error(%v) ", values, err)
- return
- }
- return
- }
- // File get one file content.
- func (s *Service) File(c context.Context, svr *model.Service) (val string, err error) {
- var (
- curVer, appID int64
- all []*model.Value
- conf *model.Content
- tag *curTag
- )
- if appID, err = s.appAuth(c, svr.Name, svr.Token); err != nil {
- return
- }
- if svr.Version != model.UnknownVersion {
- curVer = svr.Version
- } else {
- // get current version, return if has new config version
- if tag, err = s.curTag(appID, svr.BuildVersion); err != nil {
- return
- }
- curVer = tag.cur()
- }
- cacheName := cacheKey(svr.Name, svr.Version)
- if conf, err = s.dao.File(cacheName); err == nil {
- if err = json.Unmarshal([]byte(conf.Content), &all); err != nil {
- return
- }
- } else {
- if all, err = s.values(curVer); err != nil {
- return
- }
- if conf, err = genConfig(curVer, all); err != nil {
- return
- }
- cacheName := cacheKey(svr.Name, curVer)
- if err = s.dao.SetFile(cacheName, conf); err != nil {
- return
- }
- }
- for _, v := range all {
- if v.Name == svr.File {
- val = v.Config
- break
- }
- }
- return
- }
- // CheckVersion check client version.
- func (s *Service) CheckVersion(c context.Context, rhost *model.Host, token string) (evt chan *model.Diff, err error) {
- var (
- appID, tagID int64
- tag *curTag
- ForceVersion int64
- tagForce int8
- lastForce int64
- )
- if appID, err = s.appAuth(c, rhost.Service, token); err != nil {
- return
- }
- // set heartbeat
- rhost.HeartbeatTime = xtime.Time(time.Now().Unix())
- s.dao.SetHost(c, rhost, rhost.Service)
- evt = make(chan *model.Diff, 1)
- //请开始你的表演
- tagForce, err = s.tagForce(appID, rhost.BuildVersion)
- if err != nil {
- return
- }
- rhost.Force = tagForce
- s.dao.SetHost(c, rhost, rhost.Service)
- if rhost.ConfigVersion > 0 {
- // force has a higher priority than appoint
- if ForceVersion, err = s.curForce(rhost, appID); err != nil {
- return
- }
- if ForceVersion > 0 {
- rhost.ForceVersion = ForceVersion
- s.dao.SetHost(c, rhost, rhost.Service)
- if ForceVersion != rhost.ConfigVersion {
- evt <- &model.Diff{Version: ForceVersion}
- }
- return
- }
- if lastForce, err = s.lastForce(appID, rhost.ConfigVersion, rhost.BuildVersion); err != nil {
- return
- }
- if rhost.ConfigVersion <= lastForce {
- if rhost.ConfigVersion != lastForce {
- evt <- &model.Diff{Version: lastForce}
- }
- return
- }
- }
- if rhost.Appoint > 0 {
- if rhost.Appoint != rhost.ConfigVersion {
- evt <- &model.Diff{Version: rhost.Appoint}
- }
- return
- }
- //结束表演
- // get current version, return if has new config version
- if tag, err = s.curTag(appID, rhost.BuildVersion); err != nil {
- return
- }
- if tagID = tag.cur(); tagID == 0 {
- err = ecode.NothingFound
- return
- }
- if tagID != rhost.ConfigVersion {
- ver := &model.Diff{Version: tagID}
- if rhost.ConfigVersion == tag.old() {
- ver.Diffs = tag.diff()
- }
- evt <- ver
- return
- }
- pushKey := pushKey(rhost.Service, rhost.Name)
- s.eLock.Lock()
- s.events[pushKey] = evt
- s.eLock.Unlock()
- return
- }
- // values get configs by tag id.
- func (s *Service) values(tagID int64) (values []*model.Value, err error) {
- var (
- ids []int64
- )
- if ids, err = s.dao.ConfIDs(tagID); err != nil {
- return
- }
- values, err = s.dao.ConfigsByIDs(ids)
- return
- }
- // appAuth check app is auth
- func (s *Service) appAuth(c context.Context, svr, token string) (appID int64, err error) {
- app, err := s.app(svr)
- if err != nil {
- return
- }
- if app.Token != token {
- err = ecode.AccessDenied
- return
- }
- appID = app.ID
- return
- }
- // Hosts return client hosts.
- func (s *Service) Hosts(c context.Context, svr string) (hosts []*model.Host, err error) {
- return s.dao.Hosts(c, svr)
- }
- // Builds all builds
- func (s *Service) Builds(c context.Context, svr string) (builds []string, err error) {
- var (
- app *model.App
- )
- if app, err = s.app(svr); err != nil {
- return
- }
- return s.dao.BuildsByAppID(app.ID)
- }
- // VersionSuccess return client versions which configuration is complete
- func (s *Service) VersionSuccess(c context.Context, svr, bver string) (versions *model.Versions, err error) {
- var (
- vers []*model.ReVer
- ver int64
- app *model.App
- )
- if app, err = s.app(svr); err != nil {
- return
- }
- if ver, err = s.dao.TagID(app.ID, bver); err != nil {
- log.Error("BuildVersion(%v) error(%v)", svr, err)
- return
- }
- if vers, err = s.dao.Tags(app.ID); err != nil {
- return
- }
- versions = &model.Versions{
- Version: vers,
- DefVer: ver,
- }
- return
- }
- // ClearHost clear service hosts.
- func (s *Service) ClearHost(c context.Context, svr string) (err error) {
- return s.dao.ClearHost(c, svr)
- }
- // pubEvent publish a event to chan.
- func (s *Service) pubEvent(key string, evt *model.Diff) (ok bool) {
- s.eLock.RLock()
- c, ok := s.events[key]
- s.eLock.RUnlock()
- if ok {
- c <- evt
- }
- return
- }
- // Unsub unsub a event.
- func (s *Service) Unsub(svr, host string) {
- key := pushKey(svr, host)
- s.eLock.Lock()
- delete(s.events, key)
- s.eLock.Unlock()
- }
- func (s *Service) curTag(appID int64, build string) (tag *curTag, err error) {
- var (
- ok bool
- tagID int64
- ids []int64
- tagForce int8
- )
- // get current version, return if has new config version
- verKey := buildVerKey(appID, build)
- s.tLock.RLock()
- tag, ok = s.tags[verKey]
- s.tLock.RUnlock()
- if !ok {
- if tagID, err = s.dao.TagID(appID, build); err != nil {
- return
- }
- if ids, err = s.dao.ConfIDs(tagID); err != nil {
- return
- }
- if tagForce, err = s.dao.TagForce(tagID); err != nil {
- return
- }
- tag = s.setTag(appID, build, &cacheTag{Tag: tagID, ConfIDs: ids, Force: tagForce})
- }
- return
- }
- func (s *Service) setTag(appID int64, bver string, cTag *cacheTag) (nTag *curTag) {
- var (
- oTag *curTag
- ok bool
- )
- verKey := buildVerKey(appID, bver)
- nTag = &curTag{C: cTag}
- s.tLock.Lock()
- oTag, ok = s.tags[verKey]
- if ok && oTag.C != nil {
- nTag.O = oTag.C
- }
- s.tags[verKey] = nTag
- s.tLock.Unlock()
- return
- }
- func (s *Service) app(svr string) (app *model.App, err error) {
- var (
- ok bool
- treeID int64
- )
- s.aLock.RLock()
- app, ok = s.apps[svr]
- s.aLock.RUnlock()
- if !ok {
- arrs := strings.Split(svr, "_")
- if len(arrs) != 3 {
- err = ecode.RequestErr
- return
- }
- if treeID, err = strconv.ParseInt(arrs[0], 10, 64); err != nil {
- return
- }
- if app, err = s.dao.AppByTree(arrs[2], arrs[1], treeID); err != nil {
- log.Error("Token(%v) error(%v)", svr, err)
- return
- }
- s.aLock.Lock()
- s.apps[svr] = app
- s.aLock.Unlock()
- }
- return app, nil
- }
- //SetToken set token.
- func (s *Service) SetToken(svr, token string) (err error) {
- var (
- ok bool
- treeID int64
- app, nApp *model.App
- )
- s.aLock.RLock()
- app, ok = s.apps[svr]
- s.aLock.RUnlock()
- if ok {
- nApp = &model.App{ID: app.ID, Name: app.Name, Token: token}
- } else {
- arrs := strings.Split(svr, "_")
- if len(arrs) != 3 {
- err = ecode.RequestErr
- return
- }
- if treeID, err = strconv.ParseInt(arrs[0], 10, 64); err != nil {
- return
- }
- if nApp, err = s.dao.AppByTree(arrs[2], arrs[1], treeID); err != nil {
- log.Error("Token(%v) error(%v)", svr, err)
- return
- }
- }
- s.aLock.Lock()
- s.apps[svr] = nApp
- s.aLock.Unlock()
- return
- }
- //TmpBuilds get builds.
- func (s *Service) TmpBuilds(svr, env string) (builds []string, err error) {
- var (
- apps []*model.DBApp
- appIDs []int64
- )
- switch env {
- case "10":
- env = "dev"
- case "11":
- env = "fat1"
- case "13":
- env = "uat"
- case "14":
- env = "pre"
- case "3":
- env = "prod"
- default:
- }
- if apps, err = s.dao.AppsByNameEnv(svr, env); err != nil {
- return
- }
- if len(apps) == 0 {
- err = ecode.NothingFound
- return
- }
- for _, app := range apps {
- appIDs = append(appIDs, app.ID)
- }
- return s.dao.BuildsByAppIDs(appIDs)
- }
- // AppService ...
- func (s *Service) AppService(zone, env, token string) (service string, err error) {
- var (
- ok bool
- key string
- app *model.App
- )
- key = fmt.Sprintf("%s_%s_%s", zone, env, token)
- s.bLock.RLock()
- app, ok = s.services[key]
- s.bLock.RUnlock()
- if !ok {
- app, err = s.dao.AppGet(zone, env, token)
- if err != nil {
- log.Error("AppService error(%v)", err)
- return
- }
- s.bLock.Lock()
- s.services[key] = app
- s.bLock.Unlock()
- }
- service = fmt.Sprintf("%d_%s_%s", app.TreeID, app.Env, app.Zone)
- return
- }
- // Force version to clients & generate config caches
- func (s *Service) Force(c context.Context, svr *model.Service, hostnames map[string]string, sType int8) (err error) {
- var (
- values []*model.Value
- ids []int64
- )
- if sType == 1 { // push 1 clear other
- if ids, err = s.dao.ConfIDs(svr.Version); err != nil {
- return
- }
- if values, err = s.dao.ConfigsByIDs(ids); err != nil {
- return
- }
- if len(values) == 0 {
- err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
- log.Error("%v", err)
- return
- }
- }
- for key, val := range hostnames {
- pushForceKey := pushForceKey(svr.Name, key, val)
- s.pubForce(pushForceKey, svr.Version)
- log.Info("s.events.Pub(%s, %d)", pushForceKey, svr.Version)
- }
- return
- }
- func (s *Service) curForce(rhost *model.Host, appID int64) (version int64, err error) {
- var (
- ok bool
- )
- // get force version
- pushForceKey := pushForceKey(rhost.Service, rhost.Name, rhost.IP)
- s.fLock.RLock()
- version, ok = s.forces[pushForceKey]
- s.fLock.RUnlock()
- if !ok {
- if version, err = s.dao.Force(appID, rhost.Name); err != nil {
- return
- }
- s.fLock.Lock()
- s.forces[pushForceKey] = version
- s.fLock.Unlock()
- }
- return
- }
- // pubEvent publish a forces.
- func (s *Service) pubForce(key string, version int64) {
- s.fLock.Lock()
- s.forces[key] = version
- s.fLock.Unlock()
- }
- func (s *Service) tagForce(appID int64, build string) (force int8, err error) {
- var (
- ok bool
- tagID int64
- )
- key := tagIDkey(appID, build)
- s.tagIDLock.RLock()
- tagID, ok = s.tagID[key]
- s.tagIDLock.RUnlock()
- if !ok {
- if tagID, err = s.dao.TagID(appID, build); err != nil {
- return
- }
- s.setTagID(appID, tagID, build)
- }
- verKey := tagForceKey(appID, build, tagID)
- s.tfLock.RLock()
- force, ok = s.forceType[verKey]
- s.tfLock.RUnlock()
- if !ok {
- if force, err = s.dao.TagForce(tagID); err != nil {
- return
- }
- s.tfLock.Lock()
- s.forceType[verKey] = force
- s.tfLock.Unlock()
- }
- return
- }
- func (s *Service) setTagID(appID, tagID int64, build string) {
- key := tagIDkey(appID, build)
- s.tagIDLock.Lock()
- s.tagID[key] = tagID
- s.tagIDLock.Unlock()
- }
- func (s *Service) lastForce(appID, version int64, build string) (lastForce int64, err error) {
- var ok bool
- var buildID int64
- key := lastForceKey(appID, build)
- s.lfvLock.RLock()
- lastForce, ok = s.lfvforces[key]
- s.lfvLock.RUnlock()
- if !ok {
- if buildID, err = s.dao.BuildID(appID, build); err != nil {
- return
- }
- if lastForce, err = s.dao.LastForce(appID, buildID); err != nil {
- return
- }
- s.lfvLock.Lock()
- s.lfvforces[key] = lastForce
- s.lfvLock.Unlock()
- }
- return
- }
- func (s *Service) setLFVForces(lastForce, appID int64, build string) {
- key := lastForceKey(appID, build)
- s.lfvLock.Lock()
- s.lfvforces[key] = lastForce
- s.lfvLock.Unlock()
- }
- //CheckLatest ...
- func (s *Service) CheckLatest(c context.Context, rhost *model.Host, token string) (ver int64, err error) {
- var (
- appID, tagID int64
- tag *curTag
- )
- if appID, err = s.appAuth(c, rhost.Service, token); err != nil {
- return
- }
- // get current version, return if has new config version
- if tag, err = s.curTag(appID, rhost.BuildVersion); err != nil {
- return
- }
- if tagID = tag.cur(); tagID == 0 {
- err = ecode.NothingFound
- return
- }
- ver = tagID
- return
- }
- // ConfigCheck return config content.
- func (s *Service) ConfigCheck(c context.Context, svrName, token string, ver int64, ids []int64) (conf *model.Content, err error) {
- var (
- values, all []*model.Value
- appID int64
- tagAll *model.DBTag
- )
- if appID, err = s.appAuth(c, svrName, token); err != nil {
- return
- }
- if tagAll, err = s.dao.TagAll(ver); err != nil {
- return
- }
- if appID != tagAll.AppID {
- err = ecode.RequestErr
- return
- }
- cacheName := cacheKey(svrName, ver)
- if conf, err = s.dao.File(cacheName); err == nil {
- if len(ids) == 0 {
- return
- }
- if err = json.Unmarshal([]byte(conf.Content), &all); err != nil {
- return
- }
- } else {
- if all, err = s.values(ver); err != nil {
- return
- }
- if conf, err = genConfig(ver, all); err != nil {
- return
- }
- cacheName := cacheKey(svrName, ver)
- if err = s.dao.SetFile(cacheName, conf); err != nil {
- return
- }
- if len(ids) == 0 {
- return
- }
- }
- if len(all) == 0 {
- err = ecode.NothingFound
- return
- }
- for _, v := range all {
- for _, id := range ids {
- if v.ConfigID == id {
- values = append(values, v)
- }
- }
- }
- if len(values) == 0 {
- log.Error("Config(%s,%v) error", svrName, ids)
- err = ecode.NothingFound
- return
- }
- if conf, err = genConfig(ver, values); err != nil {
- log.Error("get config value:%s error(%v) ", values, err)
- return
- }
- return
- }
|