123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- package replication
- import (
- "encoding/binary"
- //"encoding/hex"
- "fmt"
- "io"
- "strconv"
- "strings"
- "time"
- "unicode"
- "github.com/juju/errors"
- "github.com/satori/go.uuid"
- . "github.com/siddontang/go-mysql/mysql"
- )
- const (
- EventHeaderSize = 19
- SidLength = 16
- LogicalTimestampTypeCode = 2
- PartLogicalTimestampLength = 8
- )
- type BinlogEvent struct {
- // raw binlog data, including crc32 checksum if exists
- RawData []byte
- Header *EventHeader
- Event Event
- }
- func (e *BinlogEvent) Dump(w io.Writer) {
- e.Header.Dump(w)
- e.Event.Dump(w)
- }
- type Event interface {
- //Dump Event, format like python-mysql-replication
- Dump(w io.Writer)
- Decode(data []byte) error
- }
- type EventError struct {
- Header *EventHeader
- //Error message
- Err string
- //Event data
- Data []byte
- }
- func (e *EventError) Error() string {
- return e.Err
- }
- type EventHeader struct {
- Timestamp uint32
- EventType EventType
- ServerID uint32
- EventSize uint32
- LogPos uint32
- Flags uint16
- }
- func (h *EventHeader) Decode(data []byte) error {
- if len(data) < EventHeaderSize {
- return errors.Errorf("header size too short %d, must 19", len(data))
- }
- pos := 0
- h.Timestamp = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- h.EventType = EventType(data[pos])
- pos++
- h.ServerID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- h.EventSize = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- h.LogPos = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- h.Flags = binary.LittleEndian.Uint16(data[pos:])
- pos += 2
- if h.EventSize < uint32(EventHeaderSize) {
- return errors.Errorf("invalid event size %d, must >= 19", h.EventSize)
- }
- return nil
- }
- func (h *EventHeader) Dump(w io.Writer) {
- fmt.Fprintf(w, "=== %s ===\n", EventType(h.EventType))
- fmt.Fprintf(w, "Date: %s\n", time.Unix(int64(h.Timestamp), 0).Format(TimeFormat))
- fmt.Fprintf(w, "Log position: %d\n", h.LogPos)
- fmt.Fprintf(w, "Event size: %d\n", h.EventSize)
- }
- var (
- checksumVersionSplitMysql []int = []int{5, 6, 1}
- checksumVersionProductMysql int = (checksumVersionSplitMysql[0]*256+checksumVersionSplitMysql[1])*256 + checksumVersionSplitMysql[2]
- checksumVersionSplitMariaDB []int = []int{5, 3, 0}
- checksumVersionProductMariaDB int = (checksumVersionSplitMariaDB[0]*256+checksumVersionSplitMariaDB[1])*256 + checksumVersionSplitMariaDB[2]
- )
- // server version format X.Y.Zabc, a is not . or number
- func splitServerVersion(server string) []int {
- seps := strings.Split(server, ".")
- if len(seps) < 3 {
- return []int{0, 0, 0}
- }
- x, _ := strconv.Atoi(seps[0])
- y, _ := strconv.Atoi(seps[1])
- index := 0
- for i, c := range seps[2] {
- if !unicode.IsNumber(c) {
- index = i
- break
- }
- }
- z, _ := strconv.Atoi(seps[2][0:index])
- return []int{x, y, z}
- }
- func calcVersionProduct(server string) int {
- versionSplit := splitServerVersion(server)
- return ((versionSplit[0]*256+versionSplit[1])*256 + versionSplit[2])
- }
- type FormatDescriptionEvent struct {
- Version uint16
- //len = 50
- ServerVersion []byte
- CreateTimestamp uint32
- EventHeaderLength uint8
- EventTypeHeaderLengths []byte
- // 0 is off, 1 is for CRC32, 255 is undefined
- ChecksumAlgorithm byte
- }
- func (e *FormatDescriptionEvent) Decode(data []byte) error {
- pos := 0
- e.Version = binary.LittleEndian.Uint16(data[pos:])
- pos += 2
- e.ServerVersion = make([]byte, 50)
- copy(e.ServerVersion, data[pos:])
- pos += 50
- e.CreateTimestamp = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.EventHeaderLength = data[pos]
- pos++
- if e.EventHeaderLength != byte(EventHeaderSize) {
- return errors.Errorf("invalid event header length %d, must 19", e.EventHeaderLength)
- }
- server := string(e.ServerVersion)
- checksumProduct := checksumVersionProductMysql
- if strings.Contains(strings.ToLower(server), "mariadb") {
- checksumProduct = checksumVersionProductMariaDB
- }
- if calcVersionProduct(string(e.ServerVersion)) >= checksumProduct {
- // here, the last 5 bytes is 1 byte check sum alg type and 4 byte checksum if exists
- e.ChecksumAlgorithm = data[len(data)-5]
- e.EventTypeHeaderLengths = data[pos : len(data)-5]
- } else {
- e.ChecksumAlgorithm = BINLOG_CHECKSUM_ALG_UNDEF
- e.EventTypeHeaderLengths = data[pos:]
- }
- return nil
- }
- func (e *FormatDescriptionEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Version: %d\n", e.Version)
- fmt.Fprintf(w, "Server version: %s\n", e.ServerVersion)
- //fmt.Fprintf(w, "Create date: %s\n", time.Unix(int64(e.CreateTimestamp), 0).Format(TimeFormat))
- fmt.Fprintf(w, "Checksum algorithm: %d\n", e.ChecksumAlgorithm)
- //fmt.Fprintf(w, "Event header lengths: \n%s", hex.Dump(e.EventTypeHeaderLengths))
- fmt.Fprintln(w)
- }
- type RotateEvent struct {
- Position uint64
- NextLogName []byte
- }
- func (e *RotateEvent) Decode(data []byte) error {
- e.Position = binary.LittleEndian.Uint64(data[0:])
- e.NextLogName = data[8:]
- return nil
- }
- func (e *RotateEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Position: %d\n", e.Position)
- fmt.Fprintf(w, "Next log name: %s\n", e.NextLogName)
- fmt.Fprintln(w)
- }
- type XIDEvent struct {
- XID uint64
- // in fact XIDEvent dosen't have the GTIDSet information, just for beneficial to use
- GSet GTIDSet
- }
- func (e *XIDEvent) Decode(data []byte) error {
- e.XID = binary.LittleEndian.Uint64(data)
- return nil
- }
- func (e *XIDEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "XID: %d\n", e.XID)
- if e.GSet != nil {
- fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String())
- }
- fmt.Fprintln(w)
- }
- type QueryEvent struct {
- SlaveProxyID uint32
- ExecutionTime uint32
- ErrorCode uint16
- StatusVars []byte
- Schema []byte
- Query []byte
- // in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use
- GSet GTIDSet
- }
- func (e *QueryEvent) Decode(data []byte) error {
- pos := 0
- e.SlaveProxyID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.ExecutionTime = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- schemaLength := uint8(data[pos])
- pos++
- e.ErrorCode = binary.LittleEndian.Uint16(data[pos:])
- pos += 2
- statusVarsLength := binary.LittleEndian.Uint16(data[pos:])
- pos += 2
- e.StatusVars = data[pos : pos+int(statusVarsLength)]
- pos += int(statusVarsLength)
- e.Schema = data[pos : pos+int(schemaLength)]
- pos += int(schemaLength)
- //skip 0x00
- pos++
- e.Query = data[pos:]
- return nil
- }
- func (e *QueryEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Slave proxy ID: %d\n", e.SlaveProxyID)
- fmt.Fprintf(w, "Execution time: %d\n", e.ExecutionTime)
- fmt.Fprintf(w, "Error code: %d\n", e.ErrorCode)
- //fmt.Fprintf(w, "Status vars: \n%s", hex.Dump(e.StatusVars))
- fmt.Fprintf(w, "Schema: %s\n", e.Schema)
- fmt.Fprintf(w, "Query: %s\n", e.Query)
- if e.GSet != nil {
- fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String())
- }
- fmt.Fprintln(w)
- }
- type GTIDEvent struct {
- CommitFlag uint8
- SID []byte
- GNO int64
- LastCommitted int64
- SequenceNumber int64
- }
- func (e *GTIDEvent) Decode(data []byte) error {
- pos := 0
- e.CommitFlag = uint8(data[pos])
- pos++
- e.SID = data[pos : pos+SidLength]
- pos += SidLength
- e.GNO = int64(binary.LittleEndian.Uint64(data[pos:]))
- pos += 8
- if len(data) >= 42 {
- if uint8(data[pos]) == LogicalTimestampTypeCode {
- pos++
- e.LastCommitted = int64(binary.LittleEndian.Uint64(data[pos:]))
- pos += PartLogicalTimestampLength
- e.SequenceNumber = int64(binary.LittleEndian.Uint64(data[pos:]))
- }
- }
- return nil
- }
- func (e *GTIDEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag)
- u, _ := uuid.FromBytes(e.SID)
- fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO)
- fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted)
- fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber)
- fmt.Fprintln(w)
- }
- type BeginLoadQueryEvent struct {
- FileID uint32
- BlockData []byte
- }
- func (e *BeginLoadQueryEvent) Decode(data []byte) error {
- pos := 0
- e.FileID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.BlockData = data[pos:]
- return nil
- }
- func (e *BeginLoadQueryEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "File ID: %d\n", e.FileID)
- fmt.Fprintf(w, "Block data: %s\n", e.BlockData)
- fmt.Fprintln(w)
- }
- type ExecuteLoadQueryEvent struct {
- SlaveProxyID uint32
- ExecutionTime uint32
- SchemaLength uint8
- ErrorCode uint16
- StatusVars uint16
- FileID uint32
- StartPos uint32
- EndPos uint32
- DupHandlingFlags uint8
- }
- func (e *ExecuteLoadQueryEvent) Decode(data []byte) error {
- pos := 0
- e.SlaveProxyID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.ExecutionTime = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.SchemaLength = uint8(data[pos])
- pos++
- e.ErrorCode = binary.LittleEndian.Uint16(data[pos:])
- pos += 2
- e.StatusVars = binary.LittleEndian.Uint16(data[pos:])
- pos += 2
- e.FileID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.StartPos = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.EndPos = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.DupHandlingFlags = uint8(data[pos])
- return nil
- }
- func (e *ExecuteLoadQueryEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Slave proxy ID: %d\n", e.SlaveProxyID)
- fmt.Fprintf(w, "Execution time: %d\n", e.ExecutionTime)
- fmt.Fprintf(w, "Schame length: %d\n", e.SchemaLength)
- fmt.Fprintf(w, "Error code: %d\n", e.ErrorCode)
- fmt.Fprintf(w, "Status vars length: %d\n", e.StatusVars)
- fmt.Fprintf(w, "File ID: %d\n", e.FileID)
- fmt.Fprintf(w, "Start pos: %d\n", e.StartPos)
- fmt.Fprintf(w, "End pos: %d\n", e.EndPos)
- fmt.Fprintf(w, "Dup handling flags: %d\n", e.DupHandlingFlags)
- fmt.Fprintln(w)
- }
- // case MARIADB_ANNOTATE_ROWS_EVENT:
- // return "MariadbAnnotateRowsEvent"
- type MariadbAnnotateRowsEvent struct {
- Query []byte
- }
- func (e *MariadbAnnotateRowsEvent) Decode(data []byte) error {
- e.Query = data
- return nil
- }
- func (e *MariadbAnnotateRowsEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Query: %s\n", e.Query)
- fmt.Fprintln(w)
- }
- type MariadbBinlogCheckPointEvent struct {
- Info []byte
- }
- func (e *MariadbBinlogCheckPointEvent) Decode(data []byte) error {
- e.Info = data
- return nil
- }
- func (e *MariadbBinlogCheckPointEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Info: %s\n", e.Info)
- fmt.Fprintln(w)
- }
- type MariadbGTIDEvent struct {
- GTID MariadbGTID
- }
- func (e *MariadbGTIDEvent) Decode(data []byte) error {
- e.GTID.SequenceNumber = binary.LittleEndian.Uint64(data)
- e.GTID.DomainID = binary.LittleEndian.Uint32(data[8:])
- // we don't care commit id now, maybe later
- return nil
- }
- func (e *MariadbGTIDEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "GTID: %s\n", e.GTID)
- fmt.Fprintln(w)
- }
- type MariadbGTIDListEvent struct {
- GTIDs []MariadbGTID
- }
- func (e *MariadbGTIDListEvent) Decode(data []byte) error {
- pos := 0
- v := binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- count := v & uint32((1<<28)-1)
- e.GTIDs = make([]MariadbGTID, count)
- for i := uint32(0); i < count; i++ {
- e.GTIDs[i].DomainID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.GTIDs[i].ServerID = binary.LittleEndian.Uint32(data[pos:])
- pos += 4
- e.GTIDs[i].SequenceNumber = binary.LittleEndian.Uint64(data[pos:])
- }
- return nil
- }
- func (e *MariadbGTIDListEvent) Dump(w io.Writer) {
- fmt.Fprintf(w, "Lists: %v\n", e.GTIDs)
- fmt.Fprintln(w)
- }
|