GRPC 客户端组件涉及服务发现组件,负载均衡组件,以及客户端本身;负载组件和客户端为相互关联关系。balancer的组成有两部分:baseBalancer 和picker;baseBalancer 管理保存当前可用是conn,baseBalancer根据当前可用链接生成picker,picker则实现了具体的负载算法;
0x01 balancer
整体的类图如下:
1.1 ClientConn
6大编程原则中有一条为接口隔离原则。而GRPC client 在resolver 和balancer 中分开定义了两个ClientConn 接口;在隔离情况下,分别从resolver的视角和balancer的视角去认知了ClientConn; balancer 和resolver 并不直接交互,而是两个组件通过client来协调通信。 下面是balancer视角下ClientConn 接口的定义,下面的方法是balancer 需要ClientConn 提供的能力
//${GRPC_PROJECT}/balancer/balancer.go
//State 对象是 balancer 和 ClienConn 交互通信的数据对象
type State struct {
ConnectivityState connectivity.State
Picker Picker
}
//balancer 视角下的clientConn 接口,需要ClientConn提供下面接口的能力来支持balancer工作
type ClientConn interface {
//balancer在获得新的地址解析结果后调用ClientConn增加新的链接
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
//balancer调用Client 移除已经失效的链接
RemoveSubConn(SubConn)
//balancer 调用ClientConn更新新的picker对象
UpdateState(State)
//balancer 告诉ClientConn重新解析地址
ResolveNow(resolver.ResolveNowOptions)
//目标客户端名称(该接口已经弃用)
Target() string
}
//${GRPC_PROJECT}/balancer_conn_wrappers.go
//ccBalancerWrapper 实现上面的ClientConn 接口来,并和具体的balancer关联,
//ccBalancerWrapper 聚合了ClientConn 和balancer
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
//关联的balancer
balancer balancer.Balancer
//scBuffer 用于ClientConn 在通信过程中感知链接发生变化的时候通知
//balancer更新链接状态
scBuffer *buffer.Unbounded
done *grpcsync.Event
mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
}
1.2 balancer
下面是balancer的接口定义,ClientConn需要balancer提供下面接口的能力来支持ClientConn 进行负载
//${GRPC_PROJECT}/balancer/balancer.go
type Balancer interface {
//client 调用,更新链接的状态,如果返回ErrBadResolverState
//client 会立即调用ResolveNow 重新解析地址
UpdateClientConnState(ClientConnState) error
//ClientConn 告诉balancer解析地址发生错误
ResolverError(error)
//上面两个接口是地址解析相关的,下面是单个网络链接相关接口。
//ClientConn 某个链接状态
UpdateSubConnState(SubConn, SubConnState)
//关闭连接
Close()
}
//${GRPC_PROJECT}/balancer/base/balancer.go
//下面是基础balancer的定义
type baseBalancer struct {
//关联的rpc 客户端
cc balancer.ClientConn
//选择器的构建对象
pickerBuilder PickerBuilder
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
//当前解析地址和对应的链接map
subConns map[resolver.Address]balancer.SubConn
//负载的链接和其状态map
scStates map[balancer.SubConn]connectivity.State
//当前的选择器
picker balancer.Picker
config Config
resolverErr error // the last error reported by the resolver; cleared on successful resolution
connErr error // the last connection error; cleared upon leaving TransientFailure
}
//根据当前的链接情况,生成一个负载选择器
//调用regeneratePicker的函数通过balancer中定义的ClientConn 接口实现对象(balancerWrapper)传递负载选中器到ClientConn 对象中,再包装成pickerWrapper 对象
func (b *baseBalancer) regeneratePicker() {
if b.state == connectivity.TransientFailure {
b.picker = NewErrPicker(b.mergeErrors())
return
}
readySCs := make(map[balancer.SubConn]SubConnInfo)
// Filter out all ready SCs from full subConn map.
for addr, sc := range b.subConns {
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[sc] = SubConnInfo{Address: addr}
}
}
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
}
1.3 picker
//${GRPC_PROEJCT}/balancer/balancer.go
type Picker interface {
//不同的负载算法实现Pick方法,返回负载选择结果
Pick(info PickInfo) (PickResult, error)
}
//下面是轮询负载器的实现
func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
p.mu.Lock()
sc := p.subConns[p.next]
p.next = (p.next + 1) % len(p.subConns)
p.mu.Unlock()
return balancer.PickResult{SubConn: sc}, nil
}
0x02 串连client,resolver,balancer
4.1 Client
${GRPC_PROJECT}/clientconn.go
//省略了部分代码,只列出关键对象
type ClientConn struct {
//链接状态管理器
csMgr *connectivityStateManager
//链接选择器
blockingpicker *pickerWrapper
//地址解析器
resolverWrapper *ccResolverWrapper
//当前链接集合
conns map[*addrConn]struct{}
//负载均衡生成器
balancerWrapper *ccBalancerWrapper
}
//下面的结构体描述了实实在在的一条网络连接
type addrConn struct {
//关联的具体客户端
cc *ClientConn
生成的balancer.SubConn 对象
acbw balancer.SubConn
//关联的传输层对象
transport transport.ClientTransport // The current transport.
//当前链接的地址
curAddr resolver.Address // The current address.
//所有解析到的地址
addrs []resolver.Address // All addresses that the resolver resolved
//链接的状态
state connectivity.State
}
2.1 ClientConn 的构造
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(), //链接选择器pickerWrapper
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(), //首次解析地址事件
}
// Determine the resolver to use.
cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
//获得解析器构造类对象,找不到配置的,就使用默认的resolver构造器
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: target,
}
resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
}
}
//填充balancer构造的参数信息
cc.balancerBuildOpts = balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}
// Build the resolver.
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
2.2 balancer 的构造和 client,resolver,balancer三者间的协作
接上一篇讲resolver的时候,resolver在起来后会立即去解析地址,在解析到地址信息后会通过ccResolverWrapper对象的UpdateState(s resolver.State)方法想ClientConn传递地址信息。ccResolverWrapper 正是通过ClientConn 下面的方法更新地址信息。
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
//当发生地址解析错误的时候ClientConn 会告知balancer地址解析出错了。
if err != nil {
cc.maybeApplyDefaultServiceConfig(nil)
if cc.balancerWrapper != nil {
cc.balancerWrapper.resolverError(err)
}
cc.mu.Unlock()
return balancer.ErrBadResolverState
}
//简化部分代码,如果cc.balancerWrapper 为空,则会调用下的方法创建ccBalancerWrapper 对象。
//func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address){}
bw := cc.balancerWrapper
cc.mu.Unlock()
//下面的句子则是最终会调用到balancer的UpdateClientConnState(ClientConnState) error 方法告诉balancer 解析到新的地址了
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
// currently meaningless to the caller.
}
2.3 balancer.SubConn 对象构建
通过上面的小节,我们串联了地址的解析到balancer更新新的地址信息,下面就来看看如何生成一个网络通信连接
//${GRPC_PROJECT}/balancer/base/balancer.go
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error{
//省略...
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns[a] = sc
b.scStates[sc] = connectivity.Idle
sc.Connect() //如果改地址没有subConn 对象,则创建一个,并建立连接
}
//省略...
从上面的代码我们可以看到balancer 通过ClientConn 创建了连接对象。并且调用链接对象的Connect方法发起网络链接。
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
//省略... 创建新的链接对象
ac, err := ccb.cc.newAddrConn(addrs, opts)
if err != nil {
return nil, err
}
acbw := &acBalancerWrapper{ac: ac}
acbw.ac.mu.Lock()
ac.acbw = acbw
acbw.ac.mu.Unlock()
ccb.subConns[acbw] = struct{}{}
return acbw, nil
}
//最终调用到ClientConn 结构体的newAddrconn 方法,创建addrConn 对象。
//func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error){}
//addrConn 会去管理链接,包括建立链接,状态管理
2.4 链接状态的更新
上面小节提到了AddrConn 会管理链接的状态,下面看看状态是如何传递到balancer的。 //${GRPC_PROJEC}/clientconn.go
// Note: this requires a lock on ac.mu.
//addrConn的connecting/ready/shutdown/TransientFailure 状态都会通过下面的方法
//调用ClientConn的方法,最后传递到balancer对象。
func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
//省略...
ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
}
0x03 总结
- ClientConn 对象并不直接和resolver 和 balancer 直接关联,而是通过对应的warpper 包装,warpper 聚合了ClientConn 和 balancer,并实现对应的resolver 视角下的ClientConn接口。
- resolver 解析到地址后将地址信息通过 ClientConn 将地址信息传递给balancer 对象,balancer对象根据 地址信息移除或增加新的链接
- addrConn 会自己管理链接状态,并将状态传递给balancer。
- picker 就不具体讲了,pickerWrapper 包了一个blockingCh ,阻塞选择器,只有在有链接的情况下才去pick