123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- package service
- import (
- "context"
- "encoding/base64"
- "fmt"
- "hash/crc32"
- "strings"
- "time"
- "go-common/app/infra/canal/conf"
- "go-common/app/infra/canal/infoc"
- "go-common/app/infra/canal/model"
- "go-common/library/log"
- "go-common/library/queue/databus"
- "github.com/pkg/errors"
- "github.com/siddontang/go-mysql/canal"
- )
- var (
- errInvalidAction = errors.New("invalid rows action")
- errInvalidUpdate = errors.New("invalid update rows event")
- errBinlogFormat = errors.New("binlog format failed")
- )
- type producer interface {
- Rows(int64)
- Send(context.Context, string, interface{}) error
- Close()
- Name() string
- }
- type databusP struct {
- group, topic string
- *databus.Databus
- }
- func (d *databusP) Rows(b int64) {
- // ignore
- }
- func (d *databusP) Send(c context.Context, key string, data interface{}) error {
- return d.Databus.Send(c, key, data)
- }
- func (d *databusP) Name() string {
- return fmt.Sprintf("databus:group(%s)topic(%s)", d.group, d.topic)
- }
- func (d *databusP) Close() {
- d.Databus.Close()
- }
- // infocP infoc producer
- type infocP struct {
- taskID string
- *infoc.Infoc
- }
- // Rows rows
- func (i *infocP) Rows(b int64) {
- i.Infoc.Rows(b)
- }
- // Send send msg
- func (i *infocP) Send(c context.Context, key string, data interface{}) error {
- return i.Infoc.Send(c, key, data)
- }
- // Name infoc name
- func (i *infocP) Name() string {
- return fmt.Sprintf("infoc(%s)", i.taskID)
- }
- // Close close infoc
- func (i *infocP) Close() {
- i.Infoc.Flush()
- i.Infoc.Close()
- }
- // Target databus target
- type Target struct {
- producers []producer
- eventLen uint32
- events []chan *canal.RowsEvent
- db *conf.Database
- closed bool
- }
- // NewTarget new databus target
- func NewTarget(db *conf.Database) (t *Target) {
- t = &Target{
- db: db,
- eventLen: uint32(len(db.CTables)),
- }
- t.events = make([]chan *canal.RowsEvent, t.eventLen)
- if db.Databus != nil {
- t.producers = append(t.producers, &databusP{group: db.Databus.Group, topic: db.Databus.Topic, Databus: databus.New(db.Databus)})
- }
- if db.Infoc != nil {
- t.producers = append(t.producers, &infocP{taskID: db.Infoc.TaskID, Infoc: infoc.New(db.Infoc)})
- }
- for i := 0; i < int(t.eventLen); i++ {
- ch := make(chan *canal.RowsEvent, 1024)
- t.events[i] = ch
- go t.proc(ch)
- }
- return
- }
- // compare check if the binlog event is needed
- // check the table name and schame
- func (t *Target) compare(schame, table, action string) bool {
- if t.db.Schema == schame {
- for _, ctb := range t.db.CTables {
- for _, tb := range ctb.Tables {
- if table == tb {
- for _, act := range ctb.OmitAction {
- if act == action { // NOTE: omit action
- return false
- }
- }
- return true
- }
- }
- }
- }
- return false
- }
- // send send rows event into event chans
- // and hash by table%concurrency.
- func (t *Target) send(ev *canal.RowsEvent) {
- yu := crc32.ChecksumIEEE([]byte(ev.Table.Name))
- t.events[yu%t.eventLen] <- ev
- }
- func (t *Target) close() {
- for _, p := range t.producers {
- p.Close()
- }
- t.closed = true
- }
- // proc aync method for transfer the binlog data
- // when connection is bad, just refresh it with retry
- func (t *Target) proc(ch chan *canal.RowsEvent) {
- type pData struct {
- datas []*model.Data
- producer producer
- }
- var (
- err error
- normalDatas []*pData
- errorDatas []*pData
- ev *canal.RowsEvent
- )
- for {
- if t.closed {
- return
- }
- if len(errorDatas) != 0 {
- normalDatas = errorDatas
- errorDatas = errorDatas[0:0]
- time.Sleep(time.Second)
- } else {
- ev = <-ch
- var datas []*model.Data
- if datas, err = makeDatas(ev, t.db.TableMap); err != nil {
- log.Error("makeData(%v) error(%v)", ev, err)
- continue
- }
- normalDatas = normalDatas[0:0]
- for _, p := range t.producers {
- p.Rows(int64(len(datas)))
- normalDatas = append(normalDatas, &pData{datas: datas, producer: p})
- if stats != nil {
- stats.Incr("send_counter", p.Name(), ev.Table.Schema, tblReplacer.ReplaceAllString(ev.Table.Name, ""), ev.Action)
- }
- }
- }
- for _, pd := range normalDatas {
- var eDatas []*model.Data
- for _, data := range pd.datas {
- if err = pd.producer.Send(context.TODO(), data.Key, data); err != nil {
- // retry pub error data
- eDatas = append(eDatas, data)
- continue
- }
- log.Info("%s pub(key:%s, value:%+v) succeed", pd.producer.Name(), data.Key, data)
- }
- if len(eDatas) > 0 {
- errorDatas = append(errorDatas, &pData{datas: eDatas, producer: pd.producer})
- if stats != nil && ev != nil {
- stats.Incr("retry_counter", pd.producer.Name(), ev.Table.Schema, tblReplacer.ReplaceAllString(ev.Table.Name, ""), ev.Action)
- }
- log.Error("%s scheme(%s) pub fail,add to retry", pd.producer.Name(), ev.Table.Schema)
- }
- }
- }
- }
- // makeDatas parse the binlog event and return the model.Data struct
- // a little bit cautious about the binlog type
- // if the type is update:
- // the old value and new value will alternate appearing in the event.Rows
- func makeDatas(e *canal.RowsEvent, tbMap map[string]*conf.Addition) (datas []*model.Data, err error) {
- var (
- rowsLen = len(e.Rows)
- firstRowLen = len(e.Rows[0])
- lenCol = len(e.Table.Columns)
- )
- if rowsLen == 0 || firstRowLen == 0 || firstRowLen != lenCol {
- log.Error("rows length(%d) first row length(%d) columns length(%d)", rowsLen, firstRowLen, lenCol)
- err = errBinlogFormat
- return
- }
- datas = make([]*model.Data, 0, rowsLen)
- switch e.Action {
- case canal.InsertAction, canal.DeleteAction:
- for _, values := range e.Rows {
- var keys []string
- data := &model.Data{
- Action: e.Action,
- Table: e.Table.Name,
- // the first primary key as the kafka key
- Key: fmt.Sprint(values[0]),
- New: make(map[string]interface{}, lenCol),
- }
- for i, c := range e.Table.Columns {
- if c.IsUnsigned {
- values[i] = unsignIntCase(values[i])
- }
- if strings.Contains(c.RawType, "binary") {
- if bs, ok := values[i].(string); ok {
- values[i] = base64.StdEncoding.EncodeToString([]byte(bs))
- }
- }
- data.New[c.Name] = values[i]
- }
- // set kafka key and remove omit columns data
- addition, ok := tbMap[e.Table.Name]
- if ok {
- for _, omit := range addition.OmitField {
- delete(data.New, omit)
- }
- for _, primary := range addition.PrimaryKey {
- if _, ok := data.New[primary]; ok {
- keys = append(keys, fmt.Sprint(data.New[primary]))
- }
- }
- }
- if len(keys) != 0 {
- data.Key = strings.Join(keys, ",")
- }
- datas = append(datas, data)
- }
- case canal.UpdateAction:
- if rowsLen%2 != 0 {
- err = errInvalidUpdate
- return
- }
- for i := 0; i < rowsLen; i += 2 {
- var keys []string
- data := &model.Data{
- Action: e.Action,
- Table: e.Table.Name,
- // the first primary key as the kafka key
- Key: fmt.Sprint(e.Rows[i][0]),
- Old: make(map[string]interface{}, lenCol),
- New: make(map[string]interface{}, lenCol),
- }
- for j, c := range e.Table.Columns {
- if c.IsUnsigned {
- e.Rows[i][j] = unsignIntCase(e.Rows[i][j])
- e.Rows[i+1][j] = unsignIntCase(e.Rows[i+1][j])
- }
- if strings.Contains(c.RawType, "binary") {
- if bs, ok := e.Rows[i][j].(string); ok {
- e.Rows[i][j] = base64.StdEncoding.EncodeToString([]byte(bs))
- }
- if bs, ok := e.Rows[i+1][j].(string); ok {
- e.Rows[i+1][j] = base64.StdEncoding.EncodeToString([]byte(bs))
- }
- }
- data.Old[c.Name] = e.Rows[i][j]
- data.New[c.Name] = e.Rows[i+1][j]
- }
- // set kafka key and remove omit columns data
- addition, ok := tbMap[e.Table.Name]
- if ok {
- for _, omit := range addition.OmitField {
- delete(data.New, omit)
- delete(data.Old, omit)
- }
- for _, primary := range addition.PrimaryKey {
- if _, ok := data.New[primary]; ok {
- keys = append(keys, fmt.Sprint(data.New[primary]))
- }
- }
- }
- if len(keys) != 0 {
- data.Key = strings.Join(keys, ",")
- }
- datas = append(datas, data)
- }
- default:
- err = errInvalidAction
- }
- return
- }
- func unsignIntCase(i interface{}) (v interface{}) {
- switch si := i.(type) {
- case int8:
- v = uint8(si)
- case int16:
- v = uint16(si)
- case int32:
- v = uint32(si)
- case int64:
- v = uint64(si)
- default:
- v = i
- }
- return
- }
|