master.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package service
  2. import (
  3. "sync"
  4. "time"
  5. "go-common/app/infra/canal/conf"
  6. "go-common/library/log"
  7. "github.com/juju/errors"
  8. "github.com/siddontang/go-mysql/client"
  9. "github.com/siddontang/go-mysql/mysql"
  10. )
  11. type dbMasterInfo struct {
  12. c *conf.MasterInfoConfig
  13. addr string
  14. binName string
  15. binPos uint32
  16. l sync.RWMutex
  17. lastSaveTime time.Time
  18. }
  19. func newDBMasterInfo(addr string, c *conf.MasterInfoConfig) (*dbMasterInfo, error) {
  20. m := &dbMasterInfo{c: c, addr: addr}
  21. conn, err := client.Connect(c.Addr, c.User, c.Password, c.DBName)
  22. if err != nil {
  23. log.Error("db master info client error(%v)", err)
  24. return nil, errors.Trace(err)
  25. }
  26. defer conn.Close()
  27. if m.c.Timeout > 0 {
  28. conn.SetDeadline(time.Now().Add(m.c.Timeout * time.Second))
  29. }
  30. r, err := conn.Execute("SELECT addr,bin_name,bin_pos FROM master_info WHERE addr=?", addr)
  31. if err != nil {
  32. log.Error("new db load master.info error(%v)", err)
  33. return nil, errors.Trace(err)
  34. }
  35. if r.RowNumber() == 0 {
  36. if m.c.Timeout > 0 {
  37. conn.SetDeadline(time.Now().Add(m.c.Timeout * time.Second))
  38. }
  39. if _, err = conn.Execute("INSERT INTO master_info (addr,bin_name,bin_pos) VALUE (?,'',0)", addr); err != nil {
  40. log.Error("insert master.info error(%v)", err)
  41. return nil, errors.Trace(err)
  42. }
  43. } else {
  44. m.addr, _ = r.GetStringByName(0, "addr")
  45. m.binName, _ = r.GetStringByName(0, "bin_name")
  46. bpos, _ := r.GetIntByName(0, "bin_pos")
  47. m.binPos = uint32(bpos)
  48. }
  49. return m, nil
  50. }
  51. func (m *dbMasterInfo) Save(pos mysql.Position, force bool) error {
  52. n := time.Now()
  53. if !force && n.Sub(m.lastSaveTime) < 2*time.Second {
  54. return nil
  55. }
  56. conn, err := client.Connect(m.c.Addr, m.c.User, m.c.Password, m.c.DBName)
  57. if err != nil {
  58. log.Error("db master info client error(%v)", err)
  59. return errors.Trace(err)
  60. }
  61. defer conn.Close()
  62. if m.c.Timeout > 0 {
  63. conn.SetDeadline(time.Now().Add(m.c.Timeout * time.Second))
  64. }
  65. if _, err = conn.Execute("UPDATE master_info SET bin_name=?,bin_pos=? WHERE addr=?", pos.Name, pos.Pos, m.addr); err != nil {
  66. log.Error("db save master info error(%v)", err)
  67. return errors.Trace(err)
  68. }
  69. m.lastSaveTime = n
  70. return nil
  71. }
  72. func (m *dbMasterInfo) Pos() mysql.Position {
  73. var pos mysql.Position
  74. m.l.RLock()
  75. pos.Name = m.binName
  76. pos.Pos = m.binPos
  77. m.l.RUnlock()
  78. return pos
  79. }
  80. type hbaseInfo struct {
  81. c *conf.MasterInfoConfig
  82. name string
  83. }
  84. func newHBaseInfo(name string, c *conf.MasterInfoConfig) (*hbaseInfo, error) {
  85. m := &hbaseInfo{c: c, name: name}
  86. return m, nil
  87. }
  88. func (m *hbaseInfo) LatestTs(table string) (lts uint64) {
  89. conn, err := client.Connect(m.c.Addr, m.c.User, m.c.Password, m.c.DBName)
  90. if err != nil {
  91. log.Error("db hbase info client error(%v)", err)
  92. return
  93. }
  94. defer conn.Close()
  95. if m.c.Timeout > 0 {
  96. conn.SetDeadline(time.Now().Add(m.c.Timeout * time.Second))
  97. }
  98. r, err := conn.Execute("SELECT latest_ts FROM hbase_info WHERE cluster_name=? AND table_name=?", m.name, table)
  99. if err != nil {
  100. log.Error("new db load hbase.info error(%v)", err)
  101. return
  102. }
  103. if r.RowNumber() == 0 {
  104. if m.c.Timeout > 0 {
  105. conn.SetDeadline(time.Now().Add(m.c.Timeout * time.Second))
  106. }
  107. if _, err = conn.Execute("INSERT INTO hbase_info (cluster_name,table_name,latest_ts) VALUE (?,?,0)", m.name, table); err != nil {
  108. log.Error("insert hbase.info error(%v)", err)
  109. return
  110. }
  111. } else {
  112. lts, _ = r.GetUintByName(0, "latest_ts")
  113. }
  114. return 0
  115. }
  116. func (m *hbaseInfo) Save(table string, latestTs uint64) {
  117. conn, err := client.Connect(m.c.Addr, m.c.User, m.c.Password, m.c.DBName)
  118. if err != nil {
  119. log.Error("save hbase info client error(%v)", err)
  120. return
  121. }
  122. defer conn.Close()
  123. if m.c.Timeout > 0 {
  124. conn.SetDeadline(time.Now().Add(m.c.Timeout * time.Second))
  125. }
  126. if _, err = conn.Execute("UPDATE hbase_info SET latest_ts=? WHERE cluster_name=? AND table_name=?", latestTs, m.name, table); err != nil {
  127. log.Error("save hbase info error(%v)", err)
  128. }
  129. }