points.go 54 KB


  1. // Package models implements basic objects used throughout the TICK stack.
  2. package models // import "github.com/influxdata/influxdb/models"
  3. import (
  4. "bytes"
  5. "encoding/binary"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "math"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "unicode"
  15. "unicode/utf8"
  16. "github.com/influxdata/influxdb/pkg/escape"
  17. "github.com/influxdata/platform/models"
  18. )
  19. type escapeSet struct {
  20. k [1]byte
  21. esc [2]byte
  22. }
  23. var (
  24. measurementEscapeCodes = [...]escapeSet{
  25. {k: [1]byte{','}, esc: [2]byte{'\\', ','}},
  26. {k: [1]byte{' '}, esc: [2]byte{'\\', ' '}},
  27. }
  28. tagEscapeCodes = [...]escapeSet{
  29. {k: [1]byte{','}, esc: [2]byte{'\\', ','}},
  30. {k: [1]byte{' '}, esc: [2]byte{'\\', ' '}},
  31. {k: [1]byte{'='}, esc: [2]byte{'\\', '='}},
  32. }
  33. // ErrPointMustHaveAField is returned when operating on a point that does not have any fields.
  34. ErrPointMustHaveAField = errors.New("point without fields is unsupported")
  35. // ErrInvalidNumber is returned when a number is expected but not provided.
  36. ErrInvalidNumber = errors.New("invalid number")
  37. // ErrInvalidPoint is returned when a point cannot be parsed correctly.
  38. ErrInvalidPoint = errors.New("point is invalid")
  39. )
  40. const (
  41. // MaxKeyLength is the largest allowed size of the combined measurement and tag keys.
  42. MaxKeyLength = 65535
  43. )
  44. // enableUint64Support will enable uint64 support if set to true.
  45. var enableUint64Support = false
  46. // EnableUintSupport manually enables uint support for the point parser.
  47. // This function will be removed in the future and only exists for unit tests during the
  48. // transition.
  49. func EnableUintSupport() {
  50. enableUint64Support = true
  51. }
  52. // Point defines the values that will be written to the database.
  53. type Point interface {
  54. // Name return the measurement name for the point.
  55. Name() []byte
  56. // SetName updates the measurement name for the point.
  57. SetName(string)
  58. // Tags returns the tag set for the point.
  59. Tags() Tags
  60. // ForEachTag iterates over each tag invoking fn. If fn return false, iteration stops.
  61. ForEachTag(fn func(k, v []byte) bool)
  62. // AddTag adds or replaces a tag value for a point.
  63. AddTag(key, value string)
  64. // SetTags replaces the tags for the point.
  65. SetTags(tags Tags)
  66. // HasTag returns true if the tag exists for the point.
  67. HasTag(tag []byte) bool
  68. // Fields returns the fields for the point.
  69. Fields() (Fields, error)
  70. // Time return the timestamp for the point.
  71. Time() time.Time
  72. // SetTime updates the timestamp for the point.
  73. SetTime(t time.Time)
  74. // UnixNano returns the timestamp of the point as nanoseconds since Unix epoch.
  75. UnixNano() int64
  76. // HashID returns a non-cryptographic checksum of the point's key.
  77. HashID() uint64
  78. // Key returns the key (measurement joined with tags) of the point.
  79. Key() []byte
  80. // String returns a string representation of the point. If there is a
  81. // timestamp associated with the point then it will be specified with the default
  82. // precision of nanoseconds.
  83. String() string
  84. // MarshalBinary returns a binary representation of the point.
  85. MarshalBinary() ([]byte, error)
  86. // PrecisionString returns a string representation of the point. If there
  87. // is a timestamp associated with the point then it will be specified in the
  88. // given unit.
  89. PrecisionString(precision string) string
  90. // RoundedString returns a string representation of the point. If there
  91. // is a timestamp associated with the point, then it will be rounded to the
  92. // given duration.
  93. RoundedString(d time.Duration) string
  94. // Split will attempt to return multiple points with the same timestamp whose
  95. // string representations are no longer than size. Points with a single field or
  96. // a point without a timestamp may exceed the requested size.
  97. Split(size int) []Point
  98. // Round will round the timestamp of the point to the given duration.
  99. Round(d time.Duration)
  100. // StringSize returns the length of the string that would be returned by String().
  101. StringSize() int
  102. // AppendString appends the result of String() to the provided buffer and returns
  103. // the result, potentially reducing string allocations.
  104. AppendString(buf []byte) []byte
  105. // FieldIterator retuns a FieldIterator that can be used to traverse the
  106. // fields of a point without constructing the in-memory map.
  107. FieldIterator() FieldIterator
  108. }
  109. // FieldType represents the type of a field.
  110. type FieldType int
  111. const (
  112. // Integer indicates the field's type is integer.
  113. Integer FieldType = iota
  114. // Float indicates the field's type is float.
  115. Float
  116. // Boolean indicates the field's type is boolean.
  117. Boolean
  118. // String indicates the field's type is string.
  119. String
  120. // Empty is used to indicate that there is no field.
  121. Empty
  122. // Unsigned indicates the field's type is an unsigned integer.
  123. Unsigned
  124. )
  125. // FieldIterator provides a low-allocation interface to iterate through a point's fields.
  126. type FieldIterator interface {
  127. // Next indicates whether there any fields remaining.
  128. Next() bool
  129. // FieldKey returns the key of the current field.
  130. FieldKey() []byte
  131. // Type returns the FieldType of the current field.
  132. Type() FieldType
  133. // StringValue returns the string value of the current field.
  134. StringValue() string
  135. // IntegerValue returns the integer value of the current field.
  136. IntegerValue() (int64, error)
  137. // UnsignedValue returns the unsigned value of the current field.
  138. UnsignedValue() (uint64, error)
  139. // BooleanValue returns the boolean value of the current field.
  140. BooleanValue() (bool, error)
  141. // FloatValue returns the float value of the current field.
  142. FloatValue() (float64, error)
  143. // Reset resets the iterator to its initial state.
  144. Reset()
  145. }
  146. // Points represents a sortable list of points by timestamp.
  147. type Points []Point
  148. // Len implements sort.Interface.
  149. func (a Points) Len() int { return len(a) }
  150. // Less implements sort.Interface.
  151. func (a Points) Less(i, j int) bool { return a[i].Time().Before(a[j].Time()) }
  152. // Swap implements sort.Interface.
  153. func (a Points) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  154. // point is the default implementation of Point.
  155. type point struct {
  156. time time.Time
  157. // text encoding of measurement and tags
  158. // key must always be stored sorted by tags, if the original line was not sorted,
  159. // we need to resort it
  160. key []byte
  161. // text encoding of field data
  162. fields []byte
  163. // text encoding of timestamp
  164. ts []byte
  165. // cached version of parsed fields from data
  166. cachedFields map[string]interface{}
  167. // cached version of parsed name from key
  168. cachedName string
  169. // cached version of parsed tags
  170. cachedTags Tags
  171. it fieldIterator
  172. }
  173. // type assertions
  174. var (
  175. _ Point = (*point)(nil)
  176. _ FieldIterator = (*point)(nil)
  177. )
  178. const (
  179. // the number of characters for the largest possible int64 (9223372036854775807)
  180. maxInt64Digits = 19
  181. // the number of characters for the smallest possible int64 (-9223372036854775808)
  182. minInt64Digits = 20
  183. // the number of characters for the largest possible uint64 (18446744073709551615)
  184. maxUint64Digits = 20
  185. // the number of characters required for the largest float64 before a range check
  186. // would occur during parsing
  187. maxFloat64Digits = 25
  188. // the number of characters required for smallest float64 before a range check occur
  189. // would occur during parsing
  190. minFloat64Digits = 27
  191. )
  192. // ParsePoints returns a slice of Points from a text representation of a point
  193. // with each point separated by newlines. If any points fail to parse, a non-nil error
  194. // will be returned in addition to the points that parsed successfully.
  195. func ParsePoints(buf []byte) ([]Point, error) {
  196. return ParsePointsWithPrecision(buf, time.Now().UTC(), "n")
  197. }
  198. // ParsePointsString is identical to ParsePoints but accepts a string.
  199. func ParsePointsString(buf string) ([]Point, error) {
  200. return ParsePoints([]byte(buf))
  201. }
  202. // ParseKey returns the measurement name and tags from a point.
  203. //
  204. // NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf.
  205. // This can have the unintended effect preventing buf from being garbage collected.
  206. func ParseKey(buf []byte) (string, Tags) {
  207. name, tags := ParseKeyBytes(buf)
  208. return string(name), tags
  209. }
  210. func ParseKeyBytes(buf []byte) ([]byte, Tags) {
  211. return ParseKeyBytesWithTags(buf, nil)
  212. }
  213. func ParseKeyBytesWithTags(buf []byte, tags Tags) ([]byte, Tags) {
  214. // Ignore the error because scanMeasurement returns "missing fields" which we ignore
  215. // when just parsing a key
  216. state, i, _ := scanMeasurement(buf, 0)
  217. var name []byte
  218. if state == tagKeyState {
  219. tags = parseTags(buf, tags)
  220. // scanMeasurement returns the location of the comma if there are tags, strip that off
  221. name = buf[:i-1]
  222. } else {
  223. name = buf[:i]
  224. }
  225. return unescapeMeasurement(name), tags
  226. }
  227. func ParseTags(buf []byte) Tags {
  228. return parseTags(buf, nil)
  229. }
  230. func ParseName(buf []byte) []byte {
  231. // Ignore the error because scanMeasurement returns "missing fields" which we ignore
  232. // when just parsing a key
  233. state, i, _ := scanMeasurement(buf, 0)
  234. var name []byte
  235. if state == tagKeyState {
  236. name = buf[:i-1]
  237. } else {
  238. name = buf[:i]
  239. }
  240. return unescapeMeasurement(name)
  241. }
  242. // ParsePointsWithPrecision is similar to ParsePoints, but allows the
  243. // caller to provide a precision for time.
  244. //
  245. // NOTE: to minimize heap allocations, the returned Points will refer to subslices of buf.
  246. // This can have the unintended effect preventing buf from being garbage collected.
  247. func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) {
  248. points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1)
  249. var (
  250. pos int
  251. block []byte
  252. failed []string
  253. )
  254. for pos < len(buf) {
  255. pos, block = scanLine(buf, pos)
  256. pos++
  257. if len(block) == 0 {
  258. continue
  259. }
  260. // lines which start with '#' are comments
  261. start := skipWhitespace(block, 0)
  262. // If line is all whitespace, just skip it
  263. if start >= len(block) {
  264. continue
  265. }
  266. if block[start] == '#' {
  267. continue
  268. }
  269. // strip the newline if one is present
  270. if block[len(block)-1] == '\n' {
  271. block = block[:len(block)-1]
  272. }
  273. pt, err := parsePoint(block[start:], defaultTime, precision)
  274. if err != nil {
  275. failed = append(failed, fmt.Sprintf("unable to parse '%s': %v", string(block[start:]), err))
  276. } else {
  277. points = append(points, pt)
  278. }
  279. }
  280. if len(failed) > 0 {
  281. return points, fmt.Errorf("%s", strings.Join(failed, "\n"))
  282. }
  283. return points, nil
  284. }
  285. func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, error) {
  286. // scan the first block which is measurement[,tag1=value1,tag2=value=2...]
  287. pos, key, err := scanKey(buf, 0)
  288. if err != nil {
  289. return nil, err
  290. }
  291. // measurement name is required
  292. if len(key) == 0 {
  293. return nil, fmt.Errorf("missing measurement")
  294. }
  295. if len(key) > MaxKeyLength {
  296. return nil, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength)
  297. }
  298. // scan the second block is which is field1=value1[,field2=value2,...]
  299. pos, fields, err := scanFields(buf, pos)
  300. if err != nil {
  301. return nil, err
  302. }
  303. // at least one field is required
  304. if len(fields) == 0 {
  305. return nil, fmt.Errorf("missing fields")
  306. }
  307. var maxKeyErr error
  308. err = walkFields(fields, func(k, v []byte) bool {
  309. if sz := seriesKeySize(key, k); sz > MaxKeyLength {
  310. maxKeyErr = fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength)
  311. return false
  312. }
  313. return true
  314. })
  315. if err != nil {
  316. return nil, err
  317. }
  318. if maxKeyErr != nil {
  319. return nil, maxKeyErr
  320. }
  321. // scan the last block which is an optional integer timestamp
  322. pos, ts, err := scanTime(buf, pos)
  323. if err != nil {
  324. return nil, err
  325. }
  326. pt := &point{
  327. key: key,
  328. fields: fields,
  329. ts: ts,
  330. }
  331. if len(ts) == 0 {
  332. pt.time = defaultTime
  333. pt.SetPrecision(precision)
  334. } else {
  335. ts, err := parseIntBytes(ts, 10, 64)
  336. if err != nil {
  337. return nil, err
  338. }
  339. pt.time, err = SafeCalcTime(ts, precision)
  340. if err != nil {
  341. return nil, err
  342. }
  343. // Determine if there are illegal non-whitespace characters after the
  344. // timestamp block.
  345. for pos < len(buf) {
  346. if buf[pos] != ' ' {
  347. return nil, ErrInvalidPoint
  348. }
  349. pos++
  350. }
  351. }
  352. return pt, nil
  353. }
  354. // GetPrecisionMultiplier will return a multiplier for the precision specified.
  355. func GetPrecisionMultiplier(precision string) int64 {
  356. d := time.Nanosecond
  357. switch precision {
  358. case "u":
  359. d = time.Microsecond
  360. case "ms":
  361. d = time.Millisecond
  362. case "s":
  363. d = time.Second
  364. case "m":
  365. d = time.Minute
  366. case "h":
  367. d = time.Hour
  368. }
  369. return int64(d)
  370. }
  371. // scanKey scans buf starting at i for the measurement and tag portion of the point.
  372. // It returns the ending position and the byte slice of key within buf. If there
  373. // are tags, they will be sorted if they are not already.
  374. func scanKey(buf []byte, i int) (int, []byte, error) {
  375. start := skipWhitespace(buf, i)
  376. i = start
  377. // Determines whether the tags are sort, assume they are
  378. sorted := true
  379. // indices holds the indexes within buf of the start of each tag. For example,
  380. // a buf of 'cpu,host=a,region=b,zone=c' would have indices slice of [4,11,20]
  381. // which indicates that the first tag starts at buf[4], seconds at buf[11], and
  382. // last at buf[20]
  383. indices := make([]int, 100)
  384. // tracks how many commas we've seen so we know how many values are indices.
  385. // Since indices is an arbitrarily large slice,
  386. // we need to know how many values in the buffer are in use.
  387. commas := 0
  388. // First scan the Point's measurement.
  389. state, i, err := scanMeasurement(buf, i)
  390. if err != nil {
  391. return i, buf[start:i], err
  392. }
  393. // Optionally scan tags if needed.
  394. if state == tagKeyState {
  395. i, commas, indices, err = scanTags(buf, i, indices)
  396. if err != nil {
  397. return i, buf[start:i], err
  398. }
  399. }
  400. // Now we know where the key region is within buf, and the location of tags, we
  401. // need to determine if duplicate tags exist and if the tags are sorted. This iterates
  402. // over the list comparing each tag in the sequence with each other.
  403. for j := 0; j < commas-1; j++ {
  404. // get the left and right tags
  405. _, left := scanTo(buf[indices[j]:indices[j+1]-1], 0, '=')
  406. _, right := scanTo(buf[indices[j+1]:indices[j+2]-1], 0, '=')
  407. // If left is greater than right, the tags are not sorted. We do not have to
  408. // continue because the short path no longer works.
  409. // If the tags are equal, then there are duplicate tags, and we should abort.
  410. // If the tags are not sorted, this pass may not find duplicate tags and we
  411. // need to do a more exhaustive search later.
  412. if cmp := bytes.Compare(left, right); cmp > 0 {
  413. sorted = false
  414. break
  415. } else if cmp == 0 {
  416. return i, buf[start:i], fmt.Errorf("duplicate tags")
  417. }
  418. }
  419. // If the tags are not sorted, then sort them. This sort is inline and
  420. // uses the tag indices we created earlier. The actual buffer is not sorted, the
  421. // indices are using the buffer for value comparison. After the indices are sorted,
  422. // the buffer is reconstructed from the sorted indices.
  423. if !sorted && commas > 0 {
  424. // Get the measurement name for later
  425. measurement := buf[start : indices[0]-1]
  426. // Sort the indices
  427. indices := indices[:commas]
  428. insertionSort(0, commas, buf, indices)
  429. // Create a new key using the measurement and sorted indices
  430. b := make([]byte, len(buf[start:i]))
  431. pos := copy(b, measurement)
  432. for _, i := range indices {
  433. b[pos] = ','
  434. pos++
  435. _, v := scanToSpaceOr(buf, i, ',')
  436. pos += copy(b[pos:], v)
  437. }
  438. // Check again for duplicate tags now that the tags are sorted.
  439. for j := 0; j < commas-1; j++ {
  440. // get the left and right tags
  441. _, left := scanTo(buf[indices[j]:], 0, '=')
  442. _, right := scanTo(buf[indices[j+1]:], 0, '=')
  443. // If the tags are equal, then there are duplicate tags, and we should abort.
  444. // If the tags are not sorted, this pass may not find duplicate tags and we
  445. // need to do a more exhaustive search later.
  446. if bytes.Equal(left, right) {
  447. return i, b, fmt.Errorf("duplicate tags")
  448. }
  449. }
  450. return i, b, nil
  451. }
  452. return i, buf[start:i], nil
  453. }
  454. // The following constants allow us to specify which state to move to
  455. // next, when scanning sections of a Point.
  456. const (
  457. tagKeyState = iota
  458. tagValueState
  459. fieldsState
  460. )
  461. // scanMeasurement examines the measurement part of a Point, returning
  462. // the next state to move to, and the current location in the buffer.
  463. func scanMeasurement(buf []byte, i int) (int, int, error) {
  464. // Check first byte of measurement, anything except a comma is fine.
  465. // It can't be a space, since whitespace is stripped prior to this
  466. // function call.
  467. if i >= len(buf) || buf[i] == ',' {
  468. return -1, i, fmt.Errorf("missing measurement")
  469. }
  470. for {
  471. i++
  472. if i >= len(buf) {
  473. // cpu
  474. return -1, i, fmt.Errorf("missing fields")
  475. }
  476. if buf[i-1] == '\\' {
  477. // Skip character (it's escaped).
  478. continue
  479. }
  480. // Unescaped comma; move onto scanning the tags.
  481. if buf[i] == ',' {
  482. return tagKeyState, i + 1, nil
  483. }
  484. // Unescaped space; move onto scanning the fields.
  485. if buf[i] == ' ' {
  486. // cpu value=1.0
  487. return fieldsState, i, nil
  488. }
  489. }
  490. }
  491. // scanTags examines all the tags in a Point, keeping track of and
  492. // returning the updated indices slice, number of commas and location
  493. // in buf where to start examining the Point fields.
  494. func scanTags(buf []byte, i int, indices []int) (int, int, []int, error) {
  495. var (
  496. err error
  497. commas int
  498. state = tagKeyState
  499. )
  500. for {
  501. switch state {
  502. case tagKeyState:
  503. // Grow our indices slice if we have too many tags.
  504. if commas >= len(indices) {
  505. newIndics := make([]int, cap(indices)*2)
  506. copy(newIndics, indices)
  507. indices = newIndics
  508. }
  509. indices[commas] = i
  510. commas++
  511. i, err = scanTagsKey(buf, i)
  512. state = tagValueState // tag value always follows a tag key
  513. case tagValueState:
  514. state, i, err = scanTagsValue(buf, i)
  515. case fieldsState:
  516. indices[commas] = i + 1
  517. return i, commas, indices, nil
  518. }
  519. if err != nil {
  520. return i, commas, indices, err
  521. }
  522. }
  523. }
  524. // scanTagsKey scans each character in a tag key.
  525. func scanTagsKey(buf []byte, i int) (int, error) {
  526. // First character of the key.
  527. if i >= len(buf) || buf[i] == ' ' || buf[i] == ',' || buf[i] == '=' {
  528. // cpu,{'', ' ', ',', '='}
  529. return i, fmt.Errorf("missing tag key")
  530. }
  531. // Examine each character in the tag key until we hit an unescaped
  532. // equals (the tag value), or we hit an error (i.e., unescaped
  533. // space or comma).
  534. for {
  535. i++
  536. // Either we reached the end of the buffer or we hit an
  537. // unescaped comma or space.
  538. if i >= len(buf) ||
  539. ((buf[i] == ' ' || buf[i] == ',') && buf[i-1] != '\\') {
  540. // cpu,tag{'', ' ', ','}
  541. return i, fmt.Errorf("missing tag value")
  542. }
  543. if buf[i] == '=' && buf[i-1] != '\\' {
  544. // cpu,tag=
  545. return i + 1, nil
  546. }
  547. }
  548. }
  549. // scanTagsValue scans each character in a tag value.
  550. func scanTagsValue(buf []byte, i int) (int, int, error) {
  551. // Tag value cannot be empty.
  552. if i >= len(buf) || buf[i] == ',' || buf[i] == ' ' {
  553. // cpu,tag={',', ' '}
  554. return -1, i, fmt.Errorf("missing tag value")
  555. }
  556. // Examine each character in the tag value until we hit an unescaped
  557. // comma (move onto next tag key), an unescaped space (move onto
  558. // fields), or we error out.
  559. for {
  560. i++
  561. if i >= len(buf) {
  562. // cpu,tag=value
  563. return -1, i, fmt.Errorf("missing fields")
  564. }
  565. // An unescaped equals sign is an invalid tag value.
  566. if buf[i] == '=' && buf[i-1] != '\\' {
  567. // cpu,tag={'=', 'fo=o'}
  568. return -1, i, fmt.Errorf("invalid tag format")
  569. }
  570. if buf[i] == ',' && buf[i-1] != '\\' {
  571. // cpu,tag=foo,
  572. return tagKeyState, i + 1, nil
  573. }
  574. // cpu,tag=foo value=1.0
  575. // cpu, tag=foo\= value=1.0
  576. if buf[i] == ' ' && buf[i-1] != '\\' {
  577. return fieldsState, i, nil
  578. }
  579. }
  580. }
  581. func insertionSort(l, r int, buf []byte, indices []int) {
  582. for i := l + 1; i < r; i++ {
  583. for j := i; j > l && less(buf, indices, j, j-1); j-- {
  584. indices[j], indices[j-1] = indices[j-1], indices[j]
  585. }
  586. }
  587. }
  588. func less(buf []byte, indices []int, i, j int) bool {
  589. // This grabs the tag names for i & j, it ignores the values
  590. _, a := scanTo(buf, indices[i], '=')
  591. _, b := scanTo(buf, indices[j], '=')
  592. return bytes.Compare(a, b) < 0
  593. }
  594. // scanFields scans buf, starting at i for the fields section of a point. It returns
  595. // the ending position and the byte slice of the fields within buf.
  596. func scanFields(buf []byte, i int) (int, []byte, error) {
  597. start := skipWhitespace(buf, i)
  598. i = start
  599. quoted := false
  600. // tracks how many '=' we've seen
  601. equals := 0
  602. // tracks how many commas we've seen
  603. commas := 0
  604. for {
  605. // reached the end of buf?
  606. if i >= len(buf) {
  607. break
  608. }
  609. // escaped characters?
  610. if buf[i] == '\\' && i+1 < len(buf) {
  611. i += 2
  612. continue
  613. }
  614. // If the value is quoted, scan until we get to the end quote
  615. // Only quote values in the field value since quotes are not significant
  616. // in the field key
  617. if buf[i] == '"' && equals > commas {
  618. quoted = !quoted
  619. i++
  620. continue
  621. }
  622. // If we see an =, ensure that there is at least on char before and after it
  623. if buf[i] == '=' && !quoted {
  624. equals++
  625. // check for "... =123" but allow "a\ =123"
  626. if buf[i-1] == ' ' && buf[i-2] != '\\' {
  627. return i, buf[start:i], fmt.Errorf("missing field key")
  628. }
  629. // check for "...a=123,=456" but allow "a=123,a\,=456"
  630. if buf[i-1] == ',' && buf[i-2] != '\\' {
  631. return i, buf[start:i], fmt.Errorf("missing field key")
  632. }
  633. // check for "... value="
  634. if i+1 >= len(buf) {
  635. return i, buf[start:i], fmt.Errorf("missing field value")
  636. }
  637. // check for "... value=,value2=..."
  638. if buf[i+1] == ',' || buf[i+1] == ' ' {
  639. return i, buf[start:i], fmt.Errorf("missing field value")
  640. }
  641. if isNumeric(buf[i+1]) || buf[i+1] == '-' || buf[i+1] == 'N' || buf[i+1] == 'n' {
  642. var err error
  643. i, err = scanNumber(buf, i+1)
  644. if err != nil {
  645. return i, buf[start:i], err
  646. }
  647. continue
  648. }
  649. // If next byte is not a double-quote, the value must be a boolean
  650. if buf[i+1] != '"' {
  651. var err error
  652. i, _, err = scanBoolean(buf, i+1)
  653. if err != nil {
  654. return i, buf[start:i], err
  655. }
  656. continue
  657. }
  658. }
  659. if buf[i] == ',' && !quoted {
  660. commas++
  661. }
  662. // reached end of block?
  663. if buf[i] == ' ' && !quoted {
  664. break
  665. }
  666. i++
  667. }
  668. if quoted {
  669. return i, buf[start:i], fmt.Errorf("unbalanced quotes")
  670. }
  671. // check that all field sections had key and values (e.g. prevent "a=1,b"
  672. if equals == 0 || commas != equals-1 {
  673. return i, buf[start:i], fmt.Errorf("invalid field format")
  674. }
  675. return i, buf[start:i], nil
  676. }
  677. // scanTime scans buf, starting at i for the time section of a point. It
  678. // returns the ending position and the byte slice of the timestamp within buf
  679. // and and error if the timestamp is not in the correct numeric format.
  680. func scanTime(buf []byte, i int) (int, []byte, error) {
  681. start := skipWhitespace(buf, i)
  682. i = start
  683. for {
  684. // reached the end of buf?
  685. if i >= len(buf) {
  686. break
  687. }
  688. // Reached end of block or trailing whitespace?
  689. if buf[i] == '\n' || buf[i] == ' ' {
  690. break
  691. }
  692. // Handle negative timestamps
  693. if i == start && buf[i] == '-' {
  694. i++
  695. continue
  696. }
  697. // Timestamps should be integers, make sure they are so we don't need
  698. // to actually parse the timestamp until needed.
  699. if buf[i] < '0' || buf[i] > '9' {
  700. return i, buf[start:i], fmt.Errorf("bad timestamp")
  701. }
  702. i++
  703. }
  704. return i, buf[start:i], nil
  705. }
  706. func isNumeric(b byte) bool {
  707. return (b >= '0' && b <= '9') || b == '.'
  708. }
  709. // scanNumber returns the end position within buf, start at i after
  710. // scanning over buf for an integer, or float. It returns an
  711. // error if a invalid number is scanned.
  712. func scanNumber(buf []byte, i int) (int, error) {
  713. start := i
  714. var isInt, isUnsigned bool
  715. // Is negative number?
  716. if i < len(buf) && buf[i] == '-' {
  717. i++
  718. // There must be more characters now, as just '-' is illegal.
  719. if i == len(buf) {
  720. return i, ErrInvalidNumber
  721. }
  722. }
  723. // how many decimal points we've see
  724. decimal := false
  725. // indicates the number is float in scientific notation
  726. scientific := false
  727. for {
  728. if i >= len(buf) {
  729. break
  730. }
  731. if buf[i] == ',' || buf[i] == ' ' {
  732. break
  733. }
  734. if buf[i] == 'i' && i > start && !(isInt || isUnsigned) {
  735. isInt = true
  736. i++
  737. continue
  738. } else if buf[i] == 'u' && i > start && !(isInt || isUnsigned) {
  739. isUnsigned = true
  740. i++
  741. continue
  742. }
  743. if buf[i] == '.' {
  744. // Can't have more than 1 decimal (e.g. 1.1.1 should fail)
  745. if decimal {
  746. return i, ErrInvalidNumber
  747. }
  748. decimal = true
  749. }
  750. // `e` is valid for floats but not as the first char
  751. if i > start && (buf[i] == 'e' || buf[i] == 'E') {
  752. scientific = true
  753. i++
  754. continue
  755. }
  756. // + and - are only valid at this point if they follow an e (scientific notation)
  757. if (buf[i] == '+' || buf[i] == '-') && (buf[i-1] == 'e' || buf[i-1] == 'E') {
  758. i++
  759. continue
  760. }
  761. // NaN is an unsupported value
  762. if i+2 < len(buf) && (buf[i] == 'N' || buf[i] == 'n') {
  763. return i, ErrInvalidNumber
  764. }
  765. if !isNumeric(buf[i]) {
  766. return i, ErrInvalidNumber
  767. }
  768. i++
  769. }
  770. if (isInt || isUnsigned) && (decimal || scientific) {
  771. return i, ErrInvalidNumber
  772. }
  773. numericDigits := i - start
  774. if isInt {
  775. numericDigits--
  776. }
  777. if decimal {
  778. numericDigits--
  779. }
  780. if buf[start] == '-' {
  781. numericDigits--
  782. }
  783. if numericDigits == 0 {
  784. return i, ErrInvalidNumber
  785. }
  786. // It's more common that numbers will be within min/max range for their type but we need to prevent
  787. // out or range numbers from being parsed successfully. This uses some simple heuristics to decide
  788. // if we should parse the number to the actual type. It does not do it all the time because it incurs
  789. // extra allocations and we end up converting the type again when writing points to disk.
  790. if isInt {
  791. // Make sure the last char is an 'i' for integers (e.g. 9i10 is not valid)
  792. if buf[i-1] != 'i' {
  793. return i, ErrInvalidNumber
  794. }
  795. // Parse the int to check bounds the number of digits could be larger than the max range
  796. // We subtract 1 from the index to remove the `i` from our tests
  797. if len(buf[start:i-1]) >= maxInt64Digits || len(buf[start:i-1]) >= minInt64Digits {
  798. if _, err := parseIntBytes(buf[start:i-1], 10, 64); err != nil {
  799. return i, fmt.Errorf("unable to parse integer %s: %s", buf[start:i-1], err)
  800. }
  801. }
  802. } else if isUnsigned {
  803. // Return an error if uint64 support has not been enabled.
  804. if !enableUint64Support {
  805. return i, ErrInvalidNumber
  806. }
  807. // Make sure the last char is a 'u' for unsigned
  808. if buf[i-1] != 'u' {
  809. return i, ErrInvalidNumber
  810. }
  811. // Make sure the first char is not a '-' for unsigned
  812. if buf[start] == '-' {
  813. return i, ErrInvalidNumber
  814. }
  815. // Parse the uint to check bounds the number of digits could be larger than the max range
  816. // We subtract 1 from the index to remove the `u` from our tests
  817. if len(buf[start:i-1]) >= maxUint64Digits {
  818. if _, err := parseUintBytes(buf[start:i-1], 10, 64); err != nil {
  819. return i, fmt.Errorf("unable to parse unsigned %s: %s", buf[start:i-1], err)
  820. }
  821. }
  822. } else {
  823. // Parse the float to check bounds if it's scientific or the number of digits could be larger than the max range
  824. if scientific || len(buf[start:i]) >= maxFloat64Digits || len(buf[start:i]) >= minFloat64Digits {
  825. if _, err := parseFloatBytes(buf[start:i], 10); err != nil {
  826. return i, fmt.Errorf("invalid float")
  827. }
  828. }
  829. }
  830. return i, nil
  831. }
  832. // scanBoolean returns the end position within buf, start at i after
  833. // scanning over buf for boolean. Valid values for a boolean are
  834. // t, T, true, TRUE, f, F, false, FALSE. It returns an error if a invalid boolean
  835. // is scanned.
  836. func scanBoolean(buf []byte, i int) (int, []byte, error) {
  837. start := i
  838. if i < len(buf) && (buf[i] != 't' && buf[i] != 'f' && buf[i] != 'T' && buf[i] != 'F') {
  839. return i, buf[start:i], fmt.Errorf("invalid boolean")
  840. }
  841. i++
  842. for {
  843. if i >= len(buf) {
  844. break
  845. }
  846. if buf[i] == ',' || buf[i] == ' ' {
  847. break
  848. }
  849. i++
  850. }
  851. // Single char bool (t, T, f, F) is ok
  852. if i-start == 1 {
  853. return i, buf[start:i], nil
  854. }
  855. // length must be 4 for true or TRUE
  856. if (buf[start] == 't' || buf[start] == 'T') && i-start != 4 {
  857. return i, buf[start:i], fmt.Errorf("invalid boolean")
  858. }
  859. // length must be 5 for false or FALSE
  860. if (buf[start] == 'f' || buf[start] == 'F') && i-start != 5 {
  861. return i, buf[start:i], fmt.Errorf("invalid boolean")
  862. }
  863. // Otherwise
  864. valid := false
  865. switch buf[start] {
  866. case 't':
  867. valid = bytes.Equal(buf[start:i], []byte("true"))
  868. case 'f':
  869. valid = bytes.Equal(buf[start:i], []byte("false"))
  870. case 'T':
  871. valid = bytes.Equal(buf[start:i], []byte("TRUE")) || bytes.Equal(buf[start:i], []byte("True"))
  872. case 'F':
  873. valid = bytes.Equal(buf[start:i], []byte("FALSE")) || bytes.Equal(buf[start:i], []byte("False"))
  874. }
  875. if !valid {
  876. return i, buf[start:i], fmt.Errorf("invalid boolean")
  877. }
  878. return i, buf[start:i], nil
  879. }
  880. // skipWhitespace returns the end position within buf, starting at i after
  881. // scanning over spaces in tags.
  882. func skipWhitespace(buf []byte, i int) int {
  883. for i < len(buf) {
  884. if buf[i] != ' ' && buf[i] != '\t' && buf[i] != 0 {
  885. break
  886. }
  887. i++
  888. }
  889. return i
  890. }
  891. // scanLine returns the end position in buf and the next line found within
  892. // buf.
  893. func scanLine(buf []byte, i int) (int, []byte) {
  894. start := i
  895. quoted := false
  896. fields := false
  897. // tracks how many '=' and commas we've seen
  898. // this duplicates some of the functionality in scanFields
  899. equals := 0
  900. commas := 0
  901. for {
  902. // reached the end of buf?
  903. if i >= len(buf) {
  904. break
  905. }
  906. // skip past escaped characters
  907. if buf[i] == '\\' && i+2 < len(buf) {
  908. i += 2
  909. continue
  910. }
  911. if buf[i] == ' ' {
  912. fields = true
  913. }
  914. // If we see a double quote, makes sure it is not escaped
  915. if fields {
  916. if !quoted && buf[i] == '=' {
  917. i++
  918. equals++
  919. continue
  920. } else if !quoted && buf[i] == ',' {
  921. i++
  922. commas++
  923. continue
  924. } else if buf[i] == '"' && equals > commas {
  925. i++
  926. quoted = !quoted
  927. continue
  928. }
  929. }
  930. if buf[i] == '\n' && !quoted {
  931. break
  932. }
  933. i++
  934. }
  935. return i, buf[start:i]
  936. }
  937. // scanTo returns the end position in buf and the next consecutive block
  938. // of bytes, starting from i and ending with stop byte, where stop byte
  939. // has not been escaped.
  940. //
  941. // If there are leading spaces, they are skipped.
  942. func scanTo(buf []byte, i int, stop byte) (int, []byte) {
  943. start := i
  944. for {
  945. // reached the end of buf?
  946. if i >= len(buf) {
  947. break
  948. }
  949. // Reached unescaped stop value?
  950. if buf[i] == stop && (i == 0 || buf[i-1] != '\\') {
  951. break
  952. }
  953. i++
  954. }
  955. return i, buf[start:i]
  956. }
  957. // scanTo returns the end position in buf and the next consecutive block
  958. // of bytes, starting from i and ending with stop byte. If there are leading
  959. // spaces, they are skipped.
  960. func scanToSpaceOr(buf []byte, i int, stop byte) (int, []byte) {
  961. start := i
  962. if buf[i] == stop || buf[i] == ' ' {
  963. return i, buf[start:i]
  964. }
  965. for {
  966. i++
  967. if buf[i-1] == '\\' {
  968. continue
  969. }
  970. // reached the end of buf?
  971. if i >= len(buf) {
  972. return i, buf[start:i]
  973. }
  974. // reached end of block?
  975. if buf[i] == stop || buf[i] == ' ' {
  976. return i, buf[start:i]
  977. }
  978. }
  979. }
  980. func scanTagValue(buf []byte, i int) (int, []byte) {
  981. start := i
  982. for {
  983. if i >= len(buf) {
  984. break
  985. }
  986. if buf[i] == ',' && buf[i-1] != '\\' {
  987. break
  988. }
  989. i++
  990. }
  991. if i > len(buf) {
  992. return i, nil
  993. }
  994. return i, buf[start:i]
  995. }
  996. func scanFieldValue(buf []byte, i int) (int, []byte) {
  997. start := i
  998. quoted := false
  999. for i < len(buf) {
  1000. // Only escape char for a field value is a double-quote and backslash
  1001. if buf[i] == '\\' && i+1 < len(buf) && (buf[i+1] == '"' || buf[i+1] == '\\') {
  1002. i += 2
  1003. continue
  1004. }
  1005. // Quoted value? (e.g. string)
  1006. if buf[i] == '"' {
  1007. i++
  1008. quoted = !quoted
  1009. continue
  1010. }
  1011. if buf[i] == ',' && !quoted {
  1012. break
  1013. }
  1014. i++
  1015. }
  1016. return i, buf[start:i]
  1017. }
  1018. func EscapeMeasurement(in []byte) []byte {
  1019. for _, c := range measurementEscapeCodes {
  1020. if bytes.IndexByte(in, c.k[0]) != -1 {
  1021. in = bytes.Replace(in, c.k[:], c.esc[:], -1)
  1022. }
  1023. }
  1024. return in
  1025. }
  1026. func unescapeMeasurement(in []byte) []byte {
  1027. if bytes.IndexByte(in, '\\') == -1 {
  1028. return in
  1029. }
  1030. for i := range measurementEscapeCodes {
  1031. c := &measurementEscapeCodes[i]
  1032. if bytes.IndexByte(in, c.k[0]) != -1 {
  1033. in = bytes.Replace(in, c.esc[:], c.k[:], -1)
  1034. }
  1035. }
  1036. return in
  1037. }
  1038. func escapeTag(in []byte) []byte {
  1039. for i := range tagEscapeCodes {
  1040. c := &tagEscapeCodes[i]
  1041. if bytes.IndexByte(in, c.k[0]) != -1 {
  1042. in = bytes.Replace(in, c.k[:], c.esc[:], -1)
  1043. }
  1044. }
  1045. return in
  1046. }
  1047. func unescapeTag(in []byte) []byte {
  1048. if bytes.IndexByte(in, '\\') == -1 {
  1049. return in
  1050. }
  1051. for i := range tagEscapeCodes {
  1052. c := &tagEscapeCodes[i]
  1053. if bytes.IndexByte(in, c.k[0]) != -1 {
  1054. in = bytes.Replace(in, c.esc[:], c.k[:], -1)
  1055. }
  1056. }
  1057. return in
  1058. }
  1059. // escapeStringFieldReplacer replaces double quotes and backslashes
  1060. // with the same character preceded by a backslash.
  1061. // As of Go 1.7 this benchmarked better in allocations and CPU time
  1062. // compared to iterating through a string byte-by-byte and appending to a new byte slice,
  1063. // calling strings.Replace twice, and better than (*Regex).ReplaceAllString.
  1064. var escapeStringFieldReplacer = strings.NewReplacer(`"`, `\"`, `\`, `\\`)
  1065. // EscapeStringField returns a copy of in with any double quotes or
  1066. // backslashes with escaped values.
  1067. func EscapeStringField(in string) string {
  1068. return escapeStringFieldReplacer.Replace(in)
  1069. }
  1070. // unescapeStringField returns a copy of in with any escaped double-quotes
  1071. // or backslashes unescaped.
  1072. func unescapeStringField(in string) string {
  1073. if strings.IndexByte(in, '\\') == -1 {
  1074. return in
  1075. }
  1076. var out []byte
  1077. i := 0
  1078. for {
  1079. if i >= len(in) {
  1080. break
  1081. }
  1082. // unescape backslashes
  1083. if in[i] == '\\' && i+1 < len(in) && in[i+1] == '\\' {
  1084. out = append(out, '\\')
  1085. i += 2
  1086. continue
  1087. }
  1088. // unescape double-quotes
  1089. if in[i] == '\\' && i+1 < len(in) && in[i+1] == '"' {
  1090. out = append(out, '"')
  1091. i += 2
  1092. continue
  1093. }
  1094. out = append(out, in[i])
  1095. i++
  1096. }
  1097. return string(out)
  1098. }
  1099. // NewPoint returns a new point with the given measurement name, tags, fields and timestamp. If
  1100. // an unsupported field value (NaN, or +/-Inf) or out of range time is passed, this function
  1101. // returns an error.
  1102. func NewPoint(name string, tags Tags, fields Fields, t time.Time) (Point, error) {
  1103. key, err := pointKey(name, tags, fields, t)
  1104. if err != nil {
  1105. return nil, err
  1106. }
  1107. return &point{
  1108. key: key,
  1109. time: t,
  1110. fields: fields.MarshalBinary(),
  1111. }, nil
  1112. }
  1113. // pointKey checks some basic requirements for valid points, and returns the
  1114. // key, along with an possible error.
  1115. func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte, error) {
  1116. if len(fields) == 0 {
  1117. return nil, ErrPointMustHaveAField
  1118. }
  1119. if !t.IsZero() {
  1120. if err := CheckTime(t); err != nil {
  1121. return nil, err
  1122. }
  1123. }
  1124. for key, value := range fields {
  1125. switch value := value.(type) {
  1126. case float64:
  1127. // Ensure the caller validates and handles invalid field values
  1128. if math.IsInf(value, 0) {
  1129. return nil, fmt.Errorf("+/-Inf is an unsupported value for field %s", key)
  1130. }
  1131. if math.IsNaN(value) {
  1132. return nil, fmt.Errorf("NaN is an unsupported value for field %s", key)
  1133. }
  1134. case float32:
  1135. // Ensure the caller validates and handles invalid field values
  1136. if math.IsInf(float64(value), 0) {
  1137. return nil, fmt.Errorf("+/-Inf is an unsupported value for field %s", key)
  1138. }
  1139. if math.IsNaN(float64(value)) {
  1140. return nil, fmt.Errorf("NaN is an unsupported value for field %s", key)
  1141. }
  1142. }
  1143. if len(key) == 0 {
  1144. return nil, fmt.Errorf("all fields must have non-empty names")
  1145. }
  1146. }
  1147. key := MakeKey([]byte(measurement), tags)
  1148. for field := range fields {
  1149. sz := seriesKeySize(key, []byte(field))
  1150. if sz > MaxKeyLength {
  1151. return nil, fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength)
  1152. }
  1153. }
  1154. return key, nil
  1155. }
  1156. func seriesKeySize(key, field []byte) int {
  1157. // 4 is the length of the tsm1.fieldKeySeparator constant. It's inlined here to avoid a circular
  1158. // dependency.
  1159. return len(key) + 4 + len(field)
  1160. }
  1161. // NewPointFromBytes returns a new Point from a marshalled Point.
  1162. func NewPointFromBytes(b []byte) (Point, error) {
  1163. p := &point{}
  1164. if err := p.UnmarshalBinary(b); err != nil {
  1165. return nil, err
  1166. }
  1167. // This does some basic validation to ensure there are fields and they
  1168. // can be unmarshalled as well.
  1169. iter := p.FieldIterator()
  1170. var hasField bool
  1171. for iter.Next() {
  1172. if len(iter.FieldKey()) == 0 {
  1173. continue
  1174. }
  1175. hasField = true
  1176. switch iter.Type() {
  1177. case Float:
  1178. _, err := iter.FloatValue()
  1179. if err != nil {
  1180. return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
  1181. }
  1182. case Integer:
  1183. _, err := iter.IntegerValue()
  1184. if err != nil {
  1185. return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
  1186. }
  1187. case Unsigned:
  1188. _, err := iter.UnsignedValue()
  1189. if err != nil {
  1190. return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
  1191. }
  1192. case String:
  1193. // Skip since this won't return an error
  1194. case Boolean:
  1195. _, err := iter.BooleanValue()
  1196. if err != nil {
  1197. return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
  1198. }
  1199. }
  1200. }
  1201. if !hasField {
  1202. return nil, ErrPointMustHaveAField
  1203. }
  1204. return p, nil
  1205. }
  1206. // MustNewPoint returns a new point with the given measurement name, tags, fields and timestamp. If
  1207. // an unsupported field value (NaN) is passed, this function panics.
  1208. func MustNewPoint(name string, tags Tags, fields Fields, time time.Time) Point {
  1209. pt, err := NewPoint(name, tags, fields, time)
  1210. if err != nil {
  1211. panic(err.Error())
  1212. }
  1213. return pt
  1214. }
  1215. // Key returns the key (measurement joined with tags) of the point.
  1216. func (p *point) Key() []byte {
  1217. return p.key
  1218. }
  1219. func (p *point) name() []byte {
  1220. _, name := scanTo(p.key, 0, ',')
  1221. return name
  1222. }
  1223. func (p *point) Name() []byte {
  1224. return escape.Unescape(p.name())
  1225. }
  1226. // SetName updates the measurement name for the point.
  1227. func (p *point) SetName(name string) {
  1228. p.cachedName = ""
  1229. p.key = MakeKey([]byte(name), p.Tags())
  1230. }
  1231. // Time return the timestamp for the point.
  1232. func (p *point) Time() time.Time {
  1233. return p.time
  1234. }
  1235. // SetTime updates the timestamp for the point.
  1236. func (p *point) SetTime(t time.Time) {
  1237. p.time = t
  1238. }
  1239. // Round will round the timestamp of the point to the given duration.
  1240. func (p *point) Round(d time.Duration) {
  1241. p.time = p.time.Round(d)
  1242. }
  1243. // Tags returns the tag set for the point.
  1244. func (p *point) Tags() Tags {
  1245. if p.cachedTags != nil {
  1246. return p.cachedTags
  1247. }
  1248. p.cachedTags = parseTags(p.key, nil)
  1249. return p.cachedTags
  1250. }
  1251. func (p *point) ForEachTag(fn func(k, v []byte) bool) {
  1252. walkTags(p.key, fn)
  1253. }
  1254. func (p *point) HasTag(tag []byte) bool {
  1255. if len(p.key) == 0 {
  1256. return false
  1257. }
  1258. var exists bool
  1259. walkTags(p.key, func(key, value []byte) bool {
  1260. if bytes.Equal(tag, key) {
  1261. exists = true
  1262. return false
  1263. }
  1264. return true
  1265. })
  1266. return exists
  1267. }
  1268. func walkTags(buf []byte, fn func(key, value []byte) bool) {
  1269. if len(buf) == 0 {
  1270. return
  1271. }
  1272. pos, name := scanTo(buf, 0, ',')
  1273. // it's an empty key, so there are no tags
  1274. if len(name) == 0 {
  1275. return
  1276. }
  1277. hasEscape := bytes.IndexByte(buf, '\\') != -1
  1278. i := pos + 1
  1279. var key, value []byte
  1280. for {
  1281. if i >= len(buf) {
  1282. break
  1283. }
  1284. i, key = scanTo(buf, i, '=')
  1285. i, value = scanTagValue(buf, i+1)
  1286. if len(value) == 0 {
  1287. continue
  1288. }
  1289. if hasEscape {
  1290. if !fn(unescapeTag(key), unescapeTag(value)) {
  1291. return
  1292. }
  1293. } else {
  1294. if !fn(key, value) {
  1295. return
  1296. }
  1297. }
  1298. i++
  1299. }
  1300. }
  1301. // walkFields walks each field key and value via fn. If fn returns false, the iteration
  1302. // is stopped. The values are the raw byte slices and not the converted types.
  1303. func walkFields(buf []byte, fn func(key, value []byte) bool) error {
  1304. var i int
  1305. var key, val []byte
  1306. for len(buf) > 0 {
  1307. i, key = scanTo(buf, 0, '=')
  1308. if i > len(buf)-2 {
  1309. return fmt.Errorf("invalid value: field-key=%s", key)
  1310. }
  1311. buf = buf[i+1:]
  1312. i, val = scanFieldValue(buf, 0)
  1313. buf = buf[i:]
  1314. if !fn(key, val) {
  1315. break
  1316. }
  1317. // slice off comma
  1318. if len(buf) > 0 {
  1319. buf = buf[1:]
  1320. }
  1321. }
  1322. return nil
  1323. }
  1324. // parseTags parses buf into the provided destination tags, returning destination
  1325. // Tags, which may have a different length and capacity.
  1326. func parseTags(buf []byte, dst Tags) Tags {
  1327. if len(buf) == 0 {
  1328. return nil
  1329. }
  1330. n := bytes.Count(buf, []byte(","))
  1331. if cap(dst) < n {
  1332. dst = make(Tags, n)
  1333. } else {
  1334. dst = dst[:n]
  1335. }
  1336. // Ensure existing behaviour when point has no tags and nil slice passed in.
  1337. if dst == nil {
  1338. dst = Tags{}
  1339. }
  1340. // Series keys can contain escaped commas, therefore the number of commas
  1341. // in a series key only gives an estimation of the upper bound on the number
  1342. // of tags.
  1343. var i int
  1344. walkTags(buf, func(key, value []byte) bool {
  1345. dst[i].Key, dst[i].Value = key, value
  1346. i++
  1347. return true
  1348. })
  1349. return dst[:i]
  1350. }
  1351. // MakeKey creates a key for a set of tags.
  1352. func MakeKey(name []byte, tags Tags) []byte {
  1353. return AppendMakeKey(nil, name, tags)
  1354. }
  1355. // AppendMakeKey appends the key derived from name and tags to dst and returns the extended buffer.
  1356. func AppendMakeKey(dst []byte, name []byte, tags Tags) []byte {
  1357. // unescape the name and then re-escape it to avoid double escaping.
  1358. // The key should always be stored in escaped form.
  1359. dst = append(dst, EscapeMeasurement(unescapeMeasurement(name))...)
  1360. dst = tags.AppendHashKey(dst)
  1361. return dst
  1362. }
  1363. // SetTags replaces the tags for the point.
  1364. func (p *point) SetTags(tags Tags) {
  1365. p.key = MakeKey(p.Name(), tags)
  1366. p.cachedTags = tags
  1367. }
  1368. // AddTag adds or replaces a tag value for a point.
  1369. func (p *point) AddTag(key, value string) {
  1370. tags := p.Tags()
  1371. tags = append(tags, Tag{Key: []byte(key), Value: []byte(value)})
  1372. sort.Sort(tags)
  1373. p.cachedTags = tags
  1374. p.key = MakeKey(p.Name(), tags)
  1375. }
  1376. // Fields returns the fields for the point.
  1377. func (p *point) Fields() (Fields, error) {
  1378. if p.cachedFields != nil {
  1379. return p.cachedFields, nil
  1380. }
  1381. cf, err := p.unmarshalBinary()
  1382. if err != nil {
  1383. return nil, err
  1384. }
  1385. p.cachedFields = cf
  1386. return p.cachedFields, nil
  1387. }
  1388. // SetPrecision will round a time to the specified precision.
  1389. func (p *point) SetPrecision(precision string) {
  1390. switch precision {
  1391. case "n":
  1392. case "u":
  1393. p.SetTime(p.Time().Truncate(time.Microsecond))
  1394. case "ms":
  1395. p.SetTime(p.Time().Truncate(time.Millisecond))
  1396. case "s":
  1397. p.SetTime(p.Time().Truncate(time.Second))
  1398. case "m":
  1399. p.SetTime(p.Time().Truncate(time.Minute))
  1400. case "h":
  1401. p.SetTime(p.Time().Truncate(time.Hour))
  1402. }
  1403. }
  1404. // String returns the string representation of the point.
  1405. func (p *point) String() string {
  1406. if p.Time().IsZero() {
  1407. return string(p.Key()) + " " + string(p.fields)
  1408. }
  1409. return string(p.Key()) + " " + string(p.fields) + " " + strconv.FormatInt(p.UnixNano(), 10)
  1410. }
  1411. // AppendString appends the string representation of the point to buf.
  1412. func (p *point) AppendString(buf []byte) []byte {
  1413. buf = append(buf, p.key...)
  1414. buf = append(buf, ' ')
  1415. buf = append(buf, p.fields...)
  1416. if !p.time.IsZero() {
  1417. buf = append(buf, ' ')
  1418. buf = strconv.AppendInt(buf, p.UnixNano(), 10)
  1419. }
  1420. return buf
  1421. }
  1422. // StringSize returns the length of the string that would be returned by String().
  1423. func (p *point) StringSize() int {
  1424. size := len(p.key) + len(p.fields) + 1
  1425. if !p.time.IsZero() {
  1426. digits := 1 // even "0" has one digit
  1427. t := p.UnixNano()
  1428. if t < 0 {
  1429. // account for negative sign, then negate
  1430. digits++
  1431. t = -t
  1432. }
  1433. for t > 9 { // already accounted for one digit
  1434. digits++
  1435. t /= 10
  1436. }
  1437. size += digits + 1 // digits and a space
  1438. }
  1439. return size
  1440. }
  1441. // MarshalBinary returns a binary representation of the point.
  1442. func (p *point) MarshalBinary() ([]byte, error) {
  1443. if len(p.fields) == 0 {
  1444. return nil, ErrPointMustHaveAField
  1445. }
  1446. tb, err := p.time.MarshalBinary()
  1447. if err != nil {
  1448. return nil, err
  1449. }
  1450. b := make([]byte, 8+len(p.key)+len(p.fields)+len(tb))
  1451. i := 0
  1452. binary.BigEndian.PutUint32(b[i:], uint32(len(p.key)))
  1453. i += 4
  1454. i += copy(b[i:], p.key)
  1455. binary.BigEndian.PutUint32(b[i:i+4], uint32(len(p.fields)))
  1456. i += 4
  1457. i += copy(b[i:], p.fields)
  1458. copy(b[i:], tb)
  1459. return b, nil
  1460. }
  1461. // UnmarshalBinary decodes a binary representation of the point into a point struct.
  1462. func (p *point) UnmarshalBinary(b []byte) error {
  1463. var n int
  1464. // Read key length.
  1465. if len(b) < 4 {
  1466. return io.ErrShortBuffer
  1467. }
  1468. n, b = int(binary.BigEndian.Uint32(b[:4])), b[4:]
  1469. // Read key.
  1470. if len(b) < n {
  1471. return io.ErrShortBuffer
  1472. }
  1473. p.key, b = b[:n], b[n:]
  1474. // Read fields length.
  1475. if len(b) < 4 {
  1476. return io.ErrShortBuffer
  1477. }
  1478. n, b = int(binary.BigEndian.Uint32(b[:4])), b[4:]
  1479. // Read fields.
  1480. if len(b) < n {
  1481. return io.ErrShortBuffer
  1482. }
  1483. p.fields, b = b[:n], b[n:]
  1484. // Read timestamp.
  1485. return p.time.UnmarshalBinary(b)
  1486. }
  1487. // PrecisionString returns a string representation of the point. If there
  1488. // is a timestamp associated with the point then it will be specified in the
  1489. // given unit.
  1490. func (p *point) PrecisionString(precision string) string {
  1491. if p.Time().IsZero() {
  1492. return fmt.Sprintf("%s %s", p.Key(), string(p.fields))
  1493. }
  1494. return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields),
  1495. p.UnixNano()/GetPrecisionMultiplier(precision))
  1496. }
  1497. // RoundedString returns a string representation of the point. If there
  1498. // is a timestamp associated with the point, then it will be rounded to the
  1499. // given duration.
  1500. func (p *point) RoundedString(d time.Duration) string {
  1501. if p.Time().IsZero() {
  1502. return fmt.Sprintf("%s %s", p.Key(), string(p.fields))
  1503. }
  1504. return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields),
  1505. p.time.Round(d).UnixNano())
  1506. }
  1507. func (p *point) unmarshalBinary() (Fields, error) {
  1508. iter := p.FieldIterator()
  1509. fields := make(Fields, 8)
  1510. for iter.Next() {
  1511. if len(iter.FieldKey()) == 0 {
  1512. continue
  1513. }
  1514. switch iter.Type() {
  1515. case Float:
  1516. v, err := iter.FloatValue()
  1517. if err != nil {
  1518. return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
  1519. }
  1520. fields[string(iter.FieldKey())] = v
  1521. case Integer:
  1522. v, err := iter.IntegerValue()
  1523. if err != nil {
  1524. return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
  1525. }
  1526. fields[string(iter.FieldKey())] = v
  1527. case Unsigned:
  1528. v, err := iter.UnsignedValue()
  1529. if err != nil {
  1530. return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
  1531. }
  1532. fields[string(iter.FieldKey())] = v
  1533. case String:
  1534. fields[string(iter.FieldKey())] = iter.StringValue()
  1535. case Boolean:
  1536. v, err := iter.BooleanValue()
  1537. if err != nil {
  1538. return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
  1539. }
  1540. fields[string(iter.FieldKey())] = v
  1541. }
  1542. }
  1543. return fields, nil
  1544. }
  1545. // HashID returns a non-cryptographic checksum of the point's key.
  1546. func (p *point) HashID() uint64 {
  1547. h := NewInlineFNV64a()
  1548. h.Write(p.key)
  1549. sum := h.Sum64()
  1550. return sum
  1551. }
  1552. // UnixNano returns the timestamp of the point as nanoseconds since Unix epoch.
  1553. func (p *point) UnixNano() int64 {
  1554. return p.Time().UnixNano()
  1555. }
  1556. // Split will attempt to return multiple points with the same timestamp whose
  1557. // string representations are no longer than size. Points with a single field or
  1558. // a point without a timestamp may exceed the requested size.
  1559. func (p *point) Split(size int) []Point {
  1560. if p.time.IsZero() || p.StringSize() <= size {
  1561. return []Point{p}
  1562. }
  1563. // key string, timestamp string, spaces
  1564. size -= len(p.key) + len(strconv.FormatInt(p.time.UnixNano(), 10)) + 2
  1565. var points []Point
  1566. var start, cur int
  1567. for cur < len(p.fields) {
  1568. end, _ := scanTo(p.fields, cur, '=')
  1569. end, _ = scanFieldValue(p.fields, end+1)
  1570. if cur > start && end-start > size {
  1571. points = append(points, &point{
  1572. key: p.key,
  1573. time: p.time,
  1574. fields: p.fields[start : cur-1],
  1575. })
  1576. start = cur
  1577. }
  1578. cur = end + 1
  1579. }
  1580. points = append(points, &point{
  1581. key: p.key,
  1582. time: p.time,
  1583. fields: p.fields[start:],
  1584. })
  1585. return points
  1586. }
  1587. // Tag represents a single key/value tag pair.
  1588. type Tag = models.Tag
  1589. // NewTag returns a new Tag.
  1590. func NewTag(key, value []byte) Tag {
  1591. return Tag{
  1592. Key: key,
  1593. Value: value,
  1594. }
  1595. }
  1596. // Tags represents a sorted list of tags.
  1597. type Tags = models.Tags
  1598. // NewTags returns a new Tags from a map.
  1599. func NewTags(m map[string]string) Tags {
  1600. if len(m) == 0 {
  1601. return nil
  1602. }
  1603. a := make(Tags, 0, len(m))
  1604. for k, v := range m {
  1605. a = append(a, NewTag([]byte(k), []byte(v)))
  1606. }
  1607. sort.Sort(a)
  1608. return a
  1609. }
  1610. // CompareTags returns -1 if a < b, 1 if a > b, and 0 if a == b.
  1611. func CompareTags(a, b Tags) int {
  1612. // Compare each key & value until a mismatch.
  1613. for i := 0; i < len(a) && i < len(b); i++ {
  1614. if cmp := bytes.Compare(a[i].Key, b[i].Key); cmp != 0 {
  1615. return cmp
  1616. }
  1617. if cmp := bytes.Compare(a[i].Value, b[i].Value); cmp != 0 {
  1618. return cmp
  1619. }
  1620. }
  1621. // If all tags are equal up to this point then return shorter tagset.
  1622. if len(a) < len(b) {
  1623. return -1
  1624. } else if len(a) > len(b) {
  1625. return 1
  1626. }
  1627. // All tags are equal.
  1628. return 0
  1629. }
  1630. // CopyTags returns a shallow copy of tags.
  1631. func CopyTags(a Tags) Tags {
  1632. other := make(Tags, len(a))
  1633. copy(other, a)
  1634. return other
  1635. }
  1636. // DeepCopyTags returns a deep copy of tags.
  1637. func DeepCopyTags(a Tags) Tags {
  1638. // Calculate size of keys/values in bytes.
  1639. var n int
  1640. for _, t := range a {
  1641. n += len(t.Key) + len(t.Value)
  1642. }
  1643. // Build single allocation for all key/values.
  1644. buf := make([]byte, n)
  1645. // Copy tags to new set.
  1646. other := make(Tags, len(a))
  1647. for i, t := range a {
  1648. copy(buf, t.Key)
  1649. other[i].Key, buf = buf[:len(t.Key)], buf[len(t.Key):]
  1650. copy(buf, t.Value)
  1651. other[i].Value, buf = buf[:len(t.Value)], buf[len(t.Value):]
  1652. }
  1653. return other
  1654. }
  1655. // Fields represents a mapping between a Point's field names and their
  1656. // values.
  1657. type Fields map[string]interface{}
  1658. // FieldIterator retuns a FieldIterator that can be used to traverse the
  1659. // fields of a point without constructing the in-memory map.
  1660. func (p *point) FieldIterator() FieldIterator {
  1661. p.Reset()
  1662. return p
  1663. }
  1664. type fieldIterator struct {
  1665. start, end int
  1666. key, keybuf []byte
  1667. valueBuf []byte
  1668. fieldType FieldType
  1669. }
  1670. // Next indicates whether there any fields remaining.
  1671. func (p *point) Next() bool {
  1672. p.it.start = p.it.end
  1673. if p.it.start >= len(p.fields) {
  1674. return false
  1675. }
  1676. p.it.end, p.it.key = scanTo(p.fields, p.it.start, '=')
  1677. if escape.IsEscaped(p.it.key) {
  1678. p.it.keybuf = escape.AppendUnescaped(p.it.keybuf[:0], p.it.key)
  1679. p.it.key = p.it.keybuf
  1680. }
  1681. p.it.end, p.it.valueBuf = scanFieldValue(p.fields, p.it.end+1)
  1682. p.it.end++
  1683. if len(p.it.valueBuf) == 0 {
  1684. p.it.fieldType = Empty
  1685. return true
  1686. }
  1687. c := p.it.valueBuf[0]
  1688. if c == '"' {
  1689. p.it.fieldType = String
  1690. return true
  1691. }
  1692. if strings.IndexByte(`0123456789-.nNiIu`, c) >= 0 {
  1693. if p.it.valueBuf[len(p.it.valueBuf)-1] == 'i' {
  1694. p.it.fieldType = Integer
  1695. p.it.valueBuf = p.it.valueBuf[:len(p.it.valueBuf)-1]
  1696. } else if p.it.valueBuf[len(p.it.valueBuf)-1] == 'u' {
  1697. p.it.fieldType = Unsigned
  1698. p.it.valueBuf = p.it.valueBuf[:len(p.it.valueBuf)-1]
  1699. } else {
  1700. p.it.fieldType = Float
  1701. }
  1702. return true
  1703. }
  1704. // to keep the same behavior that currently exists, default to boolean
  1705. p.it.fieldType = Boolean
  1706. return true
  1707. }
  1708. // FieldKey returns the key of the current field.
  1709. func (p *point) FieldKey() []byte {
  1710. return p.it.key
  1711. }
  1712. // Type returns the FieldType of the current field.
  1713. func (p *point) Type() FieldType {
  1714. return p.it.fieldType
  1715. }
  1716. // StringValue returns the string value of the current field.
  1717. func (p *point) StringValue() string {
  1718. return unescapeStringField(string(p.it.valueBuf[1 : len(p.it.valueBuf)-1]))
  1719. }
  1720. // IntegerValue returns the integer value of the current field.
  1721. func (p *point) IntegerValue() (int64, error) {
  1722. n, err := parseIntBytes(p.it.valueBuf, 10, 64)
  1723. if err != nil {
  1724. return 0, fmt.Errorf("unable to parse integer value %q: %v", p.it.valueBuf, err)
  1725. }
  1726. return n, nil
  1727. }
  1728. // UnsignedValue returns the unsigned value of the current field.
  1729. func (p *point) UnsignedValue() (uint64, error) {
  1730. n, err := parseUintBytes(p.it.valueBuf, 10, 64)
  1731. if err != nil {
  1732. return 0, fmt.Errorf("unable to parse unsigned value %q: %v", p.it.valueBuf, err)
  1733. }
  1734. return n, nil
  1735. }
  1736. // BooleanValue returns the boolean value of the current field.
  1737. func (p *point) BooleanValue() (bool, error) {
  1738. b, err := parseBoolBytes(p.it.valueBuf)
  1739. if err != nil {
  1740. return false, fmt.Errorf("unable to parse bool value %q: %v", p.it.valueBuf, err)
  1741. }
  1742. return b, nil
  1743. }
  1744. // FloatValue returns the float value of the current field.
  1745. func (p *point) FloatValue() (float64, error) {
  1746. f, err := parseFloatBytes(p.it.valueBuf, 64)
  1747. if err != nil {
  1748. return 0, fmt.Errorf("unable to parse floating point value %q: %v", p.it.valueBuf, err)
  1749. }
  1750. return f, nil
  1751. }
  1752. // Reset resets the iterator to its initial state.
  1753. func (p *point) Reset() {
  1754. p.it.fieldType = Empty
  1755. p.it.key = nil
  1756. p.it.valueBuf = nil
  1757. p.it.start = 0
  1758. p.it.end = 0
  1759. }
  1760. // MarshalBinary encodes all the fields to their proper type and returns the binary
  1761. // represenation
  1762. // NOTE: uint64 is specifically not supported due to potential overflow when we decode
  1763. // again later to an int64
  1764. // NOTE2: uint is accepted, and may be 64 bits, and is for some reason accepted...
  1765. func (p Fields) MarshalBinary() []byte {
  1766. var b []byte
  1767. keys := make([]string, 0, len(p))
  1768. for k := range p {
  1769. keys = append(keys, k)
  1770. }
  1771. // Not really necessary, can probably be removed.
  1772. sort.Strings(keys)
  1773. for i, k := range keys {
  1774. if i > 0 {
  1775. b = append(b, ',')
  1776. }
  1777. b = appendField(b, k, p[k])
  1778. }
  1779. return b
  1780. }
  1781. func appendField(b []byte, k string, v interface{}) []byte {
  1782. b = append(b, []byte(escape.String(k))...)
  1783. b = append(b, '=')
  1784. // check popular types first
  1785. switch v := v.(type) {
  1786. case float64:
  1787. b = strconv.AppendFloat(b, v, 'f', -1, 64)
  1788. case int64:
  1789. b = strconv.AppendInt(b, v, 10)
  1790. b = append(b, 'i')
  1791. case string:
  1792. b = append(b, '"')
  1793. b = append(b, []byte(EscapeStringField(v))...)
  1794. b = append(b, '"')
  1795. case bool:
  1796. b = strconv.AppendBool(b, v)
  1797. case int32:
  1798. b = strconv.AppendInt(b, int64(v), 10)
  1799. b = append(b, 'i')
  1800. case int16:
  1801. b = strconv.AppendInt(b, int64(v), 10)
  1802. b = append(b, 'i')
  1803. case int8:
  1804. b = strconv.AppendInt(b, int64(v), 10)
  1805. b = append(b, 'i')
  1806. case int:
  1807. b = strconv.AppendInt(b, int64(v), 10)
  1808. b = append(b, 'i')
  1809. case uint64:
  1810. b = strconv.AppendUint(b, v, 10)
  1811. b = append(b, 'u')
  1812. case uint32:
  1813. b = strconv.AppendInt(b, int64(v), 10)
  1814. b = append(b, 'i')
  1815. case uint16:
  1816. b = strconv.AppendInt(b, int64(v), 10)
  1817. b = append(b, 'i')
  1818. case uint8:
  1819. b = strconv.AppendInt(b, int64(v), 10)
  1820. b = append(b, 'i')
  1821. case uint:
  1822. // TODO: 'uint' should be converted to writing as an unsigned integer,
  1823. // but we cannot since that would break backwards compatibility.
  1824. b = strconv.AppendInt(b, int64(v), 10)
  1825. b = append(b, 'i')
  1826. case float32:
  1827. b = strconv.AppendFloat(b, float64(v), 'f', -1, 32)
  1828. case []byte:
  1829. b = append(b, v...)
  1830. case nil:
  1831. // skip
  1832. default:
  1833. // Can't determine the type, so convert to string
  1834. b = append(b, '"')
  1835. b = append(b, []byte(EscapeStringField(fmt.Sprintf("%v", v)))...)
  1836. b = append(b, '"')
  1837. }
  1838. return b
  1839. }
  1840. // ValidKeyToken returns true if the token used for measurement, tag key, or tag
  1841. // value is a valid unicode string and only contains printable, non-replacement characters.
  1842. func ValidKeyToken(s string) bool {
  1843. if !utf8.ValidString(s) {
  1844. return false
  1845. }
  1846. for _, r := range s {
  1847. if !unicode.IsPrint(r) || r == unicode.ReplacementChar {
  1848. return false
  1849. }
  1850. }
  1851. return true
  1852. }
  1853. // ValidKeyTokens returns true if the measurement name and all tags are valid.
  1854. func ValidKeyTokens(name string, tags Tags) bool {
  1855. if !ValidKeyToken(name) {
  1856. return false
  1857. }
  1858. for _, tag := range tags {
  1859. if !ValidKeyToken(string(tag.Key)) || !ValidKeyToken(string(tag.Value)) {
  1860. return false
  1861. }
  1862. }
  1863. return true
  1864. }