GRPC-go 客户端组件介绍-balancer

GRPC 客户端组件涉及服务发现组件,负载均衡组件,以及客户端本身;负载组件和客户端为相互关联关系。balancer的组成有两部分:baseBalancer 和picker;baseBalancer 管理保存当前可用是conn,baseBalancer根据当前可用链接生成picker,picker则实现了具体的负载算法;

0x01 balancer

整体的类图如下:

1.1 ClientConn

6大编程原则中有一条为接口隔离原则。而GRPC client 在resolver 和balancer 中分开定义了两个ClientConn 接口;在隔离情况下,分别从resolver的视角和balancer的视角去认知了ClientConn; balancer 和resolver 并不直接交互,而是两个组件通过client来协调通信。 下面是balancer视角下ClientConn 接口的定义,下面的方法是balancer 需要ClientConn 提供的能力

//${GRPC_PROJECT}/balancer/balancer.go
//State 对象是 balancer 和 ClienConn  交互通信的数据对象
type State struct {
	ConnectivityState connectivity.State
	Picker Picker
}

//balancer 视角下的clientConn 接口,需要ClientConn提供下面接口的能力来支持balancer工作
type ClientConn interface {
	//balancer在获得新的地址解析结果后调用ClientConn增加新的链接
	NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
	//balancer调用Client 移除已经失效的链接
	RemoveSubConn(SubConn)
	//balancer 调用ClientConn更新新的picker对象
	UpdateState(State)
	//balancer 告诉ClientConn重新解析地址
	ResolveNow(resolver.ResolveNowOptions)
	//目标客户端名称(该接口已经弃用)
	Target() string
}

//${GRPC_PROJECT}/balancer_conn_wrappers.go
//ccBalancerWrapper 实现上面的ClientConn 接口来,并和具体的balancer关联,
//ccBalancerWrapper 聚合了ClientConn 和balancer
type ccBalancerWrapper struct {
	cc         *ClientConn
	balancerMu sync.Mutex // synchronizes calls to the balancer
	//关联的balancer
	balancer   balancer.Balancer
	//scBuffer 用于ClientConn 在通信过程中感知链接发生变化的时候通知
	//balancer更新链接状态
	scBuffer   *buffer.Unbounded
	done       *grpcsync.Event

	mu       sync.Mutex
	subConns map[*acBalancerWrapper]struct{}
}

1.2 balancer

下面是balancer的接口定义,ClientConn需要balancer提供下面接口的能力来支持ClientConn 进行负载

//${GRPC_PROJECT}/balancer/balancer.go
type Balancer interface {
	//client 调用,更新链接的状态,如果返回ErrBadResolverState
	//client 会立即调用ResolveNow 重新解析地址
	UpdateClientConnState(ClientConnState) error
	//ClientConn 告诉balancer解析地址发生错误
	ResolverError(error)
	//上面两个接口是地址解析相关的,下面是单个网络链接相关接口。
	//ClientConn 某个链接状态
	UpdateSubConnState(SubConn, SubConnState)
	//关闭连接
	Close()
}
//${GRPC_PROJECT}/balancer/base/balancer.go
//下面是基础balancer的定义
type baseBalancer struct {
	//关联的rpc 客户端
	cc            balancer.ClientConn
	//选择器的构建对象
	pickerBuilder PickerBuilder

	csEvltr *balancer.ConnectivityStateEvaluator
	state   connectivity.State
	//当前解析地址和对应的链接map
	subConns map[resolver.Address]balancer.SubConn
	//负载的链接和其状态map
	scStates map[balancer.SubConn]connectivity.State
	//当前的选择器
	picker   balancer.Picker
	config   Config

	resolverErr error // the last error reported by the resolver; cleared on successful resolution
	connErr     error // the last connection error; cleared upon leaving TransientFailure
}



//根据当前的链接情况,生成一个负载选择器
//调用regeneratePicker的函数通过balancer中定义的ClientConn 接口实现对象(balancerWrapper)传递负载选中器到ClientConn 对象中,再包装成pickerWrapper 对象
func (b *baseBalancer) regeneratePicker() {
	if b.state == connectivity.TransientFailure {
		b.picker = NewErrPicker(b.mergeErrors())
		return
	}
	readySCs := make(map[balancer.SubConn]SubConnInfo)

	// Filter out all ready SCs from full subConn map.
	for addr, sc := range b.subConns {
		if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
			readySCs[sc] = SubConnInfo{Address: addr}
		}
	}
	b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
}

1.3 picker

//${GRPC_PROEJCT}/balancer/balancer.go
type Picker interface {
	//不同的负载算法实现Pick方法,返回负载选择结果
	Pick(info PickInfo) (PickResult, error)
}
//下面是轮询负载器的实现
func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
	p.mu.Lock()
	sc := p.subConns[p.next]
	p.next = (p.next + 1) % len(p.subConns)
	p.mu.Unlock()
	return balancer.PickResult{SubConn: sc}, nil
}

0x02 串连client,resolver,balancer

4.1 Client

${GRPC_PROJECT}/clientconn.go

