row_event.go 19 KB


  1. package replication
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "encoding/hex"
  6. "fmt"
  7. "io"
  8. "strconv"
  9. "time"
  10. "github.com/juju/errors"
  11. "github.com/shopspring/decimal"
  12. . "github.com/siddontang/go-mysql/mysql"
  13. "github.com/siddontang/go/hack"
  14. log "github.com/sirupsen/logrus"
  15. )
  16. type errMissingTableMapEvent error
  17. type TableMapEvent struct {
  18. tableIDSize int
  19. TableID uint64
  20. Flags uint16
  21. Schema []byte
  22. Table []byte
  23. ColumnCount uint64
  24. ColumnType []byte
  25. ColumnMeta []uint16
  26. //len = (ColumnCount + 7) / 8
  27. NullBitmap []byte
  28. }
  29. func (e *TableMapEvent) Decode(data []byte) error {
  30. pos := 0
  31. e.TableID = FixedLengthInt(data[0:e.tableIDSize])
  32. pos += e.tableIDSize
  33. e.Flags = binary.LittleEndian.Uint16(data[pos:])
  34. pos += 2
  35. schemaLength := data[pos]
  36. pos++
  37. e.Schema = data[pos : pos+int(schemaLength)]
  38. pos += int(schemaLength)
  39. //skip 0x00
  40. pos++
  41. tableLength := data[pos]
  42. pos++
  43. e.Table = data[pos : pos+int(tableLength)]
  44. pos += int(tableLength)
  45. //skip 0x00
  46. pos++
  47. var n int
  48. e.ColumnCount, _, n = LengthEncodedInt(data[pos:])
  49. pos += n
  50. e.ColumnType = data[pos : pos+int(e.ColumnCount)]
  51. pos += int(e.ColumnCount)
  52. var err error
  53. var metaData []byte
  54. if metaData, _, n, err = LengthEnodedString(data[pos:]); err != nil {
  55. return errors.Trace(err)
  56. }
  57. if err = e.decodeMeta(metaData); err != nil {
  58. return errors.Trace(err)
  59. }
  60. pos += n
  61. if len(data[pos:]) != bitmapByteSize(int(e.ColumnCount)) {
  62. return io.EOF
  63. }
  64. e.NullBitmap = data[pos:]
  65. return nil
  66. }
  67. func bitmapByteSize(columnCount int) int {
  68. return int(columnCount+7) / 8
  69. }
  70. // see mysql sql/log_event.h
  71. /*
  72. 0 byte
  73. MYSQL_TYPE_DECIMAL
  74. MYSQL_TYPE_TINY
  75. MYSQL_TYPE_SHORT
  76. MYSQL_TYPE_LONG
  77. MYSQL_TYPE_NULL
  78. MYSQL_TYPE_TIMESTAMP
  79. MYSQL_TYPE_LONGLONG
  80. MYSQL_TYPE_INT24
  81. MYSQL_TYPE_DATE
  82. MYSQL_TYPE_TIME
  83. MYSQL_TYPE_DATETIME
  84. MYSQL_TYPE_YEAR
  85. 1 byte
  86. MYSQL_TYPE_FLOAT
  87. MYSQL_TYPE_DOUBLE
  88. MYSQL_TYPE_BLOB
  89. MYSQL_TYPE_GEOMETRY
  90. //maybe
  91. MYSQL_TYPE_TIME2
  92. MYSQL_TYPE_DATETIME2
  93. MYSQL_TYPE_TIMESTAMP2
  94. 2 byte
  95. MYSQL_TYPE_VARCHAR
  96. MYSQL_TYPE_BIT
  97. MYSQL_TYPE_NEWDECIMAL
  98. MYSQL_TYPE_VAR_STRING
  99. MYSQL_TYPE_STRING
  100. This enumeration value is only used internally and cannot exist in a binlog.
  101. MYSQL_TYPE_NEWDATE
  102. MYSQL_TYPE_ENUM
  103. MYSQL_TYPE_SET
  104. MYSQL_TYPE_TINY_BLOB
  105. MYSQL_TYPE_MEDIUM_BLOB
  106. MYSQL_TYPE_LONG_BLOB
  107. */
  108. func (e *TableMapEvent) decodeMeta(data []byte) error {
  109. pos := 0
  110. e.ColumnMeta = make([]uint16, e.ColumnCount)
  111. for i, t := range e.ColumnType {
  112. switch t {
  113. case MYSQL_TYPE_STRING:
  114. var x uint16 = uint16(data[pos]) << 8 //real type
  115. x += uint16(data[pos+1]) //pack or field length
  116. e.ColumnMeta[i] = x
  117. pos += 2
  118. case MYSQL_TYPE_NEWDECIMAL:
  119. var x uint16 = uint16(data[pos]) << 8 //precision
  120. x += uint16(data[pos+1]) //decimals
  121. e.ColumnMeta[i] = x
  122. pos += 2
  123. case MYSQL_TYPE_VAR_STRING,
  124. MYSQL_TYPE_VARCHAR,
  125. MYSQL_TYPE_BIT:
  126. e.ColumnMeta[i] = binary.LittleEndian.Uint16(data[pos:])
  127. pos += 2
  128. case MYSQL_TYPE_BLOB,
  129. MYSQL_TYPE_DOUBLE,
  130. MYSQL_TYPE_FLOAT,
  131. MYSQL_TYPE_GEOMETRY,
  132. MYSQL_TYPE_JSON:
  133. e.ColumnMeta[i] = uint16(data[pos])
  134. pos++
  135. case MYSQL_TYPE_TIME2,
  136. MYSQL_TYPE_DATETIME2,
  137. MYSQL_TYPE_TIMESTAMP2:
  138. e.ColumnMeta[i] = uint16(data[pos])
  139. pos++
  140. case MYSQL_TYPE_NEWDATE,
  141. MYSQL_TYPE_ENUM,
  142. MYSQL_TYPE_SET,
  143. MYSQL_TYPE_TINY_BLOB,
  144. MYSQL_TYPE_MEDIUM_BLOB,
  145. MYSQL_TYPE_LONG_BLOB:
  146. return errors.Errorf("unsupport type in binlog %d", t)
  147. default:
  148. e.ColumnMeta[i] = 0
  149. }
  150. }
  151. return nil
  152. }
  153. func (e *TableMapEvent) Dump(w io.Writer) {
  154. fmt.Fprintf(w, "TableID: %d\n", e.TableID)
  155. fmt.Fprintf(w, "TableID size: %d\n", e.tableIDSize)
  156. fmt.Fprintf(w, "Flags: %d\n", e.Flags)
  157. fmt.Fprintf(w, "Schema: %s\n", e.Schema)
  158. fmt.Fprintf(w, "Table: %s\n", e.Table)
  159. fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount)
  160. fmt.Fprintf(w, "Column type: \n%s", hex.Dump(e.ColumnType))
  161. fmt.Fprintf(w, "NULL bitmap: \n%s", hex.Dump(e.NullBitmap))
  162. fmt.Fprintln(w)
  163. }
  164. // RowsEventStmtEndFlag is set in the end of the statement.
  165. const RowsEventStmtEndFlag = 0x01
  166. type RowsEvent struct {
  167. //0, 1, 2
  168. Version int
  169. tableIDSize int
  170. tables map[uint64]*TableMapEvent
  171. needBitmap2 bool
  172. Table *TableMapEvent
  173. TableID uint64
  174. Flags uint16
  175. //if version == 2
  176. ExtraData []byte
  177. //lenenc_int
  178. ColumnCount uint64
  179. //len = (ColumnCount + 7) / 8
  180. ColumnBitmap1 []byte
  181. //if UPDATE_ROWS_EVENTv1 or v2
  182. //len = (ColumnCount + 7) / 8
  183. ColumnBitmap2 []byte
  184. //rows: invalid: int64, float64, bool, []byte, string
  185. Rows [][]interface{}
  186. parseTime bool
  187. useDecimal bool
  188. }
  189. func (e *RowsEvent) Decode(data []byte) error {
  190. pos := 0
  191. e.TableID = FixedLengthInt(data[0:e.tableIDSize])
  192. pos += e.tableIDSize
  193. e.Flags = binary.LittleEndian.Uint16(data[pos:])
  194. pos += 2
  195. if e.Version == 2 {
  196. dataLen := binary.LittleEndian.Uint16(data[pos:])
  197. pos += 2
  198. e.ExtraData = data[pos : pos+int(dataLen-2)]
  199. pos += int(dataLen - 2)
  200. }
  201. var n int
  202. e.ColumnCount, _, n = LengthEncodedInt(data[pos:])
  203. pos += n
  204. bitCount := bitmapByteSize(int(e.ColumnCount))
  205. e.ColumnBitmap1 = data[pos : pos+bitCount]
  206. pos += bitCount
  207. if e.needBitmap2 {
  208. e.ColumnBitmap2 = data[pos : pos+bitCount]
  209. pos += bitCount
  210. }
  211. var ok bool
  212. e.Table, ok = e.tables[e.TableID]
  213. if !ok {
  214. if len(e.tables) > 0 {
  215. return errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID)
  216. } else {
  217. return errMissingTableMapEvent(errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID))
  218. }
  219. }
  220. var err error
  221. // ... repeat rows until event-end
  222. defer func() {
  223. if r := recover(); r != nil {
  224. log.Fatalf("parse rows event panic %v, data %q, parsed rows %#v, table map %#v\n%s", r, data, e, e.Table, Pstack())
  225. }
  226. }()
  227. for pos < len(data) {
  228. if n, err = e.decodeRows(data[pos:], e.Table, e.ColumnBitmap1); err != nil {
  229. return errors.Trace(err)
  230. }
  231. pos += n
  232. if e.needBitmap2 {
  233. if n, err = e.decodeRows(data[pos:], e.Table, e.ColumnBitmap2); err != nil {
  234. return errors.Trace(err)
  235. }
  236. pos += n
  237. }
  238. }
  239. return nil
  240. }
  241. func isBitSet(bitmap []byte, i int) bool {
  242. return bitmap[i>>3]&(1<<(uint(i)&7)) > 0
  243. }
  244. func (e *RowsEvent) decodeRows(data []byte, table *TableMapEvent, bitmap []byte) (int, error) {
  245. row := make([]interface{}, e.ColumnCount)
  246. pos := 0
  247. // refer: https://github.com/alibaba/canal/blob/c3e38e50e269adafdd38a48c63a1740cde304c67/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java#L63
  248. count := 0
  249. for i := 0; i < int(e.ColumnCount); i++ {
  250. if isBitSet(bitmap, i) {
  251. count++
  252. }
  253. }
  254. count = (count + 7) / 8
  255. nullBitmap := data[pos : pos+count]
  256. pos += count
  257. nullbitIndex := 0
  258. var n int
  259. var err error
  260. for i := 0; i < int(e.ColumnCount); i++ {
  261. if !isBitSet(bitmap, i) {
  262. continue
  263. }
  264. isNull := (uint32(nullBitmap[nullbitIndex/8]) >> uint32(nullbitIndex%8)) & 0x01
  265. nullbitIndex++
  266. if isNull > 0 {
  267. row[i] = nil
  268. continue
  269. }
  270. row[i], n, err = e.decodeValue(data[pos:], table.ColumnType[i], table.ColumnMeta[i])
  271. if err != nil {
  272. return 0, err
  273. }
  274. pos += n
  275. }
  276. e.Rows = append(e.Rows, row)
  277. return pos, nil
  278. }
  279. func (e *RowsEvent) parseFracTime(t interface{}) interface{} {
  280. v, ok := t.(fracTime)
  281. if !ok {
  282. return t
  283. }
  284. if !e.parseTime {
  285. // Don't parse time, return string directly
  286. return v.String()
  287. }
  288. // return Golang time directly
  289. return v.Time
  290. }
  291. // see mysql sql/log_event.cc log_event_print_value
  292. func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{}, n int, err error) {
  293. var length int = 0
  294. if tp == MYSQL_TYPE_STRING {
  295. if meta >= 256 {
  296. b0 := uint8(meta >> 8)
  297. b1 := uint8(meta & 0xFF)
  298. if b0&0x30 != 0x30 {
  299. length = int(uint16(b1) | (uint16((b0&0x30)^0x30) << 4))
  300. tp = byte(b0 | 0x30)
  301. } else {
  302. length = int(meta & 0xFF)
  303. tp = b0
  304. }
  305. } else {
  306. length = int(meta)
  307. }
  308. }
  309. switch tp {
  310. case MYSQL_TYPE_NULL:
  311. return nil, 0, nil
  312. case MYSQL_TYPE_LONG:
  313. n = 4
  314. v = ParseBinaryInt32(data)
  315. case MYSQL_TYPE_TINY:
  316. n = 1
  317. v = ParseBinaryInt8(data)
  318. case MYSQL_TYPE_SHORT:
  319. n = 2
  320. v = ParseBinaryInt16(data)
  321. case MYSQL_TYPE_INT24:
  322. n = 3
  323. v = ParseBinaryInt24(data)
  324. case MYSQL_TYPE_LONGLONG:
  325. n = 8
  326. v = ParseBinaryInt64(data)
  327. case MYSQL_TYPE_NEWDECIMAL:
  328. prec := uint8(meta >> 8)
  329. scale := uint8(meta & 0xFF)
  330. v, n, err = decodeDecimal(data, int(prec), int(scale), e.useDecimal)
  331. case MYSQL_TYPE_FLOAT:
  332. n = 4
  333. v = ParseBinaryFloat32(data)
  334. case MYSQL_TYPE_DOUBLE:
  335. n = 8
  336. v = ParseBinaryFloat64(data)
  337. case MYSQL_TYPE_BIT:
  338. nbits := ((meta >> 8) * 8) + (meta & 0xFF)
  339. n = int(nbits+7) / 8
  340. //use int64 for bit
  341. v, err = decodeBit(data, int(nbits), int(n))
  342. case MYSQL_TYPE_TIMESTAMP:
  343. n = 4
  344. t := binary.LittleEndian.Uint32(data)
  345. v = e.parseFracTime(fracTime{time.Unix(int64(t), 0), 0})
  346. case MYSQL_TYPE_TIMESTAMP2:
  347. v, n, err = decodeTimestamp2(data, meta)
  348. v = e.parseFracTime(v)
  349. case MYSQL_TYPE_DATETIME:
  350. n = 8
  351. i64 := binary.LittleEndian.Uint64(data)
  352. d := i64 / 1000000
  353. t := i64 % 1000000
  354. v = e.parseFracTime(fracTime{time.Date(int(d/10000),
  355. time.Month((d%10000)/100),
  356. int(d%100),
  357. int(t/10000),
  358. int((t%10000)/100),
  359. int(t%100),
  360. 0,
  361. time.UTC), 0})
  362. case MYSQL_TYPE_DATETIME2:
  363. v, n, err = decodeDatetime2(data, meta)
  364. v = e.parseFracTime(v)
  365. case MYSQL_TYPE_TIME:
  366. n = 3
  367. i32 := uint32(FixedLengthInt(data[0:3]))
  368. if i32 == 0 {
  369. v = "00:00:00"
  370. } else {
  371. sign := ""
  372. if i32 < 0 {
  373. sign = "-"
  374. }
  375. v = fmt.Sprintf("%s%02d:%02d:%02d", sign, i32/10000, (i32%10000)/100, i32%100)
  376. }
  377. case MYSQL_TYPE_TIME2:
  378. v, n, err = decodeTime2(data, meta)
  379. case MYSQL_TYPE_DATE:
  380. n = 3
  381. i32 := uint32(FixedLengthInt(data[0:3]))
  382. if i32 == 0 {
  383. v = "0000-00-00"
  384. } else {
  385. v = fmt.Sprintf("%04d-%02d-%02d", i32/(16*32), i32/32%16, i32%32)
  386. }
  387. case MYSQL_TYPE_YEAR:
  388. n = 1
  389. v = int(data[0]) + 1900
  390. case MYSQL_TYPE_ENUM:
  391. l := meta & 0xFF
  392. switch l {
  393. case 1:
  394. v = int64(data[0])
  395. n = 1
  396. case 2:
  397. v = int64(binary.BigEndian.Uint16(data))
  398. n = 2
  399. default:
  400. err = fmt.Errorf("Unknown ENUM packlen=%d", l)
  401. }
  402. case MYSQL_TYPE_SET:
  403. n = int(meta & 0xFF)
  404. nbits := n * 8
  405. v, err = decodeBit(data, nbits, n)
  406. case MYSQL_TYPE_BLOB:
  407. v, n, err = decodeBlob(data, meta)
  408. case MYSQL_TYPE_VARCHAR,
  409. MYSQL_TYPE_VAR_STRING:
  410. length = int(meta)
  411. v, n = decodeString(data, length)
  412. case MYSQL_TYPE_STRING:
  413. v, n = decodeString(data, length)
  414. case MYSQL_TYPE_JSON:
  415. // Refer: https://github.com/shyiko/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java#L404
  416. length = int(FixedLengthInt(data[0:meta]))
  417. n = length + int(meta)
  418. v, err = e.decodeJsonBinary(data[meta:n])
  419. case MYSQL_TYPE_GEOMETRY:
  420. // MySQL saves Geometry as Blob in binlog
  421. // Seem that the binary format is SRID (4 bytes) + WKB, outer can use
  422. // MySQL GeoFromWKB or others to create the geometry data.
  423. // Refer https://dev.mysql.com/doc/refman/5.7/en/gis-wkb-functions.html
  424. // I also find some go libs to handle WKB if possible
  425. // see https://github.com/twpayne/go-geom or https://github.com/paulmach/go.geo
  426. v, n, err = decodeBlob(data, meta)
  427. default:
  428. err = fmt.Errorf("unsupport type %d in binlog and don't know how to handle", tp)
  429. }
  430. return
  431. }
  432. func decodeString(data []byte, length int) (v string, n int) {
  433. if length < 256 {
  434. length = int(data[0])
  435. n = int(length) + 1
  436. v = hack.String(data[1:n])
  437. } else {
  438. length = int(binary.LittleEndian.Uint16(data[0:]))
  439. n = length + 2
  440. v = hack.String(data[2:n])
  441. }
  442. return
  443. }
  444. const digitsPerInteger int = 9
  445. var compressedBytes = []int{0, 1, 1, 2, 2, 3, 3, 4, 4, 4}
  446. func decodeDecimalDecompressValue(compIndx int, data []byte, mask uint8) (size int, value uint32) {
  447. size = compressedBytes[compIndx]
  448. databuff := make([]byte, size)
  449. for i := 0; i < size; i++ {
  450. databuff[i] = data[i] ^ mask
  451. }
  452. value = uint32(BFixedLengthInt(databuff))
  453. return
  454. }
  455. func decodeDecimal(data []byte, precision int, decimals int, useDecimal bool) (interface{}, int, error) {
  456. //see python mysql replication and https://github.com/jeremycole/mysql_binlog
  457. integral := (precision - decimals)
  458. uncompIntegral := int(integral / digitsPerInteger)
  459. uncompFractional := int(decimals / digitsPerInteger)
  460. compIntegral := integral - (uncompIntegral * digitsPerInteger)
  461. compFractional := decimals - (uncompFractional * digitsPerInteger)
  462. binSize := uncompIntegral*4 + compressedBytes[compIntegral] +
  463. uncompFractional*4 + compressedBytes[compFractional]
  464. buf := make([]byte, binSize)
  465. copy(buf, data[:binSize])
  466. //must copy the data for later change
  467. data = buf
  468. // Support negative
  469. // The sign is encoded in the high bit of the the byte
  470. // But this bit can also be used in the value
  471. value := uint32(data[0])
  472. var res bytes.Buffer
  473. var mask uint32 = 0
  474. if value&0x80 == 0 {
  475. mask = uint32((1 << 32) - 1)
  476. res.WriteString("-")
  477. }
  478. //clear sign
  479. data[0] ^= 0x80
  480. pos, value := decodeDecimalDecompressValue(compIntegral, data, uint8(mask))
  481. res.WriteString(fmt.Sprintf("%d", value))
  482. for i := 0; i < uncompIntegral; i++ {
  483. value = binary.BigEndian.Uint32(data[pos:]) ^ mask
  484. pos += 4
  485. res.WriteString(fmt.Sprintf("%09d", value))
  486. }
  487. res.WriteString(".")
  488. for i := 0; i < uncompFractional; i++ {
  489. value = binary.BigEndian.Uint32(data[pos:]) ^ mask
  490. pos += 4
  491. res.WriteString(fmt.Sprintf("%09d", value))
  492. }
  493. if size, value := decodeDecimalDecompressValue(compFractional, data[pos:], uint8(mask)); size > 0 {
  494. res.WriteString(fmt.Sprintf("%0*d", compFractional, value))
  495. pos += size
  496. }
  497. if useDecimal {
  498. f, err := decimal.NewFromString(hack.String(res.Bytes()))
  499. return f, pos, err
  500. }
  501. f, err := strconv.ParseFloat(hack.String(res.Bytes()), 64)
  502. return f, pos, err
  503. }
  504. func decodeBit(data []byte, nbits int, length int) (value int64, err error) {
  505. if nbits > 1 {
  506. switch length {
  507. case 1:
  508. value = int64(data[0])
  509. case 2:
  510. value = int64(binary.BigEndian.Uint16(data))
  511. case 3:
  512. value = int64(BFixedLengthInt(data[0:3]))
  513. case 4:
  514. value = int64(binary.BigEndian.Uint32(data))
  515. case 5:
  516. value = int64(BFixedLengthInt(data[0:5]))
  517. case 6:
  518. value = int64(BFixedLengthInt(data[0:6]))
  519. case 7:
  520. value = int64(BFixedLengthInt(data[0:7]))
  521. case 8:
  522. value = int64(binary.BigEndian.Uint64(data))
  523. default:
  524. err = fmt.Errorf("invalid bit length %d", length)
  525. }
  526. } else {
  527. if length != 1 {
  528. err = fmt.Errorf("invalid bit length %d", length)
  529. } else {
  530. value = int64(data[0])
  531. }
  532. }
  533. return
  534. }
  535. func decodeTimestamp2(data []byte, dec uint16) (interface{}, int, error) {
  536. //get timestamp binary length
  537. n := int(4 + (dec+1)/2)
  538. sec := int64(binary.BigEndian.Uint32(data[0:4]))
  539. usec := int64(0)
  540. switch dec {
  541. case 1, 2:
  542. usec = int64(data[4]) * 10000
  543. case 3, 4:
  544. usec = int64(binary.BigEndian.Uint16(data[4:])) * 100
  545. case 5, 6:
  546. usec = int64(BFixedLengthInt(data[4:7]))
  547. }
  548. if sec == 0 {
  549. return formatZeroTime(int(usec), int(dec)), n, nil
  550. }
  551. return fracTime{time.Unix(sec, usec*1000), int(dec)}, n, nil
  552. }
  553. const DATETIMEF_INT_OFS int64 = 0x8000000000
  554. func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
  555. //get datetime binary length
  556. n := int(5 + (dec+1)/2)
  557. intPart := int64(BFixedLengthInt(data[0:5])) - DATETIMEF_INT_OFS
  558. var frac int64 = 0
  559. switch dec {
  560. case 1, 2:
  561. frac = int64(data[5]) * 10000
  562. case 3, 4:
  563. frac = int64(binary.BigEndian.Uint16(data[5:7])) * 100
  564. case 5, 6:
  565. frac = int64(BFixedLengthInt(data[5:8]))
  566. }
  567. if intPart == 0 {
  568. return formatZeroTime(int(frac), int(dec)), n, nil
  569. }
  570. tmp := intPart<<24 + frac
  571. //handle sign???
  572. if tmp < 0 {
  573. tmp = -tmp
  574. }
  575. // var secPart int64 = tmp % (1 << 24)
  576. ymdhms := tmp >> 24
  577. ymd := ymdhms >> 17
  578. ym := ymd >> 5
  579. hms := ymdhms % (1 << 17)
  580. day := int(ymd % (1 << 5))
  581. month := int(ym % 13)
  582. year := int(ym / 13)
  583. second := int(hms % (1 << 6))
  584. minute := int((hms >> 6) % (1 << 6))
  585. hour := int((hms >> 12))
  586. return fracTime{time.Date(year, time.Month(month), day, hour, minute, second, int(frac*1000), time.UTC), int(dec)}, n, nil
  587. }
  588. const TIMEF_OFS int64 = 0x800000000000
  589. const TIMEF_INT_OFS int64 = 0x800000
  590. func decodeTime2(data []byte, dec uint16) (string, int, error) {
  591. //time binary length
  592. n := int(3 + (dec+1)/2)
  593. tmp := int64(0)
  594. intPart := int64(0)
  595. frac := int64(0)
  596. switch dec {
  597. case 1:
  598. case 2:
  599. intPart = int64(BFixedLengthInt(data[0:3])) - TIMEF_INT_OFS
  600. frac = int64(data[3])
  601. if intPart < 0 && frac > 0 {
  602. /*
  603. Negative values are stored with reverse fractional part order,
  604. for binary sort compatibility.
  605. Disk value intpart frac Time value Memory value
  606. 800000.00 0 0 00:00:00.00 0000000000.000000
  607. 7FFFFF.FF -1 255 -00:00:00.01 FFFFFFFFFF.FFD8F0
  608. 7FFFFF.9D -1 99 -00:00:00.99 FFFFFFFFFF.F0E4D0
  609. 7FFFFF.00 -1 0 -00:00:01.00 FFFFFFFFFF.000000
  610. 7FFFFE.FF -1 255 -00:00:01.01 FFFFFFFFFE.FFD8F0
  611. 7FFFFE.F6 -2 246 -00:00:01.10 FFFFFFFFFE.FE7960
  612. Formula to convert fractional part from disk format
  613. (now stored in "frac" variable) to absolute value: "0x100 - frac".
  614. To reconstruct in-memory value, we shift
  615. to the next integer value and then substruct fractional part.
  616. */
  617. intPart++ /* Shift to the next integer value */
  618. frac -= 0x100 /* -(0x100 - frac) */
  619. }
  620. tmp = intPart<<24 + frac*10000
  621. case 3:
  622. case 4:
  623. intPart = int64(BFixedLengthInt(data[0:3])) - TIMEF_INT_OFS
  624. frac = int64(binary.BigEndian.Uint16(data[3:5]))
  625. if intPart < 0 && frac > 0 {
  626. /*
  627. Fix reverse fractional part order: "0x10000 - frac".
  628. See comments for FSP=1 and FSP=2 above.
  629. */
  630. intPart++ /* Shift to the next integer value */
  631. frac -= 0x10000 /* -(0x10000-frac) */
  632. }
  633. tmp = intPart<<24 + frac*100
  634. case 5:
  635. case 6:
  636. tmp = int64(BFixedLengthInt(data[0:6])) - TIMEF_OFS
  637. default:
  638. intPart = int64(BFixedLengthInt(data[0:3])) - TIMEF_INT_OFS
  639. tmp = intPart << 24
  640. }
  641. if intPart == 0 {
  642. return "00:00:00", n, nil
  643. }
  644. hms := int64(0)
  645. sign := ""
  646. if tmp < 0 {
  647. tmp = -tmp
  648. sign = "-"
  649. }
  650. hms = tmp >> 24
  651. hour := (hms >> 12) % (1 << 10) /* 10 bits starting at 12th */
  652. minute := (hms >> 6) % (1 << 6) /* 6 bits starting at 6th */
  653. second := hms % (1 << 6) /* 6 bits starting at 0th */
  654. secPart := tmp % (1 << 24)
  655. if secPart != 0 {
  656. return fmt.Sprintf("%s%02d:%02d:%02d.%06d", sign, hour, minute, second, secPart), n, nil
  657. }
  658. return fmt.Sprintf("%s%02d:%02d:%02d", sign, hour, minute, second), n, nil
  659. }
  660. func decodeBlob(data []byte, meta uint16) (v []byte, n int, err error) {
  661. var length int
  662. switch meta {
  663. case 1:
  664. length = int(data[0])
  665. v = data[1 : 1+length]
  666. n = length + 1
  667. case 2:
  668. length = int(binary.LittleEndian.Uint16(data))
  669. v = data[2 : 2+length]
  670. n = length + 2
  671. case 3:
  672. length = int(FixedLengthInt(data[0:3]))
  673. v = data[3 : 3+length]
  674. n = length + 3
  675. case 4:
  676. length = int(binary.LittleEndian.Uint32(data))
  677. v = data[4 : 4+length]
  678. n = length + 4
  679. default:
  680. err = fmt.Errorf("invalid blob packlen = %d", meta)
  681. }
  682. return
  683. }
  684. func (e *RowsEvent) Dump(w io.Writer) {
  685. fmt.Fprintf(w, "TableID: %d\n", e.TableID)
  686. fmt.Fprintf(w, "Flags: %d\n", e.Flags)
  687. fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount)
  688. fmt.Fprintf(w, "Values:\n")
  689. for _, rows := range e.Rows {
  690. fmt.Fprintf(w, "--\n")
  691. for j, d := range rows {
  692. if _, ok := d.([]byte); ok {
  693. fmt.Fprintf(w, "%d:%q\n", j, d)
  694. } else {
  695. fmt.Fprintf(w, "%d:%#v\n", j, d)
  696. }
  697. }
  698. }
  699. fmt.Fprintln(w)
  700. }
  701. type RowsQueryEvent struct {
  702. Query []byte
  703. }
  704. func (e *RowsQueryEvent) Decode(data []byte) error {
  705. //ignore length byte 1
  706. e.Query = data[1:]
  707. return nil
  708. }
  709. func (e *RowsQueryEvent) Dump(w io.Writer) {
  710. fmt.Fprintf(w, "Query: %s\n", e.Query)
  711. fmt.Fprintln(w)
  712. }