event.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. package replication
  2. import (
  3. "encoding/binary"
  4. //"encoding/hex"
  5. "fmt"
  6. "io"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "unicode"
  11. "github.com/juju/errors"
  12. "github.com/satori/go.uuid"
  13. . "github.com/siddontang/go-mysql/mysql"
  14. )
  15. const (
  16. EventHeaderSize = 19
  17. SidLength = 16
  18. LogicalTimestampTypeCode = 2
  19. PartLogicalTimestampLength = 8
  20. )
  21. type BinlogEvent struct {
  22. // raw binlog data, including crc32 checksum if exists
  23. RawData []byte
  24. Header *EventHeader
  25. Event Event
  26. }
  27. func (e *BinlogEvent) Dump(w io.Writer) {
  28. e.Header.Dump(w)
  29. e.Event.Dump(w)
  30. }
  31. type Event interface {
  32. //Dump Event, format like python-mysql-replication
  33. Dump(w io.Writer)
  34. Decode(data []byte) error
  35. }
  36. type EventError struct {
  37. Header *EventHeader
  38. //Error message
  39. Err string
  40. //Event data
  41. Data []byte
  42. }
  43. func (e *EventError) Error() string {
  44. return e.Err
  45. }
  46. type EventHeader struct {
  47. Timestamp uint32
  48. EventType EventType
  49. ServerID uint32
  50. EventSize uint32
  51. LogPos uint32
  52. Flags uint16
  53. }
  54. func (h *EventHeader) Decode(data []byte) error {
  55. if len(data) < EventHeaderSize {
  56. return errors.Errorf("header size too short %d, must 19", len(data))
  57. }
  58. pos := 0
  59. h.Timestamp = binary.LittleEndian.Uint32(data[pos:])
  60. pos += 4
  61. h.EventType = EventType(data[pos])
  62. pos++
  63. h.ServerID = binary.LittleEndian.Uint32(data[pos:])
  64. pos += 4
  65. h.EventSize = binary.LittleEndian.Uint32(data[pos:])
  66. pos += 4
  67. h.LogPos = binary.LittleEndian.Uint32(data[pos:])
  68. pos += 4
  69. h.Flags = binary.LittleEndian.Uint16(data[pos:])
  70. pos += 2
  71. if h.EventSize < uint32(EventHeaderSize) {
  72. return errors.Errorf("invalid event size %d, must >= 19", h.EventSize)
  73. }
  74. return nil
  75. }
  76. func (h *EventHeader) Dump(w io.Writer) {
  77. fmt.Fprintf(w, "=== %s ===\n", EventType(h.EventType))
  78. fmt.Fprintf(w, "Date: %s\n", time.Unix(int64(h.Timestamp), 0).Format(TimeFormat))
  79. fmt.Fprintf(w, "Log position: %d\n", h.LogPos)
  80. fmt.Fprintf(w, "Event size: %d\n", h.EventSize)
  81. }
  82. var (
  83. checksumVersionSplitMysql []int = []int{5, 6, 1}
  84. checksumVersionProductMysql int = (checksumVersionSplitMysql[0]*256+checksumVersionSplitMysql[1])*256 + checksumVersionSplitMysql[2]
  85. checksumVersionSplitMariaDB []int = []int{5, 3, 0}
  86. checksumVersionProductMariaDB int = (checksumVersionSplitMariaDB[0]*256+checksumVersionSplitMariaDB[1])*256 + checksumVersionSplitMariaDB[2]
  87. )
  88. // server version format X.Y.Zabc, a is not . or number
  89. func splitServerVersion(server string) []int {
  90. seps := strings.Split(server, ".")
  91. if len(seps) < 3 {
  92. return []int{0, 0, 0}
  93. }
  94. x, _ := strconv.Atoi(seps[0])
  95. y, _ := strconv.Atoi(seps[1])
  96. index := 0
  97. for i, c := range seps[2] {
  98. if !unicode.IsNumber(c) {
  99. index = i
  100. break
  101. }
  102. }
  103. z, _ := strconv.Atoi(seps[2][0:index])
  104. return []int{x, y, z}
  105. }
  106. func calcVersionProduct(server string) int {
  107. versionSplit := splitServerVersion(server)
  108. return ((versionSplit[0]*256+versionSplit[1])*256 + versionSplit[2])
  109. }
  110. type FormatDescriptionEvent struct {
  111. Version uint16
  112. //len = 50
  113. ServerVersion []byte
  114. CreateTimestamp uint32
  115. EventHeaderLength uint8
  116. EventTypeHeaderLengths []byte
  117. // 0 is off, 1 is for CRC32, 255 is undefined
  118. ChecksumAlgorithm byte
  119. }
  120. func (e *FormatDescriptionEvent) Decode(data []byte) error {
  121. pos := 0
  122. e.Version = binary.LittleEndian.Uint16(data[pos:])
  123. pos += 2
  124. e.ServerVersion = make([]byte, 50)
  125. copy(e.ServerVersion, data[pos:])
  126. pos += 50
  127. e.CreateTimestamp = binary.LittleEndian.Uint32(data[pos:])
  128. pos += 4
  129. e.EventHeaderLength = data[pos]
  130. pos++
  131. if e.EventHeaderLength != byte(EventHeaderSize) {
  132. return errors.Errorf("invalid event header length %d, must 19", e.EventHeaderLength)
  133. }
  134. server := string(e.ServerVersion)
  135. checksumProduct := checksumVersionProductMysql
  136. if strings.Contains(strings.ToLower(server), "mariadb") {
  137. checksumProduct = checksumVersionProductMariaDB
  138. }
  139. if calcVersionProduct(string(e.ServerVersion)) >= checksumProduct {
  140. // here, the last 5 bytes is 1 byte check sum alg type and 4 byte checksum if exists
  141. e.ChecksumAlgorithm = data[len(data)-5]
  142. e.EventTypeHeaderLengths = data[pos : len(data)-5]
  143. } else {
  144. e.ChecksumAlgorithm = BINLOG_CHECKSUM_ALG_UNDEF
  145. e.EventTypeHeaderLengths = data[pos:]
  146. }
  147. return nil
  148. }
  149. func (e *FormatDescriptionEvent) Dump(w io.Writer) {
  150. fmt.Fprintf(w, "Version: %d\n", e.Version)
  151. fmt.Fprintf(w, "Server version: %s\n", e.ServerVersion)
  152. //fmt.Fprintf(w, "Create date: %s\n", time.Unix(int64(e.CreateTimestamp), 0).Format(TimeFormat))
  153. fmt.Fprintf(w, "Checksum algorithm: %d\n", e.ChecksumAlgorithm)
  154. //fmt.Fprintf(w, "Event header lengths: \n%s", hex.Dump(e.EventTypeHeaderLengths))
  155. fmt.Fprintln(w)
  156. }
  157. type RotateEvent struct {
  158. Position uint64
  159. NextLogName []byte
  160. }
  161. func (e *RotateEvent) Decode(data []byte) error {
  162. e.Position = binary.LittleEndian.Uint64(data[0:])
  163. e.NextLogName = data[8:]
  164. return nil
  165. }
  166. func (e *RotateEvent) Dump(w io.Writer) {
  167. fmt.Fprintf(w, "Position: %d\n", e.Position)
  168. fmt.Fprintf(w, "Next log name: %s\n", e.NextLogName)
  169. fmt.Fprintln(w)
  170. }
  171. type XIDEvent struct {
  172. XID uint64
  173. // in fact XIDEvent dosen't have the GTIDSet information, just for beneficial to use
  174. GSet GTIDSet
  175. }
  176. func (e *XIDEvent) Decode(data []byte) error {
  177. e.XID = binary.LittleEndian.Uint64(data)
  178. return nil
  179. }
  180. func (e *XIDEvent) Dump(w io.Writer) {
  181. fmt.Fprintf(w, "XID: %d\n", e.XID)
  182. if e.GSet != nil {
  183. fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String())
  184. }
  185. fmt.Fprintln(w)
  186. }
  187. type QueryEvent struct {
  188. SlaveProxyID uint32
  189. ExecutionTime uint32
  190. ErrorCode uint16
  191. StatusVars []byte
  192. Schema []byte
  193. Query []byte
  194. // in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use
  195. GSet GTIDSet
  196. }
  197. func (e *QueryEvent) Decode(data []byte) error {
  198. pos := 0
  199. e.SlaveProxyID = binary.LittleEndian.Uint32(data[pos:])
  200. pos += 4
  201. e.ExecutionTime = binary.LittleEndian.Uint32(data[pos:])
  202. pos += 4
  203. schemaLength := uint8(data[pos])
  204. pos++
  205. e.ErrorCode = binary.LittleEndian.Uint16(data[pos:])
  206. pos += 2
  207. statusVarsLength := binary.LittleEndian.Uint16(data[pos:])
  208. pos += 2
  209. e.StatusVars = data[pos : pos+int(statusVarsLength)]
  210. pos += int(statusVarsLength)
  211. e.Schema = data[pos : pos+int(schemaLength)]
  212. pos += int(schemaLength)
  213. //skip 0x00
  214. pos++
  215. e.Query = data[pos:]
  216. return nil
  217. }
  218. func (e *QueryEvent) Dump(w io.Writer) {
  219. fmt.Fprintf(w, "Slave proxy ID: %d\n", e.SlaveProxyID)
  220. fmt.Fprintf(w, "Execution time: %d\n", e.ExecutionTime)
  221. fmt.Fprintf(w, "Error code: %d\n", e.ErrorCode)
  222. //fmt.Fprintf(w, "Status vars: \n%s", hex.Dump(e.StatusVars))
  223. fmt.Fprintf(w, "Schema: %s\n", e.Schema)
  224. fmt.Fprintf(w, "Query: %s\n", e.Query)
  225. if e.GSet != nil {
  226. fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String())
  227. }
  228. fmt.Fprintln(w)
  229. }
  230. type GTIDEvent struct {
  231. CommitFlag uint8
  232. SID []byte
  233. GNO int64
  234. LastCommitted int64
  235. SequenceNumber int64
  236. }
  237. func (e *GTIDEvent) Decode(data []byte) error {
  238. pos := 0
  239. e.CommitFlag = uint8(data[pos])
  240. pos++
  241. e.SID = data[pos : pos+SidLength]
  242. pos += SidLength
  243. e.GNO = int64(binary.LittleEndian.Uint64(data[pos:]))
  244. pos += 8
  245. if len(data) >= 42 {
  246. if uint8(data[pos]) == LogicalTimestampTypeCode {
  247. pos++
  248. e.LastCommitted = int64(binary.LittleEndian.Uint64(data[pos:]))
  249. pos += PartLogicalTimestampLength
  250. e.SequenceNumber = int64(binary.LittleEndian.Uint64(data[pos:]))
  251. }
  252. }
  253. return nil
  254. }
  255. func (e *GTIDEvent) Dump(w io.Writer) {
  256. fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag)
  257. u, _ := uuid.FromBytes(e.SID)
  258. fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO)
  259. fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted)
  260. fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber)
  261. fmt.Fprintln(w)
  262. }
  263. type BeginLoadQueryEvent struct {
  264. FileID uint32
  265. BlockData []byte
  266. }
  267. func (e *BeginLoadQueryEvent) Decode(data []byte) error {
  268. pos := 0
  269. e.FileID = binary.LittleEndian.Uint32(data[pos:])
  270. pos += 4
  271. e.BlockData = data[pos:]
  272. return nil
  273. }
  274. func (e *BeginLoadQueryEvent) Dump(w io.Writer) {
  275. fmt.Fprintf(w, "File ID: %d\n", e.FileID)
  276. fmt.Fprintf(w, "Block data: %s\n", e.BlockData)
  277. fmt.Fprintln(w)
  278. }
  279. type ExecuteLoadQueryEvent struct {
  280. SlaveProxyID uint32
  281. ExecutionTime uint32
  282. SchemaLength uint8
  283. ErrorCode uint16
  284. StatusVars uint16
  285. FileID uint32
  286. StartPos uint32
  287. EndPos uint32
  288. DupHandlingFlags uint8
  289. }
  290. func (e *ExecuteLoadQueryEvent) Decode(data []byte) error {
  291. pos := 0
  292. e.SlaveProxyID = binary.LittleEndian.Uint32(data[pos:])
  293. pos += 4
  294. e.ExecutionTime = binary.LittleEndian.Uint32(data[pos:])
  295. pos += 4
  296. e.SchemaLength = uint8(data[pos])
  297. pos++
  298. e.ErrorCode = binary.LittleEndian.Uint16(data[pos:])
  299. pos += 2
  300. e.StatusVars = binary.LittleEndian.Uint16(data[pos:])
  301. pos += 2
  302. e.FileID = binary.LittleEndian.Uint32(data[pos:])
  303. pos += 4
  304. e.StartPos = binary.LittleEndian.Uint32(data[pos:])
  305. pos += 4
  306. e.EndPos = binary.LittleEndian.Uint32(data[pos:])
  307. pos += 4
  308. e.DupHandlingFlags = uint8(data[pos])
  309. return nil
  310. }
  311. func (e *ExecuteLoadQueryEvent) Dump(w io.Writer) {
  312. fmt.Fprintf(w, "Slave proxy ID: %d\n", e.SlaveProxyID)
  313. fmt.Fprintf(w, "Execution time: %d\n", e.ExecutionTime)
  314. fmt.Fprintf(w, "Schame length: %d\n", e.SchemaLength)
  315. fmt.Fprintf(w, "Error code: %d\n", e.ErrorCode)
  316. fmt.Fprintf(w, "Status vars length: %d\n", e.StatusVars)
  317. fmt.Fprintf(w, "File ID: %d\n", e.FileID)
  318. fmt.Fprintf(w, "Start pos: %d\n", e.StartPos)
  319. fmt.Fprintf(w, "End pos: %d\n", e.EndPos)
  320. fmt.Fprintf(w, "Dup handling flags: %d\n", e.DupHandlingFlags)
  321. fmt.Fprintln(w)
  322. }
  323. // case MARIADB_ANNOTATE_ROWS_EVENT:
  324. // return "MariadbAnnotateRowsEvent"
  325. type MariadbAnnotateRowsEvent struct {
  326. Query []byte
  327. }
  328. func (e *MariadbAnnotateRowsEvent) Decode(data []byte) error {
  329. e.Query = data
  330. return nil
  331. }
  332. func (e *MariadbAnnotateRowsEvent) Dump(w io.Writer) {
  333. fmt.Fprintf(w, "Query: %s\n", e.Query)
  334. fmt.Fprintln(w)
  335. }
  336. type MariadbBinlogCheckPointEvent struct {
  337. Info []byte
  338. }
  339. func (e *MariadbBinlogCheckPointEvent) Decode(data []byte) error {
  340. e.Info = data
  341. return nil
  342. }
  343. func (e *MariadbBinlogCheckPointEvent) Dump(w io.Writer) {
  344. fmt.Fprintf(w, "Info: %s\n", e.Info)
  345. fmt.Fprintln(w)
  346. }
  347. type MariadbGTIDEvent struct {
  348. GTID MariadbGTID
  349. }
  350. func (e *MariadbGTIDEvent) Decode(data []byte) error {
  351. e.GTID.SequenceNumber = binary.LittleEndian.Uint64(data)
  352. e.GTID.DomainID = binary.LittleEndian.Uint32(data[8:])
  353. // we don't care commit id now, maybe later
  354. return nil
  355. }
  356. func (e *MariadbGTIDEvent) Dump(w io.Writer) {
  357. fmt.Fprintf(w, "GTID: %s\n", e.GTID)
  358. fmt.Fprintln(w)
  359. }
  360. type MariadbGTIDListEvent struct {
  361. GTIDs []MariadbGTID
  362. }
  363. func (e *MariadbGTIDListEvent) Decode(data []byte) error {
  364. pos := 0
  365. v := binary.LittleEndian.Uint32(data[pos:])
  366. pos += 4
  367. count := v & uint32((1<<28)-1)
  368. e.GTIDs = make([]MariadbGTID, count)
  369. for i := uint32(0); i < count; i++ {
  370. e.GTIDs[i].DomainID = binary.LittleEndian.Uint32(data[pos:])
  371. pos += 4
  372. e.GTIDs[i].ServerID = binary.LittleEndian.Uint32(data[pos:])
  373. pos += 4
  374. e.GTIDs[i].SequenceNumber = binary.LittleEndian.Uint64(data[pos:])
  375. }
  376. return nil
  377. }
  378. func (e *MariadbGTIDListEvent) Dump(w io.Writer) {
  379. fmt.Fprintf(w, "Lists: %v\n", e.GTIDs)
  380. fmt.Fprintln(w)
  381. }