hbase.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "time"
  7. "go-common/app/infra/canal/conf"
  8. "go-common/app/infra/canal/model"
  9. "go-common/library/log"
  10. "go-common/library/queue/databus"
  11. hbase "go-common/library/database/hbase.v2"
  12. "github.com/pkg/errors"
  13. "github.com/tsuna/gohbase/hrpc"
  14. )
  15. // HBaseInstance hbase canal instance
  16. type HBaseInstance struct {
  17. config *conf.HBaseInsConf
  18. client *hbase.Client
  19. // one instance can have lots of target different by schema and table
  20. targets []*databus.Databus
  21. hinfo *hbaseInfo
  22. // scan latest timestamp, be used check delay
  23. latestTimestamp int64
  24. err error
  25. closed bool
  26. }
  27. // NewHBaseInstance new canal instance
  28. func NewHBaseInstance(c *conf.HBaseInsConf) (ins *HBaseInstance, err error) {
  29. // new instance
  30. ins = &HBaseInstance{config: c}
  31. // check and modify config
  32. if c.MasterInfo == nil {
  33. err = errors.New("no masterinfo config")
  34. ins.err = err
  35. return
  36. }
  37. // new hbase info
  38. if ins.hinfo, err = newHBaseInfo(ins.config.Cluster, ins.config.MasterInfo); err != nil {
  39. log.Error("init hbase info error(%v)", err)
  40. ins.err = errors.Wrap(err, "init hbase info")
  41. return
  42. }
  43. // new hbase client
  44. ins.client = hbase.NewClient(&hbase.Config{Zookeeper: &hbase.ZKConfig{
  45. Root: c.Root,
  46. Addrs: c.Addrs,
  47. }})
  48. return
  49. }
  50. // Start start binlog receive
  51. func (ins *HBaseInstance) Start() {
  52. for _, db := range ins.config.Databases {
  53. databus := databus.New(db.Databus)
  54. ins.targets = append(ins.targets, databus)
  55. for _, tb := range db.Tables {
  56. go ins.scanproc(db, databus, tb)
  57. }
  58. }
  59. }
  60. func (ins *HBaseInstance) scanproc(c *conf.HBaseDatabase, databus *databus.Databus, table *conf.HBaseTable) {
  61. latestTs := ins.hinfo.LatestTs(table.Name)
  62. if latestTs == 0 {
  63. latestTs = uint64(time.Now().UnixNano() / 1e6)
  64. }
  65. AGAIN:
  66. for {
  67. time.Sleep(300 * time.Millisecond)
  68. nowTs := uint64(time.Now().UnixNano() / 1e6)
  69. // scan
  70. scanner, err := ins.client.Scan(context.TODO(), []byte(table.Name), hrpc.TimeRangeUint64(latestTs, nowTs))
  71. if err != nil {
  72. time.Sleep(time.Second)
  73. continue
  74. }
  75. ins.latestTimestamp = time.Now().Unix()
  76. for {
  77. res, err := scanner.Next()
  78. if err == io.EOF {
  79. latestTs = nowTs
  80. ins.hinfo.Save(table.Name, latestTs)
  81. continue AGAIN
  82. } else if err != nil {
  83. time.Sleep(time.Second)
  84. continue AGAIN
  85. }
  86. var key string
  87. data := map[string]interface{}{}
  88. for _, cell := range res.Cells {
  89. row := string(cell.Row)
  90. if key == "" {
  91. key = row
  92. }
  93. if _, ok := data[row]; !ok {
  94. data[row] = map[string]string{}
  95. }
  96. column := string(cell.Qualifier)
  97. omit := false
  98. for _, field := range table.OmitField {
  99. if field == column {
  100. omit = true
  101. break
  102. }
  103. }
  104. if !omit {
  105. cm := data[row].(map[string]string)
  106. cm[column] = string(cell.Value)
  107. }
  108. }
  109. real := &model.Data{Table: table.Name, New: data}
  110. databus.Send(context.TODO(), key, real)
  111. log.Info("hbase databus:group(%s)topic(%s) pub(key:%s, value:%+v) succeed", c.Databus.Group, c.Databus.Topic, key, real)
  112. }
  113. }
  114. }
  115. // Close close instance
  116. func (ins *HBaseInstance) Close() {
  117. if ins.err != nil {
  118. return
  119. }
  120. ins.client.Close()
  121. for _, t := range ins.targets {
  122. t.Close()
  123. }
  124. ins.closed = true
  125. ins.err = errors.New("canal hbase instance closed")
  126. }
  127. func (ins *HBaseInstance) String() string {
  128. return ins.config.Cluster
  129. }
  130. // Error returns instance error.
  131. func (ins *HBaseInstance) Error() string {
  132. if ins.err == nil {
  133. return ""
  134. }
  135. return fmt.Sprintf("+%v", ins.err)
  136. }
  137. func (ins *HBaseInstance) delay() int64 {
  138. return time.Now().Unix() - ins.latestTimestamp
  139. }