123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- package deliver
- import (
- "encoding/binary"
- "fmt"
- "math/rand"
- "net"
- "sync"
- "time"
- "go-common/library/log"
- )
- var (
- _magicBuf = []byte{0xAC, 0xBE}
- _bufpool sync.Pool
- )
- func init() {
- rand.Seed(time.Now().UnixNano())
- _bufpool = sync.Pool{New: func() interface{} {
- return make([]byte, 0, 4096)
- }}
- }
- func freeBuf(buf []byte) {
- buf = buf[:0]
- _bufpool.Put(buf)
- }
- func getBuf() []byte {
- return _bufpool.Get().([]byte)
- }
- // Deliver deliver span to dapper-service through tcp
- type Deliver struct {
- servers []string
- readFn func() ([]byte, error)
- conn *net.TCPConn
- dataCh chan []byte
- closeCh chan struct{}
- closed bool
- }
- // New Deliver
- func New(servers []string, readFn func() ([]byte, error)) (*Deliver, error) {
- if len(servers) == 0 {
- return nil, fmt.Errorf("no server provide")
- }
- d := &Deliver{
- servers: servers,
- readFn: readFn,
- closeCh: make(chan struct{}, 1),
- dataCh: make(chan []byte),
- }
- return d, d.start()
- }
- func (d *Deliver) start() error {
- if err := d.dial(); err != nil {
- return err
- }
- go d.fetch()
- go d.loop()
- return nil
- }
- func (d *Deliver) fetch() {
- for {
- if d.closed {
- return
- }
- data, err := d.readFn()
- if err != nil {
- log.Error("deliver read data error: %s", err)
- continue
- }
- d.dataCh <- data
- }
- }
- func (d *Deliver) loop() {
- for {
- select {
- case <-d.closeCh:
- return
- case data := <-d.dataCh:
- data = warpData(data)
- send:
- _, err := d.conn.Write(data)
- if err == nil {
- freeBuf(data)
- continue
- }
- d.reDial()
- goto send
- }
- }
- }
- // Close deliver
- func (d *Deliver) Close() error {
- if d.closed {
- return fmt.Errorf("already closed")
- }
- d.closed = true
- d.closeCh <- struct{}{}
- timer := time.NewTimer(50 * time.Millisecond)
- select {
- case data := <-d.dataCh:
- // write last data to conn
- _, err := d.conn.Write(data)
- return fmt.Errorf("write last data error: %s", err)
- case <-timer.C:
- return nil
- }
- return nil
- }
- func (d *Deliver) reDial() {
- if d.conn != nil {
- d.conn.Close()
- }
- for {
- if err := d.dial(); err != nil {
- log.Error("redial error: %s, retry after second", err)
- time.Sleep(time.Second)
- }
- break
- }
- }
- func (d *Deliver) dial() error {
- server := chioceServer(d.servers)
- conn, err := net.Dial("tcp", server)
- if err != nil {
- return fmt.Errorf("dial tcp://%s error: %s", server, err)
- }
- d.conn = conn.(*net.TCPConn)
- d.conn.SetKeepAlive(true)
- return nil
- }
- func chioceServer(servers []string) string {
- return servers[rand.Intn(len(servers))]
- }
- func warpData(data []byte) []byte {
- buf := getBuf()
- buf = append(buf, _magicBuf...)
- buf = append(buf, []byte{0, 0, 0, 0, 0, 0}...)
- binary.BigEndian.PutUint32(buf[2:6], uint32(len(data)+2))
- buf = append(buf, data...)
- return buf
- }
|