Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

1.引子

前几天一个同事在MySQL实例上执行一些数据处理,程序大致如下:

modify1.go

package main

import (
    "context"
    "database/sql"
    "fmt"
    _ "github.com/go-sql-driver/mysql"
    "log"
    "time"
)

func main() {
    db, err := sql.Open("mysql",
        "testdb_user:12345678@tcp(192.168.2.100:25037)/testdb?charset=utf8&loc=Asia%2FShanghai&parseTime=true")
    if err != nil {
        fmt.Println("===0===", err)
    }
    defer db.Close()

    tx, err := db.Begin()
    if err != nil {
        fmt.Println("===0===", err)
    }

  // 第1个SQL
    rows, err := tx.QueryContext(context.Background(), "select mtime, money from test where money > ?", 1)
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()
    // 迭代查询结果
    for rows.Next() { 
        var mtime time.Time
        var money int
        if err := rows.Scan(&mtime, &money); err != nil {
            // Check for a scan error.
            // Query rows will be closed with defer.
            log.Fatal(err)
        }
        log.Println(mtime, money)
        // 第2个SQL
        result, err := tx.ExecContext(context.Background(),
            "insert into test2(`mtime`, `money`) values (?, ?)", mtime, money)
        if err != nil {
            log.Fatal(err)
        }
        rowsAffected, _ := result.RowsAffected()
        lastInsertIdresult, _ := result.LastInsertId()
        log.Println(rowsAffected, lastInsertIdresult)
    }
    tx.Commit()
}

报错信息如下:

2023/06/18 16:40:02 2016-03-06 01:00:00 +0800 CST 2
[mysql] 2023/06/18 16:40:02 packets.go:446: busy buffer
[mysql] 2023/06/18 16:40:02 connection.go:173: bad connection
2023/06/18 16:40:02 driver: bad connection

2. 错误原因

modify1.go 创建了一个事务,事务执行的逻辑是在test表中中执行了一个查询语句(第1个SQL),然后迭代查询结果,将数据插入到表test2中。

错误产生的原因是:

一个事务在它的执行期间,只会与一个连接绑定。如果在事务执行期间,连接断开,事务就会自动回滚。

为了方便写入Query和处理Result, mysqlConn连接引入一个buffer, 这个buffer类似于bufio.Reader / Writer 。

type mysqlConn struct {
    buf              buffer  // 就是这个
    netConn          net.Conn
    rawConn          net.Conn // underlying connection when netConn is TLS connection.
    affectedRows     uint64
    insertId         uint64
    ...
}

A buffer which is used for both reading and writing.
This is possible since communication on each connection is synchronous.
In other words, we can’t write and read simultaneously on the same connection.

type buffer struct {
    buf     []byte // buf is a byte buffer who's length and capacity are equal.
    nc      net.Conn  // 实际的网络连接
    ...
}
  • 读取和写入的缓冲区都是这个buffer。

  • 这是可能的,因为每个连接上的通信都是同步的

  • 换句话说,我们不能在同一个连接上同时写入和读取。

从设计上讲,一个连接中上的SQL执行,只能是串行执行,SQL1执行完成之后,再执行SQL2。

SQL1:send Query
SQL1:parse Result
SQL2:send Query
SQL2:parse Result

如果SQL1的结果如果没有读干净,那么会被当做SQL2的执行结果去解析,这必然引起程序错乱。

这种情况非常类似于TCP网络传输中的”粘包问题”。

为避免这个问题,mysql client sdk在执行下一条SQL语句时,会先检查缓冲区是否为空,如果不为空,则一定有SQL交错执行的问题。

此时抛出异常

    ErrBusyBuffer        = errors.New("busy buffer")

3. 如何避免

3.1 方法1

将多条SQL放在不同的连接中执行

func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)
func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error)

modify2.go

package main

import (
    "context"
    "database/sql"
    "fmt"
    _ "github.com/go-sql-driver/mysql"
    "log"
    "time"
)

func main() {
    db, err := sql.Open("mysql",
        "testdb_user:12345678@tcp(192.168.2.100:25037)/testdb?charset=utf8&loc=Asia%2FShanghai&parseTime=true")
    if err != nil {
        fmt.Println("===0===", err)
    }
    defer db.Close()

    rows, err := db.QueryContext(context.Background(), "select mtime, money from test where money > ?", 1)
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()
    for rows.Next() {
        var mtime time.Time
        var money int
        if err := rows.Scan(&mtime, &money); err != nil {
            // Check for a scan error.
            // Query rows will be closed with defer.
            log.Fatal(err)
        }
        log.Println(mtime, money)
        //db.ExecContext()
        result, err := db.ExecContext(context.Background(),
            "insert into test2(`mtime`, `money`) values (?, ?)", mtime, money)
        if err != nil {
            log.Fatal(err)
        }
        rowsAffected, _ := result.RowsAffected()
        lastInsertIdresult, _ := result.LastInsertId()
        log.Println(rowsAffected, lastInsertIdresult)
    }
}

3.2 方法2

在一个事务内部,先执行完第1条语句,再执行其它语句

modify3.go

package main

import (
    "context"
    "database/sql"
    "fmt"
    _ "github.com/go-sql-driver/mysql"
    "log"
    "time"
)

type Record struct {
    Money int
    Mtime time.Time
}

func main() {
    db, err := sql.Open("mysql",
        "testdb_user:12345678@tcp(192.168.2.100:25037)/testdb?charset=utf8&loc=Asia%2FShanghai&parseTime=true")
    if err != nil {
        fmt.Println("===0===", err)
    }
    defer db.Close()

    tx, err := db.Begin()
    if err != nil {
        fmt.Println("===0===", err)
    }

    rows, err := tx.QueryContext(context.Background(), "select mtime, money from test where money > ?", 1)
    if err != nil {
        log.Fatal(err)
    }

    defer rows.Close()
    records := make([]Record, 0)
    for rows.Next() {
        var mtime time.Time
        var money int
        if err := rows.Scan(&mtime, &money); err != nil {
            log.Fatal(err)
        }
        log.Println(mtime, money)
        records = append(records, Record{Mtime: mtime, Money: money})
    }

    for _, record := range records {
        //db.ExecContext()
        result, err := tx.ExecContext(context.Background(),
            "insert into test2(`mtime`, `money`) values (?, ?)", record.Mtime, record.Money)
        if err != nil {
            log.Fatal(err)
        }
        rowsAffected, _ := result.RowsAffected()
        lastInsertIdresult, _ := result.LastInsertId()
        log.Println(rowsAffected, lastInsertIdresult)
    }
    tx.Commit()
}

参考资料

1.MySQL:网络断开后执行的事务怎么处理

2.如果MySQL事务中发生了网络异常

3.MySQL的SQL预处理(Prepared)

4.Socket粘包问题的3种解决方案

5.怎么解决TCP网络传输「粘包」问题?


微信公众号

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据