sql.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. package tidb
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "go-common/library/naming"
  10. "go-common/library/net/netutil/breaker"
  11. "go-common/library/net/trace"
  12. "github.com/go-sql-driver/mysql"
  13. "github.com/pkg/errors"
  14. )
  15. const (
  16. _family = "tidb_client"
  17. )
  18. var (
  19. // ErrStmtNil prepared stmt error
  20. ErrStmtNil = errors.New("sql: prepare failed and stmt nil")
  21. // ErrNoRows is returned by Scan when QueryRow doesn't return a row.
  22. // In such a case, QueryRow returns a placeholder *Row value that defers
  23. // this error until a Scan.
  24. ErrNoRows = sql.ErrNoRows
  25. // ErrTxDone transaction done.
  26. ErrTxDone = sql.ErrTxDone
  27. )
  28. // DB database.
  29. type DB struct {
  30. conf *Config
  31. conns []*conn
  32. idx int64
  33. dis naming.Resolver
  34. appid string
  35. mutex sync.RWMutex
  36. breakerGroup *breaker.Group
  37. }
  38. // conn database connection
  39. type conn struct {
  40. *sql.DB
  41. breaker breaker.Breaker
  42. conf *Config
  43. addr string
  44. }
  45. // Tx transaction.
  46. type Tx struct {
  47. db *conn
  48. tx *sql.Tx
  49. t trace.Trace
  50. c context.Context
  51. cancel func()
  52. }
  53. // Row row.
  54. type Row struct {
  55. err error
  56. *sql.Row
  57. db *conn
  58. query string
  59. args []interface{}
  60. t trace.Trace
  61. cancel func()
  62. }
  63. // Scan copies the columns from the matched row into the values pointed at by dest.
  64. func (r *Row) Scan(dest ...interface{}) (err error) {
  65. if r.t != nil {
  66. defer r.t.Finish(&err)
  67. }
  68. if r.err != nil {
  69. err = r.err
  70. } else if r.Row == nil {
  71. err = ErrStmtNil
  72. }
  73. if err != nil {
  74. return
  75. }
  76. err = r.Row.Scan(dest...)
  77. if r.cancel != nil {
  78. r.cancel()
  79. }
  80. r.db.onBreaker(&err)
  81. if err != ErrNoRows {
  82. err = errors.Wrapf(err, "addr: %s, query %s args %+v", r.db.addr, r.query, r.args)
  83. }
  84. return
  85. }
  86. // Rows rows.
  87. type Rows struct {
  88. *sql.Rows
  89. cancel func()
  90. }
  91. // Close closes the Rows, preventing further enumeration. If Next is called
  92. // and returns false and there are no further result sets,
  93. // the Rows are closed automatically and it will suffice to check the
  94. // result of Err. Close is idempotent and does not affect the result of Err.
  95. func (rs *Rows) Close() (err error) {
  96. err = errors.WithStack(rs.Rows.Close())
  97. if rs.cancel != nil {
  98. rs.cancel()
  99. }
  100. return
  101. }
  102. // Stmt prepared stmt.
  103. type Stmt struct {
  104. db *conn
  105. tx bool
  106. query string
  107. stmt atomic.Value
  108. t trace.Trace
  109. }
  110. // Stmts random prepared stmt.
  111. type Stmts struct {
  112. query string
  113. sts map[string]*Stmt
  114. mu sync.RWMutex
  115. db *DB
  116. }
  117. // Open opens a database specified by its database driver name and a
  118. // driver-specific data source name, usually consisting of at least a database
  119. // name and connection information.
  120. func Open(c *Config) (db *DB, err error) {
  121. db = &DB{conf: c, breakerGroup: breaker.NewGroup(c.Breaker)}
  122. cfg, err := mysql.ParseDSN(c.DSN)
  123. if err != nil {
  124. return
  125. }
  126. var dsns []string
  127. if cfg.Net == "discovery" {
  128. db.appid = cfg.Addr
  129. for _, addr := range db.disc() {
  130. dsns = append(dsns, genDSN(c.DSN, addr))
  131. }
  132. } else {
  133. dsns = append(dsns, c.DSN)
  134. }
  135. cs := make([]*conn, 0, len(dsns))
  136. for _, dsn := range dsns {
  137. r, err := db.connectDSN(dsn)
  138. if err != nil {
  139. return db, err
  140. }
  141. cs = append(cs, r)
  142. }
  143. db.conns = cs
  144. return
  145. }
  146. func (db *DB) connectDSN(dsn string) (c *conn, err error) {
  147. d, err := connect(db.conf, dsn)
  148. if err != nil {
  149. return
  150. }
  151. addr := parseDSNAddr(dsn)
  152. brk := db.breakerGroup.Get(addr)
  153. c = &conn{DB: d, breaker: brk, conf: db.conf, addr: addr}
  154. return
  155. }
  156. func connect(c *Config, dataSourceName string) (*sql.DB, error) {
  157. d, err := sql.Open("mysql", dataSourceName)
  158. if err != nil {
  159. err = errors.WithStack(err)
  160. return nil, err
  161. }
  162. d.SetMaxOpenConns(c.Active)
  163. d.SetMaxIdleConns(c.Idle)
  164. d.SetConnMaxLifetime(time.Duration(c.IdleTimeout))
  165. return d, nil
  166. }
  167. func (db *DB) conn() (c *conn) {
  168. db.mutex.RLock()
  169. c = db.conns[db.index()]
  170. db.mutex.RUnlock()
  171. return
  172. }
  173. // Begin starts a transaction. The isolation level is dependent on the driver.
  174. func (db *DB) Begin(c context.Context) (tx *Tx, err error) {
  175. return db.conn().begin(c)
  176. }
  177. // Exec executes a query without returning any rows.
  178. // The args are for any placeholder parameters in the query.
  179. func (db *DB) Exec(c context.Context, query string, args ...interface{}) (res sql.Result, err error) {
  180. return db.conn().exec(c, query, args...)
  181. }
  182. // Prepare creates a prepared statement for later queries or executions.
  183. // Multiple queries or executions may be run concurrently from the returned
  184. // statement. The caller must call the statement's Close method when the
  185. // statement is no longer needed.
  186. func (db *DB) Prepare(query string) (*Stmt, error) {
  187. return db.conn().prepare(query)
  188. }
  189. // Prepared creates a prepared statement for later queries or executions.
  190. // Multiple queries or executions may be run concurrently from the returned
  191. // statement. The caller must call the statement's Close method when the
  192. // statement is no longer needed.
  193. func (db *DB) Prepared(query string) (s *Stmts) {
  194. s = &Stmts{query: query, sts: make(map[string]*Stmt), db: db}
  195. for _, c := range db.conns {
  196. st := c.prepared(query)
  197. s.mu.Lock()
  198. s.sts[c.addr] = st
  199. s.mu.Unlock()
  200. }
  201. return
  202. }
  203. // Query executes a query that returns rows, typically a SELECT. The args are
  204. // for any placeholder parameters in the query.
  205. func (db *DB) Query(c context.Context, query string, args ...interface{}) (rows *Rows, err error) {
  206. return db.conn().query(c, query, args...)
  207. }
  208. // QueryRow executes a query that is expected to return at most one row.
  209. // QueryRow always returns a non-nil value. Errors are deferred until Row's
  210. // Scan method is called.
  211. func (db *DB) QueryRow(c context.Context, query string, args ...interface{}) *Row {
  212. return db.conn().queryRow(c, query, args...)
  213. }
  214. func (db *DB) index() int {
  215. if len(db.conns) == 1 {
  216. return 0
  217. }
  218. v := atomic.AddInt64(&db.idx, 1)
  219. return int(v) % len(db.conns)
  220. }
  221. // Close closes the databases, releasing any open resources.
  222. func (db *DB) Close() (err error) {
  223. db.mutex.RLock()
  224. defer db.mutex.RUnlock()
  225. for _, d := range db.conns {
  226. if e := d.Close(); e != nil {
  227. err = errors.WithStack(e)
  228. }
  229. }
  230. return
  231. }
  232. // Ping verifies a connection to the database is still alive, establishing a
  233. // connection if necessary.
  234. func (db *DB) Ping(c context.Context) (err error) {
  235. if err = db.conn().ping(c); err != nil {
  236. return
  237. }
  238. return
  239. }
  240. func (db *conn) onBreaker(err *error) {
  241. if err != nil && *err != nil && *err != sql.ErrNoRows && *err != sql.ErrTxDone {
  242. db.breaker.MarkFailed()
  243. } else {
  244. db.breaker.MarkSuccess()
  245. }
  246. }
  247. func (db *conn) begin(c context.Context) (tx *Tx, err error) {
  248. now := time.Now()
  249. t, ok := trace.FromContext(c)
  250. if ok {
  251. t = t.Fork(_family, "begin")
  252. t.SetTag(trace.String(trace.TagAddress, db.addr), trace.String(trace.TagComment, ""))
  253. defer func() {
  254. if err != nil {
  255. t.Finish(&err)
  256. }
  257. }()
  258. }
  259. if err = db.breaker.Allow(); err != nil {
  260. stats.Incr("tidb:begin", "breaker")
  261. return
  262. }
  263. _, c, cancel := db.conf.TranTimeout.Shrink(c)
  264. rtx, err := db.BeginTx(c, nil)
  265. stats.Timing("tidb:begin", int64(time.Since(now)/time.Millisecond))
  266. if err != nil {
  267. err = errors.WithStack(err)
  268. cancel()
  269. return
  270. }
  271. tx = &Tx{tx: rtx, t: t, db: db, c: c, cancel: cancel}
  272. return
  273. }
  274. func (db *conn) exec(c context.Context, query string, args ...interface{}) (res sql.Result, err error) {
  275. now := time.Now()
  276. if t, ok := trace.FromContext(c); ok {
  277. t = t.Fork(_family, "exec")
  278. t.SetTag(trace.String(trace.TagAddress, db.addr), trace.String(trace.TagComment, query))
  279. defer t.Finish(&err)
  280. }
  281. if err = db.breaker.Allow(); err != nil {
  282. stats.Incr("tidb:exec", "breaker")
  283. return
  284. }
  285. _, c, cancel := db.conf.ExecTimeout.Shrink(c)
  286. res, err = db.ExecContext(c, query, args...)
  287. cancel()
  288. db.onBreaker(&err)
  289. stats.Timing("tidb:exec", int64(time.Since(now)/time.Millisecond))
  290. if err != nil {
  291. err = errors.Wrapf(err, "addr: %s exec:%s, args:%+v", db.addr, query, args)
  292. }
  293. return
  294. }
  295. func (db *conn) ping(c context.Context) (err error) {
  296. now := time.Now()
  297. if t, ok := trace.FromContext(c); ok {
  298. t = t.Fork(_family, "ping")
  299. t.SetTag(trace.String(trace.TagAddress, db.addr), trace.String(trace.TagComment, ""))
  300. defer t.Finish(&err)
  301. }
  302. if err = db.breaker.Allow(); err != nil {
  303. stats.Incr("tidb:ping", "breaker")
  304. return
  305. }
  306. _, c, cancel := db.conf.ExecTimeout.Shrink(c)
  307. err = db.PingContext(c)
  308. cancel()
  309. db.onBreaker(&err)
  310. stats.Timing("tidb:ping", int64(time.Since(now)/time.Millisecond))
  311. if err != nil {
  312. err = errors.WithStack(err)
  313. }
  314. return
  315. }
  316. func (db *conn) prepare(query string) (*Stmt, error) {
  317. stmt, err := db.Prepare(query)
  318. if err != nil {
  319. err = errors.Wrapf(err, "addr: %s prepare %s", db.addr, query)
  320. return nil, err
  321. }
  322. st := &Stmt{query: query, db: db}
  323. st.stmt.Store(stmt)
  324. return st, nil
  325. }
  326. func (db *conn) prepared(query string) (stmt *Stmt) {
  327. stmt = &Stmt{query: query, db: db}
  328. s, err := db.Prepare(query)
  329. if err == nil {
  330. stmt.stmt.Store(s)
  331. return
  332. }
  333. return
  334. }
  335. func (db *conn) query(c context.Context, query string, args ...interface{}) (rows *Rows, err error) {
  336. now := time.Now()
  337. if t, ok := trace.FromContext(c); ok {
  338. t = t.Fork(_family, "query")
  339. t.SetTag(trace.String(trace.TagAddress, db.addr), trace.String(trace.TagComment, query))
  340. defer t.Finish(&err)
  341. }
  342. if err = db.breaker.Allow(); err != nil {
  343. stats.Incr("tidb:query", "breaker")
  344. return
  345. }
  346. _, c, cancel := db.conf.QueryTimeout.Shrink(c)
  347. rs, err := db.DB.QueryContext(c, query, args...)
  348. db.onBreaker(&err)
  349. stats.Timing("tidb:query", int64(time.Since(now)/time.Millisecond))
  350. if err != nil {
  351. err = errors.Wrapf(err, "addr: %s, query:%s, args:%+v", db.addr, query, args)
  352. cancel()
  353. return
  354. }
  355. rows = &Rows{Rows: rs, cancel: cancel}
  356. return
  357. }
  358. func (db *conn) queryRow(c context.Context, query string, args ...interface{}) *Row {
  359. now := time.Now()
  360. t, ok := trace.FromContext(c)
  361. if ok {
  362. t = t.Fork(_family, "queryrow")
  363. t.SetTag(trace.String(trace.TagAddress, db.addr), trace.String(trace.TagComment, query))
  364. }
  365. if err := db.breaker.Allow(); err != nil {
  366. stats.Incr("tidb:queryrow", "breaker")
  367. return &Row{db: db, t: t, err: err}
  368. }
  369. _, c, cancel := db.conf.QueryTimeout.Shrink(c)
  370. r := db.DB.QueryRowContext(c, query, args...)
  371. stats.Timing("tidb:queryrow", int64(time.Since(now)/time.Millisecond))
  372. return &Row{db: db, Row: r, query: query, args: args, t: t, cancel: cancel}
  373. }
  374. // Close closes the statement.
  375. func (s *Stmt) Close() (err error) {
  376. stmt, ok := s.stmt.Load().(*sql.Stmt)
  377. if ok {
  378. err = errors.WithStack(stmt.Close())
  379. }
  380. return
  381. }
  382. func (s *Stmt) prepare() (st *sql.Stmt) {
  383. var ok bool
  384. if st, ok = s.stmt.Load().(*sql.Stmt); ok {
  385. return
  386. }
  387. var err error
  388. if st, err = s.db.Prepare(s.query); err == nil {
  389. s.stmt.Store(st)
  390. }
  391. return
  392. }
  393. // Exec executes a prepared statement with the given arguments and returns a
  394. // Result summarizing the effect of the statement.
  395. func (s *Stmt) Exec(c context.Context, args ...interface{}) (res sql.Result, err error) {
  396. now := time.Now()
  397. if s.tx {
  398. if s.t != nil {
  399. s.t.SetTag(trace.String(trace.TagAnnotation, s.query))
  400. }
  401. } else if t, ok := trace.FromContext(c); ok {
  402. t = t.Fork(_family, "exec")
  403. t.SetTag(trace.String(trace.TagAddress, s.db.addr), trace.String(trace.TagComment, s.query))
  404. defer t.Finish(&err)
  405. }
  406. if err = s.db.breaker.Allow(); err != nil {
  407. stats.Incr("tidb:stmt:exec", "breaker")
  408. return
  409. }
  410. stmt := s.prepare()
  411. if stmt == nil {
  412. err = ErrStmtNil
  413. return
  414. }
  415. _, c, cancel := s.db.conf.ExecTimeout.Shrink(c)
  416. res, err = stmt.ExecContext(c, args...)
  417. cancel()
  418. s.db.onBreaker(&err)
  419. stats.Timing("tidb:stmt:exec", int64(time.Since(now)/time.Millisecond))
  420. if err != nil {
  421. err = errors.Wrapf(err, "addr: %s exec:%s, args:%+v", s.db.addr, s.query, args)
  422. }
  423. return
  424. }
  425. // Query executes a prepared query statement with the given arguments and
  426. // returns the query results as a *Rows.
  427. func (s *Stmt) Query(c context.Context, args ...interface{}) (rows *Rows, err error) {
  428. if s.tx {
  429. if s.t != nil {
  430. s.t.SetTag(trace.String(trace.TagAnnotation, s.query))
  431. }
  432. } else if t, ok := trace.FromContext(c); ok {
  433. t = t.Fork(_family, "query")
  434. t.SetTag(trace.String(trace.TagAddress, s.db.addr), trace.String(trace.TagComment, s.query))
  435. defer t.Finish(&err)
  436. }
  437. if err = s.db.breaker.Allow(); err != nil {
  438. stats.Incr("tidb:stmt:query", "breaker")
  439. return
  440. }
  441. stmt := s.prepare()
  442. if stmt == nil {
  443. err = ErrStmtNil
  444. return
  445. }
  446. now := time.Now()
  447. _, c, cancel := s.db.conf.QueryTimeout.Shrink(c)
  448. rs, err := stmt.QueryContext(c, args...)
  449. s.db.onBreaker(&err)
  450. stats.Timing("tidb:stmt:query", int64(time.Since(now)/time.Millisecond))
  451. if err != nil {
  452. err = errors.Wrapf(err, "addr: %s, query:%s, args:%+v", s.db.addr, s.query, args)
  453. cancel()
  454. return
  455. }
  456. rows = &Rows{Rows: rs, cancel: cancel}
  457. return
  458. }
  459. // QueryRow executes a prepared query statement with the given arguments.
  460. // If an error occurs during the execution of the statement, that error will
  461. // be returned by a call to Scan on the returned *Row, which is always non-nil.
  462. // If the query selects no rows, the *Row's Scan will return ErrNoRows.
  463. // Otherwise, the *Row's Scan scans the first selected row and discards the rest.
  464. func (s *Stmt) QueryRow(c context.Context, args ...interface{}) (row *Row) {
  465. now := time.Now()
  466. row = &Row{db: s.db, query: s.query, args: args}
  467. if s.tx {
  468. if s.t != nil {
  469. s.t.SetTag(trace.String(trace.TagAnnotation, s.query))
  470. }
  471. } else if t, ok := trace.FromContext(c); ok {
  472. t = t.Fork(_family, "queryrow")
  473. t.SetTag(trace.String(trace.TagAddress, s.db.addr), trace.String(trace.TagComment, s.query))
  474. row.t = t
  475. }
  476. if row.err = s.db.breaker.Allow(); row.err != nil {
  477. stats.Incr("tidb:stmt:queryrow", "breaker")
  478. return
  479. }
  480. stmt := s.prepare()
  481. if stmt == nil {
  482. return
  483. }
  484. _, c, cancel := s.db.conf.QueryTimeout.Shrink(c)
  485. row.Row = stmt.QueryRowContext(c, args...)
  486. row.cancel = cancel
  487. stats.Timing("tidb:stmt:queryrow", int64(time.Since(now)/time.Millisecond))
  488. return
  489. }
  490. func (s *Stmts) prepare(conn *conn) (st *Stmt) {
  491. if conn == nil {
  492. conn = s.db.conn()
  493. }
  494. s.mu.RLock()
  495. st = s.sts[conn.addr]
  496. s.mu.RUnlock()
  497. if st == nil {
  498. st = conn.prepared(s.query)
  499. s.mu.Lock()
  500. s.sts[conn.addr] = st
  501. s.mu.Unlock()
  502. }
  503. return
  504. }
  505. // Exec executes a prepared statement with the given arguments and returns a
  506. // Result summarizing the effect of the statement.
  507. func (s *Stmts) Exec(c context.Context, args ...interface{}) (res sql.Result, err error) {
  508. return s.prepare(nil).Exec(c, args...)
  509. }
  510. // Query executes a prepared query statement with the given arguments and
  511. // returns the query results as a *Rows.
  512. func (s *Stmts) Query(c context.Context, args ...interface{}) (rows *Rows, err error) {
  513. return s.prepare(nil).Query(c, args...)
  514. }
  515. // QueryRow executes a prepared query statement with the given arguments.
  516. // If an error occurs during the execution of the statement, that error will
  517. // be returned by a call to Scan on the returned *Row, which is always non-nil.
  518. // If the query selects no rows, the *Row's Scan will return ErrNoRows.
  519. // Otherwise, the *Row's Scan scans the first selected row and discards the rest.
  520. func (s *Stmts) QueryRow(c context.Context, args ...interface{}) (row *Row) {
  521. return s.prepare(nil).QueryRow(c, args...)
  522. }
  523. // Close closes the statement.
  524. func (s *Stmts) Close() (err error) {
  525. for _, st := range s.sts {
  526. if err = errors.WithStack(st.Close()); err != nil {
  527. return
  528. }
  529. }
  530. return
  531. }
  532. // Commit commits the transaction.
  533. func (tx *Tx) Commit() (err error) {
  534. err = tx.tx.Commit()
  535. tx.cancel()
  536. tx.db.onBreaker(&err)
  537. if tx.t != nil {
  538. tx.t.Finish(&err)
  539. }
  540. if err != nil {
  541. err = errors.WithStack(err)
  542. }
  543. return
  544. }
  545. // Rollback aborts the transaction.
  546. func (tx *Tx) Rollback() (err error) {
  547. err = tx.tx.Rollback()
  548. tx.cancel()
  549. tx.db.onBreaker(&err)
  550. if tx.t != nil {
  551. tx.t.Finish(&err)
  552. }
  553. if err != nil {
  554. err = errors.WithStack(err)
  555. }
  556. return
  557. }
  558. // Exec executes a query that doesn't return rows. For example: an INSERT and
  559. // UPDATE.
  560. func (tx *Tx) Exec(query string, args ...interface{}) (res sql.Result, err error) {
  561. now := time.Now()
  562. if tx.t != nil {
  563. tx.t.SetTag(trace.String(trace.TagAnnotation, fmt.Sprintf("exec %s", query)))
  564. }
  565. res, err = tx.tx.ExecContext(tx.c, query, args...)
  566. stats.Timing("tidb:tx:exec", int64(time.Since(now)/time.Millisecond))
  567. if err != nil {
  568. err = errors.Wrapf(err, "addr: %s exec:%s, args:%+v", tx.db.addr, query, args)
  569. }
  570. return
  571. }
  572. // Query executes a query that returns rows, typically a SELECT.
  573. func (tx *Tx) Query(query string, args ...interface{}) (rows *Rows, err error) {
  574. if tx.t != nil {
  575. tx.t.SetTag(trace.String(trace.TagAnnotation, fmt.Sprintf("query %s", query)))
  576. }
  577. now := time.Now()
  578. defer func() {
  579. stats.Timing("tidb:tx:query", int64(time.Since(now)/time.Millisecond))
  580. }()
  581. rs, err := tx.tx.QueryContext(tx.c, query, args...)
  582. if err == nil {
  583. rows = &Rows{Rows: rs}
  584. } else {
  585. err = errors.Wrapf(err, "addr: %s, query:%s, args:%+v", tx.db.addr, query, args)
  586. }
  587. return
  588. }
  589. // QueryRow executes a query that is expected to return at most one row.
  590. // QueryRow always returns a non-nil value. Errors are deferred until Row's
  591. // Scan method is called.
  592. func (tx *Tx) QueryRow(query string, args ...interface{}) *Row {
  593. if tx.t != nil {
  594. tx.t.SetTag(trace.String(trace.TagAnnotation, fmt.Sprintf("queryrow %s", query)))
  595. }
  596. now := time.Now()
  597. defer func() {
  598. stats.Timing("tidb:tx:queryrow", int64(time.Since(now)/time.Millisecond))
  599. }()
  600. r := tx.tx.QueryRowContext(tx.c, query, args...)
  601. return &Row{Row: r, db: tx.db, query: query, args: args}
  602. }
  603. // Stmt returns a transaction-specific prepared statement from an existing statement.
  604. func (tx *Tx) Stmt(stmt *Stmt) *Stmt {
  605. if stmt == nil {
  606. return nil
  607. }
  608. as, ok := stmt.stmt.Load().(*sql.Stmt)
  609. if !ok {
  610. return nil
  611. }
  612. ts := tx.tx.StmtContext(tx.c, as)
  613. st := &Stmt{query: stmt.query, tx: true, t: tx.t, db: tx.db}
  614. st.stmt.Store(ts)
  615. return st
  616. }
  617. // Stmts returns a transaction-specific prepared statement from an existing statement.
  618. func (tx *Tx) Stmts(stmt *Stmts) *Stmt {
  619. return tx.Stmt(stmt.prepare(tx.db))
  620. }
  621. // Prepare creates a prepared statement for use within a transaction.
  622. // The returned statement operates within the transaction and can no longer be
  623. // used once the transaction has been committed or rolled back.
  624. // To use an existing prepared statement on this transaction, see Tx.Stmt.
  625. func (tx *Tx) Prepare(query string) (*Stmt, error) {
  626. if tx.t != nil {
  627. tx.t.SetTag(trace.String(trace.TagAnnotation, fmt.Sprintf("prepare %s", query)))
  628. }
  629. stmt, err := tx.tx.Prepare(query)
  630. if err != nil {
  631. err = errors.Wrapf(err, "addr: %s prepare %s", tx.db.addr, query)
  632. return nil, err
  633. }
  634. st := &Stmt{query: query, tx: true, t: tx.t, db: tx.db}
  635. st.stmt.Store(stmt)
  636. return st, nil
  637. }
  638. // parseDSNAddr parse dsn name and return addr.
  639. func parseDSNAddr(dsn string) (addr string) {
  640. if dsn == "" {
  641. return
  642. }
  643. cfg, err := mysql.ParseDSN(dsn)
  644. if err != nil {
  645. return
  646. }
  647. addr = cfg.Addr
  648. return
  649. }
  650. func genDSN(dsn, addr string) (res string) {
  651. cfg, err := mysql.ParseDSN(dsn)
  652. if err != nil {
  653. return
  654. }
  655. cfg.Addr = addr
  656. cfg.Net = "tcp"
  657. res = cfg.FormatDSN()
  658. return
  659. }