GRPC 客户端组件涉及服务发现组件,负载均衡组件,以及客户端本身;服务发现组件和客户端为相互关联关系。
0x01 resolver
服务发现组件和客户端的UML 类图如下:
在GRPC-go resolver组件代码中($projectPath/resolver/resolver.go) 关键代码如下:
type Address struct{
Addr string
ServerName string
Attributes *attributes.Attributes
Metadata interface{}
}
//resolver对象的输出,ClientConn对象的输入
type State struct{
Addresses []Address
}
//ClientConn 接口定义
type ClientConn interface{
//客户端地址更新接口
UpdateState(State)
}
从上述代码可以看到,client通过UpdateState 方法更新地址信息;
type Resolver interface{
ResolveNow(ResolveNowOptions)
}
type Builder interface{
Build(target Target, cc ClientConn, opts BuildOptions)(Resolver,error)
}
通过上述代码,当前可知,在构建resolver对象的时候将ClientConn对象注入到了 resolver对象中。这样,resolver在解析到地址的时候,通过client的UpdateState方法将State 输入到client中。 通过($projectPath/internal/resolver/dns_resolver.go)dns resolver代码进行验证:
func(b *dnsBuilder)Build(targe resolver.Targe,cc resolver.ClientConn,opts resolver.BuildOptions)(resolver.Resolver, error){
//...
//dnsResolver在构造的时候包含resolver.ClientConn对象
d :=&dnsResolver{
host:host,
port:port,
ctx:ctx,
cancel:cancel,
cc:cc,
rn:make(chan struct{},1),
disableServiceConfig:opts.DisableServiceConfig,
}
//...
}
func(d *dnsResolver)watcher(){
defer d.wg.Done()
for {
select {
case <-d.ctx.Done():
return
case <-d.rn:
}
state, err := d.lookup()
if err != nil {
d.cc.ReportError(err)
} else {
//调用con 对象,更新地址列表
d.cc.UpdateState(*state)
}
// Sleep to prevent excessive re-resolutions. Incoming resolution requests
// will be queued in d.rn.
t := time.NewTimer(minDNSResRate)
select {
case <-t.C:
case <-d.ctx.Done():
t.Stop()
return
}
}
}
从上面的代码可以看到,resolver对象正是通过client的UpdateState方法将地址信息输入到client对象中。 client 侧是如何实现Resolver.ClientConn方法的呢?通过阅读$projectPath/resolver_conn_wrapper.go 我们可以看到client侧的实现
//client resolver 胶水层代码,聚合处理client的服务发现功能。粘接ClientConn 和resolver.Resolver 对象。
type ccResolverWrapper struct{
cc *ClientConn
resolverMu sync.Mutex
resolver resolver.Resolver
Done *grpcsync.Event
curState resolver.State
pollingMu sync.Mutex
polling chan struct{}
}
//构造ccResolverWrapper 胶水层时,将ClientConn 和解析器构造器一起注入。
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder)(ccResolverWrapper,error)
//该方法触发服务发现,立即进行地址解析。
func(ccr *ccResolverWrapper)ResolveNow(o resolver.ResolverNowOptions){
//...
//触发解析器立即解析。
ccr.resolver.ResolverNow(o)
//...
}
//通过调用ClientConn 的updateResolverState 方法完成地址更新
func(ccr *ccResolverWrapper)UpdateState(s resolver.State){
ccr.poll(ccr.cc.updateResolverState(ccr.curState,nil))
}
总结
1.resolve.Address 对象是resolver解析服务最小单元地址信息。
2.resolve.State 对象是client和resolver通信的对象,resolver通过client.UpdateState方法更新地址信息,client 通过resolver.ResolveNow触发服务发现。
3.resolver 和 ccResolverWrapper 为关联关系。
4.grpc的client通过 ccResolverWrapper 胶水对象聚合了client的服务发现逻辑。