udp.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package client
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "time"
  7. )
  8. const (
  9. // UDPPayloadSize is a reasonable default payload size for UDP packets that
  10. // could be travelling over the internet.
  11. UDPPayloadSize = 512
  12. )
  13. // UDPConfig is the config data needed to create a UDP Client.
  14. type UDPConfig struct {
  15. // Addr should be of the form "host:port"
  16. // or "[ipv6-host%zone]:port".
  17. Addr string
  18. // PayloadSize is the maximum size of a UDP client message, optional
  19. // Tune this based on your network. Defaults to UDPPayloadSize.
  20. PayloadSize int
  21. }
  22. // NewUDPClient returns a client interface for writing to an InfluxDB UDP
  23. // service from the given config.
  24. func NewUDPClient(conf UDPConfig) (Client, error) {
  25. var udpAddr *net.UDPAddr
  26. udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
  27. if err != nil {
  28. return nil, err
  29. }
  30. conn, err := net.DialUDP("udp", nil, udpAddr)
  31. if err != nil {
  32. return nil, err
  33. }
  34. payloadSize := conf.PayloadSize
  35. if payloadSize == 0 {
  36. payloadSize = UDPPayloadSize
  37. }
  38. return &udpclient{
  39. conn: conn,
  40. payloadSize: payloadSize,
  41. }, nil
  42. }
  43. // Close releases the udpclient's resources.
  44. func (uc *udpclient) Close() error {
  45. return uc.conn.Close()
  46. }
  47. type udpclient struct {
  48. conn io.WriteCloser
  49. payloadSize int
  50. }
  51. func (uc *udpclient) Write(bp BatchPoints) error {
  52. var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed
  53. var d, _ = time.ParseDuration("1" + bp.Precision())
  54. var delayedError error
  55. var checkBuffer = func(n int) {
  56. if len(b) > 0 && len(b)+n > uc.payloadSize {
  57. if _, err := uc.conn.Write(b); err != nil {
  58. delayedError = err
  59. }
  60. b = b[:0]
  61. }
  62. }
  63. for _, p := range bp.Points() {
  64. p.pt.Round(d)
  65. pointSize := p.pt.StringSize() + 1 // include newline in size
  66. //point := p.pt.RoundedString(d) + "\n"
  67. checkBuffer(pointSize)
  68. if p.Time().IsZero() || pointSize <= uc.payloadSize {
  69. b = p.pt.AppendString(b)
  70. b = append(b, '\n')
  71. continue
  72. }
  73. points := p.pt.Split(uc.payloadSize - 1) // account for newline character
  74. for _, sp := range points {
  75. checkBuffer(sp.StringSize() + 1)
  76. b = sp.AppendString(b)
  77. b = append(b, '\n')
  78. }
  79. }
  80. if len(b) > 0 {
  81. if _, err := uc.conn.Write(b); err != nil {
  82. return err
  83. }
  84. }
  85. return delayedError
  86. }
  87. func (uc *udpclient) Query(q Query) (*Response, error) {
  88. return nil, fmt.Errorf("Querying via UDP is not supported")
  89. }
  90. func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
  91. return 0, "", nil
  92. }