Skip to content

Seate‐go data source proxy design

JayLiu edited this page Jan 20, 2024 · 1 revision

Seata 的4 种分布式事务解决方案,分别是 AT 模式、TCC 模式、Saga 模式和 XA 模式,这次我们来主要讲讲Seata中的AT模式。 AT 模式是一种无侵入的分布式事务解决方案。在 AT 模式下,用户只需关注自己的“业务 SQL”,用户的 “业务 SQL” 作为一阶段,Seata 框架会自动生成事务的二阶段提交和回滚操作。可以发现AT模式的特点,只需关注自己的业务sql。 对业务无入侵的一种分布式事务模式。Seata是怎么实现的呢?在Java体系中,Seata通过对用户的DataSource进行代理,将用户执行的相关SQL拦截并执行AT模式的相关逻辑。通过这种方式将AT模式具体逻辑对用户进行屏蔽,尽可能使用户保留原有的编码习惯。 image.png

那么在Golang体系下,我们如何设计,使得用户可以在Golang中体验到与Java一样的无侵入式的AT分布式事务模式?

Go的数据库Driver设计

Go 语言中,所有关系型数据库的客户端都需要实现如下所示的驱动接口:

type Driver interface {
	Open(name string) (Conn, error)
}

type Conn interface {
	Prepare(query string) (Stmt, error)
	Close() error
	Begin() (Tx, error)
}

database/sql/driver.Driver 接口中只包含一个 Open 方法,该方法接收一个数据库连接串作为输入参数并返回对应数据库的连接,这个返回的连接仍然是一个接口,整个database/sql/driver库中的全部接口可以构成如下所示的树形结构: Screenshot 2022-07-17 at 10.13.44 PM.png 我们可以看到,driver.Conn 是整个操作数据库的核心接口,而 driver.Conn 统一从 driver.Driver 创建出来,因此在Go体系下。要实现无侵入的AT模式的话,我们可以对 driver.Driver 接口进行代理,对 driver.Driver 创建出的 driver.Conn 实例统一返回Seata的代理 driver.Conn。

  • 针对 driver.Tx、driver.Stmt、driver.Conn 三个核心接口进行代理
  • 对于 Execer、Queryer、ConnPrepare 三类核心接口,做AOP逻辑,对方法执行前以及方法执行后,插入SQLHook,在SQLHook中实现AT模式下数据的前后镜像
  • 在 driver.Conn 开启事务之后,注入Seata内部的事务跟踪对象,记录整个事务过程中产生的所有数据镜像信息
  • 拦截 drvier.Tx 的 Commit 以及 Rollback,处理undolog的flush以及上报事务提交结果至TC-Server

因此,我们可以在Seata-Go中增加类似sql.Open的函数,然后返回一个被处理过的*sql.DB实例

// 打开数据库
func Open(driverName, dataSourceName string, opts ...seataOption) (*gosql.DB, error) {

	targetDriver := strings.ReplaceAll(driverName, "seata-", "")
    // 打开真实的数据库连接
	db, err := gosql.Open(targetDriver, dataSourceName)
	if err != nil {
		return nil, err
	}

	v := reflect.ValueOf(db)
	if v.Kind() == reflect.Ptr {
		v = v.Elem()
	}

    // 获取到 sql.DB 中的 connector 对象
	field := v.FieldByName("connector")

	connector, _ := GetUnexportedField(field).(driver.Connector)

	dbType := types.ParseDBType(targetDriver)
	if dbType == types.DBType_Unknown {
		return nil, errors.New("unsuppoer db type")
	}
    
    // 将用户的真实DB进行注册,同时并返回一个经过代理的 Connector
	proxy, err := regisResource(connector, dbType, db, dataSourceName, opts...)
	if err != nil {
		return nil, err
	}
    
    // 重写 sql.DB 中的 Connector 属性
	SetUnexportedField(field, proxy)
	return db, nil
}

通过上述的方式,我们将*sql.DB 中的 driver.Connector 成员实例替换为了Seata的 driver.Connector 代理对象。

AT模式下,SQL执行的流程

既然要执行AT模式,那么首先就需要用户触发一个开启事务的动作,当调用了 sql.DB.BeginTx 时,最终会调用到 driver.Conn.BeginTx,这个时候 Seata-Go 就会通过先前的 driver.Connector 代理对象,返回一个被代理过的 driver.Tx

func (c *Conn) Begin() (driver.Tx, error) {

	tx, err := c.conn.Begin()
	if err != nil {
		return nil, err
	}
    
    // 创建一个 事务上下文,仅在 driver 中流转,主要记录整个事务过程中涉及的一切
    // 1. 前后数据镜像
    // 2. 分支事务ID
    // 3. 资源ID
    // 4. SQL中涉及的 lockey 列表
    // 5. 事务ID
	c.txCtx = types.NewTxCtx()
	c.txCtx.DBType = c.res.dbType
	c.txCtx.TxOpt = driver.TxOptions{}

	return newTx(
		withDriverConn(c),
		withTxCtx(c.txCtx),
		withOriginTx(tx),
	)
}

当开启一个 driver.Tx 之后,则会有一个 driver.Conn 和当前这个 driver.Tx 进行绑定,因此接下来所有这个事务的操作,其发生的SQL都将被 Seata-Go 的代理 driver.Conn 对象所代理执行。 这里便举 driver.Conn 中执行 Exec 方法

// 执行用户的 SQL
func (c *Conn) Exec(query string, args []driver.Value) (driver.Result, error) {
	conn, ok := c.conn.(driver.Execer)
	if !ok {
		return nil, driver.ErrSkip
	}
    
    // 如果当前的 Conn 存在与一个事务对象中
	if c.txCtx != nil {
        // 首先根据 DB 类型以及 SQL 构建出对应的 Executor
		executor, err := exec.BuildExecutor(c.res.dbType, query)
		if err != nil {
			return nil, err
		}
        
        // 设置好当前SQL执行的上下文信息
        // 1. 事务上下文
        // 2. SQL语句
        // 3. SQL参数
		execCtx := &exec.ExecContext{
			TxCtx:  c.txCtx,
			Query:  query,
			Values: args,
		}
        
        // 将用户的SQL进行代理执行
		ret, err := executor.ExecWithValue(context.Background(), execCtx,
			func(ctx context.Context, query string, args []driver.Value) (types.ExecResult, error) {

				ret, err := conn.Exec(query, args)
				if err != nil {
					return nil, err
				}

				return types.NewResult(types.WithResult(ret)), nil
			})

		if err != nil {
			return nil, err
		}

		return ret.GetResult(), nil
	}

	return conn.Exec(query, args)

}

当前 executor执行用户的SQL操作时,首先会触发SQLHook的前置操作,当SQL之前完毕之后,又会触发SQLHook的后置逻辑

// Exec
func (e *BaseExecutor) ExecWithNamedValue(ctx context.Context, execCtx *ExecContext, f CallbackWithNamedValue) (types.ExecResult, error) {
    // 执行 SQLHook 的前置动作,可以通过这个Hook将AT模式所需要的前置镜像生产出来,
    // 并放入到 TransactionContext 中
	for i := range e.is {
		e.is[i].Before(ctx, execCtx)
	}

	defer func() {
		for i := range e.is {
			e.is[i].After(ctx, execCtx)
		}
	}()

	if e.ex != nil {
		return e.ex.ExecWithNamedValue(ctx, execCtx, f)
	}

	return f(ctx, execCtx.Query, execCtx.NamedValues)
}
Clone this wiki locally