//省略了部分代码,只列出关键对象
type ClientConn struct {
 //链接状态管理器
	csMgr        *connectivityStateManager
	//链接选择器
	blockingpicker    *pickerWrapper
	//地址解析器
	resolverWrapper *ccResolverWrapper
	//当前链接集合
	conns           map[*addrConn]struct{}
	//负载均衡生成器
	balancerWrapper *ccBalancerWrapper
}
//下面的结构体描述了实实在在的一条网络连接
type addrConn struct {
	//关联的具体客户端
	cc     *ClientConn
	生成的balancer.SubConn 对象
	acbw   balancer.SubConn
	//关联的传输层对象
	transport transport.ClientTransport // The current transport.
	//当前链接的地址
	curAddr resolver.Address   // The current address.
	//所有解析到的地址
	addrs   []resolver.Address // All addresses that the resolver resolved 
	//链接的状态
	state connectivity.State
}

2.1 ClientConn 的构造

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
	cc := &ClientConn{
		target:            target,
		csMgr:             &connectivityStateManager{},
		conns:             make(map[*addrConn]struct{}),
		dopts:             defaultDialOptions(),
		blockingpicker:    newPickerWrapper(), //链接选择器pickerWrapper
		czData:            new(channelzData),
		firstResolveEvent: grpcsync.NewEvent(), //首次解析地址事件
	}
	
	// Determine the resolver to use.
	cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
	//获得解析器构造类对象,找不到配置的,就使用默认的resolver构造器
	resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
	if resolverBuilder == nil {
		cc.parsedTarget = resolver.Target{
			Scheme:   resolver.GetDefaultScheme(),
			Endpoint: target,
		}
		resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
		if resolverBuilder == nil {
			return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
		}
	}
	//填充balancer构造的参数信息
	cc.balancerBuildOpts = balancer.BuildOptions{
		DialCreds:        credsClone,
		CredsBundle:      cc.dopts.copts.CredsBundle,
		Dialer:           cc.dopts.copts.Dialer,
		ChannelzParentID: cc.channelzID,
		Target:           cc.parsedTarget,
	}
	// Build the resolver.
	rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) 
	if err != nil {
		return nil, fmt.Errorf("failed to build resolver: %v", err)
	}

2.2 balancer 的构造和 client,resolver,balancer三者间的协作

接上一篇讲resolver的时候,resolver在起来后会立即去解析地址,在解析到地址信息后会通过ccResolverWrapper对象的UpdateState(s resolver.State)方法想ClientConn传递地址信息。ccResolverWrapper 正是通过ClientConn 下面的方法更新地址信息。

func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
//当发生地址解析错误的时候ClientConn 会告知balancer地址解析出错了。
if err != nil {
		cc.maybeApplyDefaultServiceConfig(nil)
		if cc.balancerWrapper != nil {
			cc.balancerWrapper.resolverError(err)
		}
		cc.mu.Unlock()
		return balancer.ErrBadResolverState
	}
//简化部分代码,如果cc.balancerWrapper 为空,则会调用下的方法创建ccBalancerWrapper 对象。
//func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address){} 

bw := cc.balancerWrapper
cc.mu.Unlock()
//下面的句子则是最终会调用到balancer的UpdateClientConnState(ClientConnState) error 方法告诉balancer 解析到新的地址了
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
	if ret == nil {
		ret = uccsErr // prefer ErrBadResolver state since any other error is
		// currently meaningless to the caller.
	}

2.3 balancer.SubConn 对象构建

通过上面的小节,我们串联了地址的解析到balancer更新新的地址信息,下面就来看看如何生成一个网络通信连接

//${GRPC_PROJECT}/balancer/base/balancer.go
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error{
	//省略...
	sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
			if err != nil {
				logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
				continue
			}
			b.subConns[a] = sc
			b.scStates[sc] = connectivity.Idle
			sc.Connect() //如果改地址没有subConn 对象,则创建一个,并建立连接
}
//省略...

从上面的代码我们可以看到balancer 通过ClientConn 创建了连接对象。并且调用链接对象的Connect方法发起网络链接。

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
	//省略... 创建新的链接对象
	ac, err := ccb.cc.newAddrConn(addrs, opts)
	if err != nil {
		return nil, err
	}
	acbw := &acBalancerWrapper{ac: ac}
	acbw.ac.mu.Lock()
	ac.acbw = acbw
	acbw.ac.mu.Unlock()
	ccb.subConns[acbw] = struct{}{}
	return acbw, nil
}
//最终调用到ClientConn 结构体的newAddrconn 方法,创建addrConn 对象。
//func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error){}
//addrConn 会去管理链接,包括建立链接,状态管理

2.4 链接状态的更新

上面小节提到了AddrConn 会管理链接的状态,下面看看状态是如何传递到balancer的。 //${GRPC_PROJEC}/clientconn.go

// Note: this requires a lock on ac.mu.
//addrConn的connecting/ready/shutdown/TransientFailure 状态都会通过下面的方法
//调用ClientConn的方法,最后传递到balancer对象。
func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
	//省略...
	ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
}

0x03 总结

  1. ClientConn 对象并不直接和resolver 和 balancer 直接关联,而是通过对应的warpper 包装,warpper 聚合了ClientConn 和 balancer,并实现对应的resolver 视角下的ClientConn接口。
  2. resolver 解析到地址后将地址信息通过 ClientConn 将地址信息传递给balancer 对象,balancer对象根据 地址信息移除或增加新的链接
  3. addrConn 会自己管理链接状态,并将状态传递给balancer。
  4. picker 就不具体讲了,pickerWrapper 包了一个blockingCh ,阻塞选择器,只有在有链接的情况下才去pick