123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- package databus
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "math/rand"
- "net/url"
- "sync"
- "sync/atomic"
- "time"
- "go-common/library/cache/redis"
- "go-common/library/conf/env"
- "go-common/library/container/pool"
- "go-common/library/log"
- "go-common/library/naming"
- "go-common/library/naming/discovery"
- "go-common/library/net/netutil"
- "go-common/library/net/trace"
- "go-common/library/stat/prom"
- xtime "go-common/library/time"
- )
- const (
- _appid = "middleware.databus"
- )
- type dial func() (redis.Conn, error)
- // Config databus config.
- type Config struct {
- Key string
- Secret string
- Group string
- Topic string
- Action string // shoule be "pub" or "sub" or "pubsub"
- Buffer int
- Name string // redis name, for trace
- Proto string
- Addr string
- Auth string
- Active int // pool
- Idle int // pool
- DialTimeout xtime.Duration
- ReadTimeout xtime.Duration
- WriteTimeout xtime.Duration
- IdleTimeout xtime.Duration
- Direct bool
- }
- const (
- _family = "databus"
- _actionSub = "sub"
- _actionPub = "pub"
- _actionAll = "pubsub"
- _cmdPub = "SET"
- _cmdSub = "MGET"
- _authFormat = "%s:%s@%s/topic=%s&role=%s"
- _open = int32(0)
- _closed = int32(1)
- _scheme = "databus"
- )
- var (
- // ErrAction action error.
- ErrAction = errors.New("action unknown")
- // ErrFull chan full
- ErrFull = errors.New("chan full")
- // ErrNoInstance no instances
- ErrNoInstance = errors.New("no databus instances found")
- bk = netutil.DefaultBackoffConfig
- stats = prom.LibClient
- )
- // Message Data.
- type Message struct {
- Key string `json:"key"`
- Value json.RawMessage `json:"value"`
- Topic string `json:"topic"`
- Partition int32 `json:"partition"`
- Offset int64 `json:"offset"`
- Timestamp int64 `json:"timestamp"`
- d *Databus
- }
- // Commit ack message.
- func (m *Message) Commit() (err error) {
- m.d.lock.Lock()
- if m.Offset >= m.d.marked[m.Partition] {
- m.d.marked[m.Partition] = m.Offset
- }
- m.d.lock.Unlock()
- return nil
- }
- // Databus databus struct.
- type Databus struct {
- conf *Config
- d dial
- p *redis.Pool
- dis naming.Resolver
- msgs chan *Message
- lock sync.RWMutex
- marked map[int32]int64
- idx int64
- closed int32
- }
- // New new a databus.
- func New(c *Config) *Databus {
- if c.Buffer == 0 {
- c.Buffer = 1024
- }
- d := &Databus{
- conf: c,
- msgs: make(chan *Message, c.Buffer),
- marked: make(map[int32]int64),
- closed: _open,
- }
- if !c.Direct && env.DeployEnv != "" && env.DeployEnv != env.DeployEnvDev {
- d.dis = discovery.Build(_appid)
- e := d.dis.Watch()
- select {
- case <-e:
- d.disc()
- case <-time.After(10 * time.Second):
- panic("init discovery err")
- }
- go d.discoveryproc(e)
- log.Info("init databus discvoery info successfully")
- }
- if c.Action == _actionSub || c.Action == _actionAll {
- if d.dis == nil {
- d.d = d.dial
- } else {
- d.d = d.dialInstance
- }
- go d.subproc()
- }
- if c.Action == _actionPub || c.Action == _actionAll {
- // new pool
- d.p = d.redisPool(c)
- if d.dis != nil {
- d.p.New = func(ctx context.Context) (io.Closer, error) {
- return d.dialInstance()
- }
- }
- }
- return d
- }
- func (d *Databus) redisPool(c *Config) *redis.Pool {
- config := &redis.Config{
- Name: c.Name,
- Proto: c.Proto,
- Addr: c.Addr,
- Auth: fmt.Sprintf(_authFormat, c.Key, c.Secret, c.Group, c.Topic, c.Action),
- DialTimeout: c.DialTimeout,
- ReadTimeout: c.ReadTimeout,
- WriteTimeout: c.WriteTimeout,
- }
- config.Config = &pool.Config{
- Active: c.Active,
- Idle: c.Idle,
- IdleTimeout: c.IdleTimeout,
- }
- stat := redis.DialStats(statfunc)
- return redis.NewPool(config, stat)
- }
- func statfunc(cmd string, err *error) func() {
- now := time.Now()
- return func() {
- stats.Timing(fmt.Sprintf("databus:%s", cmd), int64(time.Since(now)/time.Millisecond))
- if err != nil && *err != nil {
- stats.Incr("databus", (*err).Error())
- }
- }
- }
- func (d *Databus) redisOptions() []redis.DialOption {
- cnop := redis.DialConnectTimeout(time.Duration(d.conf.DialTimeout))
- rdop := redis.DialReadTimeout(time.Duration(d.conf.ReadTimeout))
- wrop := redis.DialWriteTimeout(time.Duration(d.conf.WriteTimeout))
- auop := redis.DialPassword(fmt.Sprintf(_authFormat, d.conf.Key, d.conf.Secret, d.conf.Group, d.conf.Topic, d.conf.Action))
- stat := redis.DialStats(statfunc)
- return []redis.DialOption{cnop, rdop, wrop, auop, stat}
- }
- func (d *Databus) dial() (redis.Conn, error) {
- return redis.Dial(d.conf.Proto, d.conf.Addr, d.redisOptions()...)
- }
- func (d *Databus) dialInstance() (redis.Conn, error) {
- if insMap, ok := d.dis.Fetch(context.Background()); ok {
- ins, ok := insMap[env.Zone]
- if !ok || len(ins) == 0 {
- for _, is := range insMap {
- ins = append(ins, is...)
- }
- }
- if len(ins) > 0 {
- var in *naming.Instance
- if d.conf.Action == "pub" {
- i := atomic.AddInt64(&d.idx, 1)
- in = ins[i%int64(len(ins))]
- } else {
- in = ins[rand.Intn(len(ins))]
- }
- for _, addr := range in.Addrs {
- u, err := url.Parse(addr)
- if err == nil && u.Scheme == _scheme {
- return redis.Dial("tcp", u.Host, d.redisOptions()...)
- }
- }
- }
- }
- if d.conf.Proto != "" && d.conf.Addr != "" {
- log.Warn("Databus: no instances(%s,%s) found in discovery,Use config(%s,%s)", _appid, env.Zone, d.conf.Proto, d.conf.Addr)
- return redis.Dial(d.conf.Proto, d.conf.Addr, d.redisOptions()...)
- }
- return nil, ErrNoInstance
- }
- func (d *Databus) disc() {
- if d.p != nil {
- op := d.p
- np := d.redisPool(d.conf)
- np.New = func(ctx context.Context) (io.Closer, error) {
- return d.dialInstance()
- }
- d.p = np
- op.Close()
- op = nil
- log.Info("discovery event renew redis pool group(%s) topic(%s)", d.conf.Group, d.conf.Topic)
- }
- if insMap, ok := d.dis.Fetch(context.Background()); ok {
- if ins, ok := insMap[env.Zone]; ok && len(ins) > 0 {
- log.Info("get databus instances len(%d)", len(ins))
- }
- }
- }
- func (d *Databus) discoveryproc(e <-chan struct{}) {
- if d.dis == nil {
- return
- }
- for {
- <-e
- d.disc()
- }
- }
- func (d *Databus) subproc() {
- var (
- err error
- r []byte
- res [][]byte
- c redis.Conn
- retry int
- commited = make(map[int32]int64)
- commit = make(map[int32]int64)
- )
- for {
- if atomic.LoadInt32(&d.closed) == _closed {
- if c != nil {
- c.Close()
- }
- close(d.msgs)
- return
- }
- if err != nil {
- time.Sleep(bk.Backoff(retry))
- retry++
- } else {
- retry = 0
- }
- if c == nil || c.Err() != nil {
- if c, err = d.d(); err != nil {
- log.Error("redis.Dial(%s@%s) group(%s) retry error(%v)", d.conf.Proto, d.conf.Addr, d.conf.Group, err)
- continue
- }
- }
- d.lock.RLock()
- for k, v := range d.marked {
- if commited[k] != v {
- commit[k] = v
- }
- }
- d.lock.RUnlock()
- // TODO pipeline commit offset
- for k, v := range commit {
- if _, err = c.Do("SET", k, v); err != nil {
- c.Close()
- log.Error("group(%s) conn.Do(SET,%d,%d) commit error(%v)", d.conf.Group, k, v, err)
- break
- }
- delete(commit, k)
- commited[k] = v
- }
- if err != nil {
- continue
- }
- // pull messages
- if res, err = redis.ByteSlices(c.Do(_cmdSub, "")); err != nil {
- c.Close()
- log.Error("group(%s) conn.Do(MGET) error(%v)", d.conf.Group, err)
- continue
- }
- for _, r = range res {
- msg := &Message{d: d}
- if err = json.Unmarshal(r, msg); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", r, err)
- continue
- }
- d.msgs <- msg
- }
- }
- }
- // Messages get message chan.
- func (d *Databus) Messages() <-chan *Message {
- return d.msgs
- }
- // Send send message to databus.
- func (d *Databus) Send(c context.Context, k string, v interface{}) (err error) {
- var b []byte
- // trace info
- if t, ok := trace.FromContext(c); ok {
- t = t.Fork(_family, _cmdPub)
- t.SetTag(trace.String(trace.TagAddress, d.conf.Addr), trace.String(trace.TagComment, k))
- defer t.Finish(&err)
- }
- // send message
- if b, err = json.Marshal(v); err != nil {
- log.Error("json.Marshal(%v) error(%v)", v, err)
- return
- }
- conn := d.p.Get(context.TODO())
- if _, err = conn.Do(_cmdPub, k, b); err != nil {
- log.Error("conn.Do(%s,%s,%s) error(%v)", _cmdPub, k, b, err)
- }
- conn.Close()
- return
- }
- // Close close databus conn.
- func (d *Databus) Close() (err error) {
- if !atomic.CompareAndSwapInt32(&d.closed, _open, _closed) {
- return
- }
- if d.p != nil {
- d.p.Close()
- }
- return nil
- }
|