canal.go 25 KB


  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "go-common/app/admin/main/apm/conf"
  13. cml "go-common/app/admin/main/apm/model/canal"
  14. "go-common/app/admin/main/apm/model/user"
  15. cgm "go-common/app/admin/main/config/model"
  16. "go-common/library/conf/env"
  17. "go-common/library/ecode"
  18. "go-common/library/log"
  19. bm "go-common/library/net/http/blademaster"
  20. "github.com/BurntSushi/toml"
  21. )
  22. const (
  23. _dialTimeout = "500ms"
  24. _readTimeout = "1s"
  25. _writeTimeout = "1s"
  26. _idleTimeout = "60s"
  27. _flavor = "mysql"
  28. _heartbeatPeriod = 60
  29. _canreadTimeout = 90
  30. )
  31. var (
  32. getBuildIDAPI = "%s/x/admin/config/build/builds"
  33. getConfigValueAPI = "%s/x/admin/config/config/value"
  34. getConfigIDAPI = "%s/x/admin/config/config/configs"
  35. getAllErrorsAPI = "%s/x/internal/canal/errors"
  36. createConfigAPI = "%s/x/admin/config/canal/config/create"
  37. configByNameAPI = "%s/x/admin/config/canal/name/configs"
  38. updateConfigAPI = "%s/x/admin/config/home/config/update"
  39. checkMasterAPI = "%s/x/internal/canal/master/check"
  40. ok = 0
  41. )
  42. type result struct {
  43. Data json.RawMessage `json:"data"`
  44. Code int `json:"code"`
  45. }
  46. type list []struct {
  47. ID int `json:"id"`
  48. }
  49. type configs struct {
  50. BuildFiles fileList `json:"build_files"`
  51. }
  52. type fileList []struct {
  53. ID int `json:"id"`
  54. Name string `json:"name"`
  55. }
  56. type groupInfo struct {
  57. Group string `json:"group"`
  58. Topic string `json:"topic"`
  59. AppID int `json:"app_id"`
  60. }
  61. type appInfo struct {
  62. ID int `json:"id"`
  63. AppKey string `json:"app_key"`
  64. AppSecret string `json:"app_secret"`
  65. }
  66. // ConfigProxy config proxy
  67. func (s *Service) ConfigProxy(c context.Context, method string, uri string, params url.Values, cookie string, args ...interface{}) (data json.RawMessage, err error) {
  68. //common params
  69. if params != nil && args == nil {
  70. params.Set("app_name", "main.common-arch.canal")
  71. params.Set("tree_id", "3766")
  72. params.Set("zone", env.Zone)
  73. params.Set("env", env.DeployEnv)
  74. }
  75. res := result{}
  76. fmt.Println("ConfigProxy uri=", uri, "params=", params.Encode())
  77. req, err := s.client.NewRequest(method, uri, "", params)
  78. if err != nil {
  79. log.Error("s.client.NewRequest() error(%v)", err)
  80. }
  81. if cookie != "" {
  82. req.Header.Set("Cookie", cookie)
  83. }
  84. if err = s.client.Do(c, req, &res); err != nil {
  85. log.Error("canal.request get url:"+uri+" params:(%v) error(%v)", params.Encode(), err)
  86. return
  87. }
  88. if res.Code != ok {
  89. log.Error("canal.request get url:"+uri+" params:(%v) returnCode:(%v)", params.Encode(), res.Code)
  90. return
  91. }
  92. data = res.Data
  93. return
  94. }
  95. func (s *Service) getBuildID(c context.Context, cookie string) (dat list, err error) {
  96. var (
  97. params = url.Values{}
  98. data = json.RawMessage{}
  99. )
  100. uri := fmt.Sprintf(getBuildIDAPI, conf.Conf.Host.SVENCo)
  101. if data, err = s.ConfigProxy(c, "GET", uri, params, cookie); err != nil {
  102. log.Error("getBuildID() response errors: %d", err)
  103. return
  104. }
  105. if data == nil {
  106. return
  107. }
  108. if err = json.Unmarshal([]byte(data), &dat); err != nil {
  109. log.Error("getBuildID() json.Unmarshal errors: %s", err)
  110. return
  111. }
  112. return
  113. }
  114. func (s *Service) getConfigID(c context.Context, id string, cookie string) (dat fileList, err error) {
  115. var (
  116. params = url.Values{}
  117. data = json.RawMessage{}
  118. )
  119. params.Set("build_id", id)
  120. uri := fmt.Sprintf(getConfigIDAPI, conf.Conf.Host.SVENCo)
  121. if data, err = s.ConfigProxy(c, "GET", uri, params, cookie); err != nil {
  122. log.Error("getConfigID() response errors: %d", err)
  123. return
  124. }
  125. if data == nil {
  126. return
  127. }
  128. res := &configs{}
  129. if err = json.Unmarshal([]byte(data), res); err != nil {
  130. log.Error("getConfigID() json.Unmarshal errors: %s", err)
  131. return
  132. }
  133. dat = res.BuildFiles
  134. return
  135. }
  136. func (s *Service) getConfigValue(c context.Context, id string, cookie string) (res *cml.Conf, err error) {
  137. var (
  138. params = url.Values{}
  139. data = json.RawMessage{}
  140. )
  141. params.Set("config_id", id)
  142. uri := fmt.Sprintf(getConfigValueAPI, conf.Conf.Host.SVENCo)
  143. if data, err = s.ConfigProxy(c, "GET", uri, params, cookie); err != nil {
  144. log.Error("getConfigValue() response errors: %d", err)
  145. return
  146. }
  147. if data == nil {
  148. return
  149. }
  150. res = new(cml.Conf)
  151. if err = json.Unmarshal([]byte(data), &res); err != nil {
  152. log.Error("getConfigValue() json.Unmarshal errors: %s", err)
  153. return
  154. }
  155. return
  156. }
  157. //ScanByAddrFromConfig 根据addr查询配置信息
  158. func (s *Service) ScanByAddrFromConfig(c context.Context, addr string, cookie string) (res *cml.Conf, err error) {
  159. var (
  160. buildIDList list
  161. configIDList fileList
  162. )
  163. //find build_id
  164. if buildIDList, err = s.getBuildID(c, cookie); err != nil {
  165. return
  166. }
  167. fmt.Println("BuildID=", buildIDList)
  168. for _, v := range buildIDList {
  169. if v.ID != 0 {
  170. //find config_id
  171. if configIDList, err = s.getConfigID(c, strconv.Itoa(v.ID), cookie); err != nil {
  172. return
  173. }
  174. fmt.Println("configID=", configIDList)
  175. for _, val := range configIDList {
  176. if val.ID != 0 && val.Name == addr+".toml" {
  177. //find config
  178. res, err = s.getConfigValue(c, strconv.Itoa(val.ID), cookie)
  179. return
  180. }
  181. continue
  182. }
  183. }
  184. continue
  185. }
  186. return
  187. }
  188. //ApplyAdd canal apply
  189. func (s *Service) ApplyAdd(c *bm.Context, v *cml.Canal, username string) (err error) {
  190. cnt := 0
  191. f := strings.Contains(v.Addr, ":")
  192. if !f {
  193. err = ecode.CanalAddrFmtErr
  194. return
  195. }
  196. if err = s.DBCanal.Model(&cml.Canal{}).Where("addr=?", v.Addr).Count(&cnt).Error; err != nil {
  197. log.Error("apmSvc.CanalAdd count error(%v)", err)
  198. err = ecode.RequestErr
  199. return
  200. }
  201. if cnt > 0 {
  202. err = ecode.CanalAddrExist
  203. return
  204. }
  205. canal := &cml.Canal{
  206. Addr: v.Addr,
  207. Cluster: v.Cluster,
  208. Leader: v.Leader,
  209. BinName: v.BinName,
  210. BinPos: v.BinPos,
  211. Remark: v.Remark,
  212. }
  213. if err = s.DBCanal.Create(canal).Error; err != nil {
  214. log.Error("apmSvc.CanalAdd create error(%v)", err)
  215. return
  216. }
  217. s.SendLog(*c, username, 0, 1, canal.ID, "apmSvc.CanalAdd", canal)
  218. return
  219. }
  220. //ApplyDelete canal delete
  221. func (s *Service) ApplyDelete(c *bm.Context, v *cml.ScanReq, username string) (err error) {
  222. cc := &cml.Canal{}
  223. if err = s.DBCanal.Model(&cml.Canal{}).Where("addr=?", v.Addr).Find(cc).Error; err != nil {
  224. log.Error("apmSvc.ApplyDelete count error(%v)", err)
  225. err = ecode.RequestErr
  226. return
  227. }
  228. id := cc.ID
  229. if err = s.DBCanal.Model(&cml.Canal{}).Where("id = ?", id).Update("is_delete", 1).Error; err != nil {
  230. log.Error("apmSvc.canalDelete canalDelete error(%v)", err)
  231. return
  232. }
  233. sqlLog := &map[string]interface{}{
  234. "SQLType": "delete",
  235. "Value": v.Addr,
  236. }
  237. s.SendLog(*c, username, 0, 3, id, "apmSvc.canalDelete", sqlLog)
  238. return
  239. }
  240. //ApplyEdit canal edit
  241. func (s *Service) ApplyEdit(c *bm.Context, v *cml.EditReq, username string) (err error) {
  242. cc := &cml.Canal{}
  243. if err = s.DBCanal.Where("id = ?", v.ID).Find(cc).Error; err != nil {
  244. log.Error("apmSvc.CanalEdit find(%d) error(%v)", v.ID, err)
  245. return
  246. }
  247. ups := map[string]interface{}{}
  248. if _, ok := c.Request.Form["bin_name"]; ok {
  249. ups["bin_name"] = v.BinName
  250. }
  251. if _, ok := c.Request.Form["bin_pos"]; ok {
  252. ups["bin_pos"] = v.BinPos
  253. }
  254. if _, ok := c.Request.Form["remark"]; ok {
  255. ups["remark"] = v.Remark
  256. }
  257. if _, ok := c.Request.Form["project"]; ok {
  258. ups["cluster"] = v.Project
  259. }
  260. if _, ok := c.Request.Form["leader"]; ok {
  261. ups["leader"] = v.Leader
  262. }
  263. if err = s.DBCanal.Model(&cml.Canal{}).Where("id = ?", v.ID).Updates(ups).Error; err != nil {
  264. log.Error("apmSvc.CanalEdit updates error(%v)", err)
  265. return
  266. }
  267. sqlLog := &map[string]interface{}{
  268. "SQLType": "update",
  269. "Where": "id = ?",
  270. "Value1": v.ID,
  271. "Update": ups,
  272. "Old": cc,
  273. }
  274. s.SendLog(*c, username, 0, 2, v.ID, "apmSvc.CanalEdit", sqlLog)
  275. return
  276. }
  277. //GetScanInfo is
  278. func (s *Service) GetScanInfo(c context.Context, v *cml.ScanReq, username string, cookie string) (confData *cml.Results, err error) {
  279. if confData, err = s.getDocFromConf(c, v.Addr, cookie); err != nil {
  280. return
  281. }
  282. if confData == nil {
  283. return
  284. }
  285. if err = s.Permit(c, username, user.CanalEdit); err != nil {
  286. confData.Document.Instance.User = ""
  287. confData.Document.Instance.Password = ""
  288. }
  289. return confData, nil
  290. }
  291. //GetAllErrors 调用x/internal/canal/errors 查询错误信息
  292. func (s *Service) GetAllErrors(c context.Context) (errs map[string]string, err error) {
  293. var (
  294. data json.RawMessage
  295. host string
  296. )
  297. type v struct {
  298. Error string `json:"error"`
  299. InstanceError map[string]string `json:"instance_error"`
  300. }
  301. if host, err = s.getCanalInstance(c); err != nil {
  302. return
  303. }
  304. uri := fmt.Sprintf(getAllErrorsAPI, host)
  305. if data, err = s.ConfigProxy(c, "GET", uri, nil, ""); err != nil {
  306. return
  307. }
  308. res := new(v)
  309. if err = json.Unmarshal([]byte(data), &res); err != nil {
  310. return
  311. }
  312. errs = res.InstanceError
  313. return
  314. }
  315. func (s *Service) getCanalInstance(c context.Context) (host string, err error) {
  316. params := url.Values{}
  317. params.Set("appid", "main.common-arch.canal")
  318. params.Set("env", env.DeployEnv)
  319. params.Set("hostname", env.Hostname)
  320. params.Set("status", "3")
  321. var ins struct {
  322. ZoneInstances map[string][]struct {
  323. Addrs []string `json:"addrs"`
  324. } `json:"zone_instances"`
  325. }
  326. resp, err := s.DiscoveryProxy(c, "GET", "fetch", params)
  327. if err != nil {
  328. return
  329. }
  330. rb, err := json.Marshal(resp)
  331. if err != nil {
  332. return
  333. }
  334. json.Unmarshal(rb, &ins)
  335. inss := ins.ZoneInstances[env.Zone]
  336. for _, zone := range inss {
  337. for _, addr := range zone.Addrs {
  338. if strings.Contains(addr, "http://") {
  339. host = addr
  340. break
  341. }
  342. }
  343. }
  344. return
  345. }
  346. //GetConfigsByName obtain configs from configByNameAPI
  347. func (s *Service) getConfigsByName(c context.Context, name string, cookie string) (configs *cgm.Config, err error) {
  348. var (
  349. params = url.Values{}
  350. data = json.RawMessage{}
  351. result []*cgm.Config
  352. )
  353. params.Set("token", conf.Conf.AppToken)
  354. params.Set("name", name+".toml")
  355. uri := fmt.Sprintf(configByNameAPI, conf.Conf.Canal.CANALSVENCo)
  356. if data, err = s.ConfigProxy(c, "POST", uri, params, cookie); err != nil {
  357. err = ecode.GetConfigByNameErr
  358. return
  359. }
  360. if data == nil {
  361. return
  362. }
  363. if err = json.Unmarshal([]byte(data), &result); err != nil {
  364. log.Error("configByNameAPI() json.Unmarshal errors: %s", err)
  365. return
  366. }
  367. if len(result) == 0 {
  368. return
  369. }
  370. configs = result[0]
  371. return
  372. }
  373. //ProcessCanalList get canal list
  374. func (s *Service) ProcessCanalList(c context.Context, v *cml.ListReq) (listdata *cml.Paper, err error) {
  375. type errorCanal struct {
  376. cml.Canal
  377. Error string `json:"error"`
  378. }
  379. var (
  380. cc []*errorCanal
  381. count int
  382. errMap map[string]string
  383. )
  384. query := " is_delete = 0 "
  385. if v.Addr != "" {
  386. query += fmt.Sprintf("and addr = '%s' ", v.Addr)
  387. }
  388. if v.Project != "" {
  389. query += fmt.Sprintf("and cluster = '%s' ", v.Project)
  390. }
  391. err = s.DBCanal.Where(query).Order("id DESC").Offset((v.Pn - 1) * v.Ps).Limit(v.Ps).Find(&cc).Error
  392. if err != nil {
  393. log.Error("apmSvc.CanalList error(%v)", err)
  394. return
  395. }
  396. err = s.DBCanal.Model(&cml.Canal{}).Where(query).Count(&count).Error
  397. if err != nil {
  398. log.Error("apmSvc.CanalList count error(%v)", err)
  399. return
  400. }
  401. // add error info
  402. if count > 0 {
  403. if errMap, err = s.GetAllErrors(c); err != nil {
  404. log.Error("apmSvc.DBCanalApply GetAllErrors error(%v)", err)
  405. }
  406. for _, va := range cc {
  407. va.Error = errMap[va.Addr]
  408. }
  409. }
  410. listdata = &cml.Paper{
  411. Pn: v.Pn,
  412. Ps: v.Ps,
  413. Items: cc,
  414. Total: count,
  415. }
  416. return
  417. }
  418. //ProcessApplyList get apply list
  419. func (s *Service) ProcessApplyList(c context.Context, v *cml.ListReq) (listdata *cml.Paper, err error) {
  420. var (
  421. cc []*cml.Apply
  422. count int
  423. )
  424. query := " state !=3 "
  425. if v.Addr != "" {
  426. query += fmt.Sprintf("and addr = '%s' ", v.Addr)
  427. }
  428. if v.Project != "" {
  429. query += fmt.Sprintf("and cluster = '%s' ", v.Project)
  430. }
  431. if v.Status > 0 {
  432. query += fmt.Sprintf("and state = '%d' ", v.Status)
  433. }
  434. err = s.DBCanal.Model(&cml.Apply{}).Where(query).Count(&count).Error
  435. if err != nil {
  436. log.Error("apmSvc.ApplyList count error(%v)", err)
  437. return
  438. }
  439. err = s.DBCanal.Where(query).Order("id DESC").Offset((v.Pn - 1) * v.Ps).Limit(v.Ps).Find(&cc).Error
  440. if err != nil {
  441. log.Error("apmSvc.ApplyList error(%v)", err)
  442. return
  443. }
  444. listdata = &cml.Paper{
  445. Pn: v.Pn,
  446. Ps: v.Ps,
  447. Items: cc,
  448. Total: count,
  449. }
  450. return
  451. }
  452. //ProcessConfigInfo process info to config center
  453. func (s *Service) ProcessConfigInfo(c context.Context, v *cml.ConfigReq, cookie string, username string) (err error) {
  454. var (
  455. confs *cgm.Config
  456. comment string
  457. query []map[string]string
  458. data []byte
  459. )
  460. if comment, err = s.jointConfigInfo(c, v, cookie); err != nil {
  461. return
  462. }
  463. if confs, err = s.getConfigsByName(c, v.Addr, cookie); err != nil {
  464. return
  465. }
  466. if confs != nil {
  467. query = []map[string]string{{
  468. "name": v.Addr + ".toml",
  469. "comment": comment,
  470. "mark": v.Mark,
  471. }}
  472. data, err = json.Marshal(query)
  473. if err != nil {
  474. return
  475. }
  476. va := url.Values{}
  477. va.Set("data", string(data))
  478. va.Set("user", username)
  479. if _, err = s.updateConfig(c, va, cookie); err != nil {
  480. return
  481. }
  482. } else {
  483. //never register config's
  484. params := url.Values{}
  485. params.Set("comment", comment)
  486. params.Set("mark", v.Mark)
  487. params.Set("state", "2")
  488. params.Set("from", "0")
  489. params.Set("name", v.Addr+".toml")
  490. params.Set("user", username)
  491. if _, err = s.createConfig(c, params, cookie); err != nil {
  492. return
  493. }
  494. }
  495. if confs, err = s.getConfigsByName(c, v.Addr, cookie); confs == nil {
  496. return
  497. }
  498. if err = s.dao.SetConfigID(confs.ID, v.Addr); err != nil {
  499. return
  500. }
  501. return
  502. }
  503. //CheckMaster canal check
  504. func (s *Service) CheckMaster(c context.Context, v *cml.ConfigReq) (err error) {
  505. res := result{}
  506. host, _ := s.getCanalInstance(c)
  507. params := url.Values{}
  508. params.Set("user", v.User)
  509. params.Set("password", v.Password)
  510. params.Set("addr", v.Addr)
  511. uri := fmt.Sprintf(checkMasterAPI, host)
  512. req, err := s.client.NewRequest("POST", uri, "", params)
  513. if err != nil {
  514. err = ecode.CheckMasterErr
  515. return
  516. }
  517. if err = s.client.Do(c, req, &res); err != nil {
  518. err = ecode.CheckMasterErr
  519. return
  520. }
  521. if res.Code != 0 {
  522. err = ecode.CheckMasterErr
  523. return
  524. }
  525. return
  526. }
  527. //ProcessCanalInfo is
  528. func (s *Service) ProcessCanalInfo(c context.Context, v *cml.ConfigReq, username string) (err error) {
  529. var (
  530. cnt = 0
  531. info = &cml.Canal{}
  532. )
  533. if err = s.DBCanal.Select("addr,cluster,leader").Where("addr=?", v.Addr).Find(info).Error; err == nil {
  534. if v.Project == "" {
  535. v.Project = info.Cluster
  536. }
  537. if v.Leader == "" {
  538. v.Leader = info.Leader
  539. }
  540. }
  541. if cnt, err = s.dao.CanalApplyCounts(v); err != nil {
  542. return
  543. }
  544. if cnt > 0 {
  545. if err = s.dao.CanalApplyEdit(v, username); err != nil {
  546. return
  547. }
  548. } else {
  549. if err = s.dao.CanalApplyCreate(v, username); err != nil {
  550. return
  551. }
  552. }
  553. return
  554. }
  555. //getCommentFromConf get document info from configbyname
  556. func (s *Service) getDocFromConf(c context.Context, addr string, cookie string) (confData *cml.Results, err error) {
  557. var conf *cgm.Config
  558. if conf, err = s.getConfigsByName(c, addr, cookie); err != nil {
  559. return
  560. }
  561. if conf == nil {
  562. return
  563. }
  564. confData = new(cml.Results)
  565. if _, err = toml.Decode(conf.Comment, &confData.Document); err != nil {
  566. err = ecode.ConfigParseErr
  567. log.Error("comment toml.decode error(%v)", err)
  568. return
  569. }
  570. row := &cml.Apply{}
  571. err = s.DBCanal.Model(&cml.Apply{}).Select("`cluster`,`leader`").Where("addr=?", addr).Scan(row).Error
  572. if err == nil {
  573. confData.Cluster = row.Cluster
  574. confData.Leader = row.Leader
  575. } else {
  576. res := &cml.Canal{}
  577. err = s.DBCanal.Model(&cml.Canal{}).Select("`cluster`,`leader`").Where("addr=?", addr).Scan(res).Error
  578. if err != nil {
  579. log.Error("canalinfo get error(%v)", err)
  580. return
  581. }
  582. confData.Cluster = res.Cluster
  583. confData.Leader = res.Leader
  584. }
  585. confData.ID = conf.ID
  586. confData.Addr = addr
  587. return
  588. }
  589. //ProcessConfigInfo is
  590. func (s *Service) jointConfigInfo(c context.Context, v *cml.ConfigReq, cookie string) (comment string, err error) {
  591. var (
  592. buf *bytes.Buffer
  593. cfg *cml.Config
  594. dbs []*cml.DB
  595. di *cml.Databus
  596. confData *cml.Results
  597. sid int64
  598. schemas = make(map[string]bool)
  599. )
  600. //analysis request params
  601. if err = json.Unmarshal([]byte(v.Databases), &dbs); err != nil {
  602. log.Error("apmSvc.jointConfigInfo Unmarshal error(%v)", err)
  603. err = ecode.DatabasesUnmarshalErr
  604. return
  605. }
  606. for _, db := range dbs {
  607. if db.Databus == nil {
  608. continue
  609. }
  610. //Find duplicate
  611. if schemas[db.Schema+db.Databus.Group] {
  612. log.Error("jointConfigInfo find duplicate databus group (%v)", db.Databus.Group)
  613. err = ecode.DatabusDuplErr
  614. return
  615. }
  616. schemas[db.Schema+db.Databus.Group] = true
  617. //get databusinfo
  618. if di, err = s.databusInfo(db.Databus.Group, db.Databus.Addr, db.Schema, db.Table); err != nil {
  619. return
  620. }
  621. db.Databus = di
  622. }
  623. if sid, err = s.getServerID(v.Addr); err != nil {
  624. err = ecode.CanalAddrFmtErr
  625. return
  626. }
  627. ist := &cml.Instance{
  628. Caddr: v.Addr,
  629. MonitorPeriod: v.MonitorPeriod,
  630. ServerID: sid,
  631. Flavor: _flavor,
  632. HeartbeatPeriod: _heartbeatPeriod,
  633. ReadTimeout: _canreadTimeout,
  634. DB: dbs,
  635. }
  636. if confData, err = s.getDocFromConf(c, v.Addr, cookie); err != nil {
  637. return
  638. }
  639. if confData != nil && v.User == "" {
  640. ist.User = confData.Document.Instance.User
  641. } else {
  642. ist.User = v.User
  643. }
  644. if confData != nil && v.Password == "" {
  645. ist.Password = confData.Document.Instance.Password
  646. } else {
  647. ist.Password = v.Password
  648. }
  649. cfg = &cml.Config{
  650. Instance: ist,
  651. }
  652. buf = new(bytes.Buffer)
  653. if err = toml.NewEncoder(buf).Encode(cfg); err != nil {
  654. return
  655. }
  656. comment = buf.String()
  657. return
  658. }
  659. //CreateConfig send requests to createConfigAPI
  660. func (s *Service) createConfig(c context.Context, params url.Values, cookie string) (res map[string]interface{}, err error) {
  661. var (
  662. data = json.RawMessage{}
  663. )
  664. uri := fmt.Sprintf(createConfigAPI, conf.Conf.Canal.CANALSVENCo)
  665. params.Set("token", conf.Conf.AppToken)
  666. if data, err = s.ConfigProxy(c, "POST", uri, params, cookie); err != nil {
  667. err = ecode.ConfigCreateErr
  668. return
  669. }
  670. if data == nil {
  671. return
  672. }
  673. if err = json.Unmarshal([]byte(data), &res); err != nil {
  674. log.Error("updateConfigAPI() json.Unmarshal errors: %s", err)
  675. return
  676. }
  677. return
  678. }
  679. //UpdateConfig send requests to updateConfigAPI
  680. func (s *Service) updateConfig(c context.Context, params url.Values, cookie string) (res map[string]interface{}, err error) {
  681. var (
  682. data = json.RawMessage{}
  683. )
  684. uri := fmt.Sprintf(updateConfigAPI, conf.Conf.Canal.CANALSVENCo)
  685. params.Set("token", conf.Conf.AppToken)
  686. if data, err = s.ConfigProxy(c, "POST", uri, params, cookie); err != nil {
  687. err = ecode.ConfigUpdateErr
  688. return
  689. }
  690. if data == nil {
  691. return
  692. }
  693. if err = json.Unmarshal([]byte(data), &res); err != nil {
  694. log.Error("updateConfigAPI() json.Unmarshal errors: %s", err)
  695. return
  696. }
  697. return
  698. }
  699. //DatabusInfo joint databusinfo
  700. func (s *Service) databusInfo(group string, addr string, schema string, table []*cml.Table) (d *cml.Databus, err error) {
  701. var (
  702. ai appInfo
  703. gi groupInfo
  704. act string
  705. )
  706. if ai, err = s.getAppInfo(group); err != nil {
  707. err = ecode.DatabusAppErr
  708. return
  709. }
  710. if gi, _, err = s.getGroupInfo(group); err != nil {
  711. err = ecode.DatabusGroupErr
  712. return
  713. }
  714. act = s.getAction(group)
  715. name := "canal/" + schema
  716. d = &cml.Databus{
  717. Key: ai.AppKey,
  718. Secret: ai.AppSecret,
  719. Group: group,
  720. Topic: gi.Topic,
  721. Action: act,
  722. Name: name,
  723. Proto: "tcp",
  724. Addr: addr,
  725. Idle: 1,
  726. Active: len(table),
  727. DialTimeout: _dialTimeout,
  728. ReadTimeout: _readTimeout,
  729. WriteTimeout: _writeTimeout,
  730. IdleTimeout: _idleTimeout,
  731. }
  732. return
  733. }
  734. //getAppInfo according group get appinfo
  735. func (s *Service) getAppInfo(group string) (ai appInfo, err error) {
  736. var table string
  737. g, new, _ := s.getGroupInfo(group)
  738. if !new {
  739. table = "app"
  740. } else {
  741. table = "app2"
  742. }
  743. err = s.DBDatabus.Table(table).Select("`id`,`app_key`,`app_secret`").Where("`id`= ?", g.AppID).Find(&ai).Error
  744. if err != nil {
  745. log.Error("apmSvc.getAppInfo error(%v)", err)
  746. return
  747. }
  748. return
  749. }
  750. //getGroupInfo according group get groupinfo
  751. func (s *Service) getGroupInfo(group string) (gi groupInfo, new bool, err error) {
  752. err = s.DBDatabus.Table("auth2").Select("auth2.group,topic.topic,auth2.app_id").Joins("join topic on topic.id=auth2.topic_id").Where("auth2.group= ?", group).Scan(&gi).Error
  753. if err == nil {
  754. new = true
  755. return
  756. }
  757. err = s.DBDatabus.Table("auth").Select("group_name as `group`,topic,app_id").Where("group_name = ?", group).Find(&gi).Error
  758. if err != nil {
  759. log.Error("apmSvc.getGroupInfo error(%v", err)
  760. return
  761. }
  762. return
  763. }
  764. //getAction according group get action
  765. func (s *Service) getAction(group string) (action string) {
  766. if strings.HasSuffix(group, "-P") {
  767. action = "pub"
  768. } else if strings.HasSuffix(group, "-S") {
  769. action = "sub"
  770. } else {
  771. action = "notify"
  772. }
  773. return
  774. }
  775. //TableInfo get array table info
  776. func (s *Service) TableInfo(table string) (infos []*cml.Table, err error) {
  777. info := strings.Split(table, ",")
  778. tab := make([]*cml.Table, len(info))
  779. for i := range info {
  780. tab[i] = &cml.Table{Name: info[i]}
  781. }
  782. infos = tab
  783. return
  784. }
  785. //getServerID get server id from addr
  786. func (s *Service) getServerID(addr string) (sid int64, err error) {
  787. ip := strings.Split(addr, ".")
  788. last := ip[len(ip)-1]
  789. port := strings.Split(last, ":")
  790. joint := fmt.Sprintf("%s%s%s", port[len(port)-1], ip[len(ip)-2], port[len(port)-2])
  791. sid, err = strconv.ParseInt(joint, 10, 64)
  792. return
  793. }
  794. //SendWechatMessage send wechat message
  795. func (s *Service) SendWechatMessage(c context.Context, addr, aType, result, sender, note string, receiver []string) (err error) {
  796. var (
  797. detail string
  798. users = []string{sender}
  799. )
  800. users = append(users, receiver...)
  801. if env.DeployEnv != "prod" {
  802. detail = fmt.Sprintf("http://%s-%s", env.DeployEnv, "%s")
  803. } else {
  804. detail = "http://%s"
  805. }
  806. switch aType {
  807. case cml.TypeMap[cml.TypeApply]:
  808. detail = fmt.Sprintf(detail, "sven.bilibili.co/#/canal/apply")
  809. case cml.TypeMap[cml.TypeReview]:
  810. detail = fmt.Sprintf(detail, "ops-log.bilibili.co/app/kibana 确认canal订阅消息")
  811. }
  812. msg := fmt.Sprintf("[sven系统抄送提示]\n发送方:%s\n事件: %s环境 DB:%s canal%s%s\n接收方:%s\n备注:%s\n详情:%s(请复制到浏览器打开)\n", sender, env.DeployEnv, addr, aType, result, strings.Join(receiver, ","), note, detail)
  813. if err = s.dao.SendWechatToUsers(c, users, msg); err != nil {
  814. log.Error("apmSvc.SendWechatMessage error(%v)", err)
  815. return
  816. }
  817. return
  818. }
  819. // UpdateProcessTag canal审核通过之后,调用/x/admin/config/canal/tag/update,同步到配置中心发版
  820. func (s *Service) UpdateProcessTag(c context.Context, configID int, cookie string) (err error) {
  821. client := &http.Client{}
  822. tokenURL := fmt.Sprintf("%s/x/admin/config/app/envs", conf.Conf.Canal.CANALSVENCo)
  823. tokenParams := url.Values{}
  824. tokenParams.Set("app_name", "main.common-arch.canal")
  825. tokenParams.Set("tree_id", "3766")
  826. tokenParams.Set("zone", env.Zone)
  827. tokenReq, err := http.NewRequest(http.MethodGet, tokenURL+"?"+tokenParams.Encode(), nil)
  828. if err != nil {
  829. err = ecode.RequestErr
  830. return
  831. }
  832. tokenReq.Header.Set("Content-Type", "application/json;charset=UTF-8")
  833. tokenReq.Header.Set("Cookie", cookie)
  834. tokenResp, err := client.Do(tokenReq)
  835. if err != nil {
  836. return
  837. }
  838. body, err := ioutil.ReadAll(tokenResp.Body)
  839. if err != nil {
  840. return
  841. }
  842. defer tokenResp.Body.Close()
  843. var tokenRespObj struct {
  844. Code int `json:"code"`
  845. Data []struct {
  846. Name string `json:"name"`
  847. NickName string `json:"nickname"`
  848. Token string `json:"token"`
  849. } `json:"data"`
  850. }
  851. err = json.Unmarshal(body, &tokenRespObj)
  852. if err != nil {
  853. return fmt.Errorf("json unmarshal error: %s, get body: %s", err, body)
  854. }
  855. if tokenRespObj.Code != 0 {
  856. log.Error(" tokenRespObj.Code: %d", tokenRespObj.Code)
  857. return fmt.Errorf("tokenRespObj.Code: %d", tokenRespObj.Code)
  858. }
  859. var token string
  860. for _, v := range tokenRespObj.Data {
  861. if v.Name == env.DeployEnv {
  862. token = v.Token
  863. break
  864. }
  865. }
  866. log.Info("tokenURL(%s), env.DeployEnv(%v), token(%v)", tokenURL+"?"+tokenParams.Encode(), env.DeployEnv, token)
  867. updateURL := fmt.Sprintf("%s/x/admin/config/canal/tag/update", conf.Conf.Canal.CANALSVENCo)
  868. params := url.Values{}
  869. params.Set("app_name", "main.common-arch.canal")
  870. params.Set("env", env.DeployEnv)
  871. params.Set("zone", env.Zone)
  872. params.Set("config_ids", fmt.Sprintf("%d", configID))
  873. params.Set("tree_id", "3766")
  874. params.Set("mark", "canal发版")
  875. params.Set("user", "canalApprovalProcess")
  876. params.Set("force", "1")
  877. if conf.Conf.Canal.BUILD != "" {
  878. params.Set("build", conf.Conf.Canal.BUILD)
  879. } else {
  880. params.Set("build", "docker-1")
  881. }
  882. log.Info("env:(%v), zone:(%v), build:(%v)", params.Get("env"), params.Get("zone"), params.Get("build"))
  883. params.Set("token", token)
  884. req, err := http.NewRequest(http.MethodPost, updateURL, strings.NewReader(params.Encode()))
  885. if err != nil {
  886. err = ecode.RequestErr
  887. return
  888. }
  889. req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  890. req.Header.Set("Cookie", cookie)
  891. var res struct {
  892. Code int `json:"code"`
  893. Message string `json:"message"`
  894. }
  895. if err = s.client.Do(c, req, &res); err != nil || res.Code != 0 {
  896. log.Error("ApplyApprovalUpdateTag url(%s) err(%v), code(%v), message(%s)", updateURL+params.Encode(), err, res.Code, res.Message)
  897. err = ecode.RequestErr
  898. return
  899. }
  900. return
  901. }