mysql_gtid.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. package mysql
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "github.com/juju/errors"
  11. "github.com/satori/go.uuid"
  12. "github.com/siddontang/go/hack"
  13. )
  14. // Like MySQL GTID Interval struct, [start, stop), left closed and right open
  15. // See MySQL rpl_gtid.h
  16. type Interval struct {
  17. // The first GID of this interval.
  18. Start int64
  19. // The first GID after this interval.
  20. Stop int64
  21. }
  22. // Interval is [start, stop), but the GTID string's format is [n] or [n1-n2], closed interval
  23. func parseInterval(str string) (i Interval, err error) {
  24. p := strings.Split(str, "-")
  25. switch len(p) {
  26. case 1:
  27. i.Start, err = strconv.ParseInt(p[0], 10, 64)
  28. i.Stop = i.Start + 1
  29. case 2:
  30. i.Start, err = strconv.ParseInt(p[0], 10, 64)
  31. i.Stop, err = strconv.ParseInt(p[1], 10, 64)
  32. i.Stop = i.Stop + 1
  33. default:
  34. err = errors.Errorf("invalid interval format, must n[-n]")
  35. }
  36. if err != nil {
  37. return
  38. }
  39. if i.Stop <= i.Start {
  40. err = errors.Errorf("invalid interval format, must n[-n] and the end must >= start")
  41. }
  42. return
  43. }
  44. func (i Interval) String() string {
  45. if i.Stop == i.Start+1 {
  46. return fmt.Sprintf("%d", i.Start)
  47. } else {
  48. return fmt.Sprintf("%d-%d", i.Start, i.Stop-1)
  49. }
  50. }
  51. type IntervalSlice []Interval
  52. func (s IntervalSlice) Len() int {
  53. return len(s)
  54. }
  55. func (s IntervalSlice) Less(i, j int) bool {
  56. if s[i].Start < s[j].Start {
  57. return true
  58. } else if s[i].Start > s[j].Start {
  59. return false
  60. } else {
  61. return s[i].Stop < s[j].Stop
  62. }
  63. }
  64. func (s IntervalSlice) Swap(i, j int) {
  65. s[i], s[j] = s[j], s[i]
  66. }
  67. func (s IntervalSlice) Sort() {
  68. sort.Sort(s)
  69. }
  70. func (s IntervalSlice) Normalize() IntervalSlice {
  71. var n IntervalSlice
  72. if len(s) == 0 {
  73. return n
  74. }
  75. s.Sort()
  76. n = append(n, s[0])
  77. for i := 1; i < len(s); i++ {
  78. last := n[len(n)-1]
  79. if s[i].Start > last.Stop {
  80. n = append(n, s[i])
  81. continue
  82. } else {
  83. stop := s[i].Stop
  84. if last.Stop > stop {
  85. stop = last.Stop
  86. }
  87. n[len(n)-1] = Interval{last.Start, stop}
  88. }
  89. }
  90. return n
  91. }
  92. // Return true if sub in s
  93. func (s IntervalSlice) Contain(sub IntervalSlice) bool {
  94. j := 0
  95. for i := 0; i < len(sub); i++ {
  96. for ; j < len(s); j++ {
  97. if sub[i].Start > s[j].Stop {
  98. continue
  99. } else {
  100. break
  101. }
  102. }
  103. if j == len(s) {
  104. return false
  105. }
  106. if sub[i].Start < s[j].Start || sub[i].Stop > s[j].Stop {
  107. return false
  108. }
  109. }
  110. return true
  111. }
  112. func (s IntervalSlice) Equal(o IntervalSlice) bool {
  113. if len(s) != len(o) {
  114. return false
  115. }
  116. for i := 0; i < len(s); i++ {
  117. if s[i].Start != o[i].Start || s[i].Stop != o[i].Stop {
  118. return false
  119. }
  120. }
  121. return true
  122. }
  123. func (s IntervalSlice) Compare(o IntervalSlice) int {
  124. if s.Equal(o) {
  125. return 0
  126. } else if s.Contain(o) {
  127. return 1
  128. } else {
  129. return -1
  130. }
  131. }
  132. // Refer http://dev.mysql.com/doc/refman/5.6/en/replication-gtids-concepts.html
  133. type UUIDSet struct {
  134. SID uuid.UUID
  135. Intervals IntervalSlice
  136. }
  137. func ParseUUIDSet(str string) (*UUIDSet, error) {
  138. str = strings.TrimSpace(str)
  139. sep := strings.Split(str, ":")
  140. if len(sep) < 2 {
  141. return nil, errors.Errorf("invalid GTID format, must UUID:interval[:interval]")
  142. }
  143. var err error
  144. s := new(UUIDSet)
  145. if s.SID, err = uuid.FromString(sep[0]); err != nil {
  146. return nil, errors.Trace(err)
  147. }
  148. // Handle interval
  149. for i := 1; i < len(sep); i++ {
  150. if in, err := parseInterval(sep[i]); err != nil {
  151. return nil, errors.Trace(err)
  152. } else {
  153. s.Intervals = append(s.Intervals, in)
  154. }
  155. }
  156. s.Intervals = s.Intervals.Normalize()
  157. return s, nil
  158. }
  159. func NewUUIDSet(sid uuid.UUID, in ...Interval) *UUIDSet {
  160. s := new(UUIDSet)
  161. s.SID = sid
  162. s.Intervals = in
  163. s.Intervals = s.Intervals.Normalize()
  164. return s
  165. }
  166. func (s *UUIDSet) Contain(sub *UUIDSet) bool {
  167. if !bytes.Equal(s.SID.Bytes(), sub.SID.Bytes()) {
  168. return false
  169. }
  170. return s.Intervals.Contain(sub.Intervals)
  171. }
  172. func (s *UUIDSet) Bytes() []byte {
  173. var buf bytes.Buffer
  174. buf.WriteString(s.SID.String())
  175. for _, i := range s.Intervals {
  176. buf.WriteString(":")
  177. buf.WriteString(i.String())
  178. }
  179. return buf.Bytes()
  180. }
  181. func (s *UUIDSet) AddInterval(in IntervalSlice) {
  182. s.Intervals = append(s.Intervals, in...)
  183. s.Intervals = s.Intervals.Normalize()
  184. }
  185. func (s *UUIDSet) String() string {
  186. return hack.String(s.Bytes())
  187. }
  188. func (s *UUIDSet) encode(w io.Writer) {
  189. w.Write(s.SID.Bytes())
  190. n := int64(len(s.Intervals))
  191. binary.Write(w, binary.LittleEndian, n)
  192. for _, i := range s.Intervals {
  193. binary.Write(w, binary.LittleEndian, i.Start)
  194. binary.Write(w, binary.LittleEndian, i.Stop)
  195. }
  196. }
  197. func (s *UUIDSet) Encode() []byte {
  198. var buf bytes.Buffer
  199. s.encode(&buf)
  200. return buf.Bytes()
  201. }
  202. func (s *UUIDSet) decode(data []byte) (int, error) {
  203. if len(data) < 24 {
  204. return 0, errors.Errorf("invalid uuid set buffer, less 24")
  205. }
  206. pos := 0
  207. var err error
  208. if s.SID, err = uuid.FromBytes(data[0:16]); err != nil {
  209. return 0, err
  210. }
  211. pos += 16
  212. n := int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
  213. pos += 8
  214. if len(data) < int(16*n)+pos {
  215. return 0, errors.Errorf("invalid uuid set buffer, must %d, but %d", pos+int(16*n), len(data))
  216. }
  217. s.Intervals = make([]Interval, 0, n)
  218. var in Interval
  219. for i := int64(0); i < n; i++ {
  220. in.Start = int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
  221. pos += 8
  222. in.Stop = int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
  223. pos += 8
  224. s.Intervals = append(s.Intervals, in)
  225. }
  226. return pos, nil
  227. }
  228. func (s *UUIDSet) Decode(data []byte) error {
  229. n, err := s.decode(data)
  230. if n != len(data) {
  231. return errors.Errorf("invalid uuid set buffer, must %d, but %d", n, len(data))
  232. }
  233. return err
  234. }
  235. type MysqlGTIDSet struct {
  236. Sets map[string]*UUIDSet
  237. }
  238. func ParseMysqlGTIDSet(str string) (GTIDSet, error) {
  239. s := new(MysqlGTIDSet)
  240. s.Sets = make(map[string]*UUIDSet)
  241. if str == "" {
  242. return s, nil
  243. }
  244. sp := strings.Split(str, ",")
  245. //todo, handle redundant same uuid
  246. for i := 0; i < len(sp); i++ {
  247. if set, err := ParseUUIDSet(sp[i]); err != nil {
  248. return nil, errors.Trace(err)
  249. } else {
  250. s.AddSet(set)
  251. }
  252. }
  253. return s, nil
  254. }
  255. func DecodeMysqlGTIDSet(data []byte) (*MysqlGTIDSet, error) {
  256. s := new(MysqlGTIDSet)
  257. if len(data) < 8 {
  258. return nil, errors.Errorf("invalid gtid set buffer, less 4")
  259. }
  260. n := int(binary.LittleEndian.Uint64(data))
  261. s.Sets = make(map[string]*UUIDSet, n)
  262. pos := 8
  263. for i := 0; i < n; i++ {
  264. set := new(UUIDSet)
  265. if n, err := set.decode(data[pos:]); err != nil {
  266. return nil, errors.Trace(err)
  267. } else {
  268. pos += n
  269. s.AddSet(set)
  270. }
  271. }
  272. return s, nil
  273. }
  274. func (s *MysqlGTIDSet) AddSet(set *UUIDSet) {
  275. if set == nil {
  276. return
  277. }
  278. sid := set.SID.String()
  279. o, ok := s.Sets[sid]
  280. if ok {
  281. o.AddInterval(set.Intervals)
  282. } else {
  283. s.Sets[sid] = set
  284. }
  285. }
  286. func (s *MysqlGTIDSet) Update(GTIDStr string) error {
  287. uuidSet, err := ParseUUIDSet(GTIDStr)
  288. if err != nil {
  289. return err
  290. }
  291. s.AddSet(uuidSet)
  292. return nil
  293. }
  294. func (s *MysqlGTIDSet) Contain(o GTIDSet) bool {
  295. sub, ok := o.(*MysqlGTIDSet)
  296. if !ok {
  297. return false
  298. }
  299. for key, set := range sub.Sets {
  300. o, ok := s.Sets[key]
  301. if !ok {
  302. return false
  303. }
  304. if !o.Contain(set) {
  305. return false
  306. }
  307. }
  308. return true
  309. }
  310. func (s *MysqlGTIDSet) Equal(o GTIDSet) bool {
  311. sub, ok := o.(*MysqlGTIDSet)
  312. if !ok {
  313. return false
  314. }
  315. for key, set := range sub.Sets {
  316. o, ok := s.Sets[key]
  317. if !ok {
  318. return false
  319. }
  320. if !o.Intervals.Equal(set.Intervals) {
  321. return false
  322. }
  323. }
  324. return true
  325. }
  326. func (s *MysqlGTIDSet) String() string {
  327. var buf bytes.Buffer
  328. sep := ""
  329. for _, set := range s.Sets {
  330. buf.WriteString(sep)
  331. buf.WriteString(set.String())
  332. sep = ","
  333. }
  334. return hack.String(buf.Bytes())
  335. }
  336. func (s *MysqlGTIDSet) Encode() []byte {
  337. var buf bytes.Buffer
  338. binary.Write(&buf, binary.LittleEndian, uint64(len(s.Sets)))
  339. for i, _ := range s.Sets {
  340. s.Sets[i].encode(&buf)
  341. }
  342. return buf.Bytes()
  343. }