Golang: Database: database/sql

27th April 2021 at 7:23pm
Golang: Database

Golang 的 database/sql 中并没有具体数据库的实现,而是定义了一套接口规范,要求 DB 的 driver 实现必须遵循。Driver 由第三方实现,官方维护了一套 driver 列表。这篇文章描述 database/sql 中的接口和机制。

顶层对象 sql.DB

sql.DB 对象通过 sql.Open() 函数获得:

db, err := sql.Open("pgx", connectionString)

它不代表一个数据库连接;它仅代表一个数据库实体,不管它是通过 TCP 连接的,还是本地文件亦或放在内存中。它的职责是:

  • 通过数据库 driver 打开和关闭与数据库的连接
  • 维护连接池

实际的连接会 lazily 发生在有查询等操作时。

设计文档中提到:

Handle concurrency well. Users shouldn’t need to care about the database’s per-connection thread safety issues (or lack thereof), and shouldn’t have to maintain their own free pools of connections. The ‘sql’ package should deal with that bookkeeping as needed. Given an *sql.DB, it should be possible to share that instance between multiple goroutines, without any extra synchronization.

sql.DB 并不是设计用来频繁 Open()Close() 的。它屏蔽了底层连接管理,使得用户不用去自己维护连接池;并且可以在不同的 go routine 间共享,而不需要使用额外的同步机制。

DB 的 接口文档 提到:

DB is a database handle representing a pool of zero or more underlying connections. It’s safe for concurrent use by multiple goroutines.

The sql package creates and frees connections automatically; it also maintains a free pool of idle connections. If the database has a concept of per-connection state, such state can be reliably observed within a transaction (Tx) or connection (Conn). Once DB.Begin is called, the returned Tx is bound to a single connection. Once Commit or Rollback is called on the transaction, that transaction’s connection is returned to DB’s idle connection pool. The pool size can be controlled with SetMaxIdleConns.

Driver 的实现

Driver 如何注册自己

比如 jackc/pgx 库,使用它时需要 import

import (
    "database/sql"
    _ "github.com/jackc/pgx/v4/stdlib"
)

import 的过程中 pgx 会去调用 database/sql 接口去做注册:

// File: $HOME/go/pkg/mod/github.com/jackc/pgx/v4@v4.10.0/stdlib/sql.go
func init() {
    pgxDriver = &Driver{
        configs: make(map[string]*pgx.ConnConfig),
    }
    fakeTxConns = make(map[*pgx.Conn]*sql.Tx)
    sql.Register("pgx", pgxDriver)
    // ...
}

如何选择 database/sql 接口与 driver 自定义的接口

Driver 往往也自己定义了一套接口,而且一般会比 database/sql 功能更多或者性能更佳。

如果你需要一套代码对接多种数据库,同时不需要用到 driver 自己接口提供的额外能力时,建议只用 database/sql 的接口。这也减少了学习成本。

简单代码

package main

import (
    "database/sql"
    "fmt"
    _ "github.com/jackc/pgx/v4/stdlib"
    "log"
    "net/url"
    "os"
)

func main() {
    u := &url.URL{
        Scheme: "postgresql",
        User:   url.UserPassword("squirrel", "iloveice"),
        Host:   "localhost",
        Path:   "squirrel",
    }

    db, err := sql.Open("pgx", u.String())
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    var greeting string
    err = db.QueryRow("select 'Hello, world!'").Scan(&greeting)
    if err != nil {
        fmt.Fprintf(os.Stderr, "QueryRow failed: %v\n", err)
        os.Exit(1)
    }

    fmt.Println(greeting)
}

查询请求

var (
    id int
    name string
)
rows, err := db.Query("select id, name from users where id = ?", 1)
if err != nil {
    log.Fatal(err)
}
defer rows.Close()

err = rows.Err()
if err != nil {
    log.Fatal(err)
}

for rows.Next() {
    err := rows.Scan(&id, &name)
    if err != nil {
        log.Fatal(err)
    }
    log.Println(id, name)
}

关键点:

  • db.Query 提交查询语句
  • rows.Next()rows.Scan() 配合遍历和读取结果
  • rows.Err() 可以在未读取数据的前提下得到查询错误,但它只能得到类似数据库 server 返回的错误。对于 sql.ErrNoRows,如果你还没有 Scan(),这个方法不会返回错误;如果 Scan() 了,它也会返回错误。如果 rows.Err() 不为 nil,调用 rows.Scan() 也会抛出
  • rows.Close() 会使得当前数据库请求使用的连接被释放回连接池。如果你用 rows.Next() 遍历完集合,rows.Close() 会被隐式地调用;但是假如没有遍历完,那么 defer rows.Close() 能保证函数结束后有调用。在已经是 close 的情况下再调用 rows.Close() 也不会出错

