json_binary.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. package replication
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "math"
  6. "github.com/juju/errors"
  7. . "github.com/siddontang/go-mysql/mysql"
  8. "github.com/siddontang/go/hack"
  9. )
  10. const (
  11. JSONB_SMALL_OBJECT byte = iota // small JSON object
  12. JSONB_LARGE_OBJECT // large JSON object
  13. JSONB_SMALL_ARRAY // small JSON array
  14. JSONB_LARGE_ARRAY // large JSON array
  15. JSONB_LITERAL // literal (true/false/null)
  16. JSONB_INT16 // int16
  17. JSONB_UINT16 // uint16
  18. JSONB_INT32 // int32
  19. JSONB_UINT32 // uint32
  20. JSONB_INT64 // int64
  21. JSONB_UINT64 // uint64
  22. JSONB_DOUBLE // double
  23. JSONB_STRING // string
  24. JSONB_OPAQUE byte = 0x0f // custom data (any MySQL data type)
  25. )
  26. const (
  27. JSONB_NULL_LITERAL byte = 0x00
  28. JSONB_TRUE_LITERAL byte = 0x01
  29. JSONB_FALSE_LITERAL byte = 0x02
  30. )
  31. const (
  32. jsonbSmallOffsetSize = 2
  33. jsonbLargeOffsetSize = 4
  34. jsonbKeyEntrySizeSmall = 2 + jsonbSmallOffsetSize
  35. jsonbKeyEntrySizeLarge = 2 + jsonbLargeOffsetSize
  36. jsonbValueEntrySizeSmall = 1 + jsonbSmallOffsetSize
  37. jsonbValueEntrySizeLarge = 1 + jsonbLargeOffsetSize
  38. )
  39. func jsonbGetOffsetSize(isSmall bool) int {
  40. if isSmall {
  41. return jsonbSmallOffsetSize
  42. }
  43. return jsonbLargeOffsetSize
  44. }
  45. func jsonbGetKeyEntrySize(isSmall bool) int {
  46. if isSmall {
  47. return jsonbKeyEntrySizeSmall
  48. }
  49. return jsonbKeyEntrySizeLarge
  50. }
  51. func jsonbGetValueEntrySize(isSmall bool) int {
  52. if isSmall {
  53. return jsonbValueEntrySizeSmall
  54. }
  55. return jsonbValueEntrySizeLarge
  56. }
  57. // decodeJsonBinary decodes the JSON binary encoding data and returns
  58. // the common JSON encoding data.
  59. func (e *RowsEvent) decodeJsonBinary(data []byte) ([]byte, error) {
  60. d := jsonBinaryDecoder{useDecimal: e.useDecimal}
  61. if d.isDataShort(data, 1) {
  62. return nil, d.err
  63. }
  64. v := d.decodeValue(data[0], data[1:])
  65. if d.err != nil {
  66. return nil, d.err
  67. }
  68. return json.Marshal(v)
  69. }
  70. type jsonBinaryDecoder struct {
  71. useDecimal bool
  72. err error
  73. }
  74. func (d *jsonBinaryDecoder) decodeValue(tp byte, data []byte) interface{} {
  75. if d.err != nil {
  76. return nil
  77. }
  78. switch tp {
  79. case JSONB_SMALL_OBJECT:
  80. return d.decodeObjectOrArray(data, true, true)
  81. case JSONB_LARGE_OBJECT:
  82. return d.decodeObjectOrArray(data, false, true)
  83. case JSONB_SMALL_ARRAY:
  84. return d.decodeObjectOrArray(data, true, false)
  85. case JSONB_LARGE_ARRAY:
  86. return d.decodeObjectOrArray(data, false, false)
  87. case JSONB_LITERAL:
  88. return d.decodeLiteral(data)
  89. case JSONB_INT16:
  90. return d.decodeInt16(data)
  91. case JSONB_UINT16:
  92. return d.decodeUint16(data)
  93. case JSONB_INT32:
  94. return d.decodeInt32(data)
  95. case JSONB_UINT32:
  96. return d.decodeUint32(data)
  97. case JSONB_INT64:
  98. return d.decodeInt64(data)
  99. case JSONB_UINT64:
  100. return d.decodeUint64(data)
  101. case JSONB_DOUBLE:
  102. return d.decodeDouble(data)
  103. case JSONB_STRING:
  104. return d.decodeString(data)
  105. case JSONB_OPAQUE:
  106. return d.decodeOpaque(data)
  107. default:
  108. d.err = errors.Errorf("invalid json type %d", tp)
  109. }
  110. return nil
  111. }
  112. func (d *jsonBinaryDecoder) decodeObjectOrArray(data []byte, isSmall bool, isObject bool) interface{} {
  113. offsetSize := jsonbGetOffsetSize(isSmall)
  114. if d.isDataShort(data, 2*offsetSize) {
  115. return nil
  116. }
  117. count := d.decodeCount(data, isSmall)
  118. size := d.decodeCount(data[offsetSize:], isSmall)
  119. if d.isDataShort(data, int(size)) {
  120. return nil
  121. }
  122. keyEntrySize := jsonbGetKeyEntrySize(isSmall)
  123. valueEntrySize := jsonbGetValueEntrySize(isSmall)
  124. headerSize := 2*offsetSize + count*valueEntrySize
  125. if isObject {
  126. headerSize += count * keyEntrySize
  127. }
  128. if headerSize > size {
  129. d.err = errors.Errorf("header size %d > size %d", headerSize, size)
  130. return nil
  131. }
  132. var keys []string
  133. if isObject {
  134. keys = make([]string, count)
  135. for i := 0; i < count; i++ {
  136. // decode key
  137. entryOffset := 2*offsetSize + keyEntrySize*i
  138. keyOffset := d.decodeCount(data[entryOffset:], isSmall)
  139. keyLength := int(d.decodeUint16(data[entryOffset+offsetSize:]))
  140. // Key must start after value entry
  141. if keyOffset < headerSize {
  142. d.err = errors.Errorf("invalid key offset %d, must > %d", keyOffset, headerSize)
  143. return nil
  144. }
  145. if d.isDataShort(data, keyOffset+keyLength) {
  146. return nil
  147. }
  148. keys[i] = hack.String(data[keyOffset : keyOffset+keyLength])
  149. }
  150. }
  151. if d.err != nil {
  152. return nil
  153. }
  154. values := make([]interface{}, count)
  155. for i := 0; i < count; i++ {
  156. // decode value
  157. entryOffset := 2*offsetSize + valueEntrySize*i
  158. if isObject {
  159. entryOffset += keyEntrySize * count
  160. }
  161. tp := data[entryOffset]
  162. if isInlineValue(tp, isSmall) {
  163. values[i] = d.decodeValue(tp, data[entryOffset+1:entryOffset+valueEntrySize])
  164. continue
  165. }
  166. valueOffset := d.decodeCount(data[entryOffset+1:], isSmall)
  167. if d.isDataShort(data, valueOffset) {
  168. return nil
  169. }
  170. values[i] = d.decodeValue(tp, data[valueOffset:])
  171. }
  172. if d.err != nil {
  173. return nil
  174. }
  175. if !isObject {
  176. return values
  177. }
  178. m := make(map[string]interface{}, count)
  179. for i := 0; i < count; i++ {
  180. m[keys[i]] = values[i]
  181. }
  182. return m
  183. }
  184. func isInlineValue(tp byte, isSmall bool) bool {
  185. switch tp {
  186. case JSONB_INT16, JSONB_UINT16, JSONB_LITERAL:
  187. return true
  188. case JSONB_INT32, JSONB_UINT32:
  189. return !isSmall
  190. }
  191. return false
  192. }
  193. func (d *jsonBinaryDecoder) decodeLiteral(data []byte) interface{} {
  194. if d.isDataShort(data, 1) {
  195. return nil
  196. }
  197. tp := data[0]
  198. switch tp {
  199. case JSONB_NULL_LITERAL:
  200. return nil
  201. case JSONB_TRUE_LITERAL:
  202. return true
  203. case JSONB_FALSE_LITERAL:
  204. return false
  205. }
  206. d.err = errors.Errorf("invalid literal %c", tp)
  207. return nil
  208. }
  209. func (d *jsonBinaryDecoder) isDataShort(data []byte, expected int) bool {
  210. if d.err != nil {
  211. return true
  212. }
  213. if len(data) < expected {
  214. d.err = errors.Errorf("data len %d < expected %d", len(data), expected)
  215. }
  216. return d.err != nil
  217. }
  218. func (d *jsonBinaryDecoder) decodeInt16(data []byte) int16 {
  219. if d.isDataShort(data, 2) {
  220. return 0
  221. }
  222. v := ParseBinaryInt16(data[0:2])
  223. return v
  224. }
  225. func (d *jsonBinaryDecoder) decodeUint16(data []byte) uint16 {
  226. if d.isDataShort(data, 2) {
  227. return 0
  228. }
  229. v := ParseBinaryUint16(data[0:2])
  230. return v
  231. }
  232. func (d *jsonBinaryDecoder) decodeInt32(data []byte) int32 {
  233. if d.isDataShort(data, 4) {
  234. return 0
  235. }
  236. v := ParseBinaryInt32(data[0:4])
  237. return v
  238. }
  239. func (d *jsonBinaryDecoder) decodeUint32(data []byte) uint32 {
  240. if d.isDataShort(data, 4) {
  241. return 0
  242. }
  243. v := ParseBinaryUint32(data[0:4])
  244. return v
  245. }
  246. func (d *jsonBinaryDecoder) decodeInt64(data []byte) int64 {
  247. if d.isDataShort(data, 8) {
  248. return 0
  249. }
  250. v := ParseBinaryInt64(data[0:8])
  251. return v
  252. }
  253. func (d *jsonBinaryDecoder) decodeUint64(data []byte) uint64 {
  254. if d.isDataShort(data, 8) {
  255. return 0
  256. }
  257. v := ParseBinaryUint64(data[0:8])
  258. return v
  259. }
  260. func (d *jsonBinaryDecoder) decodeDouble(data []byte) float64 {
  261. if d.isDataShort(data, 8) {
  262. return 0
  263. }
  264. v := ParseBinaryFloat64(data[0:8])
  265. return v
  266. }
  267. func (d *jsonBinaryDecoder) decodeString(data []byte) string {
  268. if d.err != nil {
  269. return ""
  270. }
  271. l, n := d.decodeVariableLength(data)
  272. if d.isDataShort(data, l+n) {
  273. return ""
  274. }
  275. data = data[n:]
  276. v := hack.String(data[0:l])
  277. return v
  278. }
  279. func (d *jsonBinaryDecoder) decodeOpaque(data []byte) interface{} {
  280. if d.isDataShort(data, 1) {
  281. return nil
  282. }
  283. tp := data[0]
  284. data = data[1:]
  285. l, n := d.decodeVariableLength(data)
  286. if d.isDataShort(data, l+n) {
  287. return nil
  288. }
  289. data = data[n : l+n]
  290. switch tp {
  291. case MYSQL_TYPE_NEWDECIMAL:
  292. return d.decodeDecimal(data)
  293. case MYSQL_TYPE_TIME:
  294. return d.decodeTime(data)
  295. case MYSQL_TYPE_DATE, MYSQL_TYPE_DATETIME, MYSQL_TYPE_TIMESTAMP:
  296. return d.decodeDateTime(data)
  297. default:
  298. return hack.String(data)
  299. }
  300. return nil
  301. }
  302. func (d *jsonBinaryDecoder) decodeDecimal(data []byte) interface{} {
  303. precision := int(data[0])
  304. scale := int(data[1])
  305. v, _, err := decodeDecimal(data[2:], precision, scale, d.useDecimal)
  306. d.err = err
  307. return v
  308. }
  309. func (d *jsonBinaryDecoder) decodeTime(data []byte) interface{} {
  310. v := d.decodeInt64(data)
  311. if v == 0 {
  312. return "00:00:00"
  313. }
  314. sign := ""
  315. if v < 0 {
  316. sign = "-"
  317. v = -v
  318. }
  319. intPart := v >> 24
  320. hour := (intPart >> 12) % (1 << 10)
  321. min := (intPart >> 6) % (1 << 6)
  322. sec := intPart % (1 << 6)
  323. frac := v % (1 << 24)
  324. return fmt.Sprintf("%s%02d:%02d:%02d.%06d", sign, hour, min, sec, frac)
  325. }
  326. func (d *jsonBinaryDecoder) decodeDateTime(data []byte) interface{} {
  327. v := d.decodeInt64(data)
  328. if v == 0 {
  329. return "0000-00-00 00:00:00"
  330. }
  331. // handle negative?
  332. if v < 0 {
  333. v = -v
  334. }
  335. intPart := v >> 24
  336. ymd := intPart >> 17
  337. ym := ymd >> 5
  338. hms := intPart % (1 << 17)
  339. year := ym / 13
  340. month := ym % 13
  341. day := ymd % (1 << 5)
  342. hour := (hms >> 12)
  343. minute := (hms >> 6) % (1 << 6)
  344. second := hms % (1 << 6)
  345. frac := v % (1 << 24)
  346. return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d.%06d", year, month, day, hour, minute, second, frac)
  347. }
  348. func (d *jsonBinaryDecoder) decodeCount(data []byte, isSmall bool) int {
  349. if isSmall {
  350. v := d.decodeUint16(data)
  351. return int(v)
  352. }
  353. return int(d.decodeUint32(data))
  354. }
  355. func (d *jsonBinaryDecoder) decodeVariableLength(data []byte) (int, int) {
  356. // The max size for variable length is math.MaxUint32, so
  357. // here we can use 5 bytes to save it.
  358. maxCount := 5
  359. if len(data) < maxCount {
  360. maxCount = len(data)
  361. }
  362. pos := 0
  363. length := uint64(0)
  364. for ; pos < maxCount; pos++ {
  365. v := data[pos]
  366. length |= uint64(v&0x7F) << uint(7*pos)
  367. if v&0x80 == 0 {
  368. if length > math.MaxUint32 {
  369. d.err = errors.Errorf("variable length %d must <= %d", length, math.MaxUint32)
  370. return 0, 0
  371. }
  372. pos += 1
  373. // TODO: should consider length overflow int here.
  374. return int(length), pos
  375. }
  376. }
  377. d.err = errors.New("decode variable length failed")
  378. return 0, 0
  379. }