Golang: Database: database/sql: API Usage

 15th December 2021 at 12:03pm

简单代码

package main

import (
    "database/sql"
    "fmt"
    _ "github.com/jackc/pgx/v4/stdlib"
    "log"
    "net/url"
    "os"
)

func main() {
    u := &url.URL{
        Scheme: "postgresql",
        User:   url.UserPassword("squirrel", "iloveice"),
        Host:   "localhost",
        Path:   "squirrel",
    }

    db, err := sql.Open("pgx", u.String())
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    var greeting string
    err = db.QueryRow("select 'Hello, world!'").Scan(&greeting)
    if err != nil {
        fmt.Fprintf(os.Stderr, "QueryRow failed: %v\n", err)
        os.Exit(1)
    }

    fmt.Println(greeting)
}

查询请求

var (
    id int
    name string
)
rows, err := db.Query("select id, name from users where id = ?", 1)
if err != nil {
    log.Fatal(err)
}
defer rows.Close()

err = rows.Err()
if err != nil {
    log.Fatal(err)
}

for rows.Next() {
    err := rows.Scan(&id, &name)
    if err != nil {
        log.Fatal(err)
    }
    log.Println(id, name)
}

关键点:

  • db.Query 提交查询语句
  • rows.Next()rows.Scan() 配合遍历和读取结果
  • rows.Err() 可以在未读取数据的前提下得到查询错误,但它只能得到类似数据库 server 返回的错误。对于 sql.ErrNoRows,如果你还没有 Scan(),这个方法不会返回错误;如果 Scan() 了,它也会返回错误。如果 rows.Err() 不为 nil,调用 rows.Scan() 也会抛出
  • rows.Close() 会使得当前数据库请求使用的连接被释放回连接池。如果你用 rows.Next() 遍历完集合,rows.Close() 会被隐式地调用;但是假如没有遍历完,那么 defer rows.Close() 能保证函数结束后有调用。在已经是 close 的情况下再调用 rows.Close() 也不会出错

如果 db.Query 中有多个 SELECT 语句,可以用 rows.NextResultSet()

Rows.Scan()文档)是其中比较复杂的部分。它的功能比较多样,可以把一个 SQL 类型值转换为 Go 的类型值。特别的一点是,如果 SQL 中是字符串(比如 VARCHAR),而且字符串存的是数字,你可以把它 scan 进一个数字类型,Go 会帮你做这个转换(比如 strconv.ParseInt())。

Prepared 查询

stmt, err := db.Prepare("select id, name from users where id = ?")
if err != nil {
    log.Fatal(err)
}
defer stmt.Close()
rows, err := stmt.Query(1)
if err != nil {
    log.Fatal(err)
}
defer rows.Close()
for rows.Next() {
    // ...
}
if err = rows.Err(); err != nil {
    log.Fatal(err)
}

不同数据库的 prepare 语句中的点位符不一样:

MySQL               PostgreSQL            Oracle
=====               ==========            ======
WHERE col = ?       WHERE col = $1        WHERE col = :col
VALUES(?, ?, ?)     VALUES($1, $2, $3)    VALUES(:val1, :val2, :val3)

单行查询

var name string
err = db.QueryRow("select name from users where id = ?", 1).Scan(&name)
if err != nil {
    if err == sql.ErrNoRows {
        // there were no rows, but otherwise no error occurred
    } else {
        log.Fatal(err)
    }
}
fmt.Println(name)
stmt, err := db.Prepare("select name from users where id = ?")
if err != nil {
    log.Fatal(err)
}
defer stmt.Close()
var name string
err = stmt.QueryRow(1).Scan(&name)
if err != nil {
    if err == sql.ErrNoRows {
        // there were no rows, but otherwise no error occurred
    } else {
        log.Fatal(err)
    }
}
fmt.Println(name)

查询的错误是在 Scan() 处抛出的,而不是 QueryRow()

修改数据

INSERT, UPDATE, DELETE 这种不返回纪录的语句,用 Exec() 来执行:

_, err := db.Exec("DELETE FROM users")  // OK

搭配 prepared statement:

stmt, err := db.Prepare("INSERT INTO users(name) VALUES(?)")
if err != nil {
    log.Fatal(err)
}
res, err := stmt.Exec("Dolly")
if err != nil {
    log.Fatal(err)
}

lastId, err := res.LastInsertId()
if err != nil {
    log.Fatal(err)
}
rowCnt, err := res.RowsAffected()
if err != nil {
    log.Fatal(err)
}
log.Printf("ID = %d, affected = %d\n", lastId, rowCnt)

不要用 db.Query() 来做这类操作,它会返回 Rows 结构,会期望你调用 Rows.Close() 释放连接。但这种情况下可能会忘掉这个调用。

事务

db, err := sql.Open("pgx", u.String())
if err != nil {
    log.Fatal(err)
}
defer db.Close()

tx, err := db.Begin()
if err != nil {
    log.Fatal(err)
}
// 这是一个 trick。一般函数实现中,当出错时就 return err 给调用方了,
// 此时 `tx.Rollback()` 就会被自动调用。虽然这个例子中用了 `log.Fatal()` 
// 而不是 return err。如果一个 tx 已经 Commit,那调用 Rollback 时会返回
// ErrTxDone。忽略即可。
defer tx.Rollback()

