tidb_proc.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/infra/canal/conf"
  7. "go-common/library/log"
  8. )
  9. func (ins *tidbInstance) proc(ch chan *msg) {
  10. defer ins.waitTable.Done()
  11. for {
  12. msg, ok := <-ch
  13. if !ok {
  14. return
  15. }
  16. data, err := tidbMakeData(msg)
  17. if err != nil {
  18. log.Error("tidb MakeData(%+v) err: %+v", msg, err)
  19. continue
  20. }
  21. if data == nil {
  22. continue
  23. }
  24. for _, target := range ins.targets[msg.db] {
  25. for {
  26. if err = target.Send(context.TODO(), data.Key, data); err == nil {
  27. stats.Incr("send_counter", target.Name(), msg.db, msg.tableRegexp, data.Action)
  28. break
  29. }
  30. stats.Incr("retry_counter", target.Name(), msg.db, msg.tableRegexp, data.Action)
  31. log.Error("tidb %s scheme(%s) pub fail,add to retry", target.Name(), msg.db)
  32. time.Sleep(time.Second)
  33. }
  34. log.Info("tidb %s pub(key:%s, value:%+v) succeed", target.Name(), data.Key, data)
  35. }
  36. }
  37. }
  38. func (ins *tidbInstance) syncproc() {
  39. defer ins.waitConsume.Done()
  40. for {
  41. if ins.closed {
  42. return
  43. }
  44. time.Sleep(time.Second * 5)
  45. ins.sync()
  46. }
  47. }
  48. func (c *Canal) tidbEventproc() {
  49. ech := conf.TiDBEvent()
  50. for {
  51. insc := <-ech
  52. if insc == nil {
  53. continue
  54. }
  55. c.tidbInsl.Lock()
  56. if old, ok := c.tidbInstances[insc.Name]; ok {
  57. old.close()
  58. }
  59. c.tidbInsl.Unlock()
  60. ins, err := newTiDBInstance(c, insc)
  61. if err != nil {
  62. log.Error("new instance error(%v)", err)
  63. c.sendWx(fmt.Sprintf("reload tidb canal instance(%s) failed error(%v)", ins.String(), err))
  64. continue
  65. }
  66. c.tidbInsl.Lock()
  67. c.tidbInstances[insc.Name] = ins
  68. c.tidbInsl.Unlock()
  69. go ins.start()
  70. log.Info("reload tidb canal instance(%s) success", ins.String())
  71. c.sendWx(fmt.Sprintf("reload tidb canal instance(%s) success", ins.String()))
  72. }
  73. }