如果 db.Query 中有多个 SELECT 语句,可以用 rows.NextResultSet()

Rows.Scan()文档)是其中比较复杂的部分。它的功能比较多样,可以把一个 SQL 类型值转换为 Go 的类型值。特别的一点是,如果 SQL 中是字符串(比如 VARCHAR),而且字符串存的是数字,你可以把它 scan 进一个数字类型,Go 会帮你做这个转换(比如 strconv.ParseInt())。

Prepared 查询

stmt, err := db.Prepare("select id, name from users where id = ?")
if err != nil {
    log.Fatal(err)
}
defer stmt.Close()
rows, err := stmt.Query(1)
if err != nil {
    log.Fatal(err)
}
defer rows.Close()
for rows.Next() {
    // ...
}
if err = rows.Err(); err != nil {
    log.Fatal(err)
}

不同数据库的 prepare 语句中的点位符不一样:

MySQL               PostgreSQL            Oracle
=====               ==========            ======
WHERE col = ?       WHERE col = $1        WHERE col = :col
VALUES(?, ?, ?)     VALUES($1, $2, $3)    VALUES(:val1, :val2, :val3)

单行查询

var name string
err = db.QueryRow("select name from users where id = ?", 1).Scan(&name)
if err != nil {
    if err == sql.ErrNoRows {
        // there were no rows, but otherwise no error occurred
    } else {
        log.Fatal(err)
    }
}
fmt.Println(name)
stmt, err := db.Prepare("select name from users where id = ?")
if err != nil {
    log.Fatal(err)
}
defer stmt.Close()
var name string
err = stmt.QueryRow(1).Scan(&name)
if err != nil {
    if err == sql.ErrNoRows {
        // there were no rows, but otherwise no error occurred
    } else {
        log.Fatal(err)
    }
}
fmt.Println(name)

查询的错误是在 Scan() 处抛出的,而不是 QueryRow()

修改数据

INSERT, UPDATE, DELETE 这种不返回纪录的语句,用 Exec() 来执行:

_, err := db.Exec("DELETE FROM users")  // OK

搭配 prepared statement:

stmt, err := db.Prepare("INSERT INTO users(name) VALUES(?)")
if err != nil {
    log.Fatal(err)
}
res, err := stmt.Exec("Dolly")
if err != nil {
    log.Fatal(err)
}

lastId, err := res.LastInsertId()
if err != nil {
    log.Fatal(err)
}
rowCnt, err := res.RowsAffected()
if err != nil {
    log.Fatal(err)
}
log.Printf("ID = %d, affected = %d\n", lastId, rowCnt)

不要用 db.Query() 来做这类操作,它会返回 Rows 结构,会期望你调用 Rows.Close() 释放连接。但这种情况下可能会忘掉这个调用。

事务

db, err := sql.Open("pgx", u.String())
if err != nil {
    log.Fatal(err)
}
defer db.Close()

tx, err := db.Begin()
if err != nil {
    log.Fatal(err)
}
// 这是一个 trick。一般函数实现中,当出错时就 return err 给调用方了,
// 此时 `tx.Rollback()` 就会被自动调用。虽然这个例子中用了 `log.Fatal()` 
// 而不是 return err。如果一个 tx 已经 Commit,那调用 Rollback 时会返回
// ErrTxDone。忽略即可。
defer tx.Rollback()

_, err = tx.Exec(`
CREATE TABLE films3 (
    code        char(5) PRIMARY KEY,
    title       varchar(40) NOT NULL,
);`)
if err != nil {
    log.Fatal(err)
}

err = tx.Commit()
if err != nil {
    log.Fatal(err)
}

事务的实现,是从 db.Begin() 获得一个 Tx 对象,以 tx.Commit()tx.Rollback() 结束。隔离级别由 driver 决定,或者由 db.BeginTx() 传参设置。

在事务中,不要使用 db.Exec() 去做修改操作,而应该使用 tx.Exec()db.Exec() 做的操作不在事务范围内。

错误处理

各接口返回的 err 有错误信息。但具体的错误码跟 driver 是绑定的,database/sql 不做规范。比如:

