1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203 |
- // Package models implements basic objects used throughout the TICK stack.
- package models // import "github.com/influxdata/influxdb/models"
- import (
- "bytes"
- "encoding/binary"
- "errors"
- "fmt"
- "io"
- "math"
- "sort"
- "strconv"
- "strings"
- "time"
- "unicode"
- "unicode/utf8"
- "github.com/influxdata/influxdb/pkg/escape"
- "github.com/influxdata/platform/models"
- )
- type escapeSet struct {
- k [1]byte
- esc [2]byte
- }
- var (
- measurementEscapeCodes = [...]escapeSet{
- {k: [1]byte{','}, esc: [2]byte{'\\', ','}},
- {k: [1]byte{' '}, esc: [2]byte{'\\', ' '}},
- }
- tagEscapeCodes = [...]escapeSet{
- {k: [1]byte{','}, esc: [2]byte{'\\', ','}},
- {k: [1]byte{' '}, esc: [2]byte{'\\', ' '}},
- {k: [1]byte{'='}, esc: [2]byte{'\\', '='}},
- }
- // ErrPointMustHaveAField is returned when operating on a point that does not have any fields.
- ErrPointMustHaveAField = errors.New("point without fields is unsupported")
- // ErrInvalidNumber is returned when a number is expected but not provided.
- ErrInvalidNumber = errors.New("invalid number")
- // ErrInvalidPoint is returned when a point cannot be parsed correctly.
- ErrInvalidPoint = errors.New("point is invalid")
- )
- const (
- // MaxKeyLength is the largest allowed size of the combined measurement and tag keys.
- MaxKeyLength = 65535
- )
- // enableUint64Support will enable uint64 support if set to true.
- var enableUint64Support = false
- // EnableUintSupport manually enables uint support for the point parser.
- // This function will be removed in the future and only exists for unit tests during the
- // transition.
- func EnableUintSupport() {
- enableUint64Support = true
- }
- // Point defines the values that will be written to the database.
- type Point interface {
- // Name return the measurement name for the point.
- Name() []byte
- // SetName updates the measurement name for the point.
- SetName(string)
- // Tags returns the tag set for the point.
- Tags() Tags
- // ForEachTag iterates over each tag invoking fn. If fn return false, iteration stops.
- ForEachTag(fn func(k, v []byte) bool)
- // AddTag adds or replaces a tag value for a point.
- AddTag(key, value string)
- // SetTags replaces the tags for the point.
- SetTags(tags Tags)
- // HasTag returns true if the tag exists for the point.
- HasTag(tag []byte) bool
- // Fields returns the fields for the point.
- Fields() (Fields, error)
- // Time return the timestamp for the point.
- Time() time.Time
- // SetTime updates the timestamp for the point.
- SetTime(t time.Time)
- // UnixNano returns the timestamp of the point as nanoseconds since Unix epoch.
- UnixNano() int64
- // HashID returns a non-cryptographic checksum of the point's key.
- HashID() uint64
- // Key returns the key (measurement joined with tags) of the point.
- Key() []byte
- // String returns a string representation of the point. If there is a
- // timestamp associated with the point then it will be specified with the default
- // precision of nanoseconds.
- String() string
- // MarshalBinary returns a binary representation of the point.
- MarshalBinary() ([]byte, error)
- // PrecisionString returns a string representation of the point. If there
- // is a timestamp associated with the point then it will be specified in the
- // given unit.
- PrecisionString(precision string) string
- // RoundedString returns a string representation of the point. If there
- // is a timestamp associated with the point, then it will be rounded to the
- // given duration.
- RoundedString(d time.Duration) string
- // Split will attempt to return multiple points with the same timestamp whose
- // string representations are no longer than size. Points with a single field or
- // a point without a timestamp may exceed the requested size.
- Split(size int) []Point
- // Round will round the timestamp of the point to the given duration.
- Round(d time.Duration)
- // StringSize returns the length of the string that would be returned by String().
- StringSize() int
- // AppendString appends the result of String() to the provided buffer and returns
- // the result, potentially reducing string allocations.
- AppendString(buf []byte) []byte
- // FieldIterator retuns a FieldIterator that can be used to traverse the
- // fields of a point without constructing the in-memory map.
- FieldIterator() FieldIterator
- }
- // FieldType represents the type of a field.
- type FieldType int
- const (
- // Integer indicates the field's type is integer.
- Integer FieldType = iota
- // Float indicates the field's type is float.
- Float
- // Boolean indicates the field's type is boolean.
- Boolean
- // String indicates the field's type is string.
- String
- // Empty is used to indicate that there is no field.
- Empty
- // Unsigned indicates the field's type is an unsigned integer.
- Unsigned
- )
- // FieldIterator provides a low-allocation interface to iterate through a point's fields.
- type FieldIterator interface {
- // Next indicates whether there any fields remaining.
- Next() bool
- // FieldKey returns the key of the current field.
- FieldKey() []byte
- // Type returns the FieldType of the current field.
- Type() FieldType
- // StringValue returns the string value of the current field.
- StringValue() string
- // IntegerValue returns the integer value of the current field.
- IntegerValue() (int64, error)
- // UnsignedValue returns the unsigned value of the current field.
- UnsignedValue() (uint64, error)
- // BooleanValue returns the boolean value of the current field.
- BooleanValue() (bool, error)
- // FloatValue returns the float value of the current field.
- FloatValue() (float64, error)
- // Reset resets the iterator to its initial state.
- Reset()
- }
- // Points represents a sortable list of points by timestamp.
- type Points []Point
- // Len implements sort.Interface.
- func (a Points) Len() int { return len(a) }
- // Less implements sort.Interface.
- func (a Points) Less(i, j int) bool { return a[i].Time().Before(a[j].Time()) }
- // Swap implements sort.Interface.
- func (a Points) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
- // point is the default implementation of Point.
- type point struct {
- time time.Time
- // text encoding of measurement and tags
- // key must always be stored sorted by tags, if the original line was not sorted,
- // we need to resort it
- key []byte
- // text encoding of field data
- fields []byte
- // text encoding of timestamp
- ts []byte
- // cached version of parsed fields from data
- cachedFields map[string]interface{}
- // cached version of parsed name from key
- cachedName string
- // cached version of parsed tags
- cachedTags Tags
- it fieldIterator
- }
- // type assertions
- var (
- _ Point = (*point)(nil)
- _ FieldIterator = (*point)(nil)
- )
- const (
- // the number of characters for the largest possible int64 (9223372036854775807)
- maxInt64Digits = 19
- // the number of characters for the smallest possible int64 (-9223372036854775808)
- minInt64Digits = 20
- // the number of characters for the largest possible uint64 (18446744073709551615)
- maxUint64Digits = 20
- // the number of characters required for the largest float64 before a range check
- // would occur during parsing
- maxFloat64Digits = 25
- // the number of characters required for smallest float64 before a range check occur
- // would occur during parsing
- minFloat64Digits = 27
- )
- // ParsePoints returns a slice of Points from a text representation of a point
- // with each point separated by newlines. If any points fail to parse, a non-nil error
- // will be returned in addition to the points that parsed successfully.
- func ParsePoints(buf []byte) ([]Point, error) {
- return ParsePointsWithPrecision(buf, time.Now().UTC(), "n")
- }
- // ParsePointsString is identical to ParsePoints but accepts a string.
- func ParsePointsString(buf string) ([]Point, error) {
- return ParsePoints([]byte(buf))
- }
- // ParseKey returns the measurement name and tags from a point.
- //
- // NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf.
- // This can have the unintended effect preventing buf from being garbage collected.
- func ParseKey(buf []byte) (string, Tags) {
- name, tags := ParseKeyBytes(buf)
- return string(name), tags
- }
- func ParseKeyBytes(buf []byte) ([]byte, Tags) {
- return ParseKeyBytesWithTags(buf, nil)
- }
- func ParseKeyBytesWithTags(buf []byte, tags Tags) ([]byte, Tags) {
- // Ignore the error because scanMeasurement returns "missing fields" which we ignore
- // when just parsing a key
- state, i, _ := scanMeasurement(buf, 0)
- var name []byte
- if state == tagKeyState {
- tags = parseTags(buf, tags)
- // scanMeasurement returns the location of the comma if there are tags, strip that off
- name = buf[:i-1]
- } else {
- name = buf[:i]
- }
- return unescapeMeasurement(name), tags
- }
- func ParseTags(buf []byte) Tags {
- return parseTags(buf, nil)
- }
- func ParseName(buf []byte) []byte {
- // Ignore the error because scanMeasurement returns "missing fields" which we ignore
- // when just parsing a key
- state, i, _ := scanMeasurement(buf, 0)
- var name []byte
- if state == tagKeyState {
- name = buf[:i-1]
- } else {
- name = buf[:i]
- }
- return unescapeMeasurement(name)
- }
- // ParsePointsWithPrecision is similar to ParsePoints, but allows the
- // caller to provide a precision for time.
- //
- // NOTE: to minimize heap allocations, the returned Points will refer to subslices of buf.
- // This can have the unintended effect preventing buf from being garbage collected.
- func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) {
- points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1)
- var (
- pos int
- block []byte
- failed []string
- )
- for pos < len(buf) {
- pos, block = scanLine(buf, pos)
- pos++
- if len(block) == 0 {
- continue
- }
- // lines which start with '#' are comments
- start := skipWhitespace(block, 0)
- // If line is all whitespace, just skip it
- if start >= len(block) {
- continue
- }
- if block[start] == '#' {
- continue
- }
- // strip the newline if one is present
- if block[len(block)-1] == '\n' {
- block = block[:len(block)-1]
- }
- pt, err := parsePoint(block[start:], defaultTime, precision)
- if err != nil {
- failed = append(failed, fmt.Sprintf("unable to parse '%s': %v", string(block[start:]), err))
- } else {
- points = append(points, pt)
- }
- }
- if len(failed) > 0 {
- return points, fmt.Errorf("%s", strings.Join(failed, "\n"))
- }
- return points, nil
- }
- func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, error) {
- // scan the first block which is measurement[,tag1=value1,tag2=value=2...]
- pos, key, err := scanKey(buf, 0)
- if err != nil {
- return nil, err
- }
- // measurement name is required
- if len(key) == 0 {
- return nil, fmt.Errorf("missing measurement")
- }
- if len(key) > MaxKeyLength {
- return nil, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength)
- }
- // scan the second block is which is field1=value1[,field2=value2,...]
- pos, fields, err := scanFields(buf, pos)
- if err != nil {
- return nil, err
- }
- // at least one field is required
- if len(fields) == 0 {
- return nil, fmt.Errorf("missing fields")
- }
- var maxKeyErr error
- err = walkFields(fields, func(k, v []byte) bool {
- if sz := seriesKeySize(key, k); sz > MaxKeyLength {
- maxKeyErr = fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength)
- return false
- }
- return true
- })
- if err != nil {
- return nil, err
- }
- if maxKeyErr != nil {
- return nil, maxKeyErr
- }
- // scan the last block which is an optional integer timestamp
- pos, ts, err := scanTime(buf, pos)
- if err != nil {
- return nil, err
- }
- pt := &point{
- key: key,
- fields: fields,
- ts: ts,
- }
- if len(ts) == 0 {
- pt.time = defaultTime
- pt.SetPrecision(precision)
- } else {
- ts, err := parseIntBytes(ts, 10, 64)
- if err != nil {
- return nil, err
- }
- pt.time, err = SafeCalcTime(ts, precision)
- if err != nil {
- return nil, err
- }
- // Determine if there are illegal non-whitespace characters after the
- // timestamp block.
- for pos < len(buf) {
- if buf[pos] != ' ' {
- return nil, ErrInvalidPoint
- }
- pos++
- }
- }
- return pt, nil
- }
- // GetPrecisionMultiplier will return a multiplier for the precision specified.
- func GetPrecisionMultiplier(precision string) int64 {
- d := time.Nanosecond
- switch precision {
- case "u":
- d = time.Microsecond
- case "ms":
- d = time.Millisecond
- case "s":
- d = time.Second
- case "m":
- d = time.Minute
- case "h":
- d = time.Hour
- }
- return int64(d)
- }
- // scanKey scans buf starting at i for the measurement and tag portion of the point.
- // It returns the ending position and the byte slice of key within buf. If there
- // are tags, they will be sorted if they are not already.
- func scanKey(buf []byte, i int) (int, []byte, error) {
- start := skipWhitespace(buf, i)
- i = start
- // Determines whether the tags are sort, assume they are
- sorted := true
- // indices holds the indexes within buf of the start of each tag. For example,
- // a buf of 'cpu,host=a,region=b,zone=c' would have indices slice of [4,11,20]
- // which indicates that the first tag starts at buf[4], seconds at buf[11], and
- // last at buf[20]
- indices := make([]int, 100)
- // tracks how many commas we've seen so we know how many values are indices.
- // Since indices is an arbitrarily large slice,
- // we need to know how many values in the buffer are in use.
- commas := 0
- // First scan the Point's measurement.
- state, i, err := scanMeasurement(buf, i)
- if err != nil {
- return i, buf[start:i], err
- }
- // Optionally scan tags if needed.
- if state == tagKeyState {
- i, commas, indices, err = scanTags(buf, i, indices)
- if err != nil {
- return i, buf[start:i], err
- }
- }
- // Now we know where the key region is within buf, and the location of tags, we
- // need to determine if duplicate tags exist and if the tags are sorted. This iterates
- // over the list comparing each tag in the sequence with each other.
- for j := 0; j < commas-1; j++ {
- // get the left and right tags
- _, left := scanTo(buf[indices[j]:indices[j+1]-1], 0, '=')
- _, right := scanTo(buf[indices[j+1]:indices[j+2]-1], 0, '=')
- // If left is greater than right, the tags are not sorted. We do not have to
- // continue because the short path no longer works.
- // If the tags are equal, then there are duplicate tags, and we should abort.
- // If the tags are not sorted, this pass may not find duplicate tags and we
- // need to do a more exhaustive search later.
- if cmp := bytes.Compare(left, right); cmp > 0 {
- sorted = false
- break
- } else if cmp == 0 {
- return i, buf[start:i], fmt.Errorf("duplicate tags")
- }
- }
- // If the tags are not sorted, then sort them. This sort is inline and
- // uses the tag indices we created earlier. The actual buffer is not sorted, the
- // indices are using the buffer for value comparison. After the indices are sorted,
- // the buffer is reconstructed from the sorted indices.
- if !sorted && commas > 0 {
- // Get the measurement name for later
- measurement := buf[start : indices[0]-1]
- // Sort the indices
- indices := indices[:commas]
- insertionSort(0, commas, buf, indices)
- // Create a new key using the measurement and sorted indices
- b := make([]byte, len(buf[start:i]))
- pos := copy(b, measurement)
- for _, i := range indices {
- b[pos] = ','
- pos++
- _, v := scanToSpaceOr(buf, i, ',')
- pos += copy(b[pos:], v)
- }
- // Check again for duplicate tags now that the tags are sorted.
- for j := 0; j < commas-1; j++ {
- // get the left and right tags
- _, left := scanTo(buf[indices[j]:], 0, '=')
- _, right := scanTo(buf[indices[j+1]:], 0, '=')
- // If the tags are equal, then there are duplicate tags, and we should abort.
- // If the tags are not sorted, this pass may not find duplicate tags and we
- // need to do a more exhaustive search later.
- if bytes.Equal(left, right) {
- return i, b, fmt.Errorf("duplicate tags")
- }
- }
- return i, b, nil
- }
- return i, buf[start:i], nil
- }
- // The following constants allow us to specify which state to move to
- // next, when scanning sections of a Point.
- const (
- tagKeyState = iota
- tagValueState
- fieldsState
- )
- // scanMeasurement examines the measurement part of a Point, returning
- // the next state to move to, and the current location in the buffer.
- func scanMeasurement(buf []byte, i int) (int, int, error) {
- // Check first byte of measurement, anything except a comma is fine.
- // It can't be a space, since whitespace is stripped prior to this
- // function call.
- if i >= len(buf) || buf[i] == ',' {
- return -1, i, fmt.Errorf("missing measurement")
- }
- for {
- i++
- if i >= len(buf) {
- // cpu
- return -1, i, fmt.Errorf("missing fields")
- }
- if buf[i-1] == '\\' {
- // Skip character (it's escaped).
- continue
- }
- // Unescaped comma; move onto scanning the tags.
- if buf[i] == ',' {
- return tagKeyState, i + 1, nil
- }
- // Unescaped space; move onto scanning the fields.
- if buf[i] == ' ' {
- // cpu value=1.0
- return fieldsState, i, nil
- }
- }
- }
- // scanTags examines all the tags in a Point, keeping track of and
- // returning the updated indices slice, number of commas and location
- // in buf where to start examining the Point fields.
- func scanTags(buf []byte, i int, indices []int) (int, int, []int, error) {
- var (
- err error
- commas int
- state = tagKeyState
- )
- for {
- switch state {
- case tagKeyState:
- // Grow our indices slice if we have too many tags.
- if commas >= len(indices) {
- newIndics := make([]int, cap(indices)*2)
- copy(newIndics, indices)
- indices = newIndics
- }
- indices[commas] = i
- commas++
- i, err = scanTagsKey(buf, i)
- state = tagValueState // tag value always follows a tag key
- case tagValueState:
- state, i, err = scanTagsValue(buf, i)
- case fieldsState:
- indices[commas] = i + 1
- return i, commas, indices, nil
- }
- if err != nil {
- return i, commas, indices, err
- }
- }
- }
- // scanTagsKey scans each character in a tag key.
- func scanTagsKey(buf []byte, i int) (int, error) {
- // First character of the key.
- if i >= len(buf) || buf[i] == ' ' || buf[i] == ',' || buf[i] == '=' {
- // cpu,{'', ' ', ',', '='}
- return i, fmt.Errorf("missing tag key")
- }
- // Examine each character in the tag key until we hit an unescaped
- // equals (the tag value), or we hit an error (i.e., unescaped
- // space or comma).
- for {
- i++
- // Either we reached the end of the buffer or we hit an
- // unescaped comma or space.
- if i >= len(buf) ||
- ((buf[i] == ' ' || buf[i] == ',') && buf[i-1] != '\\') {
- // cpu,tag{'', ' ', ','}
- return i, fmt.Errorf("missing tag value")
- }
- if buf[i] == '=' && buf[i-1] != '\\' {
- // cpu,tag=
- return i + 1, nil
- }
- }
- }
- // scanTagsValue scans each character in a tag value.
- func scanTagsValue(buf []byte, i int) (int, int, error) {
- // Tag value cannot be empty.
- if i >= len(buf) || buf[i] == ',' || buf[i] == ' ' {
- // cpu,tag={',', ' '}
- return -1, i, fmt.Errorf("missing tag value")
- }
- // Examine each character in the tag value until we hit an unescaped
- // comma (move onto next tag key), an unescaped space (move onto
- // fields), or we error out.
- for {
- i++
- if i >= len(buf) {
- // cpu,tag=value
- return -1, i, fmt.Errorf("missing fields")
- }
- // An unescaped equals sign is an invalid tag value.
- if buf[i] == '=' && buf[i-1] != '\\' {
- // cpu,tag={'=', 'fo=o'}
- return -1, i, fmt.Errorf("invalid tag format")
- }
- if buf[i] == ',' && buf[i-1] != '\\' {
- // cpu,tag=foo,
- return tagKeyState, i + 1, nil
- }
- // cpu,tag=foo value=1.0
- // cpu, tag=foo\= value=1.0
- if buf[i] == ' ' && buf[i-1] != '\\' {
- return fieldsState, i, nil
- }
- }
- }
- func insertionSort(l, r int, buf []byte, indices []int) {
- for i := l + 1; i < r; i++ {
- for j := i; j > l && less(buf, indices, j, j-1); j-- {
- indices[j], indices[j-1] = indices[j-1], indices[j]
- }
- }
- }
- func less(buf []byte, indices []int, i, j int) bool {
- // This grabs the tag names for i & j, it ignores the values
- _, a := scanTo(buf, indices[i], '=')
- _, b := scanTo(buf, indices[j], '=')
- return bytes.Compare(a, b) < 0
- }
- // scanFields scans buf, starting at i for the fields section of a point. It returns
- // the ending position and the byte slice of the fields within buf.
- func scanFields(buf []byte, i int) (int, []byte, error) {
- start := skipWhitespace(buf, i)
- i = start
- quoted := false
- // tracks how many '=' we've seen
- equals := 0
- // tracks how many commas we've seen
- commas := 0
- for {
- // reached the end of buf?
- if i >= len(buf) {
- break
- }
- // escaped characters?
- if buf[i] == '\\' && i+1 < len(buf) {
- i += 2
- continue
- }
- // If the value is quoted, scan until we get to the end quote
- // Only quote values in the field value since quotes are not significant
- // in the field key
- if buf[i] == '"' && equals > commas {
- quoted = !quoted
- i++
- continue
- }
- // If we see an =, ensure that there is at least on char before and after it
- if buf[i] == '=' && !quoted {
- equals++
- // check for "... =123" but allow "a\ =123"
- if buf[i-1] == ' ' && buf[i-2] != '\\' {
- return i, buf[start:i], fmt.Errorf("missing field key")
- }
- // check for "...a=123,=456" but allow "a=123,a\,=456"
- if buf[i-1] == ',' && buf[i-2] != '\\' {
- return i, buf[start:i], fmt.Errorf("missing field key")
- }
- // check for "... value="
- if i+1 >= len(buf) {
- return i, buf[start:i], fmt.Errorf("missing field value")
- }
- // check for "... value=,value2=..."
- if buf[i+1] == ',' || buf[i+1] == ' ' {
- return i, buf[start:i], fmt.Errorf("missing field value")
- }
- if isNumeric(buf[i+1]) || buf[i+1] == '-' || buf[i+1] == 'N' || buf[i+1] == 'n' {
- var err error
- i, err = scanNumber(buf, i+1)
- if err != nil {
- return i, buf[start:i], err
- }
- continue
- }
- // If next byte is not a double-quote, the value must be a boolean
- if buf[i+1] != '"' {
- var err error
- i, _, err = scanBoolean(buf, i+1)
- if err != nil {
- return i, buf[start:i], err
- }
- continue
- }
- }
- if buf[i] == ',' && !quoted {
- commas++
- }
- // reached end of block?
- if buf[i] == ' ' && !quoted {
- break
- }
- i++
- }
- if quoted {
- return i, buf[start:i], fmt.Errorf("unbalanced quotes")
- }
- // check that all field sections had key and values (e.g. prevent "a=1,b"
- if equals == 0 || commas != equals-1 {
- return i, buf[start:i], fmt.Errorf("invalid field format")
- }
- return i, buf[start:i], nil
- }
- // scanTime scans buf, starting at i for the time section of a point. It
- // returns the ending position and the byte slice of the timestamp within buf
- // and and error if the timestamp is not in the correct numeric format.
- func scanTime(buf []byte, i int) (int, []byte, error) {
- start := skipWhitespace(buf, i)
- i = start
- for {
- // reached the end of buf?
- if i >= len(buf) {
- break
- }
- // Reached end of block or trailing whitespace?
- if buf[i] == '\n' || buf[i] == ' ' {
- break
- }
- // Handle negative timestamps
- if i == start && buf[i] == '-' {
- i++
- continue
- }
- // Timestamps should be integers, make sure they are so we don't need
- // to actually parse the timestamp until needed.
- if buf[i] < '0' || buf[i] > '9' {
- return i, buf[start:i], fmt.Errorf("bad timestamp")
- }
- i++
- }
- return i, buf[start:i], nil
- }
- func isNumeric(b byte) bool {
- return (b >= '0' && b <= '9') || b == '.'
- }
- // scanNumber returns the end position within buf, start at i after
- // scanning over buf for an integer, or float. It returns an
- // error if a invalid number is scanned.
- func scanNumber(buf []byte, i int) (int, error) {
- start := i
- var isInt, isUnsigned bool
- // Is negative number?
- if i < len(buf) && buf[i] == '-' {
- i++
- // There must be more characters now, as just '-' is illegal.
- if i == len(buf) {
- return i, ErrInvalidNumber
- }
- }
- // how many decimal points we've see
- decimal := false
- // indicates the number is float in scientific notation
- scientific := false
- for {
- if i >= len(buf) {
- break
- }
- if buf[i] == ',' || buf[i] == ' ' {
- break
- }
- if buf[i] == 'i' && i > start && !(isInt || isUnsigned) {
- isInt = true
- i++
- continue
- } else if buf[i] == 'u' && i > start && !(isInt || isUnsigned) {
- isUnsigned = true
- i++
- continue
- }
- if buf[i] == '.' {
- // Can't have more than 1 decimal (e.g. 1.1.1 should fail)
- if decimal {
- return i, ErrInvalidNumber
- }
- decimal = true
- }
- // `e` is valid for floats but not as the first char
- if i > start && (buf[i] == 'e' || buf[i] == 'E') {
- scientific = true
- i++
- continue
- }
- // + and - are only valid at this point if they follow an e (scientific notation)
- if (buf[i] == '+' || buf[i] == '-') && (buf[i-1] == 'e' || buf[i-1] == 'E') {
- i++
- continue
- }
- // NaN is an unsupported value
- if i+2 < len(buf) && (buf[i] == 'N' || buf[i] == 'n') {
- return i, ErrInvalidNumber
- }
- if !isNumeric(buf[i]) {
- return i, ErrInvalidNumber
- }
- i++
- }
- if (isInt || isUnsigned) && (decimal || scientific) {
- return i, ErrInvalidNumber
- }
- numericDigits := i - start
- if isInt {
- numericDigits--
- }
- if decimal {
- numericDigits--
- }
- if buf[start] == '-' {
- numericDigits--
- }
- if numericDigits == 0 {
- return i, ErrInvalidNumber
- }
- // It's more common that numbers will be within min/max range for their type but we need to prevent
- // out or range numbers from being parsed successfully. This uses some simple heuristics to decide
- // if we should parse the number to the actual type. It does not do it all the time because it incurs
- // extra allocations and we end up converting the type again when writing points to disk.
- if isInt {
- // Make sure the last char is an 'i' for integers (e.g. 9i10 is not valid)
- if buf[i-1] != 'i' {
- return i, ErrInvalidNumber
- }
- // Parse the int to check bounds the number of digits could be larger than the max range
- // We subtract 1 from the index to remove the `i` from our tests
- if len(buf[start:i-1]) >= maxInt64Digits || len(buf[start:i-1]) >= minInt64Digits {
- if _, err := parseIntBytes(buf[start:i-1], 10, 64); err != nil {
- return i, fmt.Errorf("unable to parse integer %s: %s", buf[start:i-1], err)
- }
- }
- } else if isUnsigned {
- // Return an error if uint64 support has not been enabled.
- if !enableUint64Support {
- return i, ErrInvalidNumber
- }
- // Make sure the last char is a 'u' for unsigned
- if buf[i-1] != 'u' {
- return i, ErrInvalidNumber
- }
- // Make sure the first char is not a '-' for unsigned
- if buf[start] == '-' {
- return i, ErrInvalidNumber
- }
- // Parse the uint to check bounds the number of digits could be larger than the max range
- // We subtract 1 from the index to remove the `u` from our tests
- if len(buf[start:i-1]) >= maxUint64Digits {
- if _, err := parseUintBytes(buf[start:i-1], 10, 64); err != nil {
- return i, fmt.Errorf("unable to parse unsigned %s: %s", buf[start:i-1], err)
- }
- }
- } else {
- // Parse the float to check bounds if it's scientific or the number of digits could be larger than the max range
- if scientific || len(buf[start:i]) >= maxFloat64Digits || len(buf[start:i]) >= minFloat64Digits {
- if _, err := parseFloatBytes(buf[start:i], 10); err != nil {
- return i, fmt.Errorf("invalid float")
- }
- }
- }
- return i, nil
- }
- // scanBoolean returns the end position within buf, start at i after
- // scanning over buf for boolean. Valid values for a boolean are
- // t, T, true, TRUE, f, F, false, FALSE. It returns an error if a invalid boolean
- // is scanned.
- func scanBoolean(buf []byte, i int) (int, []byte, error) {
- start := i
- if i < len(buf) && (buf[i] != 't' && buf[i] != 'f' && buf[i] != 'T' && buf[i] != 'F') {
- return i, buf[start:i], fmt.Errorf("invalid boolean")
- }
- i++
- for {
- if i >= len(buf) {
- break
- }
- if buf[i] == ',' || buf[i] == ' ' {
- break
- }
- i++
- }
- // Single char bool (t, T, f, F) is ok
- if i-start == 1 {
- return i, buf[start:i], nil
- }
- // length must be 4 for true or TRUE
- if (buf[start] == 't' || buf[start] == 'T') && i-start != 4 {
- return i, buf[start:i], fmt.Errorf("invalid boolean")
- }
- // length must be 5 for false or FALSE
- if (buf[start] == 'f' || buf[start] == 'F') && i-start != 5 {
- return i, buf[start:i], fmt.Errorf("invalid boolean")
- }
- // Otherwise
- valid := false
- switch buf[start] {
- case 't':
- valid = bytes.Equal(buf[start:i], []byte("true"))
- case 'f':
- valid = bytes.Equal(buf[start:i], []byte("false"))
- case 'T':
- valid = bytes.Equal(buf[start:i], []byte("TRUE")) || bytes.Equal(buf[start:i], []byte("True"))
- case 'F':
- valid = bytes.Equal(buf[start:i], []byte("FALSE")) || bytes.Equal(buf[start:i], []byte("False"))
- }
- if !valid {
- return i, buf[start:i], fmt.Errorf("invalid boolean")
- }
- return i, buf[start:i], nil
- }
- // skipWhitespace returns the end position within buf, starting at i after
- // scanning over spaces in tags.
- func skipWhitespace(buf []byte, i int) int {
- for i < len(buf) {
- if buf[i] != ' ' && buf[i] != '\t' && buf[i] != 0 {
- break
- }
- i++
- }
- return i
- }
- // scanLine returns the end position in buf and the next line found within
- // buf.
- func scanLine(buf []byte, i int) (int, []byte) {
- start := i
- quoted := false
- fields := false
- // tracks how many '=' and commas we've seen
- // this duplicates some of the functionality in scanFields
- equals := 0
- commas := 0
- for {
- // reached the end of buf?
- if i >= len(buf) {
- break
- }
- // skip past escaped characters
- if buf[i] == '\\' && i+2 < len(buf) {
- i += 2
- continue
- }
- if buf[i] == ' ' {
- fields = true
- }
- // If we see a double quote, makes sure it is not escaped
- if fields {
- if !quoted && buf[i] == '=' {
- i++
- equals++
- continue
- } else if !quoted && buf[i] == ',' {
- i++
- commas++
- continue
- } else if buf[i] == '"' && equals > commas {
- i++
- quoted = !quoted
- continue
- }
- }
- if buf[i] == '\n' && !quoted {
- break
- }
- i++
- }
- return i, buf[start:i]
- }
- // scanTo returns the end position in buf and the next consecutive block
- // of bytes, starting from i and ending with stop byte, where stop byte
- // has not been escaped.
- //
- // If there are leading spaces, they are skipped.
- func scanTo(buf []byte, i int, stop byte) (int, []byte) {
- start := i
- for {
- // reached the end of buf?
- if i >= len(buf) {
- break
- }
- // Reached unescaped stop value?
- if buf[i] == stop && (i == 0 || buf[i-1] != '\\') {
- break
- }
- i++
- }
- return i, buf[start:i]
- }
- // scanTo returns the end position in buf and the next consecutive block
- // of bytes, starting from i and ending with stop byte. If there are leading
- // spaces, they are skipped.
- func scanToSpaceOr(buf []byte, i int, stop byte) (int, []byte) {
- start := i
- if buf[i] == stop || buf[i] == ' ' {
- return i, buf[start:i]
- }
- for {
- i++
- if buf[i-1] == '\\' {
- continue
- }
- // reached the end of buf?
- if i >= len(buf) {
- return i, buf[start:i]
- }
- // reached end of block?
- if buf[i] == stop || buf[i] == ' ' {
- return i, buf[start:i]
- }
- }
- }
- func scanTagValue(buf []byte, i int) (int, []byte) {
- start := i
- for {
- if i >= len(buf) {
- break
- }
- if buf[i] == ',' && buf[i-1] != '\\' {
- break
- }
- i++
- }
- if i > len(buf) {
- return i, nil
- }
- return i, buf[start:i]
- }
- func scanFieldValue(buf []byte, i int) (int, []byte) {
- start := i
- quoted := false
- for i < len(buf) {
- // Only escape char for a field value is a double-quote and backslash
- if buf[i] == '\\' && i+1 < len(buf) && (buf[i+1] == '"' || buf[i+1] == '\\') {
- i += 2
- continue
- }
- // Quoted value? (e.g. string)
- if buf[i] == '"' {
- i++
- quoted = !quoted
- continue
- }
- if buf[i] == ',' && !quoted {
- break
- }
- i++
- }
- return i, buf[start:i]
- }
- func EscapeMeasurement(in []byte) []byte {
- for _, c := range measurementEscapeCodes {
- if bytes.IndexByte(in, c.k[0]) != -1 {
- in = bytes.Replace(in, c.k[:], c.esc[:], -1)
- }
- }
- return in
- }
- func unescapeMeasurement(in []byte) []byte {
- if bytes.IndexByte(in, '\\') == -1 {
- return in
- }
- for i := range measurementEscapeCodes {
- c := &measurementEscapeCodes[i]
- if bytes.IndexByte(in, c.k[0]) != -1 {
- in = bytes.Replace(in, c.esc[:], c.k[:], -1)
- }
- }
- return in
- }
- func escapeTag(in []byte) []byte {
- for i := range tagEscapeCodes {
- c := &tagEscapeCodes[i]
- if bytes.IndexByte(in, c.k[0]) != -1 {
- in = bytes.Replace(in, c.k[:], c.esc[:], -1)
- }
- }
- return in
- }
- func unescapeTag(in []byte) []byte {
- if bytes.IndexByte(in, '\\') == -1 {
- return in
- }
- for i := range tagEscapeCodes {
- c := &tagEscapeCodes[i]
- if bytes.IndexByte(in, c.k[0]) != -1 {
- in = bytes.Replace(in, c.esc[:], c.k[:], -1)
- }
- }
- return in
- }
- // escapeStringFieldReplacer replaces double quotes and backslashes
- // with the same character preceded by a backslash.
- // As of Go 1.7 this benchmarked better in allocations and CPU time
- // compared to iterating through a string byte-by-byte and appending to a new byte slice,
- // calling strings.Replace twice, and better than (*Regex).ReplaceAllString.
- var escapeStringFieldReplacer = strings.NewReplacer(`"`, `\"`, `\`, `\\`)
- // EscapeStringField returns a copy of in with any double quotes or
- // backslashes with escaped values.
- func EscapeStringField(in string) string {
- return escapeStringFieldReplacer.Replace(in)
- }
- // unescapeStringField returns a copy of in with any escaped double-quotes
- // or backslashes unescaped.
- func unescapeStringField(in string) string {
- if strings.IndexByte(in, '\\') == -1 {
- return in
- }
- var out []byte
- i := 0
- for {
- if i >= len(in) {
- break
- }
- // unescape backslashes
- if in[i] == '\\' && i+1 < len(in) && in[i+1] == '\\' {
- out = append(out, '\\')
- i += 2
- continue
- }
- // unescape double-quotes
- if in[i] == '\\' && i+1 < len(in) && in[i+1] == '"' {
- out = append(out, '"')
- i += 2
- continue
- }
- out = append(out, in[i])
- i++
- }
- return string(out)
- }
- // NewPoint returns a new point with the given measurement name, tags, fields and timestamp. If
- // an unsupported field value (NaN, or +/-Inf) or out of range time is passed, this function
- // returns an error.
- func NewPoint(name string, tags Tags, fields Fields, t time.Time) (Point, error) {
- key, err := pointKey(name, tags, fields, t)
- if err != nil {
- return nil, err
- }
- return &point{
- key: key,
- time: t,
- fields: fields.MarshalBinary(),
- }, nil
- }
- // pointKey checks some basic requirements for valid points, and returns the
- // key, along with an possible error.
- func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte, error) {
- if len(fields) == 0 {
- return nil, ErrPointMustHaveAField
- }
- if !t.IsZero() {
- if err := CheckTime(t); err != nil {
- return nil, err
- }
- }
- for key, value := range fields {
- switch value := value.(type) {
- case float64:
- // Ensure the caller validates and handles invalid field values
- if math.IsInf(value, 0) {
- return nil, fmt.Errorf("+/-Inf is an unsupported value for field %s", key)
- }
- if math.IsNaN(value) {
- return nil, fmt.Errorf("NaN is an unsupported value for field %s", key)
- }
- case float32:
- // Ensure the caller validates and handles invalid field values
- if math.IsInf(float64(value), 0) {
- return nil, fmt.Errorf("+/-Inf is an unsupported value for field %s", key)
- }
- if math.IsNaN(float64(value)) {
- return nil, fmt.Errorf("NaN is an unsupported value for field %s", key)
- }
- }
- if len(key) == 0 {
- return nil, fmt.Errorf("all fields must have non-empty names")
- }
- }
- key := MakeKey([]byte(measurement), tags)
- for field := range fields {
- sz := seriesKeySize(key, []byte(field))
- if sz > MaxKeyLength {
- return nil, fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength)
- }
- }
- return key, nil
- }
- func seriesKeySize(key, field []byte) int {
- // 4 is the length of the tsm1.fieldKeySeparator constant. It's inlined here to avoid a circular
- // dependency.
- return len(key) + 4 + len(field)
- }
- // NewPointFromBytes returns a new Point from a marshalled Point.
- func NewPointFromBytes(b []byte) (Point, error) {
- p := &point{}
- if err := p.UnmarshalBinary(b); err != nil {
- return nil, err
- }
- // This does some basic validation to ensure there are fields and they
- // can be unmarshalled as well.
- iter := p.FieldIterator()
- var hasField bool
- for iter.Next() {
- if len(iter.FieldKey()) == 0 {
- continue
- }
- hasField = true
- switch iter.Type() {
- case Float:
- _, err := iter.FloatValue()
- if err != nil {
- return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
- }
- case Integer:
- _, err := iter.IntegerValue()
- if err != nil {
- return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
- }
- case Unsigned:
- _, err := iter.UnsignedValue()
- if err != nil {
- return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
- }
- case String:
- // Skip since this won't return an error
- case Boolean:
- _, err := iter.BooleanValue()
- if err != nil {
- return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
- }
- }
- }
- if !hasField {
- return nil, ErrPointMustHaveAField
- }
- return p, nil
- }
- // MustNewPoint returns a new point with the given measurement name, tags, fields and timestamp. If
- // an unsupported field value (NaN) is passed, this function panics.
- func MustNewPoint(name string, tags Tags, fields Fields, time time.Time) Point {
- pt, err := NewPoint(name, tags, fields, time)
- if err != nil {
- panic(err.Error())
- }
- return pt
- }
- // Key returns the key (measurement joined with tags) of the point.
- func (p *point) Key() []byte {
- return p.key
- }
- func (p *point) name() []byte {
- _, name := scanTo(p.key, 0, ',')
- return name
- }
- func (p *point) Name() []byte {
- return escape.Unescape(p.name())
- }
- // SetName updates the measurement name for the point.
- func (p *point) SetName(name string) {
- p.cachedName = ""
- p.key = MakeKey([]byte(name), p.Tags())
- }
- // Time return the timestamp for the point.
- func (p *point) Time() time.Time {
- return p.time
- }
- // SetTime updates the timestamp for the point.
- func (p *point) SetTime(t time.Time) {
- p.time = t
- }
- // Round will round the timestamp of the point to the given duration.
- func (p *point) Round(d time.Duration) {
- p.time = p.time.Round(d)
- }
- // Tags returns the tag set for the point.
- func (p *point) Tags() Tags {
- if p.cachedTags != nil {
- return p.cachedTags
- }
- p.cachedTags = parseTags(p.key, nil)
- return p.cachedTags
- }
- func (p *point) ForEachTag(fn func(k, v []byte) bool) {
- walkTags(p.key, fn)
- }
- func (p *point) HasTag(tag []byte) bool {
- if len(p.key) == 0 {
- return false
- }
- var exists bool
- walkTags(p.key, func(key, value []byte) bool {
- if bytes.Equal(tag, key) {
- exists = true
- return false
- }
- return true
- })
- return exists
- }
- func walkTags(buf []byte, fn func(key, value []byte) bool) {
- if len(buf) == 0 {
- return
- }
- pos, name := scanTo(buf, 0, ',')
- // it's an empty key, so there are no tags
- if len(name) == 0 {
- return
- }
- hasEscape := bytes.IndexByte(buf, '\\') != -1
- i := pos + 1
- var key, value []byte
- for {
- if i >= len(buf) {
- break
- }
- i, key = scanTo(buf, i, '=')
- i, value = scanTagValue(buf, i+1)
- if len(value) == 0 {
- continue
- }
- if hasEscape {
- if !fn(unescapeTag(key), unescapeTag(value)) {
- return
- }
- } else {
- if !fn(key, value) {
- return
- }
- }
- i++
- }
- }
- // walkFields walks each field key and value via fn. If fn returns false, the iteration
- // is stopped. The values are the raw byte slices and not the converted types.
- func walkFields(buf []byte, fn func(key, value []byte) bool) error {
- var i int
- var key, val []byte
- for len(buf) > 0 {
- i, key = scanTo(buf, 0, '=')
- if i > len(buf)-2 {
- return fmt.Errorf("invalid value: field-key=%s", key)
- }
- buf = buf[i+1:]
- i, val = scanFieldValue(buf, 0)
- buf = buf[i:]
- if !fn(key, val) {
- break
- }
- // slice off comma
- if len(buf) > 0 {
- buf = buf[1:]
- }
- }
- return nil
- }
- // parseTags parses buf into the provided destination tags, returning destination
- // Tags, which may have a different length and capacity.
- func parseTags(buf []byte, dst Tags) Tags {
- if len(buf) == 0 {
- return nil
- }
- n := bytes.Count(buf, []byte(","))
- if cap(dst) < n {
- dst = make(Tags, n)
- } else {
- dst = dst[:n]
- }
- // Ensure existing behaviour when point has no tags and nil slice passed in.
- if dst == nil {
- dst = Tags{}
- }
- // Series keys can contain escaped commas, therefore the number of commas
- // in a series key only gives an estimation of the upper bound on the number
- // of tags.
- var i int
- walkTags(buf, func(key, value []byte) bool {
- dst[i].Key, dst[i].Value = key, value
- i++
- return true
- })
- return dst[:i]
- }
- // MakeKey creates a key for a set of tags.
- func MakeKey(name []byte, tags Tags) []byte {
- return AppendMakeKey(nil, name, tags)
- }
- // AppendMakeKey appends the key derived from name and tags to dst and returns the extended buffer.
- func AppendMakeKey(dst []byte, name []byte, tags Tags) []byte {
- // unescape the name and then re-escape it to avoid double escaping.
- // The key should always be stored in escaped form.
- dst = append(dst, EscapeMeasurement(unescapeMeasurement(name))...)
- dst = tags.AppendHashKey(dst)
- return dst
- }
- // SetTags replaces the tags for the point.
- func (p *point) SetTags(tags Tags) {
- p.key = MakeKey(p.Name(), tags)
- p.cachedTags = tags
- }
- // AddTag adds or replaces a tag value for a point.
- func (p *point) AddTag(key, value string) {
- tags := p.Tags()
- tags = append(tags, Tag{Key: []byte(key), Value: []byte(value)})
- sort.Sort(tags)
- p.cachedTags = tags
- p.key = MakeKey(p.Name(), tags)
- }
- // Fields returns the fields for the point.
- func (p *point) Fields() (Fields, error) {
- if p.cachedFields != nil {
- return p.cachedFields, nil
- }
- cf, err := p.unmarshalBinary()
- if err != nil {
- return nil, err
- }
- p.cachedFields = cf
- return p.cachedFields, nil
- }
- // SetPrecision will round a time to the specified precision.
- func (p *point) SetPrecision(precision string) {
- switch precision {
- case "n":
- case "u":
- p.SetTime(p.Time().Truncate(time.Microsecond))
- case "ms":
- p.SetTime(p.Time().Truncate(time.Millisecond))
- case "s":
- p.SetTime(p.Time().Truncate(time.Second))
- case "m":
- p.SetTime(p.Time().Truncate(time.Minute))
- case "h":
- p.SetTime(p.Time().Truncate(time.Hour))
- }
- }
- // String returns the string representation of the point.
- func (p *point) String() string {
- if p.Time().IsZero() {
- return string(p.Key()) + " " + string(p.fields)
- }
- return string(p.Key()) + " " + string(p.fields) + " " + strconv.FormatInt(p.UnixNano(), 10)
- }
- // AppendString appends the string representation of the point to buf.
- func (p *point) AppendString(buf []byte) []byte {
- buf = append(buf, p.key...)
- buf = append(buf, ' ')
- buf = append(buf, p.fields...)
- if !p.time.IsZero() {
- buf = append(buf, ' ')
- buf = strconv.AppendInt(buf, p.UnixNano(), 10)
- }
- return buf
- }
- // StringSize returns the length of the string that would be returned by String().
- func (p *point) StringSize() int {
- size := len(p.key) + len(p.fields) + 1
- if !p.time.IsZero() {
- digits := 1 // even "0" has one digit
- t := p.UnixNano()
- if t < 0 {
- // account for negative sign, then negate
- digits++
- t = -t
- }
- for t > 9 { // already accounted for one digit
- digits++
- t /= 10
- }
- size += digits + 1 // digits and a space
- }
- return size
- }
- // MarshalBinary returns a binary representation of the point.
- func (p *point) MarshalBinary() ([]byte, error) {
- if len(p.fields) == 0 {
- return nil, ErrPointMustHaveAField
- }
- tb, err := p.time.MarshalBinary()
- if err != nil {
- return nil, err
- }
- b := make([]byte, 8+len(p.key)+len(p.fields)+len(tb))
- i := 0
- binary.BigEndian.PutUint32(b[i:], uint32(len(p.key)))
- i += 4
- i += copy(b[i:], p.key)
- binary.BigEndian.PutUint32(b[i:i+4], uint32(len(p.fields)))
- i += 4
- i += copy(b[i:], p.fields)
- copy(b[i:], tb)
- return b, nil
- }
- // UnmarshalBinary decodes a binary representation of the point into a point struct.
- func (p *point) UnmarshalBinary(b []byte) error {
- var n int
- // Read key length.
- if len(b) < 4 {
- return io.ErrShortBuffer
- }
- n, b = int(binary.BigEndian.Uint32(b[:4])), b[4:]
- // Read key.
- if len(b) < n {
- return io.ErrShortBuffer
- }
- p.key, b = b[:n], b[n:]
- // Read fields length.
- if len(b) < 4 {
- return io.ErrShortBuffer
- }
- n, b = int(binary.BigEndian.Uint32(b[:4])), b[4:]
- // Read fields.
- if len(b) < n {
- return io.ErrShortBuffer
- }
- p.fields, b = b[:n], b[n:]
- // Read timestamp.
- return p.time.UnmarshalBinary(b)
- }
- // PrecisionString returns a string representation of the point. If there
- // is a timestamp associated with the point then it will be specified in the
- // given unit.
- func (p *point) PrecisionString(precision string) string {
- if p.Time().IsZero() {
- return fmt.Sprintf("%s %s", p.Key(), string(p.fields))
- }
- return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields),
- p.UnixNano()/GetPrecisionMultiplier(precision))
- }
- // RoundedString returns a string representation of the point. If there
- // is a timestamp associated with the point, then it will be rounded to the
- // given duration.
- func (p *point) RoundedString(d time.Duration) string {
- if p.Time().IsZero() {
- return fmt.Sprintf("%s %s", p.Key(), string(p.fields))
- }
- return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields),
- p.time.Round(d).UnixNano())
- }
- func (p *point) unmarshalBinary() (Fields, error) {
- iter := p.FieldIterator()
- fields := make(Fields, 8)
- for iter.Next() {
- if len(iter.FieldKey()) == 0 {
- continue
- }
- switch iter.Type() {
- case Float:
- v, err := iter.FloatValue()
- if err != nil {
- return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
- }
- fields[string(iter.FieldKey())] = v
- case Integer:
- v, err := iter.IntegerValue()
- if err != nil {
- return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
- }
- fields[string(iter.FieldKey())] = v
- case Unsigned:
- v, err := iter.UnsignedValue()
- if err != nil {
- return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
- }
- fields[string(iter.FieldKey())] = v
- case String:
- fields[string(iter.FieldKey())] = iter.StringValue()
- case Boolean:
- v, err := iter.BooleanValue()
- if err != nil {
- return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err)
- }
- fields[string(iter.FieldKey())] = v
- }
- }
- return fields, nil
- }
- // HashID returns a non-cryptographic checksum of the point's key.
- func (p *point) HashID() uint64 {
- h := NewInlineFNV64a()
- h.Write(p.key)
- sum := h.Sum64()
- return sum
- }
- // UnixNano returns the timestamp of the point as nanoseconds since Unix epoch.
- func (p *point) UnixNano() int64 {
- return p.Time().UnixNano()
- }
- // Split will attempt to return multiple points with the same timestamp whose
- // string representations are no longer than size. Points with a single field or
- // a point without a timestamp may exceed the requested size.
- func (p *point) Split(size int) []Point {
- if p.time.IsZero() || p.StringSize() <= size {
- return []Point{p}
- }
- // key string, timestamp string, spaces
- size -= len(p.key) + len(strconv.FormatInt(p.time.UnixNano(), 10)) + 2
- var points []Point
- var start, cur int
- for cur < len(p.fields) {
- end, _ := scanTo(p.fields, cur, '=')
- end, _ = scanFieldValue(p.fields, end+1)
- if cur > start && end-start > size {
- points = append(points, &point{
- key: p.key,
- time: p.time,
- fields: p.fields[start : cur-1],
- })
- start = cur
- }
- cur = end + 1
- }
- points = append(points, &point{
- key: p.key,
- time: p.time,
- fields: p.fields[start:],
- })
- return points
- }
- // Tag represents a single key/value tag pair.
- type Tag = models.Tag
- // NewTag returns a new Tag.
- func NewTag(key, value []byte) Tag {
- return Tag{
- Key: key,
- Value: value,
- }
- }
- // Tags represents a sorted list of tags.
- type Tags = models.Tags
- // NewTags returns a new Tags from a map.
- func NewTags(m map[string]string) Tags {
- if len(m) == 0 {
- return nil
- }
- a := make(Tags, 0, len(m))
- for k, v := range m {
- a = append(a, NewTag([]byte(k), []byte(v)))
- }
- sort.Sort(a)
- return a
- }
- // CompareTags returns -1 if a < b, 1 if a > b, and 0 if a == b.
- func CompareTags(a, b Tags) int {
- // Compare each key & value until a mismatch.
- for i := 0; i < len(a) && i < len(b); i++ {
- if cmp := bytes.Compare(a[i].Key, b[i].Key); cmp != 0 {
- return cmp
- }
- if cmp := bytes.Compare(a[i].Value, b[i].Value); cmp != 0 {
- return cmp
- }
- }
- // If all tags are equal up to this point then return shorter tagset.
- if len(a) < len(b) {
- return -1
- } else if len(a) > len(b) {
- return 1
- }
- // All tags are equal.
- return 0
- }
- // CopyTags returns a shallow copy of tags.
- func CopyTags(a Tags) Tags {
- other := make(Tags, len(a))
- copy(other, a)
- return other
- }
- // DeepCopyTags returns a deep copy of tags.
- func DeepCopyTags(a Tags) Tags {
- // Calculate size of keys/values in bytes.
- var n int
- for _, t := range a {
- n += len(t.Key) + len(t.Value)
- }
- // Build single allocation for all key/values.
- buf := make([]byte, n)
- // Copy tags to new set.
- other := make(Tags, len(a))
- for i, t := range a {
- copy(buf, t.Key)
- other[i].Key, buf = buf[:len(t.Key)], buf[len(t.Key):]
- copy(buf, t.Value)
- other[i].Value, buf = buf[:len(t.Value)], buf[len(t.Value):]
- }
- return other
- }
- // Fields represents a mapping between a Point's field names and their
- // values.
- type Fields map[string]interface{}
- // FieldIterator retuns a FieldIterator that can be used to traverse the
- // fields of a point without constructing the in-memory map.
- func (p *point) FieldIterator() FieldIterator {
- p.Reset()
- return p
- }
- type fieldIterator struct {
- start, end int
- key, keybuf []byte
- valueBuf []byte
- fieldType FieldType
- }
- // Next indicates whether there any fields remaining.
- func (p *point) Next() bool {
- p.it.start = p.it.end
- if p.it.start >= len(p.fields) {
- return false
- }
- p.it.end, p.it.key = scanTo(p.fields, p.it.start, '=')
- if escape.IsEscaped(p.it.key) {
- p.it.keybuf = escape.AppendUnescaped(p.it.keybuf[:0], p.it.key)
- p.it.key = p.it.keybuf
- }
- p.it.end, p.it.valueBuf = scanFieldValue(p.fields, p.it.end+1)
- p.it.end++
- if len(p.it.valueBuf) == 0 {
- p.it.fieldType = Empty
- return true
- }
- c := p.it.valueBuf[0]
- if c == '"' {
- p.it.fieldType = String
- return true
- }
- if strings.IndexByte(`0123456789-.nNiIu`, c) >= 0 {
- if p.it.valueBuf[len(p.it.valueBuf)-1] == 'i' {
- p.it.fieldType = Integer
- p.it.valueBuf = p.it.valueBuf[:len(p.it.valueBuf)-1]
- } else if p.it.valueBuf[len(p.it.valueBuf)-1] == 'u' {
- p.it.fieldType = Unsigned
- p.it.valueBuf = p.it.valueBuf[:len(p.it.valueBuf)-1]
- } else {
- p.it.fieldType = Float
- }
- return true
- }
- // to keep the same behavior that currently exists, default to boolean
- p.it.fieldType = Boolean
- return true
- }
- // FieldKey returns the key of the current field.
- func (p *point) FieldKey() []byte {
- return p.it.key
- }
- // Type returns the FieldType of the current field.
- func (p *point) Type() FieldType {
- return p.it.fieldType
- }
- // StringValue returns the string value of the current field.
- func (p *point) StringValue() string {
- return unescapeStringField(string(p.it.valueBuf[1 : len(p.it.valueBuf)-1]))
- }
- // IntegerValue returns the integer value of the current field.
- func (p *point) IntegerValue() (int64, error) {
- n, err := parseIntBytes(p.it.valueBuf, 10, 64)
- if err != nil {
- return 0, fmt.Errorf("unable to parse integer value %q: %v", p.it.valueBuf, err)
- }
- return n, nil
- }
- // UnsignedValue returns the unsigned value of the current field.
- func (p *point) UnsignedValue() (uint64, error) {
- n, err := parseUintBytes(p.it.valueBuf, 10, 64)
- if err != nil {
- return 0, fmt.Errorf("unable to parse unsigned value %q: %v", p.it.valueBuf, err)
- }
- return n, nil
- }
- // BooleanValue returns the boolean value of the current field.
- func (p *point) BooleanValue() (bool, error) {
- b, err := parseBoolBytes(p.it.valueBuf)
- if err != nil {
- return false, fmt.Errorf("unable to parse bool value %q: %v", p.it.valueBuf, err)
- }
- return b, nil
- }
- // FloatValue returns the float value of the current field.
- func (p *point) FloatValue() (float64, error) {
- f, err := parseFloatBytes(p.it.valueBuf, 64)
- if err != nil {
- return 0, fmt.Errorf("unable to parse floating point value %q: %v", p.it.valueBuf, err)
- }
- return f, nil
- }
- // Reset resets the iterator to its initial state.
- func (p *point) Reset() {
- p.it.fieldType = Empty
- p.it.key = nil
- p.it.valueBuf = nil
- p.it.start = 0
- p.it.end = 0
- }
- // MarshalBinary encodes all the fields to their proper type and returns the binary
- // represenation
- // NOTE: uint64 is specifically not supported due to potential overflow when we decode
- // again later to an int64
- // NOTE2: uint is accepted, and may be 64 bits, and is for some reason accepted...
- func (p Fields) MarshalBinary() []byte {
- var b []byte
- keys := make([]string, 0, len(p))
- for k := range p {
- keys = append(keys, k)
- }
- // Not really necessary, can probably be removed.
- sort.Strings(keys)
- for i, k := range keys {
- if i > 0 {
- b = append(b, ',')
- }
- b = appendField(b, k, p[k])
- }
- return b
- }
- func appendField(b []byte, k string, v interface{}) []byte {
- b = append(b, []byte(escape.String(k))...)
- b = append(b, '=')
- // check popular types first
- switch v := v.(type) {
- case float64:
- b = strconv.AppendFloat(b, v, 'f', -1, 64)
- case int64:
- b = strconv.AppendInt(b, v, 10)
- b = append(b, 'i')
- case string:
- b = append(b, '"')
- b = append(b, []byte(EscapeStringField(v))...)
- b = append(b, '"')
- case bool:
- b = strconv.AppendBool(b, v)
- case int32:
- b = strconv.AppendInt(b, int64(v), 10)
- b = append(b, 'i')
- case int16:
- b = strconv.AppendInt(b, int64(v), 10)
- b = append(b, 'i')
- case int8:
- b = strconv.AppendInt(b, int64(v), 10)
- b = append(b, 'i')
- case int:
- b = strconv.AppendInt(b, int64(v), 10)
- b = append(b, 'i')
- case uint64:
- b = strconv.AppendUint(b, v, 10)
- b = append(b, 'u')
- case uint32:
- b = strconv.AppendInt(b, int64(v), 10)
- b = append(b, 'i')
- case uint16:
- b = strconv.AppendInt(b, int64(v), 10)
- b = append(b, 'i')
- case uint8:
- b = strconv.AppendInt(b, int64(v), 10)
- b = append(b, 'i')
- case uint:
- // TODO: 'uint' should be converted to writing as an unsigned integer,
- // but we cannot since that would break backwards compatibility.
- b = strconv.AppendInt(b, int64(v), 10)
- b = append(b, 'i')
- case float32:
- b = strconv.AppendFloat(b, float64(v), 'f', -1, 32)
- case []byte:
- b = append(b, v...)
- case nil:
- // skip
- default:
- // Can't determine the type, so convert to string
- b = append(b, '"')
- b = append(b, []byte(EscapeStringField(fmt.Sprintf("%v", v)))...)
- b = append(b, '"')
- }
- return b
- }
- // ValidKeyToken returns true if the token used for measurement, tag key, or tag
- // value is a valid unicode string and only contains printable, non-replacement characters.
- func ValidKeyToken(s string) bool {
- if !utf8.ValidString(s) {
- return false
- }
- for _, r := range s {
- if !unicode.IsPrint(r) || r == unicode.ReplacementChar {
- return false
- }
- }
- return true
- }
- // ValidKeyTokens returns true if the measurement name and all tags are valid.
- func ValidKeyTokens(name string, tags Tags) bool {
- if !ValidKeyToken(name) {
- return false
- }
- for _, tag := range tags {
- if !ValidKeyToken(string(tag.Key)) || !ValidKeyToken(string(tag.Value)) {
- return false
- }
- }
- return true
- }
|