canal.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package http
  2. import (
  3. "context"
  4. "strings"
  5. "go-common/app/admin/main/apm/conf"
  6. "go-common/app/admin/main/apm/model/canal"
  7. "go-common/app/admin/main/apm/model/user"
  8. "go-common/library/ecode"
  9. "go-common/library/log"
  10. bm "go-common/library/net/http/blademaster"
  11. )
  12. const (
  13. _upsertMasterInfo = "INSERT INTO master_info (addr,remark,leader,cluster) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE remark=?,leader=?,cluster=?"
  14. )
  15. //canalList get canalinfo list
  16. func canalList(c *bm.Context) {
  17. var err error
  18. v := new(canal.ListReq)
  19. if err = c.Bind(v); err != nil {
  20. return
  21. }
  22. data, _ := apmSvc.ProcessCanalList(c, v)
  23. c.JSON(data, nil)
  24. }
  25. //canalList get canalinfo add
  26. func canalAdd(c *bm.Context) {
  27. v := new(canal.Canal)
  28. var err error
  29. if err = c.Bind(v); err != nil {
  30. return
  31. }
  32. username := name(c)
  33. err = apmSvc.ApplyAdd(c, v, username)
  34. if err != nil {
  35. c.JSON(nil, err)
  36. return
  37. }
  38. c.JSON(nil, err)
  39. }
  40. //canalList get canalinfo edit
  41. func canalEdit(c *bm.Context) {
  42. v := new(canal.EditReq)
  43. var err error
  44. if err = c.Bind(v); err != nil {
  45. return
  46. }
  47. username := name(c)
  48. if err = apmSvc.ApplyEdit(c, v, username); err != nil {
  49. c.JSON(nil, err)
  50. return
  51. }
  52. c.JSON(nil, err)
  53. }
  54. //canalDelete 根据addr查询对应id进行软删除
  55. func canalDelete(c *bm.Context) {
  56. var err error
  57. v := new(canal.ScanReq)
  58. if err = c.Bind(v); err != nil {
  59. return
  60. }
  61. username := name(c)
  62. if err = apmSvc.ApplyDelete(c, v, username); err != nil {
  63. c.JSON(nil, err)
  64. return
  65. }
  66. c.JSON(nil, err)
  67. }
  68. //canalScanByAddrFromConfig 根据Addr查询对应有效的配置
  69. func canalScanByAddrFromConfig(c *bm.Context) {
  70. var confData *canal.Results
  71. cookie := c.Request.Header.Get("Cookie")
  72. v := new(canal.ScanReq)
  73. var err error
  74. if err = c.Bind(v); err != nil {
  75. return
  76. }
  77. username := name(c)
  78. if confData, err = apmSvc.GetScanInfo(c, v, username, cookie); err != nil {
  79. c.JSON(nil, err)
  80. return
  81. }
  82. c.JSON(confData, nil)
  83. }
  84. func canalApplyList(c *bm.Context) {
  85. v := new(canal.ListReq)
  86. var err error
  87. if err = c.Bind(v); err != nil {
  88. return
  89. }
  90. data, err := apmSvc.ProcessApplyList(c, v)
  91. if err != nil {
  92. c.JSON(nil, err)
  93. return
  94. }
  95. c.JSON(data, nil)
  96. }
  97. //canalApplyDetailToConfig is
  98. func canalApplyDetailToConfig(c *bm.Context) {
  99. var (
  100. username string
  101. err error
  102. v = new(canal.ConfigReq)
  103. cookie = c.Request.Header.Get("Cookie")
  104. )
  105. if u, err := c.Request.Cookie("username"); err == nil {
  106. username = u.Value
  107. } else {
  108. username = "third"
  109. }
  110. if err = c.Bind(v); err != nil {
  111. return
  112. }
  113. //judge legal params
  114. f := strings.Contains(v.Addr, ":")
  115. if !f {
  116. log.Error("canalApplyAdd addr not standard error(%v)", err)
  117. c.JSON(nil, ecode.CanalAddrFmtErr)
  118. return
  119. }
  120. if v.User != "" && v.Password != "" {
  121. if err = apmSvc.CheckMaster(c, v); err != nil {
  122. c.JSON(nil, err)
  123. return
  124. }
  125. }
  126. if err = apmSvc.ProcessCanalInfo(c, v, username); err != nil {
  127. c.JSON(nil, err)
  128. return
  129. }
  130. if err = apmSvc.ProcessConfigInfo(c, v, cookie, username); err != nil {
  131. c.JSON(nil, err)
  132. return
  133. }
  134. go apmSvc.SendWechatMessage(context.Background(), v.Addr, canal.TypeMap[canal.TypeApply], "", username, v.Mark, conf.Conf.Canal.Reviewer)
  135. c.JSON(nil, err)
  136. }
  137. //canalApplyConfigEdit is
  138. func canalApplyConfigEdit(c *bm.Context) {
  139. cookie := c.Request.Header.Get("Cookie")
  140. v := new(canal.ConfigReq)
  141. if err := c.Bind(v); err != nil {
  142. return
  143. }
  144. ap := &canal.Apply{}
  145. err := apmSvc.DBCanal.Model(&canal.Apply{}).Select("`operator`").Where("addr=?", v.Addr).Find(ap).Error
  146. if err != nil {
  147. log.Error("no apply error", err)
  148. err = ecode.CanalAddrNotFound
  149. c.JSON(nil, err)
  150. return
  151. }
  152. username := name(c)
  153. err = apmSvc.Permit(c, username, user.CanalView)
  154. err0 := apmSvc.Permit(c, username, user.CanalEdit)
  155. if err != nil || (username != ap.Operator && err0 != nil) {
  156. log.Error("permit(%v, %s,%s)", username, err)
  157. c.JSON(nil, ecode.AccessDenied)
  158. return
  159. }
  160. if v.User != "" && v.Password != "" {
  161. if err = apmSvc.CheckMaster(c, v); err != nil {
  162. c.JSON(nil, err)
  163. return
  164. }
  165. }
  166. if err0 != nil {
  167. v = &canal.ConfigReq{
  168. Addr: v.Addr,
  169. MonitorPeriod: v.MonitorPeriod,
  170. Databases: v.Databases,
  171. Project: v.Project,
  172. Leader: v.Leader,
  173. Mark: v.Mark,
  174. }
  175. }
  176. //judge legal params
  177. f := strings.Contains(v.Addr, ":")
  178. if !f {
  179. log.Error("canalApplyAdd addr not standard error(%v)", err)
  180. c.JSON(nil, ecode.CanalAddrFmtErr)
  181. return
  182. }
  183. if err = apmSvc.ProcessCanalInfo(c, v, username); err != nil {
  184. c.JSON(nil, err)
  185. return
  186. }
  187. if err = apmSvc.ProcessConfigInfo(c, v, cookie, username); err != nil {
  188. c.JSON(nil, err)
  189. return
  190. }
  191. c.JSON(nil, err)
  192. }
  193. //canalAddrAll get canal all addr info
  194. func canalAddrAll(c *bm.Context) {
  195. var (
  196. list []string
  197. items []*canal.Canal
  198. )
  199. err := apmSvc.DBCanal.Model(&canal.Canal{}).Where("is_delete= 0").Select("`addr`").Scan(&items).Error
  200. if err != nil {
  201. log.Error("canalAddrAll get addr error(%v)", err)
  202. c.JSON(nil, err)
  203. return
  204. }
  205. for _, v := range items {
  206. list = append(list, v.Addr)
  207. }
  208. c.JSON(list, nil)
  209. }
  210. // canal 审核
  211. func canalApplyApprovalProcess(c *bm.Context) {
  212. var (
  213. err error
  214. )
  215. cookie := c.Request.Header.Get("Cookie")
  216. res := map[string]interface{}{}
  217. v := new(struct {
  218. ID int `form:"id" validate:"required"`
  219. State int8 `form:"state" validate:"required"`
  220. })
  221. if err = c.Bind(v); err != nil {
  222. return
  223. }
  224. username := name(c)
  225. apply := &canal.Apply{}
  226. if err = apmSvc.DBCanal.Where("id = ?", v.ID).First(apply).Error; err != nil {
  227. log.Error("canalApplyApprovalProcess id error(%v)", err)
  228. c.JSON(nil, err)
  229. return
  230. }
  231. if !(apply.State == 1 || apply.State == 2) {
  232. log.Error("canalApplyApprovalProcess apply.state error(%v)", apply.State)
  233. res["message"] = "只有申请中和打回才可审核"
  234. c.JSONMap(res, ecode.RequestErr)
  235. return
  236. }
  237. if !(v.State == 2 || v.State == 3 || v.State == 4) {
  238. log.Error("canalApplyApprovalProcess v.state error(%v)", v.State)
  239. res["message"] = "state值范围2,3,4"
  240. c.JSONMap(res, ecode.RequestErr)
  241. return
  242. }
  243. ups := map[string]interface{}{
  244. "state": v.State,
  245. }
  246. if v.State == 3 && apply.State != 3 {
  247. if err = apmSvc.UpdateProcessTag(c, apply.ConfID, cookie); err != nil {
  248. log.Error("apmSvc.UpdateProcessTag error(%v)", apply.ID)
  249. c.JSON(nil, err)
  250. return
  251. }
  252. if err = apmSvc.DBCanal.Exec(_upsertMasterInfo, apply.Addr, apply.Remark, apply.Leader, apply.Cluster, apply.Remark, apply.Leader, apply.Cluster).Error; err != nil {
  253. log.Error("canalProcess update master_info error(%v)", err)
  254. c.JSONMap(nil, err)
  255. return
  256. }
  257. }
  258. // 更新apply
  259. if err = apmSvc.DBCanal.Model(apply).Where("id = ?", apply.ID).Update(ups).Error; err != nil {
  260. log.Error("canalApplyApprovalProcess update error(%v)", apply.ID)
  261. res["message"] = "修改状态失败"
  262. c.JSONMap(res, err)
  263. return
  264. }
  265. go apmSvc.SendWechatMessage(context.Background(), apply.Addr, canal.TypeMap[canal.TypeReview], canal.TypeMap[v.State], username, "", []string{apply.Operator})
  266. c.JSON(nil, err)
  267. }