123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980 |
- package service
- import (
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "net/http"
- "net/url"
- "strconv"
- "strings"
- "go-common/app/admin/main/apm/conf"
- cml "go-common/app/admin/main/apm/model/canal"
- "go-common/app/admin/main/apm/model/user"
- cgm "go-common/app/admin/main/config/model"
- "go-common/library/conf/env"
- "go-common/library/ecode"
- "go-common/library/log"
- bm "go-common/library/net/http/blademaster"
- "github.com/BurntSushi/toml"
- )
- const (
- _dialTimeout = "500ms"
- _readTimeout = "1s"
- _writeTimeout = "1s"
- _idleTimeout = "60s"
- _flavor = "mysql"
- _heartbeatPeriod = 60
- _canreadTimeout = 90
- )
- var (
- getBuildIDAPI = "%s/x/admin/config/build/builds"
- getConfigValueAPI = "%s/x/admin/config/config/value"
- getConfigIDAPI = "%s/x/admin/config/config/configs"
- getAllErrorsAPI = "%s/x/internal/canal/errors"
- createConfigAPI = "%s/x/admin/config/canal/config/create"
- configByNameAPI = "%s/x/admin/config/canal/name/configs"
- updateConfigAPI = "%s/x/admin/config/home/config/update"
- checkMasterAPI = "%s/x/internal/canal/master/check"
- ok = 0
- )
- type result struct {
- Data json.RawMessage `json:"data"`
- Code int `json:"code"`
- }
- type list []struct {
- ID int `json:"id"`
- }
- type configs struct {
- BuildFiles fileList `json:"build_files"`
- }
- type fileList []struct {
- ID int `json:"id"`
- Name string `json:"name"`
- }
- type groupInfo struct {
- Group string `json:"group"`
- Topic string `json:"topic"`
- AppID int `json:"app_id"`
- }
- type appInfo struct {
- ID int `json:"id"`
- AppKey string `json:"app_key"`
- AppSecret string `json:"app_secret"`
- }
- // ConfigProxy config proxy
- func (s *Service) ConfigProxy(c context.Context, method string, uri string, params url.Values, cookie string, args ...interface{}) (data json.RawMessage, err error) {
- //common params
- if params != nil && args == nil {
- params.Set("app_name", "main.common-arch.canal")
- params.Set("tree_id", "3766")
- params.Set("zone", env.Zone)
- params.Set("env", env.DeployEnv)
- }
- res := result{}
- fmt.Println("ConfigProxy uri=", uri, "params=", params.Encode())
- req, err := s.client.NewRequest(method, uri, "", params)
- if err != nil {
- log.Error("s.client.NewRequest() error(%v)", err)
- }
- if cookie != "" {
- req.Header.Set("Cookie", cookie)
- }
- if err = s.client.Do(c, req, &res); err != nil {
- log.Error("canal.request get url:"+uri+" params:(%v) error(%v)", params.Encode(), err)
- return
- }
- if res.Code != ok {
- log.Error("canal.request get url:"+uri+" params:(%v) returnCode:(%v)", params.Encode(), res.Code)
- return
- }
- data = res.Data
- return
- }
- func (s *Service) getBuildID(c context.Context, cookie string) (dat list, err error) {
- var (
- params = url.Values{}
- data = json.RawMessage{}
- )
- uri := fmt.Sprintf(getBuildIDAPI, conf.Conf.Host.SVENCo)
- if data, err = s.ConfigProxy(c, "GET", uri, params, cookie); err != nil {
- log.Error("getBuildID() response errors: %d", err)
- return
- }
- if data == nil {
- return
- }
- if err = json.Unmarshal([]byte(data), &dat); err != nil {
- log.Error("getBuildID() json.Unmarshal errors: %s", err)
- return
- }
- return
- }
- func (s *Service) getConfigID(c context.Context, id string, cookie string) (dat fileList, err error) {
- var (
- params = url.Values{}
- data = json.RawMessage{}
- )
- params.Set("build_id", id)
- uri := fmt.Sprintf(getConfigIDAPI, conf.Conf.Host.SVENCo)
- if data, err = s.ConfigProxy(c, "GET", uri, params, cookie); err != nil {
- log.Error("getConfigID() response errors: %d", err)
- return
- }
- if data == nil {
- return
- }
- res := &configs{}
- if err = json.Unmarshal([]byte(data), res); err != nil {
- log.Error("getConfigID() json.Unmarshal errors: %s", err)
- return
- }
- dat = res.BuildFiles
- return
- }
- func (s *Service) getConfigValue(c context.Context, id string, cookie string) (res *cml.Conf, err error) {
- var (
- params = url.Values{}
- data = json.RawMessage{}
- )
- params.Set("config_id", id)
- uri := fmt.Sprintf(getConfigValueAPI, conf.Conf.Host.SVENCo)
- if data, err = s.ConfigProxy(c, "GET", uri, params, cookie); err != nil {
- log.Error("getConfigValue() response errors: %d", err)
- return
- }
- if data == nil {
- return
- }
- res = new(cml.Conf)
- if err = json.Unmarshal([]byte(data), &res); err != nil {
- log.Error("getConfigValue() json.Unmarshal errors: %s", err)
- return
- }
- return
- }
- //ScanByAddrFromConfig 根据addr查询配置信息
- func (s *Service) ScanByAddrFromConfig(c context.Context, addr string, cookie string) (res *cml.Conf, err error) {
- var (
- buildIDList list
- configIDList fileList
- )
- //find build_id
- if buildIDList, err = s.getBuildID(c, cookie); err != nil {
- return
- }
- fmt.Println("BuildID=", buildIDList)
- for _, v := range buildIDList {
- if v.ID != 0 {
- //find config_id
- if configIDList, err = s.getConfigID(c, strconv.Itoa(v.ID), cookie); err != nil {
- return
- }
- fmt.Println("configID=", configIDList)
- for _, val := range configIDList {
- if val.ID != 0 && val.Name == addr+".toml" {
- //find config
- res, err = s.getConfigValue(c, strconv.Itoa(val.ID), cookie)
- return
- }
- continue
- }
- }
- continue
- }
- return
- }
- //ApplyAdd canal apply
- func (s *Service) ApplyAdd(c *bm.Context, v *cml.Canal, username string) (err error) {
- cnt := 0
- f := strings.Contains(v.Addr, ":")
- if !f {
- err = ecode.CanalAddrFmtErr
- return
- }
- if err = s.DBCanal.Model(&cml.Canal{}).Where("addr=?", v.Addr).Count(&cnt).Error; err != nil {
- log.Error("apmSvc.CanalAdd count error(%v)", err)
- err = ecode.RequestErr
- return
- }
- if cnt > 0 {
- err = ecode.CanalAddrExist
- return
- }
- canal := &cml.Canal{
- Addr: v.Addr,
- Cluster: v.Cluster,
- Leader: v.Leader,
- BinName: v.BinName,
- BinPos: v.BinPos,
- Remark: v.Remark,
- }
- if err = s.DBCanal.Create(canal).Error; err != nil {
- log.Error("apmSvc.CanalAdd create error(%v)", err)
- return
- }
- s.SendLog(*c, username, 0, 1, canal.ID, "apmSvc.CanalAdd", canal)
- return
- }
- //ApplyDelete canal delete
- func (s *Service) ApplyDelete(c *bm.Context, v *cml.ScanReq, username string) (err error) {
- cc := &cml.Canal{}
- if err = s.DBCanal.Model(&cml.Canal{}).Where("addr=?", v.Addr).Find(cc).Error; err != nil {
- log.Error("apmSvc.ApplyDelete count error(%v)", err)
- err = ecode.RequestErr
- return
- }
- id := cc.ID
- if err = s.DBCanal.Model(&cml.Canal{}).Where("id = ?", id).Update("is_delete", 1).Error; err != nil {
- log.Error("apmSvc.canalDelete canalDelete error(%v)", err)
- return
- }
- sqlLog := &map[string]interface{}{
- "SQLType": "delete",
- "Value": v.Addr,
- }
- s.SendLog(*c, username, 0, 3, id, "apmSvc.canalDelete", sqlLog)
- return
- }
- //ApplyEdit canal edit
- func (s *Service) ApplyEdit(c *bm.Context, v *cml.EditReq, username string) (err error) {
- cc := &cml.Canal{}
- if err = s.DBCanal.Where("id = ?", v.ID).Find(cc).Error; err != nil {
- log.Error("apmSvc.CanalEdit find(%d) error(%v)", v.ID, err)
- return
- }
- ups := map[string]interface{}{}
- if _, ok := c.Request.Form["bin_name"]; ok {
- ups["bin_name"] = v.BinName
- }
- if _, ok := c.Request.Form["bin_pos"]; ok {
- ups["bin_pos"] = v.BinPos
- }
- if _, ok := c.Request.Form["remark"]; ok {
- ups["remark"] = v.Remark
- }
- if _, ok := c.Request.Form["project"]; ok {
- ups["cluster"] = v.Project
- }
- if _, ok := c.Request.Form["leader"]; ok {
- ups["leader"] = v.Leader
- }
- if err = s.DBCanal.Model(&cml.Canal{}).Where("id = ?", v.ID).Updates(ups).Error; err != nil {
- log.Error("apmSvc.CanalEdit updates error(%v)", err)
- return
- }
- sqlLog := &map[string]interface{}{
- "SQLType": "update",
- "Where": "id = ?",
- "Value1": v.ID,
- "Update": ups,
- "Old": cc,
- }
- s.SendLog(*c, username, 0, 2, v.ID, "apmSvc.CanalEdit", sqlLog)
- return
- }
- //GetScanInfo is
- func (s *Service) GetScanInfo(c context.Context, v *cml.ScanReq, username string, cookie string) (confData *cml.Results, err error) {
- if confData, err = s.getDocFromConf(c, v.Addr, cookie); err != nil {
- return
- }
- if confData == nil {
- return
- }
- if err = s.Permit(c, username, user.CanalEdit); err != nil {
- confData.Document.Instance.User = ""
- confData.Document.Instance.Password = ""
- }
- return confData, nil
- }
- //GetAllErrors 调用x/internal/canal/errors 查询错误信息
- func (s *Service) GetAllErrors(c context.Context) (errs map[string]string, err error) {
- var (
- data json.RawMessage
- host string
- )
- type v struct {
- Error string `json:"error"`
- InstanceError map[string]string `json:"instance_error"`
- }
- if host, err = s.getCanalInstance(c); err != nil {
- return
- }
- uri := fmt.Sprintf(getAllErrorsAPI, host)
- if data, err = s.ConfigProxy(c, "GET", uri, nil, ""); err != nil {
- return
- }
- res := new(v)
- if err = json.Unmarshal([]byte(data), &res); err != nil {
- return
- }
- errs = res.InstanceError
- return
- }
- func (s *Service) getCanalInstance(c context.Context) (host string, err error) {
- params := url.Values{}
- params.Set("appid", "main.common-arch.canal")
- params.Set("env", env.DeployEnv)
- params.Set("hostname", env.Hostname)
- params.Set("status", "3")
- var ins struct {
- ZoneInstances map[string][]struct {
- Addrs []string `json:"addrs"`
- } `json:"zone_instances"`
- }
- resp, err := s.DiscoveryProxy(c, "GET", "fetch", params)
- if err != nil {
- return
- }
- rb, err := json.Marshal(resp)
- if err != nil {
- return
- }
- json.Unmarshal(rb, &ins)
- inss := ins.ZoneInstances[env.Zone]
- for _, zone := range inss {
- for _, addr := range zone.Addrs {
- if strings.Contains(addr, "http://") {
- host = addr
- break
- }
- }
- }
- return
- }
- //GetConfigsByName obtain configs from configByNameAPI
- func (s *Service) getConfigsByName(c context.Context, name string, cookie string) (configs *cgm.Config, err error) {
- var (
- params = url.Values{}
- data = json.RawMessage{}
- result []*cgm.Config
- )
- params.Set("token", conf.Conf.AppToken)
- params.Set("name", name+".toml")
- uri := fmt.Sprintf(configByNameAPI, conf.Conf.Canal.CANALSVENCo)
- if data, err = s.ConfigProxy(c, "POST", uri, params, cookie); err != nil {
- err = ecode.GetConfigByNameErr
- return
- }
- if data == nil {
- return
- }
- if err = json.Unmarshal([]byte(data), &result); err != nil {
- log.Error("configByNameAPI() json.Unmarshal errors: %s", err)
- return
- }
- if len(result) == 0 {
- return
- }
- configs = result[0]
- return
- }
- //ProcessCanalList get canal list
- func (s *Service) ProcessCanalList(c context.Context, v *cml.ListReq) (listdata *cml.Paper, err error) {
- type errorCanal struct {
- cml.Canal
- Error string `json:"error"`
- }
- var (
- cc []*errorCanal
- count int
- errMap map[string]string
- )
- query := " is_delete = 0 "
- if v.Addr != "" {
- query += fmt.Sprintf("and addr = '%s' ", v.Addr)
- }
- if v.Project != "" {
- query += fmt.Sprintf("and cluster = '%s' ", v.Project)
- }
- err = s.DBCanal.Where(query).Order("id DESC").Offset((v.Pn - 1) * v.Ps).Limit(v.Ps).Find(&cc).Error
- if err != nil {
- log.Error("apmSvc.CanalList error(%v)", err)
- return
- }
- err = s.DBCanal.Model(&cml.Canal{}).Where(query).Count(&count).Error
- if err != nil {
- log.Error("apmSvc.CanalList count error(%v)", err)
- return
- }
- // add error info
- if count > 0 {
- if errMap, err = s.GetAllErrors(c); err != nil {
- log.Error("apmSvc.DBCanalApply GetAllErrors error(%v)", err)
- }
- for _, va := range cc {
- va.Error = errMap[va.Addr]
- }
- }
- listdata = &cml.Paper{
- Pn: v.Pn,
- Ps: v.Ps,
- Items: cc,
- Total: count,
- }
- return
- }
- //ProcessApplyList get apply list
- func (s *Service) ProcessApplyList(c context.Context, v *cml.ListReq) (listdata *cml.Paper, err error) {
- var (
- cc []*cml.Apply
- count int
- )
- query := " state !=3 "
- if v.Addr != "" {
- query += fmt.Sprintf("and addr = '%s' ", v.Addr)
- }
- if v.Project != "" {
- query += fmt.Sprintf("and cluster = '%s' ", v.Project)
- }
- if v.Status > 0 {
- query += fmt.Sprintf("and state = '%d' ", v.Status)
- }
- err = s.DBCanal.Model(&cml.Apply{}).Where(query).Count(&count).Error
- if err != nil {
- log.Error("apmSvc.ApplyList count error(%v)", err)
- return
- }
- err = s.DBCanal.Where(query).Order("id DESC").Offset((v.Pn - 1) * v.Ps).Limit(v.Ps).Find(&cc).Error
- if err != nil {
- log.Error("apmSvc.ApplyList error(%v)", err)
- return
- }
- listdata = &cml.Paper{
- Pn: v.Pn,
- Ps: v.Ps,
- Items: cc,
- Total: count,
- }
- return
- }
- //ProcessConfigInfo process info to config center
- func (s *Service) ProcessConfigInfo(c context.Context, v *cml.ConfigReq, cookie string, username string) (err error) {
- var (
- confs *cgm.Config
- comment string
- query []map[string]string
- data []byte
- )
- if comment, err = s.jointConfigInfo(c, v, cookie); err != nil {
- return
- }
- if confs, err = s.getConfigsByName(c, v.Addr, cookie); err != nil {
- return
- }
- if confs != nil {
- query = []map[string]string{{
- "name": v.Addr + ".toml",
- "comment": comment,
- "mark": v.Mark,
- }}
- data, err = json.Marshal(query)
- if err != nil {
- return
- }
- va := url.Values{}
- va.Set("data", string(data))
- va.Set("user", username)
- if _, err = s.updateConfig(c, va, cookie); err != nil {
- return
- }
- } else {
- //never register config's
- params := url.Values{}
- params.Set("comment", comment)
- params.Set("mark", v.Mark)
- params.Set("state", "2")
- params.Set("from", "0")
- params.Set("name", v.Addr+".toml")
- params.Set("user", username)
- if _, err = s.createConfig(c, params, cookie); err != nil {
- return
- }
- }
- if confs, err = s.getConfigsByName(c, v.Addr, cookie); confs == nil {
- return
- }
- if err = s.dao.SetConfigID(confs.ID, v.Addr); err != nil {
- return
- }
- return
- }
- //CheckMaster canal check
- func (s *Service) CheckMaster(c context.Context, v *cml.ConfigReq) (err error) {
- res := result{}
- host, _ := s.getCanalInstance(c)
- params := url.Values{}
- params.Set("user", v.User)
- params.Set("password", v.Password)
- params.Set("addr", v.Addr)
- uri := fmt.Sprintf(checkMasterAPI, host)
- req, err := s.client.NewRequest("POST", uri, "", params)
- if err != nil {
- err = ecode.CheckMasterErr
- return
- }
- if err = s.client.Do(c, req, &res); err != nil {
- err = ecode.CheckMasterErr
- return
- }
- if res.Code != 0 {
- err = ecode.CheckMasterErr
- return
- }
- return
- }
- //ProcessCanalInfo is
- func (s *Service) ProcessCanalInfo(c context.Context, v *cml.ConfigReq, username string) (err error) {
- var (
- cnt = 0
- info = &cml.Canal{}
- )
- if err = s.DBCanal.Select("addr,cluster,leader").Where("addr=?", v.Addr).Find(info).Error; err == nil {
- if v.Project == "" {
- v.Project = info.Cluster
- }
- if v.Leader == "" {
- v.Leader = info.Leader
- }
- }
- if cnt, err = s.dao.CanalApplyCounts(v); err != nil {
- return
- }
- if cnt > 0 {
- if err = s.dao.CanalApplyEdit(v, username); err != nil {
- return
- }
- } else {
- if err = s.dao.CanalApplyCreate(v, username); err != nil {
- return
- }
- }
- return
- }
- //getCommentFromConf get document info from configbyname
- func (s *Service) getDocFromConf(c context.Context, addr string, cookie string) (confData *cml.Results, err error) {
- var conf *cgm.Config
- if conf, err = s.getConfigsByName(c, addr, cookie); err != nil {
- return
- }
- if conf == nil {
- return
- }
- confData = new(cml.Results)
- if _, err = toml.Decode(conf.Comment, &confData.Document); err != nil {
- err = ecode.ConfigParseErr
- log.Error("comment toml.decode error(%v)", err)
- return
- }
- row := &cml.Apply{}
- err = s.DBCanal.Model(&cml.Apply{}).Select("`cluster`,`leader`").Where("addr=?", addr).Scan(row).Error
- if err == nil {
- confData.Cluster = row.Cluster
- confData.Leader = row.Leader
- } else {
- res := &cml.Canal{}
- err = s.DBCanal.Model(&cml.Canal{}).Select("`cluster`,`leader`").Where("addr=?", addr).Scan(res).Error
- if err != nil {
- log.Error("canalinfo get error(%v)", err)
- return
- }
- confData.Cluster = res.Cluster
- confData.Leader = res.Leader
- }
- confData.ID = conf.ID
- confData.Addr = addr
- return
- }
- //ProcessConfigInfo is
- func (s *Service) jointConfigInfo(c context.Context, v *cml.ConfigReq, cookie string) (comment string, err error) {
- var (
- buf *bytes.Buffer
- cfg *cml.Config
- dbs []*cml.DB
- di *cml.Databus
- confData *cml.Results
- sid int64
- schemas = make(map[string]bool)
- )
- //analysis request params
- if err = json.Unmarshal([]byte(v.Databases), &dbs); err != nil {
- log.Error("apmSvc.jointConfigInfo Unmarshal error(%v)", err)
- err = ecode.DatabasesUnmarshalErr
- return
- }
- for _, db := range dbs {
- if db.Databus == nil {
- continue
- }
- //Find duplicate
- if schemas[db.Schema+db.Databus.Group] {
- log.Error("jointConfigInfo find duplicate databus group (%v)", db.Databus.Group)
- err = ecode.DatabusDuplErr
- return
- }
- schemas[db.Schema+db.Databus.Group] = true
- //get databusinfo
- if di, err = s.databusInfo(db.Databus.Group, db.Databus.Addr, db.Schema, db.Table); err != nil {
- return
- }
- db.Databus = di
- }
- if sid, err = s.getServerID(v.Addr); err != nil {
- err = ecode.CanalAddrFmtErr
- return
- }
- ist := &cml.Instance{
- Caddr: v.Addr,
- MonitorPeriod: v.MonitorPeriod,
- ServerID: sid,
- Flavor: _flavor,
- HeartbeatPeriod: _heartbeatPeriod,
- ReadTimeout: _canreadTimeout,
- DB: dbs,
- }
- if confData, err = s.getDocFromConf(c, v.Addr, cookie); err != nil {
- return
- }
- if confData != nil && v.User == "" {
- ist.User = confData.Document.Instance.User
- } else {
- ist.User = v.User
- }
- if confData != nil && v.Password == "" {
- ist.Password = confData.Document.Instance.Password
- } else {
- ist.Password = v.Password
- }
- cfg = &cml.Config{
- Instance: ist,
- }
- buf = new(bytes.Buffer)
- if err = toml.NewEncoder(buf).Encode(cfg); err != nil {
- return
- }
- comment = buf.String()
- return
- }
- //CreateConfig send requests to createConfigAPI
- func (s *Service) createConfig(c context.Context, params url.Values, cookie string) (res map[string]interface{}, err error) {
- var (
- data = json.RawMessage{}
- )
- uri := fmt.Sprintf(createConfigAPI, conf.Conf.Canal.CANALSVENCo)
- params.Set("token", conf.Conf.AppToken)
- if data, err = s.ConfigProxy(c, "POST", uri, params, cookie); err != nil {
- err = ecode.ConfigCreateErr
- return
- }
- if data == nil {
- return
- }
- if err = json.Unmarshal([]byte(data), &res); err != nil {
- log.Error("updateConfigAPI() json.Unmarshal errors: %s", err)
- return
- }
- return
- }
- //UpdateConfig send requests to updateConfigAPI
- func (s *Service) updateConfig(c context.Context, params url.Values, cookie string) (res map[string]interface{}, err error) {
- var (
- data = json.RawMessage{}
- )
- uri := fmt.Sprintf(updateConfigAPI, conf.Conf.Canal.CANALSVENCo)
- params.Set("token", conf.Conf.AppToken)
- if data, err = s.ConfigProxy(c, "POST", uri, params, cookie); err != nil {
- err = ecode.ConfigUpdateErr
- return
- }
- if data == nil {
- return
- }
- if err = json.Unmarshal([]byte(data), &res); err != nil {
- log.Error("updateConfigAPI() json.Unmarshal errors: %s", err)
- return
- }
- return
- }
- //DatabusInfo joint databusinfo
- func (s *Service) databusInfo(group string, addr string, schema string, table []*cml.Table) (d *cml.Databus, err error) {
- var (
- ai appInfo
- gi groupInfo
- act string
- )
- if ai, err = s.getAppInfo(group); err != nil {
- err = ecode.DatabusAppErr
- return
- }
- if gi, _, err = s.getGroupInfo(group); err != nil {
- err = ecode.DatabusGroupErr
- return
- }
- act = s.getAction(group)
- name := "canal/" + schema
- d = &cml.Databus{
- Key: ai.AppKey,
- Secret: ai.AppSecret,
- Group: group,
- Topic: gi.Topic,
- Action: act,
- Name: name,
- Proto: "tcp",
- Addr: addr,
- Idle: 1,
- Active: len(table),
- DialTimeout: _dialTimeout,
- ReadTimeout: _readTimeout,
- WriteTimeout: _writeTimeout,
- IdleTimeout: _idleTimeout,
- }
- return
- }
- //getAppInfo according group get appinfo
- func (s *Service) getAppInfo(group string) (ai appInfo, err error) {
- var table string
- g, new, _ := s.getGroupInfo(group)
- if !new {
- table = "app"
- } else {
- table = "app2"
- }
- err = s.DBDatabus.Table(table).Select("`id`,`app_key`,`app_secret`").Where("`id`= ?", g.AppID).Find(&ai).Error
- if err != nil {
- log.Error("apmSvc.getAppInfo error(%v)", err)
- return
- }
- return
- }
- //getGroupInfo according group get groupinfo
- func (s *Service) getGroupInfo(group string) (gi groupInfo, new bool, err error) {
- err = s.DBDatabus.Table("auth2").Select("auth2.group,topic.topic,auth2.app_id").Joins("join topic on topic.id=auth2.topic_id").Where("auth2.group= ?", group).Scan(&gi).Error
- if err == nil {
- new = true
- return
- }
- err = s.DBDatabus.Table("auth").Select("group_name as `group`,topic,app_id").Where("group_name = ?", group).Find(&gi).Error
- if err != nil {
- log.Error("apmSvc.getGroupInfo error(%v", err)
- return
- }
- return
- }
- //getAction according group get action
- func (s *Service) getAction(group string) (action string) {
- if strings.HasSuffix(group, "-P") {
- action = "pub"
- } else if strings.HasSuffix(group, "-S") {
- action = "sub"
- } else {
- action = "notify"
- }
- return
- }
- //TableInfo get array table info
- func (s *Service) TableInfo(table string) (infos []*cml.Table, err error) {
- info := strings.Split(table, ",")
- tab := make([]*cml.Table, len(info))
- for i := range info {
- tab[i] = &cml.Table{Name: info[i]}
- }
- infos = tab
- return
- }
- //getServerID get server id from addr
- func (s *Service) getServerID(addr string) (sid int64, err error) {
- ip := strings.Split(addr, ".")
- last := ip[len(ip)-1]
- port := strings.Split(last, ":")
- joint := fmt.Sprintf("%s%s%s", port[len(port)-1], ip[len(ip)-2], port[len(port)-2])
- sid, err = strconv.ParseInt(joint, 10, 64)
- return
- }
- //SendWechatMessage send wechat message
- func (s *Service) SendWechatMessage(c context.Context, addr, aType, result, sender, note string, receiver []string) (err error) {
- var (
- detail string
- users = []string{sender}
- )
- users = append(users, receiver...)
- if env.DeployEnv != "prod" {
- detail = fmt.Sprintf("http://%s-%s", env.DeployEnv, "%s")
- } else {
- detail = "http://%s"
- }
- switch aType {
- case cml.TypeMap[cml.TypeApply]:
- detail = fmt.Sprintf(detail, "sven.bilibili.co/#/canal/apply")
- case cml.TypeMap[cml.TypeReview]:
- detail = fmt.Sprintf(detail, "ops-log.bilibili.co/app/kibana 确认canal订阅消息")
- }
- msg := fmt.Sprintf("[sven系统抄送提示]\n发送方:%s\n事件: %s环境 DB:%s canal%s%s\n接收方:%s\n备注:%s\n详情:%s(请复制到浏览器打开)\n", sender, env.DeployEnv, addr, aType, result, strings.Join(receiver, ","), note, detail)
- if err = s.dao.SendWechatToUsers(c, users, msg); err != nil {
- log.Error("apmSvc.SendWechatMessage error(%v)", err)
- return
- }
- return
- }
- // UpdateProcessTag canal审核通过之后,调用/x/admin/config/canal/tag/update,同步到配置中心发版
- func (s *Service) UpdateProcessTag(c context.Context, configID int, cookie string) (err error) {
- client := &http.Client{}
- tokenURL := fmt.Sprintf("%s/x/admin/config/app/envs", conf.Conf.Canal.CANALSVENCo)
- tokenParams := url.Values{}
- tokenParams.Set("app_name", "main.common-arch.canal")
- tokenParams.Set("tree_id", "3766")
- tokenParams.Set("zone", env.Zone)
- tokenReq, err := http.NewRequest(http.MethodGet, tokenURL+"?"+tokenParams.Encode(), nil)
- if err != nil {
- err = ecode.RequestErr
- return
- }
- tokenReq.Header.Set("Content-Type", "application/json;charset=UTF-8")
- tokenReq.Header.Set("Cookie", cookie)
- tokenResp, err := client.Do(tokenReq)
- if err != nil {
- return
- }
- body, err := ioutil.ReadAll(tokenResp.Body)
- if err != nil {
- return
- }
- defer tokenResp.Body.Close()
- var tokenRespObj struct {
- Code int `json:"code"`
- Data []struct {
- Name string `json:"name"`
- NickName string `json:"nickname"`
- Token string `json:"token"`
- } `json:"data"`
- }
- err = json.Unmarshal(body, &tokenRespObj)
- if err != nil {
- return fmt.Errorf("json unmarshal error: %s, get body: %s", err, body)
- }
- if tokenRespObj.Code != 0 {
- log.Error(" tokenRespObj.Code: %d", tokenRespObj.Code)
- return fmt.Errorf("tokenRespObj.Code: %d", tokenRespObj.Code)
- }
- var token string
- for _, v := range tokenRespObj.Data {
- if v.Name == env.DeployEnv {
- token = v.Token
- break
- }
- }
- log.Info("tokenURL(%s), env.DeployEnv(%v), token(%v)", tokenURL+"?"+tokenParams.Encode(), env.DeployEnv, token)
- updateURL := fmt.Sprintf("%s/x/admin/config/canal/tag/update", conf.Conf.Canal.CANALSVENCo)
- params := url.Values{}
- params.Set("app_name", "main.common-arch.canal")
- params.Set("env", env.DeployEnv)
- params.Set("zone", env.Zone)
- params.Set("config_ids", fmt.Sprintf("%d", configID))
- params.Set("tree_id", "3766")
- params.Set("mark", "canal发版")
- params.Set("user", "canalApprovalProcess")
- params.Set("force", "1")
- if conf.Conf.Canal.BUILD != "" {
- params.Set("build", conf.Conf.Canal.BUILD)
- } else {
- params.Set("build", "docker-1")
- }
- log.Info("env:(%v), zone:(%v), build:(%v)", params.Get("env"), params.Get("zone"), params.Get("build"))
- params.Set("token", token)
- req, err := http.NewRequest(http.MethodPost, updateURL, strings.NewReader(params.Encode()))
- if err != nil {
- err = ecode.RequestErr
- return
- }
- req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
- req.Header.Set("Cookie", cookie)
- var res struct {
- Code int `json:"code"`
- Message string `json:"message"`
- }
- if err = s.client.Do(c, req, &res); err != nil || res.Code != 0 {
- log.Error("ApplyApprovalUpdateTag url(%s) err(%v), code(%v), message(%s)", updateURL+params.Encode(), err, res.Code, res.Message)
- err = ecode.RequestErr
- return
- }
- return
- }
|