reader.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package file
  2. import (
  3. "io"
  4. "bufio"
  5. "bytes"
  6. )
  7. // Message represents a reader event with timestamp, content and actual number
  8. // of bytes read from input before decoding.
  9. //type Message struct {
  10. // Ts time.Time // timestamp the content was read
  11. // Content []byte // actual content read
  12. // Bytes int // total number of bytes read to generate the message
  13. // //Fields common.MapStr // optional fields that can be added by reader
  14. //}
  15. type Reader interface {
  16. Next() ([]byte, int, error)
  17. }
  18. type LineReader struct {
  19. reader io.Reader
  20. rb *bufio.Reader
  21. bufferSize int
  22. nl []byte
  23. nlSize int
  24. scan *bufio.Scanner
  25. }
  26. // New creates a new reader object
  27. func NewLineReader(input io.Reader, bufferSize int) (*LineReader, error) {
  28. nl := []byte{'\n'}
  29. r := &LineReader{
  30. reader: input,
  31. bufferSize: bufferSize,
  32. nl: nl,
  33. nlSize: len(nl),
  34. }
  35. r.rb = bufio.NewReaderSize(input, r.bufferSize)
  36. r.scan = bufio.NewScanner(r.rb)
  37. r.scan.Split(ScanLines)
  38. return r, nil
  39. }
  40. func ScanLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
  41. if atEOF && len(data) == 0 {
  42. return 0, nil, nil
  43. }
  44. if i := bytes.IndexByte(data, '\n'); i >= 0 {
  45. // We have a full newline-terminated line.
  46. return i + 1, data[0:i], nil
  47. }
  48. // Request more data.
  49. return 0, nil, nil
  50. }
  51. // Next reads the next line until the new line character
  52. func (r *LineReader) Next() ([]byte, int, error) {
  53. body, err := r.rb.ReadBytes('\n')
  54. advance := len(body)
  55. //if err == io.EOF && advance > 0 {
  56. // return body, advance, err
  57. //}
  58. // remove '\n'
  59. if len(body) > 0 && body[len(body)-1] == '\n' {
  60. body = body[0:len(body)-1]
  61. }
  62. // remove '\r'
  63. if len(body) > 0 && body[len(body)-1] == '\r' {
  64. body = body[0: len(body)-1]
  65. }
  66. return body, advance, err
  67. }