pool.go 8.9 KB


  1. package lancerlogstream
  2. import (
  3. "net"
  4. "time"
  5. "sync"
  6. "errors"
  7. "math/rand"
  8. "expvar"
  9. "go-common/library/log"
  10. "go-common/app/service/ops/log-agent/pkg/bufio"
  11. xtime "go-common/library/time"
  12. )
  13. var (
  14. ErrAddrListNil = errors.New("addrList can't be nil")
  15. ErrPoolSize = errors.New("Pool size should be no greater then length of addr list")
  16. )
  17. type LancerBufConn struct {
  18. conn net.Conn
  19. wr *bufio.Writer
  20. enabled bool
  21. ctime time.Time
  22. }
  23. type connPool struct {
  24. c *ConnPoolConfig
  25. invalidUpstreams map[string]interface{}
  26. invalidUpstreamsLock sync.RWMutex
  27. validBufConnChan chan *LancerBufConn
  28. invalidBufConnChan chan *LancerBufConn
  29. connCounter map[string]int
  30. connCounterLock sync.RWMutex
  31. newConnLock sync.Mutex
  32. }
  33. type ConnPoolConfig struct {
  34. Name string `tome:"name"`
  35. AddrList []string `tome:"addrList"`
  36. DialTimeout xtime.Duration `tome:"dialTimeout"`
  37. IdleTimeout xtime.Duration `tome:"idleTimeout"`
  38. BufSize int `tome:"bufSize"`
  39. PoolSize int `tome:"poolSize"`
  40. }
  41. func (c *ConnPoolConfig) ConfigValidate() (error) {
  42. if c == nil {
  43. return errors.New("Config of pool is nil")
  44. }
  45. if len(c.AddrList) == 0 {
  46. return errors.New("pool addr list can't be empty")
  47. }
  48. if time.Duration(c.DialTimeout) == 0 {
  49. c.DialTimeout = xtime.Duration(time.Second * 5)
  50. }
  51. if time.Duration(c.IdleTimeout) == 0 {
  52. c.IdleTimeout = xtime.Duration(time.Minute * 15)
  53. }
  54. if c.BufSize == 0 {
  55. c.BufSize = 1024 * 1024 * 2 // 2M by default
  56. }
  57. if c.PoolSize == 0 {
  58. c.PoolSize = len(c.AddrList)
  59. }
  60. return nil
  61. }
  62. // newConn make a connection to lancer
  63. func (cp *connPool) newConn() (conn net.Conn, err error) {
  64. cp.newConnLock.Lock()
  65. defer cp.newConnLock.Unlock()
  66. for {
  67. if addr, err := cp.randomOneUpstream(); err == nil {
  68. if conn, err := net.DialTimeout("tcp", addr, time.Duration(cp.c.DialTimeout)); err == nil && conn != nil {
  69. log.Info("connect to %s success", addr)
  70. cp.connCounterAdd(addr)
  71. return conn, nil
  72. } else {
  73. cp.markUpstreamInvalid(addr)
  74. continue
  75. }
  76. } else {
  77. return nil, err
  78. }
  79. }
  80. }
  81. // newBufConn 创建一个buf连接, buf连接绑定一个conn(无论连接是否可用)
  82. func (cp *connPool) newBufConn() (bufConn *LancerBufConn, err error) {
  83. bufConn = new(LancerBufConn)
  84. bufConn.wr = bufio.NewWriterSize(nil, cp.c.BufSize)
  85. if err := cp.setConn(bufConn); err == nil {
  86. bufConn.enabled = true
  87. } else {
  88. bufConn.enabled = false
  89. }
  90. return bufConn, nil
  91. }
  92. // flushBufConn 定期flush buffer
  93. func (cp *connPool) flushBufConn() {
  94. for {
  95. bufConn, _ := cp.getBufConn()
  96. bufConn.conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
  97. if err := bufConn.wr.Flush(); err != nil {
  98. log.Error("Error when flush to %s: %s", bufConn.conn.RemoteAddr().String(), err)
  99. bufConn.enabled = false
  100. }
  101. cp.putBufConn(bufConn)
  102. time.Sleep(time.Second * 5)
  103. }
  104. }
  105. // initConnPool 初始化conn pool对象
  106. func initConnPool(c *ConnPoolConfig) (cp *connPool, err error) {
  107. if err = c.ConfigValidate(); err != nil {
  108. return nil, err
  109. }
  110. if len(c.AddrList) == 0 {
  111. return nil, ErrAddrListNil
  112. }
  113. if c.PoolSize > len(c.AddrList) {
  114. return nil, ErrPoolSize
  115. }
  116. rand.Seed(time.Now().Unix())
  117. cp = new(connPool)
  118. cp.c = c
  119. cp.validBufConnChan = make(chan *LancerBufConn, cp.c.PoolSize)
  120. cp.invalidBufConnChan = make(chan *LancerBufConn, cp.c.PoolSize)
  121. cp.invalidUpstreams = make(map[string]interface{})
  122. cp.connCounter = make(map[string]int)
  123. cp.initPool()
  124. go cp.maintainUpstream()
  125. go cp.flushBufConn()
  126. go cp.maintainBufConnPool()
  127. expvar.Publish("conn_pool"+cp.c.Name, expvar.Func(cp.connPoolStatus))
  128. return cp, nil
  129. }
  130. // connableUpstreams 返回可以建立连接的upstream列表
  131. func (cp *connPool) connableUpstreams() ([]string) {
  132. list := make([]string, 0)
  133. cp.invalidUpstreamsLock.RLock()
  134. defer cp.invalidUpstreamsLock.RUnlock()
  135. for _, addr := range cp.c.AddrList {
  136. if _, ok := cp.invalidUpstreams[addr]; !ok {
  137. if count, ok := cp.connCounter[addr]; ok && count == 0 {
  138. list = append(list, addr)
  139. }
  140. }
  141. }
  142. return list
  143. }
  144. // write write []byte to BufConn
  145. func (bc *LancerBufConn) write(p []byte) (int, error) {
  146. bc.conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
  147. return bc.wr.Write(p)
  148. }
  149. // randomOneUpstream 随机返回一个可以建立连接的upstream
  150. func (cp *connPool) randomOneUpstream() (s string, err error) {
  151. list := cp.connableUpstreams()
  152. if len(list) == 0 {
  153. err = errors.New("No valid upstreams")
  154. return
  155. }
  156. return list[rand.Intn(len(list))], nil
  157. }
  158. // initPool 初始化poolSize个数的bufConn
  159. func (cp *connPool) initPool() {
  160. for _, addr := range cp.c.AddrList {
  161. cp.connCounter[addr] = 0
  162. }
  163. for i := 0; i < cp.c.PoolSize; i++ {
  164. if bufConn, err := cp.newBufConn(); err == nil {
  165. cp.putBufConn(bufConn)
  166. }
  167. }
  168. }
  169. // novalidUpstream check if there is no validUpstream
  170. func (cp *connPool) novalidUpstream() bool {
  171. return len(cp.invalidUpstreams) == len(cp.c.AddrList)
  172. }
  173. //GetConn 从pool中取一个BufConn
  174. func (cp *connPool) getBufConn() (*LancerBufConn, error) {
  175. for {
  176. select {
  177. case bufConn := <-cp.validBufConnChan:
  178. if !bufConn.enabled {
  179. cp.putInvalidBufConn(bufConn)
  180. continue
  181. }
  182. return bufConn, nil
  183. case <-time.After(10 * time.Second):
  184. log.Warn("timeout when get conn from conn pool")
  185. continue
  186. }
  187. }
  188. }
  189. // setConn 为bufConn绑定一个新的Conn
  190. func (cp *connPool) setConn(bufConn *LancerBufConn) (error) {
  191. if bufConn.conn != nil {
  192. if bufConn.enabled == false {
  193. cp.markUpstreamInvalid(bufConn.conn.RemoteAddr().String())
  194. }
  195. cp.connCounterDel(bufConn.conn.RemoteAddr().String())
  196. bufConn.conn.Close()
  197. bufConn.conn = nil
  198. bufConn.enabled = false
  199. }
  200. if conn, err := cp.newConn(); err == nil {
  201. bufConn.conn = conn
  202. bufConn.wr.Reset(conn)
  203. bufConn.ctime = time.Now()
  204. bufConn.enabled = true
  205. return nil
  206. } else {
  207. bufConn.enabled = false
  208. return err
  209. }
  210. }
  211. //putBufConn 把BufConn放回到pool中
  212. func (cp *connPool) putBufConn(bufConn *LancerBufConn) {
  213. if bufConn.enabled == false {
  214. cp.putInvalidBufConn(bufConn)
  215. return
  216. }
  217. if bufConn.ctime.Add(time.Duration(cp.c.IdleTimeout)).Before(time.Now()) {
  218. bufConn.wr.Flush()
  219. cp.putInvalidBufConn(bufConn)
  220. return
  221. }
  222. cp.putValidBufConn(bufConn)
  223. }
  224. // putValidBufConn 把 bufConn放到可用的pool中
  225. func (cp *connPool) putValidBufConn(bufConn *LancerBufConn) {
  226. select {
  227. case cp.validBufConnChan <- bufConn:
  228. return
  229. default:
  230. log.Warn("BufConnChan full, discard,this shouldn't happen")
  231. return
  232. }
  233. }
  234. // putInvalidBufConn 把bufConn放到不可用的pool中
  235. func (cp *connPool) putInvalidBufConn(bufConn *LancerBufConn) {
  236. select {
  237. case cp.invalidBufConnChan <- bufConn:
  238. return
  239. default:
  240. log.Warn("invalidBufConnChan full, discard,this shouldn't happen")
  241. return
  242. }
  243. }
  244. // maintainBufConnPool 维护BufConnPool状态
  245. func (cp *connPool) maintainBufConnPool() {
  246. for {
  247. select {
  248. case bufConn := <-cp.invalidBufConnChan:
  249. cp.setConn(bufConn)
  250. cp.putBufConn(bufConn)
  251. }
  252. time.Sleep(time.Second * 1)
  253. }
  254. }
  255. //markConnInvalid会将链接关闭并且将相应upstreamserver设置为不可用
  256. func (cp *connPool) markUpstreamInvalid(addr string) (err error) {
  257. log.Error("mark upstream %s invalid", addr)
  258. cp.invalidUpstreamsLock.Lock()
  259. cp.invalidUpstreams[addr] = nil
  260. cp.invalidUpstreamsLock.Unlock()
  261. return
  262. }
  263. // markUpstreamValid 将某一addr设置为不可用
  264. func (cp *connPool) markUpstreamValid(addr string) (err error) {
  265. log.Info("%s is valid again", addr)
  266. cp.invalidUpstreamsLock.Lock()
  267. delete(cp.invalidUpstreams, addr)
  268. cp.invalidUpstreamsLock.Unlock()
  269. return
  270. }
  271. // connCounterAdd 连接数+1
  272. func (cp *connPool) connCounterAdd(addr string) {
  273. cp.connCounterLock.Lock()
  274. defer cp.connCounterLock.Unlock()
  275. if _, ok := cp.connCounter[addr]; ok {
  276. cp.connCounter[addr] += 1
  277. } else {
  278. cp.connCounter[addr] = 1
  279. }
  280. return
  281. }
  282. //connCounterDel 连接数-1
  283. func (cp *connPool) connCounterDel(addr string) {
  284. cp.connCounterLock.Lock()
  285. defer cp.connCounterLock.Unlock()
  286. if _, ok := cp.connCounter[addr]; ok {
  287. cp.connCounter[addr] -= 1
  288. }
  289. }
  290. // connPoolStatus 返回connPool状态
  291. func (cp *connPool) connPoolStatus() interface{} {
  292. status := make(map[string]interface{})
  293. status["conn_num"] = cp.connCounter
  294. status["invalidUpstreams"] = cp.invalidUpstreams
  295. return status
  296. }
  297. // maintainUpstream 维护upstream的健康状态
  298. func (cp *connPool) maintainUpstream() {
  299. for {
  300. cp.invalidUpstreamsLock.RLock()
  301. tryAddrs := make([]string, 0, len(cp.invalidUpstreams))
  302. for k := range cp.invalidUpstreams {
  303. tryAddrs = append(tryAddrs, k)
  304. }
  305. cp.invalidUpstreamsLock.RUnlock()
  306. for _, addr := range tryAddrs {
  307. if conn, err := net.DialTimeout("tcp", addr, time.Duration(cp.c.DialTimeout)); err == nil && conn != nil {
  308. conn.Close()
  309. cp.markUpstreamValid(addr)
  310. }
  311. }
  312. time.Sleep(time.Second * 10)
  313. }
  314. }
  315. //ReleaseConnPool 释放连接池中所有链接
  316. func (cp *connPool) ReleaseConnPool() {
  317. log.Info("Release Conn Pool")
  318. close(cp.validBufConnChan)
  319. close(cp.invalidBufConnChan)
  320. for conn := range cp.validBufConnChan {
  321. conn.enabled = false
  322. conn.wr.Flush()
  323. conn.conn.Close()
  324. }
  325. }