_, err = tx.Exec(`
CREATE TABLE films3 (
    code        char(5) PRIMARY KEY,
    title       varchar(40) NOT NULL,
);`)
if err != nil {
    log.Fatal(err)
}

err = tx.Commit()
if err != nil {
    log.Fatal(err)
}

事务的实现,是从 db.Begin() 获得一个 Tx 对象,以 tx.Commit()tx.Rollback() 结束。隔离级别由 driver 决定,或者由 db.BeginTx() 传参设置。

在事务中,不要使用 db.Exec() 去做修改操作,而应该使用 tx.Exec()db.Exec() 做的操作不在事务范围内。

错误处理

各接口返回的 err 有错误信息。但具体的错误码跟 driver 是绑定的,database/sql 不做规范。比如:

if driverErr, ok := err.(*mysql.MySQLError); ok {
    if driverErr.Number == mysqlerr.ER_ACCESS_DENIED_ERROR {
        // Handle the permission-denied error
    }
}

查询单行数据时,有一个特殊的错误,用来判断有没有查到数据:

var name string
err = db.QueryRow("select name from users where id = ?", 1).Scan(&name)
if err != nil {
    if err == sql.ErrNoRows {
        // there were no rows, but otherwise no error occurred
    } else {
        log.Fatal(err)
    }
}
fmt.Println(name)

当出现连接错误时,database/sql 会自动重试(重试一次,一共试两次)。次数定义在 database/sql/sql.go 中的 maxBadConnRetries

处理 null

当某一列的数值可能是 null 时,你要用 sql 包中对应的 Null 类型:

for rows.Next() {
    var s sql.NullString
    err := rows.Scan(&s)
    // check err
    if s.Valid {
       // use s.String
    } else {
       // NULL value
    }
}

Null 类型并不是很多,比如没有 sql.NullUint64。如果需要,你可以拷贝 sql 包中的代码做类似的实现。

另外的方法是,通过 SQL 中的函数(比如 COALESCE):

rows, err := db.Query(`
    SELECT
        name,
        COALESCE(other_field, '') as otherField
    WHERE id = ?
`, 42)

for rows.Next() {
    err := rows.Scan(&name, &otherField)
    // ..
    // If `other_field` was NULL, `otherField` is now an empty string. This works with other data types as well.
}

注意事项

批量操作database/sql 还不支持批量操作。有一个 issue 在关注这个问题。

可能会导致资源耗尽的操作

  • 频繁地 sql.Open()db.Close(),会导致单个 sql.DB 维持的连接池无法被复用,可能导致机器的网络资源被消耗完(比如一堆 TCP 连接保持在 TIME_WAIT 状态)
  • 查询数据时,没有遍历完所有行,也没有调用 rows.Close(),导致连接无法返回连接池
  • 使用 Query() 去做 INSERT, UPDATE, DELETE 等操作,却不对返回的 rowsClose() 操作。这种情况下应该用 Exec() 代替 Query()
  • Prepared statements 在数据库层面,需要 driver 跟 DB server 有三次交互。频繁的使用 prepared statement,配合 sql 包本身的重试机制,可能会引起问题(但我觉得问题不大)

单次执行多条语句时连接复用问题:当多次请求(Query() / Exec())时,如果不同语句之间有上下依赖关系(比如 MySQL 中,经常有先使用 USE 语句,再做其他操作的),要注意不同语句执行时是否使用的是同个数据库连接:

  • 在事务内用 Tx 对象发起的请求,使用同个连接
  • 不在事务中时,
    • 直接用 DB 对象发起的请求,每次请求都随机使用的连接池中的连接,不保证是同样的
    • 需要保证同一连接的话,使用 db.Conn() 来获得一个连接
conn, err := db.Conn(ctx)
if err != nil {
    log.Fatal(err)
}
defer conn.Close() // Return the connection to the pool.
id := 41
result, err := conn.ExecContext(ctx, `UPDATE balances SET balance = balance + 10 WHERE user_id = ?;`, id)
if err != nil {
    log.Fatal(err)
}

没有多语句支持:这样的语句的效果是不确定的,看 DB server 如何处理:

_, err := db.Exec("DELETE FROM tbl1; DELETE FROM tbl2") // Error/unpredictable result

它可能报错,也可能不报错;可能两条语句都被执行,也可能只执行一条。

事务内 rows 未被关闭时,连接无法复用:比如这样的代码:

tx, err := db.Begin()
rows, err := tx.Query("select * from tbl1") // Uses tx's connection
for rows.Next() {
    err = rows.Scan(&myvariable)
    // ERROR! tx's connection is already busy!
    tx.Query("select * from tbl2 where id = ?", myvariable)
}

rows 没有被遍历完,或者没有调用 rows.Close() 之前,你是无法使用 tx 做新的查询的,因为连接还没有被释放回去。

即使不在事务中,如果你在循环内用 sql.DB 做一次新的查询,也无法复用上一次查询的连接:

rows, err := db.Query("select * from tbl1") // Uses connection 1
for rows.Next() {
    err = rows.Scan(&myvariable)
    // The following line will NOT use connection 1, which is already in-use
    db.Query("select * from tbl2 where id = ?", myvariable)
}