canal.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "net/url"
  7. "regexp"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "go-common/app/infra/canal/conf"
  13. "go-common/app/infra/canal/dao"
  14. "go-common/library/conf/env"
  15. "go-common/library/log"
  16. xhttp "go-common/library/net/http/blademaster"
  17. "go-common/library/net/netutil"
  18. "go-common/library/stat/prom"
  19. "github.com/siddontang/go-mysql/client"
  20. )
  21. var (
  22. stats = prom.New().WithState("go_canal_counter", []string{"type", "addr", "scheme", "table", "action"})
  23. tblReplacer = regexp.MustCompile("[0-9]+") // NOTE: replace number of sub-table name to space
  24. )
  25. // Canal is canal.
  26. type Canal struct {
  27. dao *dao.Dao
  28. instances map[string]*Instance
  29. hbaseInstances map[string]*HBaseInstance
  30. tidbInstances map[string]*tidbInstance
  31. insl sync.RWMutex
  32. tidbInsl sync.RWMutex
  33. err error
  34. backoff netutil.Backoff
  35. errCount int64
  36. lastErr time.Time
  37. }
  38. // NewCanal load config and start canal instance.
  39. func NewCanal(config *conf.Config) (c *Canal) {
  40. c = &Canal{
  41. dao: dao.New(config),
  42. }
  43. cfg, err := conf.LoadCanalConf()
  44. if err != nil {
  45. panic(err)
  46. }
  47. c.instances = make(map[string]*Instance, len(cfg.Instances))
  48. c.hbaseInstances = make(map[string]*HBaseInstance, len(cfg.HBaseInstances))
  49. c.tidbInstances = make(map[string]*tidbInstance, len(cfg.TiDBInstances))
  50. for _, insc := range cfg.Instances {
  51. ins, err := NewInstance(insc)
  52. if err != nil {
  53. log.Error("new instance error(%+v)", err)
  54. }
  55. c.insl.Lock()
  56. c.instances[insc.Addr] = ins
  57. c.insl.Unlock()
  58. if err == nil {
  59. go ins.Start()
  60. log.Info("start canal instance(%s) success", ins.String())
  61. }
  62. }
  63. c.backoff = &netutil.BackoffConfig{
  64. MaxDelay: 120 * time.Second,
  65. BaseDelay: 1.0 * time.Second,
  66. Factor: 1.6,
  67. Jitter: 0.2,
  68. }
  69. c.lastErr = time.Now()
  70. go c.eventproc()
  71. // hbase
  72. for _, insc := range cfg.HBaseInstances {
  73. ins, err := NewHBaseInstance(insc)
  74. if err != nil {
  75. log.Error("new hbase instance error(%+v)", err)
  76. }
  77. c.insl.Lock()
  78. c.hbaseInstances[insc.Cluster] = ins
  79. c.insl.Unlock()
  80. if err == nil {
  81. ins.Start()
  82. log.Info("start hbase instance(%s) success", ins.String())
  83. }
  84. }
  85. go c.hbaseeventproc()
  86. // tidb
  87. for _, insc := range cfg.TiDBInstances {
  88. ins, err := newTiDBInstance(c, insc)
  89. if err != nil {
  90. log.Error("new instance error(%+v)", err)
  91. c.sendWx(fmt.Sprintf("new tidb canal instance(%s) failed error(%v)", ins.String(), err))
  92. continue
  93. }
  94. c.tidbInsl.Lock()
  95. c.tidbInstances[insc.Name] = ins
  96. c.tidbInsl.Unlock()
  97. if err == nil {
  98. go ins.start()
  99. log.Info("start tidb canal instance(%s) success", ins.String())
  100. }
  101. }
  102. go c.tidbEventproc()
  103. go c.monitorproc()
  104. // errproc
  105. go c.errproc()
  106. return
  107. }
  108. // CheckMaster check master status.
  109. func (c *Canal) CheckMaster(addr, user, pwd string) (name string, pos int64, err error) {
  110. conn, err := client.Connect(addr, user, pwd, "")
  111. if err != nil {
  112. return
  113. }
  114. rr, err := conn.Execute("SHOW MASTER STATUS")
  115. if err != nil {
  116. return
  117. }
  118. name, _ = rr.GetString(0, 0)
  119. pos, _ = rr.GetInt(0, 1)
  120. if name != "" && pos > 0 {
  121. return
  122. }
  123. return "", 0, fmt.Errorf("check master no name|pos")
  124. }
  125. // PosSync sync newewst bin_pos.
  126. func (c *Canal) PosSync(addr string) (err error) {
  127. c.insl.Lock()
  128. old, ok := c.instances[addr]
  129. c.insl.Unlock()
  130. if !ok {
  131. return
  132. }
  133. pos, err := old.GetMasterPos()
  134. if err != nil {
  135. return
  136. }
  137. old.Close()
  138. old.OnPosSynced(pos, true)
  139. ins, _ := NewInstance(old.config)
  140. c.insl.Lock()
  141. c.instances[addr] = ins
  142. c.insl.Unlock()
  143. ins.Start()
  144. return
  145. }
  146. // Errors returns instance errors.
  147. func (c *Canal) Errors() (ies map[string]string) {
  148. ies = map[string]string{}
  149. c.insl.RLock()
  150. for _, i := range c.instances {
  151. ies[i.String()] = i.Error()
  152. }
  153. for _, i := range c.hbaseInstances {
  154. ies[i.String()] = i.Error()
  155. }
  156. c.insl.RUnlock()
  157. return
  158. }
  159. // Error returns canal error.
  160. func (c *Canal) Error() string {
  161. if c.err == nil {
  162. return ""
  163. }
  164. return c.err.Error()
  165. }
  166. // errproc check errors.
  167. func (c *Canal) errproc() {
  168. for {
  169. time.Sleep(10 * time.Second)
  170. es := c.Error()
  171. if es != "" {
  172. c.sendWx(fmt.Sprintf("canal occur error(%s)", es))
  173. }
  174. ies := c.Errors()
  175. for k, v := range ies {
  176. if v != "" {
  177. c.sendWx(fmt.Sprintf("canal instance(%s) occur error(%s)", k, v))
  178. }
  179. }
  180. }
  181. }
  182. // Close close canal instance
  183. func (c *Canal) Close() {
  184. c.insl.RLock()
  185. defer c.insl.RUnlock()
  186. for _, ins := range c.instances {
  187. ins.Close()
  188. log.Info("close canal instance(%s) success", ins.String())
  189. }
  190. for _, ins := range c.hbaseInstances {
  191. ins.Close()
  192. log.Info("close hbase instance(%s) success", ins.String())
  193. }
  194. for _, ins := range c.tidbInstances {
  195. ins.close()
  196. log.Info("close tidb instance(%s) success", ins.String())
  197. }
  198. }
  199. func (c *Canal) eventproc() {
  200. ech := conf.Event()
  201. for {
  202. insc := <-ech
  203. if insc == nil {
  204. continue
  205. }
  206. ins, err := NewInstance(insc)
  207. if err != nil {
  208. log.Error("new instance error(%v)", err)
  209. c.sendWx(fmt.Sprintf("reload canal instance(%s) failed error(%v)", ins.String(), err))
  210. c.insl.Lock()
  211. if old, ok := c.instances[insc.Addr]; ok {
  212. old.Close()
  213. }
  214. c.instances[insc.Addr] = ins
  215. c.insl.Unlock()
  216. continue
  217. }
  218. c.insl.Lock()
  219. if old, ok := c.instances[insc.Addr]; ok {
  220. old.Close()
  221. }
  222. c.instances[insc.Addr] = ins
  223. c.insl.Unlock()
  224. go ins.Start()
  225. log.Info("reload canal instance(%s) success", ins.String())
  226. // c.sendWx(fmt.Sprintf("reload canal instance(%s) success", ins.String()))
  227. }
  228. }
  229. func (c *Canal) hbaseeventproc() {
  230. ech := conf.HBaseEvent()
  231. for {
  232. insc := <-ech
  233. if insc == nil {
  234. continue
  235. }
  236. ins, err := NewHBaseInstance(insc)
  237. if err != nil {
  238. log.Error("new instance error(%v)", err)
  239. c.insl.Lock()
  240. if old, ok := c.hbaseInstances[insc.Cluster]; ok {
  241. old.Close()
  242. }
  243. c.hbaseInstances[insc.Cluster] = ins
  244. c.insl.Unlock()
  245. continue
  246. }
  247. c.insl.Lock()
  248. if old, ok := c.hbaseInstances[insc.Cluster]; ok {
  249. old.Close()
  250. }
  251. c.hbaseInstances[insc.Cluster] = ins
  252. c.insl.Unlock()
  253. ins.Start()
  254. log.Info("reload hbase instance(%s) success", ins.String())
  255. }
  256. }
  257. // monitorproc monitor instance delay.
  258. func (c *Canal) monitorproc() {
  259. if env.DeployEnv != env.DeployEnvProd {
  260. return
  261. }
  262. const delay = 2 * time.Minute
  263. for {
  264. time.Sleep(delay)
  265. c.insl.RLock()
  266. for _, ins := range c.instances {
  267. if ins.closed {
  268. continue
  269. }
  270. threshold := int64(time.Duration(ins.config.MonitorPeriod) / time.Second)
  271. if threshold <= 0 {
  272. threshold = int64(delay / time.Second)
  273. }
  274. dt := ins.delay()
  275. if ins.config != nil && !ins.config.MonitorOff && dt > threshold {
  276. for _, db := range ins.config.Databases {
  277. c.sendWx(fmt.Sprintf("canal env(%s) 数据库(%s)地址(%s) 同步延迟时间超过阈值:%d秒 当前超过:%d秒", env.DeployEnv, db.Schema, ins.config.Addr, threshold, dt))
  278. }
  279. }
  280. }
  281. for _, ins := range c.hbaseInstances {
  282. if ins.closed {
  283. continue
  284. }
  285. threshold := int64(time.Duration(ins.config.MonitorPeriod) / time.Second)
  286. if threshold <= 0 {
  287. threshold = int64(delay / time.Second)
  288. }
  289. dt := ins.delay()
  290. if ins.config != nil && !ins.config.MonitorOff && dt > threshold {
  291. c.sendWx(fmt.Sprintf("canal env(%s) hbase集群(%s) 同步延迟时间超过阈值:%d秒 当前超过:%d秒", env.DeployEnv, ins.config.Cluster, threshold, dt))
  292. }
  293. }
  294. c.insl.RUnlock()
  295. c.tidbInsl.RLock()
  296. for _, ins := range c.tidbInstances {
  297. if ins.closed {
  298. continue
  299. }
  300. threshold := int64(time.Duration(ins.config.MonitorPeriod) / time.Second)
  301. if threshold <= 0 {
  302. threshold = int64(delay / time.Second)
  303. }
  304. dt := ins.delay()
  305. if ins.config != nil && dt > threshold {
  306. for _, db := range ins.config.Databases {
  307. c.sendWx(fmt.Sprintf("tidb canal env(%s) 数据库(%s)地址(%s) 同步延迟时间超过阈值:%d秒 当前超过:%d秒", env.DeployEnv, db.Schema, ins.config.Name, threshold, dt))
  308. }
  309. }
  310. }
  311. c.tidbInsl.RUnlock()
  312. }
  313. }
  314. func (c *Canal) sendWx(msg string) {
  315. count := atomic.LoadInt64(&c.errCount)
  316. atomic.AddInt64(&c.errCount, 1)
  317. duration := c.backoff.Backoff(int(count))
  318. if time.Since(c.lastErr) < duration {
  319. return
  320. }
  321. c.lastErr = time.Now()
  322. sendWechat(msg, conf.Conf.Monitor.User)
  323. }
  324. func sendWechat(msg string, user string) {
  325. params := url.Values{}
  326. params.Set("Action", "CreateWechatMessage")
  327. params.Set("PublicKey", "9c178e51a7d4dc8aa1dbef0c790b06e7574c4d0etracehubtuhui@bilibili.com")
  328. params.Set("UserName", user)
  329. params.Set("Content", msg)
  330. params.Set("TreeId", "bilibili.main.common-arch.canal")
  331. params.Set("Title", "canal 监控报警")
  332. params.Set("Signature", "1")
  333. req, _ := http.NewRequest("POST", "http://merak.bilibili.co", strings.NewReader(params.Encode()))
  334. req.Header.Add("content-type", "application/x-www-form-urlencoded; charset=UTF-8")
  335. var v struct {
  336. ReqID string `json:"reqId"`
  337. Status int64 `json:"status"`
  338. Response struct {
  339. status int
  340. } `json:"Response"`
  341. }
  342. if err := xhttp.NewClient(conf.Conf.HTTPClient).Do(context.TODO(), req, &v); err != nil {
  343. log.Error("send wechat monitor status(%d) msg(%v,%v) error(%v)", v.Status, v.Response, v.Response.status, err)
  344. }
  345. }