parser.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package replication
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "os"
  7. "sync/atomic"
  8. "github.com/juju/errors"
  9. )
  10. type BinlogParser struct {
  11. format *FormatDescriptionEvent
  12. tables map[uint64]*TableMapEvent
  13. // for rawMode, we only parse FormatDescriptionEvent and RotateEvent
  14. rawMode bool
  15. parseTime bool
  16. // used to start/stop processing
  17. stopProcessing uint32
  18. useDecimal bool
  19. }
  20. func NewBinlogParser() *BinlogParser {
  21. p := new(BinlogParser)
  22. p.tables = make(map[uint64]*TableMapEvent)
  23. // p.stop = make(uint32)
  24. return p
  25. }
  26. func (p *BinlogParser) Stop() {
  27. atomic.StoreUint32(&p.stopProcessing, 1)
  28. }
  29. func (p *BinlogParser) Resume() {
  30. atomic.StoreUint32(&p.stopProcessing, 0)
  31. }
  32. func (p *BinlogParser) Reset() {
  33. p.format = nil
  34. }
  35. type OnEventFunc func(*BinlogEvent) error
  36. func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) error {
  37. f, err := os.Open(name)
  38. if err != nil {
  39. return errors.Trace(err)
  40. }
  41. defer f.Close()
  42. b := make([]byte, 4)
  43. if _, err = f.Read(b); err != nil {
  44. return errors.Trace(err)
  45. } else if !bytes.Equal(b, BinLogFileHeader) {
  46. return errors.Errorf("%s is not a valid binlog file, head 4 bytes must fe'bin' ", name)
  47. }
  48. if offset < 4 {
  49. offset = 4
  50. } else if offset > 4 {
  51. // FORMAT_DESCRIPTION event should be read by default always (despite that fact passed offset may be higher than 4)
  52. if _, err = f.Seek(4, os.SEEK_SET); err != nil {
  53. return errors.Errorf("seek %s to %d error %v", name, offset, err)
  54. }
  55. p.getFormatDescriptionEvent(f, onEvent)
  56. }
  57. if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
  58. return errors.Errorf("seek %s to %d error %v", name, offset, err)
  59. }
  60. return p.ParseReader(f, onEvent)
  61. }
  62. func (p *BinlogParser) getFormatDescriptionEvent(r io.Reader, onEvent OnEventFunc) error {
  63. _, err := p.parseSingleEvent(&r, onEvent)
  64. return err
  65. }
  66. func (p *BinlogParser) parseSingleEvent(r *io.Reader, onEvent OnEventFunc) (bool, error) {
  67. var err error
  68. var n int64
  69. headBuf := make([]byte, EventHeaderSize)
  70. if _, err = io.ReadFull(*r, headBuf); err == io.EOF {
  71. return true, nil
  72. } else if err != nil {
  73. return false, errors.Trace(err)
  74. }
  75. var h *EventHeader
  76. h, err = p.parseHeader(headBuf)
  77. if err != nil {
  78. return false, errors.Trace(err)
  79. }
  80. if h.EventSize <= uint32(EventHeaderSize) {
  81. return false, errors.Errorf("invalid event header, event size is %d, too small", h.EventSize)
  82. }
  83. var buf bytes.Buffer
  84. if n, err = io.CopyN(&buf, *r, int64(h.EventSize)-int64(EventHeaderSize)); err != nil {
  85. return false, errors.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, EventHeaderSize, n)
  86. }
  87. data := buf.Bytes()
  88. rawData := data
  89. eventLen := int(h.EventSize) - EventHeaderSize
  90. if len(data) != eventLen {
  91. return false, errors.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen)
  92. }
  93. var e Event
  94. e, err = p.parseEvent(h, data)
  95. if err != nil {
  96. if _, ok := err.(errMissingTableMapEvent); ok {
  97. return false, nil
  98. }
  99. return false, errors.Trace(err)
  100. }
  101. if err = onEvent(&BinlogEvent{rawData, h, e}); err != nil {
  102. return false, errors.Trace(err)
  103. }
  104. return false, nil
  105. }
  106. func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error {
  107. for {
  108. if atomic.LoadUint32(&p.stopProcessing) == 1 {
  109. break
  110. }
  111. done, err := p.parseSingleEvent(&r, onEvent)
  112. if err != nil {
  113. if _, ok := err.(errMissingTableMapEvent); ok {
  114. continue
  115. }
  116. return errors.Trace(err)
  117. }
  118. if done {
  119. break
  120. }
  121. }
  122. return nil
  123. }
  124. func (p *BinlogParser) SetRawMode(mode bool) {
  125. p.rawMode = mode
  126. }
  127. func (p *BinlogParser) SetParseTime(parseTime bool) {
  128. p.parseTime = parseTime
  129. }
  130. func (p *BinlogParser) SetUseDecimal(useDecimal bool) {
  131. p.useDecimal = useDecimal
  132. }
  133. func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) {
  134. h := new(EventHeader)
  135. err := h.Decode(data)
  136. if err != nil {
  137. return nil, err
  138. }
  139. return h, nil
  140. }
  141. func (p *BinlogParser) parseEvent(h *EventHeader, data []byte) (Event, error) {
  142. var e Event
  143. if h.EventType == FORMAT_DESCRIPTION_EVENT {
  144. p.format = &FormatDescriptionEvent{}
  145. e = p.format
  146. } else {
  147. if p.format != nil && p.format.ChecksumAlgorithm == BINLOG_CHECKSUM_ALG_CRC32 {
  148. data = data[0 : len(data)-4]
  149. }
  150. if h.EventType == ROTATE_EVENT {
  151. e = &RotateEvent{}
  152. } else if !p.rawMode {
  153. switch h.EventType {
  154. case QUERY_EVENT:
  155. e = &QueryEvent{}
  156. case XID_EVENT:
  157. e = &XIDEvent{}
  158. case TABLE_MAP_EVENT:
  159. te := &TableMapEvent{}
  160. if p.format.EventTypeHeaderLengths[TABLE_MAP_EVENT-1] == 6 {
  161. te.tableIDSize = 4
  162. } else {
  163. te.tableIDSize = 6
  164. }
  165. e = te
  166. case WRITE_ROWS_EVENTv0,
  167. UPDATE_ROWS_EVENTv0,
  168. DELETE_ROWS_EVENTv0,
  169. WRITE_ROWS_EVENTv1,
  170. DELETE_ROWS_EVENTv1,
  171. UPDATE_ROWS_EVENTv1,
  172. WRITE_ROWS_EVENTv2,
  173. UPDATE_ROWS_EVENTv2,
  174. DELETE_ROWS_EVENTv2:
  175. e = p.newRowsEvent(h)
  176. case ROWS_QUERY_EVENT:
  177. e = &RowsQueryEvent{}
  178. case GTID_EVENT:
  179. e = &GTIDEvent{}
  180. case BEGIN_LOAD_QUERY_EVENT:
  181. e = &BeginLoadQueryEvent{}
  182. case EXECUTE_LOAD_QUERY_EVENT:
  183. e = &ExecuteLoadQueryEvent{}
  184. case MARIADB_ANNOTATE_ROWS_EVENT:
  185. e = &MariadbAnnotateRowsEvent{}
  186. case MARIADB_BINLOG_CHECKPOINT_EVENT:
  187. e = &MariadbBinlogCheckPointEvent{}
  188. case MARIADB_GTID_LIST_EVENT:
  189. e = &MariadbGTIDListEvent{}
  190. case MARIADB_GTID_EVENT:
  191. ee := &MariadbGTIDEvent{}
  192. ee.GTID.ServerID = h.ServerID
  193. e = ee
  194. default:
  195. e = &GenericEvent{}
  196. }
  197. } else {
  198. e = &GenericEvent{}
  199. }
  200. }
  201. if err := e.Decode(data); err != nil {
  202. return nil, &EventError{h, err.Error(), data}
  203. }
  204. if te, ok := e.(*TableMapEvent); ok {
  205. p.tables[te.TableID] = te
  206. }
  207. if re, ok := e.(*RowsEvent); ok {
  208. if (re.Flags & RowsEventStmtEndFlag) > 0 {
  209. // Refer https://github.com/alibaba/canal/blob/38cc81b7dab29b51371096fb6763ca3a8432ffee/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java#L176
  210. p.tables = make(map[uint64]*TableMapEvent)
  211. }
  212. }
  213. return e, nil
  214. }
  215. // Given the bytes for a a binary log event: return the decoded event.
  216. // With the exception of the FORMAT_DESCRIPTION_EVENT event type
  217. // there must have previously been passed a FORMAT_DESCRIPTION_EVENT
  218. // into the parser for this to work properly on any given event.
  219. // Passing a new FORMAT_DESCRIPTION_EVENT into the parser will replace
  220. // an existing one.
  221. func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error) {
  222. rawData := data
  223. h, err := p.parseHeader(data)
  224. if err != nil {
  225. return nil, err
  226. }
  227. data = data[EventHeaderSize:]
  228. eventLen := int(h.EventSize) - EventHeaderSize
  229. if len(data) != eventLen {
  230. return nil, fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen)
  231. }
  232. e, err := p.parseEvent(h, data)
  233. if err != nil {
  234. return nil, err
  235. }
  236. return &BinlogEvent{rawData, h, e}, nil
  237. }
  238. func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
  239. e := &RowsEvent{}
  240. if p.format.EventTypeHeaderLengths[h.EventType-1] == 6 {
  241. e.tableIDSize = 4
  242. } else {
  243. e.tableIDSize = 6
  244. }
  245. e.needBitmap2 = false
  246. e.tables = p.tables
  247. e.parseTime = p.parseTime
  248. e.useDecimal = p.useDecimal
  249. switch h.EventType {
  250. case WRITE_ROWS_EVENTv0:
  251. e.Version = 0
  252. case UPDATE_ROWS_EVENTv0:
  253. e.Version = 0
  254. case DELETE_ROWS_EVENTv0:
  255. e.Version = 0
  256. case WRITE_ROWS_EVENTv1:
  257. e.Version = 1
  258. case DELETE_ROWS_EVENTv1:
  259. e.Version = 1
  260. case UPDATE_ROWS_EVENTv1:
  261. e.Version = 1
  262. e.needBitmap2 = true
  263. case WRITE_ROWS_EVENTv2:
  264. e.Version = 2
  265. case UPDATE_ROWS_EVENTv2:
  266. e.Version = 2
  267. e.needBitmap2 = true
  268. case DELETE_ROWS_EVENTv2:
  269. e.Version = 2
  270. }
  271. return e
  272. }