target.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. package service
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "hash/crc32"
  7. "strings"
  8. "time"
  9. "go-common/app/infra/canal/conf"
  10. "go-common/app/infra/canal/infoc"
  11. "go-common/app/infra/canal/model"
  12. "go-common/library/log"
  13. "go-common/library/queue/databus"
  14. "github.com/pkg/errors"
  15. "github.com/siddontang/go-mysql/canal"
  16. )
  17. var (
  18. errInvalidAction = errors.New("invalid rows action")
  19. errInvalidUpdate = errors.New("invalid update rows event")
  20. errBinlogFormat = errors.New("binlog format failed")
  21. )
  22. type producer interface {
  23. Rows(int64)
  24. Send(context.Context, string, interface{}) error
  25. Close()
  26. Name() string
  27. }
  28. type databusP struct {
  29. group, topic string
  30. *databus.Databus
  31. }
  32. func (d *databusP) Rows(b int64) {
  33. // ignore
  34. }
  35. func (d *databusP) Send(c context.Context, key string, data interface{}) error {
  36. return d.Databus.Send(c, key, data)
  37. }
  38. func (d *databusP) Name() string {
  39. return fmt.Sprintf("databus:group(%s)topic(%s)", d.group, d.topic)
  40. }
  41. func (d *databusP) Close() {
  42. d.Databus.Close()
  43. }
  44. // infocP infoc producer
  45. type infocP struct {
  46. taskID string
  47. *infoc.Infoc
  48. }
  49. // Rows rows
  50. func (i *infocP) Rows(b int64) {
  51. i.Infoc.Rows(b)
  52. }
  53. // Send send msg
  54. func (i *infocP) Send(c context.Context, key string, data interface{}) error {
  55. return i.Infoc.Send(c, key, data)
  56. }
  57. // Name infoc name
  58. func (i *infocP) Name() string {
  59. return fmt.Sprintf("infoc(%s)", i.taskID)
  60. }
  61. // Close close infoc
  62. func (i *infocP) Close() {
  63. i.Infoc.Flush()
  64. i.Infoc.Close()
  65. }
  66. // Target databus target
  67. type Target struct {
  68. producers []producer
  69. eventLen uint32
  70. events []chan *canal.RowsEvent
  71. db *conf.Database
  72. closed bool
  73. }
  74. // NewTarget new databus target
  75. func NewTarget(db *conf.Database) (t *Target) {
  76. t = &Target{
  77. db: db,
  78. eventLen: uint32(len(db.CTables)),
  79. }
  80. t.events = make([]chan *canal.RowsEvent, t.eventLen)
  81. if db.Databus != nil {
  82. t.producers = append(t.producers, &databusP{group: db.Databus.Group, topic: db.Databus.Topic, Databus: databus.New(db.Databus)})
  83. }
  84. if db.Infoc != nil {
  85. t.producers = append(t.producers, &infocP{taskID: db.Infoc.TaskID, Infoc: infoc.New(db.Infoc)})
  86. }
  87. for i := 0; i < int(t.eventLen); i++ {
  88. ch := make(chan *canal.RowsEvent, 1024)
  89. t.events[i] = ch
  90. go t.proc(ch)
  91. }
  92. return
  93. }
  94. // compare check if the binlog event is needed
  95. // check the table name and schame
  96. func (t *Target) compare(schame, table, action string) bool {
  97. if t.db.Schema == schame {
  98. for _, ctb := range t.db.CTables {
  99. for _, tb := range ctb.Tables {
  100. if table == tb {
  101. for _, act := range ctb.OmitAction {
  102. if act == action { // NOTE: omit action
  103. return false
  104. }
  105. }
  106. return true
  107. }
  108. }
  109. }
  110. }
  111. return false
  112. }
  113. // send send rows event into event chans
  114. // and hash by table%concurrency.
  115. func (t *Target) send(ev *canal.RowsEvent) {
  116. yu := crc32.ChecksumIEEE([]byte(ev.Table.Name))
  117. t.events[yu%t.eventLen] <- ev
  118. }
  119. func (t *Target) close() {
  120. for _, p := range t.producers {
  121. p.Close()
  122. }
  123. t.closed = true
  124. }
  125. // proc aync method for transfer the binlog data
  126. // when connection is bad, just refresh it with retry
  127. func (t *Target) proc(ch chan *canal.RowsEvent) {
  128. type pData struct {
  129. datas []*model.Data
  130. producer producer
  131. }
  132. var (
  133. err error
  134. normalDatas []*pData
  135. errorDatas []*pData
  136. ev *canal.RowsEvent
  137. )
  138. for {
  139. if t.closed {
  140. return
  141. }
  142. if len(errorDatas) != 0 {
  143. normalDatas = errorDatas
  144. errorDatas = errorDatas[0:0]
  145. time.Sleep(time.Second)
  146. } else {
  147. ev = <-ch
  148. var datas []*model.Data
  149. if datas, err = makeDatas(ev, t.db.TableMap); err != nil {
  150. log.Error("makeData(%v) error(%v)", ev, err)
  151. continue
  152. }
  153. normalDatas = normalDatas[0:0]
  154. for _, p := range t.producers {
  155. p.Rows(int64(len(datas)))
  156. normalDatas = append(normalDatas, &pData{datas: datas, producer: p})
  157. if stats != nil {
  158. stats.Incr("send_counter", p.Name(), ev.Table.Schema, tblReplacer.ReplaceAllString(ev.Table.Name, ""), ev.Action)
  159. }
  160. }
  161. }
  162. for _, pd := range normalDatas {
  163. var eDatas []*model.Data
  164. for _, data := range pd.datas {
  165. if err = pd.producer.Send(context.TODO(), data.Key, data); err != nil {
  166. // retry pub error data
  167. eDatas = append(eDatas, data)
  168. continue
  169. }
  170. log.Info("%s pub(key:%s, value:%+v) succeed", pd.producer.Name(), data.Key, data)
  171. }
  172. if len(eDatas) > 0 {
  173. errorDatas = append(errorDatas, &pData{datas: eDatas, producer: pd.producer})
  174. if stats != nil && ev != nil {
  175. stats.Incr("retry_counter", pd.producer.Name(), ev.Table.Schema, tblReplacer.ReplaceAllString(ev.Table.Name, ""), ev.Action)
  176. }
  177. log.Error("%s scheme(%s) pub fail,add to retry", pd.producer.Name(), ev.Table.Schema)
  178. }
  179. }
  180. }
  181. }
  182. // makeDatas parse the binlog event and return the model.Data struct
  183. // a little bit cautious about the binlog type
  184. // if the type is update:
  185. // the old value and new value will alternate appearing in the event.Rows
  186. func makeDatas(e *canal.RowsEvent, tbMap map[string]*conf.Addition) (datas []*model.Data, err error) {
  187. var (
  188. rowsLen = len(e.Rows)
  189. firstRowLen = len(e.Rows[0])
  190. lenCol = len(e.Table.Columns)
  191. )
  192. if rowsLen == 0 || firstRowLen == 0 || firstRowLen != lenCol {
  193. log.Error("rows length(%d) first row length(%d) columns length(%d)", rowsLen, firstRowLen, lenCol)
  194. err = errBinlogFormat
  195. return
  196. }
  197. datas = make([]*model.Data, 0, rowsLen)
  198. switch e.Action {
  199. case canal.InsertAction, canal.DeleteAction:
  200. for _, values := range e.Rows {
  201. var keys []string
  202. data := &model.Data{
  203. Action: e.Action,
  204. Table: e.Table.Name,
  205. // the first primary key as the kafka key
  206. Key: fmt.Sprint(values[0]),
  207. New: make(map[string]interface{}, lenCol),
  208. }
  209. for i, c := range e.Table.Columns {
  210. if c.IsUnsigned {
  211. values[i] = unsignIntCase(values[i])
  212. }
  213. if strings.Contains(c.RawType, "binary") {
  214. if bs, ok := values[i].(string); ok {
  215. values[i] = base64.StdEncoding.EncodeToString([]byte(bs))
  216. }
  217. }
  218. data.New[c.Name] = values[i]
  219. }
  220. // set kafka key and remove omit columns data
  221. addition, ok := tbMap[e.Table.Name]
  222. if ok {
  223. for _, omit := range addition.OmitField {
  224. delete(data.New, omit)
  225. }
  226. for _, primary := range addition.PrimaryKey {
  227. if _, ok := data.New[primary]; ok {
  228. keys = append(keys, fmt.Sprint(data.New[primary]))
  229. }
  230. }
  231. }
  232. if len(keys) != 0 {
  233. data.Key = strings.Join(keys, ",")
  234. }
  235. datas = append(datas, data)
  236. }
  237. case canal.UpdateAction:
  238. if rowsLen%2 != 0 {
  239. err = errInvalidUpdate
  240. return
  241. }
  242. for i := 0; i < rowsLen; i += 2 {
  243. var keys []string
  244. data := &model.Data{
  245. Action: e.Action,
  246. Table: e.Table.Name,
  247. // the first primary key as the kafka key
  248. Key: fmt.Sprint(e.Rows[i][0]),
  249. Old: make(map[string]interface{}, lenCol),
  250. New: make(map[string]interface{}, lenCol),
  251. }
  252. for j, c := range e.Table.Columns {
  253. if c.IsUnsigned {
  254. e.Rows[i][j] = unsignIntCase(e.Rows[i][j])
  255. e.Rows[i+1][j] = unsignIntCase(e.Rows[i+1][j])
  256. }
  257. if strings.Contains(c.RawType, "binary") {
  258. if bs, ok := e.Rows[i][j].(string); ok {
  259. e.Rows[i][j] = base64.StdEncoding.EncodeToString([]byte(bs))
  260. }
  261. if bs, ok := e.Rows[i+1][j].(string); ok {
  262. e.Rows[i+1][j] = base64.StdEncoding.EncodeToString([]byte(bs))
  263. }
  264. }
  265. data.Old[c.Name] = e.Rows[i][j]
  266. data.New[c.Name] = e.Rows[i+1][j]
  267. }
  268. // set kafka key and remove omit columns data
  269. addition, ok := tbMap[e.Table.Name]
  270. if ok {
  271. for _, omit := range addition.OmitField {
  272. delete(data.New, omit)
  273. delete(data.Old, omit)
  274. }
  275. for _, primary := range addition.PrimaryKey {
  276. if _, ok := data.New[primary]; ok {
  277. keys = append(keys, fmt.Sprint(data.New[primary]))
  278. }
  279. }
  280. }
  281. if len(keys) != 0 {
  282. data.Key = strings.Join(keys, ",")
  283. }
  284. datas = append(datas, data)
  285. }
  286. default:
  287. err = errInvalidAction
  288. }
  289. return
  290. }
  291. func unsignIntCase(i interface{}) (v interface{}) {
  292. switch si := i.(type) {
  293. case int8:
  294. v = uint8(si)
  295. case int16:
  296. v = uint16(si)
  297. case int32:
  298. v = uint32(si)
  299. case int64:
  300. v = uint64(si)
  301. default:
  302. v = i
  303. }
  304. return
  305. }