涉及到网络通信的代码阅读套路
0x01 分层架构
1.1 架构定义
分层架构模式也叫做N层架构,分层架构模式中,每层都有特定的角色和职能。一般RPC 框架都是分层架构。例如GRPC,dubbo; RPC 框架可以分为transport 通信层,RPC方法路由层,业务service层。
1.2 上下层间的通信方式
分层架构中,上下层是需要通信的,一般上层和下层通信的时候,通过直接调用下层的方法即可。但是也有下层要主动需要和上层通信的情况,例如某个业务请求到了,或者某个事件发生了。下层和上层的通信主要有下面两种方式:
1.2.1 回调
上层可以通过想下层注册回调函数,类似于下层委托(函数指针)上层的方法处理请求。在下层事件发生或者网络包到达的时候调用上层的注册函数。
1.2.2 发布订阅模式
发布订阅模式可以基于观察者模式,消息队列的方式来实现;观察者模式可以立即调用订阅状态变化的对象方法,上下层有耦合。消息队列者可以解耦两者;上层者可以通过自己的实际的消息消费能力去处理消息,但是一般业务框架都需要立即响应请求,所以发布订阅模式RPC 框架中不业务请求中使用。
0x02 代码分层
通信层
通信层负责处理网络通信;责任为通过系统网络调用发起各种方式(TCP/HTTP/HTTPS)的网络包传输;如果用的TCP 则需要自己处理自定义的packet,需要校验包大小,是否收包完整;网络包一般分为head 和body, head会带一些token,方法路由信息,trace信息等公共使用的信息。如果使用的HTTP、HTTP2则有标准库来帮忙处理收包是否完整;TCP 自定义包协议格式推荐学习BaiduStd 协议格式。
路由层
路由层需要通过网络层解包后从头部信息中拿到方法信息。例如GRPC 其通信包头中含有消息需要路由到的服务名称和方法名称;这一层的路由非常简单,不会像网络路由那样复杂,直接通过map hash查找即可;一般情况下在RPC服务起来的时候,向server注入service对象,可以显示的方法绑定或者通过反射自动绑定。推荐反射,方式注入service方法,这样简单舒心,使用protobuffer,更是舒服,有协议编译工具直接生成桩代码;在网络层通知路由层消息到达后,路由层再回调service层对应的业务方法;路由层一般会负责网络body包的decoding 和 encoding,压缩和解压缩,还会有一些方法拦截器在路由层调用service层方法前调用。
业务层
业务层负责实现业务代码,响应客户端的请求。操作数据库数据等等;
0x03 GRPC 代码分层
3.1 transport 层
GRPC transport通信层代码分为client端和server端;下面说说server端的代码:
//${GRPC_PROJECT}/internal/transport/handler_server.go
//这就是transport对象
type serverHandlerTransport struct {
//....
}
//下面这个这个方法就是传输层回调上层-路由层的方法。func(*Stream) 就是回调的函数指针
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
// With this transport type there will be exactly 1 stream: this HTTP request.
}
3.2 router层
向server 注入方法路由
//${GRPC_PROJECT}/server.go
//服务描述有由protobuf通过service描述生成方法桩代码
//ServiceDesc信息就是来自桩代码
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
if s.serve {
logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
//
if _, ok := s.services[sd.ServiceName]; ok {
logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
//服务信息
info := &serviceInfo{
serviceImpl: ss,
//一元方法描述map
methods: make(map[string]*MethodDesc),
//流式方法描述map
streams: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
//填入一元方法描述
for i := range sd.Methods {
d := &sd.Methods[i]
info.methods[d.MethodName] = d
}
//填入流式方法描述
for i := range sd.Streams {
d := &sd.Streams[i]
info.streams[d.StreamName] = d
}
s.services[sd.ServiceName] = info
}
路由注入transport的回调方法: ${GRPC_PROJECT}/server.go
func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup
var roundRobinCounter uint32
//st 为servertransport对象,stream 则为transport层给上层回调函数的传参代表一次请求
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
//如果设置了工作池,则消息通过chan传递出去;
if s.opts.numServerWorkers > 0 {
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
default:
// If all stream workers are busy, fallback to the default code path.
go func() {
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
}()
}
} else {
//默认情况下直接go出协程处理
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}
路由查找对应的service方法: //${GRPC_PROJECT}/server.go
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
//.... 省略
service := sm[:pos]
method := sm[pos+1:]
srv, knownService := s.services[service]
if knownService {
if md, ok := srv.methods[method]; ok {
//一元请求消息
//1. 解压缩
//2. decode
//3. 通过下面的语句调用service层注册的service方法。
//reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.streams[method]; ok {
//流式消息
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
//....省略
}
0x04 总结
1.通过概念描述介绍和对GRPC 代码的拆解,我们可以看到涉及到服务通信的代码如何通过分层思路去阅读。
2.每个层负责的职责不一样,通过回调解耦了上下层;所以可以做多通信层,单路由或者多路由和单service层的代码。例如:nsqd 提供HTTP和TCP 两种通信方式,但是其service层是公用的。