if driverErr, ok := err.(*mysql.MySQLError); ok {
    if driverErr.Number == mysqlerr.ER_ACCESS_DENIED_ERROR {
        // Handle the permission-denied error
    }
}

查询单行数据时,有一个特殊的错误,用来判断有没有查到数据:

var name string
err = db.QueryRow("select name from users where id = ?", 1).Scan(&name)
if err != nil {
    if err == sql.ErrNoRows {
        // there were no rows, but otherwise no error occurred
    } else {
        log.Fatal(err)
    }
}
fmt.Println(name)

当出现连接错误时,database/sql 会自动重试(重试一次,一共试两次)。次数定义在 database/sql/sql.go 中的 maxBadConnRetries

处理 null

当某一列的数值可能是 null 时,你要用 sql 包中对应的 Null 类型:

for rows.Next() {
    var s sql.NullString
    err := rows.Scan(&s)
    // check err
    if s.Valid {
       // use s.String
    } else {
       // NULL value
    }
}

Null 类型并不是很多,比如没有 sql.NullUint64。如果需要,你可以拷贝 sql 包中的代码做类似的实现。

另外的方法是,通过 SQL 中的函数(比如 COALESCE):

rows, err := db.Query(`
    SELECT
        name,
        COALESCE(other_field, '') as otherField
    WHERE id = ?
`, 42)

for rows.Next() {
    err := rows.Scan(&name, &otherField)
    // ..
    // If `other_field` was NULL, `otherField` is now an empty string. This works with other data types as well.
}

注意事项

批量操作database/sql 还不支持批量操作。有一个 issue 在关注这个问题。

可能会导致资源耗尽的操作

  • 频繁地 sql.Open()db.Close(),会导致单个 sql.DB 维持的连接池无法被复用,可能导致机器的网络资源被消耗完(比如一堆 TCP 连接保持在 TIME_WAIT 状态)
  • 查询数据时,没有遍历完所有行,也没有调用 rows.Close(),导致连接无法返回连接池
  • 使用 Query() 去做 INSERT, UPDATE, DELETE 等操作,却不对返回的 rowsClose() 操作。这种情况下应该用 Exec() 代替 Query()
  • Prepared statements 在数据库层面,需要 driver 跟 DB server 有三次交互。频繁的使用 prepared statement,配合 sql 包本身的重试机制,可能会引起问题(但我觉得问题不大)

单次执行多条语句时连接复用问题:当多次请求(Query() / Exec())时,如果不同语句之间有上下依赖关系(比如 MySQL 中,经常有先使用 USE 语句,再做其他操作的),要注意不同语句执行时是否使用的是同个数据库连接:

  • 在事务内用 Tx 对象发起的请求,使用同个连接
  • 不在事务中时,
    • 直接用 DB 对象发起的请求,每次请求都随机使用的连接池中的连接,不保证是同样的
    • 需要保证同一连接的话,使用 db.Conn() 来获得一个连接
conn, err := db.Conn(ctx)
if err != nil {
    log.Fatal(err)
}
defer conn.Close() // Return the connection to the pool.
id := 41
result, err := conn.ExecContext(ctx, `UPDATE balances SET balance = balance + 10 WHERE user_id = ?;`, id)
if err != nil {
    log.Fatal(err)
}

没有多语句支持:这样的语句的效果是不确定的,看 DB server 如何处理:

_, err := db.Exec("DELETE FROM tbl1; DELETE FROM tbl2") // Error/unpredictable result

它可能报错,也可能不报错;可能两条语句都被执行,也可能只执行一条。

事务内 rows 未被关闭时,连接无法复用:比如这样的代码:

tx, err := db.Begin()
rows, err := tx.Query("select * from tbl1") // Uses tx's connection
for rows.Next() {
    err = rows.Scan(&myvariable)
    // ERROR! tx's connection is already busy!
    tx.Query("select * from tbl2 where id = ?", myvariable)
}

rows 没有被遍历完,或者没有调用 rows.Close() 之前,你是无法使用 tx 做新的查询的,因为连接还没有被释放回去。

即使不在事务中,如果你在循环内用 sql.DB 做一次新的查询,也无法复用上一次查询的连接:

rows, err := db.Query("select * from tbl1") // Uses connection 1
for rows.Next() {
    err = rows.Scan(&myvariable)
    // The following line will NOT use connection 1, which is already in-use
    db.Query("select * from tbl2 where id = ?", myvariable)
}

参考