123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- package service
- import (
- "context"
- "fmt"
- "io"
- "time"
- "go-common/app/infra/canal/conf"
- "go-common/app/infra/canal/model"
- "go-common/library/log"
- "go-common/library/queue/databus"
- hbase "go-common/library/database/hbase.v2"
- "github.com/pkg/errors"
- "github.com/tsuna/gohbase/hrpc"
- )
- // HBaseInstance hbase canal instance
- type HBaseInstance struct {
- config *conf.HBaseInsConf
- client *hbase.Client
- // one instance can have lots of target different by schema and table
- targets []*databus.Databus
- hinfo *hbaseInfo
- // scan latest timestamp, be used check delay
- latestTimestamp int64
- err error
- closed bool
- }
- // NewHBaseInstance new canal instance
- func NewHBaseInstance(c *conf.HBaseInsConf) (ins *HBaseInstance, err error) {
- // new instance
- ins = &HBaseInstance{config: c}
- // check and modify config
- if c.MasterInfo == nil {
- err = errors.New("no masterinfo config")
- ins.err = err
- return
- }
- // new hbase info
- if ins.hinfo, err = newHBaseInfo(ins.config.Cluster, ins.config.MasterInfo); err != nil {
- log.Error("init hbase info error(%v)", err)
- ins.err = errors.Wrap(err, "init hbase info")
- return
- }
- // new hbase client
- ins.client = hbase.NewClient(&hbase.Config{Zookeeper: &hbase.ZKConfig{
- Root: c.Root,
- Addrs: c.Addrs,
- }})
- return
- }
- // Start start binlog receive
- func (ins *HBaseInstance) Start() {
- for _, db := range ins.config.Databases {
- databus := databus.New(db.Databus)
- ins.targets = append(ins.targets, databus)
- for _, tb := range db.Tables {
- go ins.scanproc(db, databus, tb)
- }
- }
- }
- func (ins *HBaseInstance) scanproc(c *conf.HBaseDatabase, databus *databus.Databus, table *conf.HBaseTable) {
- latestTs := ins.hinfo.LatestTs(table.Name)
- if latestTs == 0 {
- latestTs = uint64(time.Now().UnixNano() / 1e6)
- }
- AGAIN:
- for {
- time.Sleep(300 * time.Millisecond)
- nowTs := uint64(time.Now().UnixNano() / 1e6)
- // scan
- scanner, err := ins.client.Scan(context.TODO(), []byte(table.Name), hrpc.TimeRangeUint64(latestTs, nowTs))
- if err != nil {
- time.Sleep(time.Second)
- continue
- }
- ins.latestTimestamp = time.Now().Unix()
- for {
- res, err := scanner.Next()
- if err == io.EOF {
- latestTs = nowTs
- ins.hinfo.Save(table.Name, latestTs)
- continue AGAIN
- } else if err != nil {
- time.Sleep(time.Second)
- continue AGAIN
- }
- var key string
- data := map[string]interface{}{}
- for _, cell := range res.Cells {
- row := string(cell.Row)
- if key == "" {
- key = row
- }
- if _, ok := data[row]; !ok {
- data[row] = map[string]string{}
- }
- column := string(cell.Qualifier)
- omit := false
- for _, field := range table.OmitField {
- if field == column {
- omit = true
- break
- }
- }
- if !omit {
- cm := data[row].(map[string]string)
- cm[column] = string(cell.Value)
- }
- }
- real := &model.Data{Table: table.Name, New: data}
- databus.Send(context.TODO(), key, real)
- log.Info("hbase databus:group(%s)topic(%s) pub(key:%s, value:%+v) succeed", c.Databus.Group, c.Databus.Topic, key, real)
- }
- }
- }
- // Close close instance
- func (ins *HBaseInstance) Close() {
- if ins.err != nil {
- return
- }
- ins.client.Close()
- for _, t := range ins.targets {
- t.Close()
- }
- ins.closed = true
- ins.err = errors.New("canal hbase instance closed")
- }
- func (ins *HBaseInstance) String() string {
- return ins.config.Cluster
- }
- // Error returns instance error.
- func (ins *HBaseInstance) Error() string {
- if ins.err == nil {
- return ""
- }
- return fmt.Sprintf("+%v", ins.err)
- }
- func (ins *HBaseInstance) delay() int64 {
- return time.Now().Unix() - ins.latestTimestamp
- }
|