网络通信服务代码阅读套路

涉及到网络通信的代码阅读套路

0x01 分层架构

1.1 架构定义

分层架构模式也叫做N层架构,分层架构模式中,每层都有特定的角色和职能。一般RPC 框架都是分层架构。例如GRPC,dubbo; RPC 框架可以分为transport 通信层,RPC方法路由层,业务service层。

1.2 上下层间的通信方式

分层架构中,上下层是需要通信的,一般上层和下层通信的时候,通过直接调用下层的方法即可。但是也有下层要主动需要和上层通信的情况,例如某个业务请求到了,或者某个事件发生了。下层和上层的通信主要有下面两种方式:

1.2.1 回调

上层可以通过想下层注册回调函数,类似于下层委托(函数指针)上层的方法处理请求。在下层事件发生或者网络包到达的时候调用上层的注册函数。

1.2.2 发布订阅模式

发布订阅模式可以基于观察者模式,消息队列的方式来实现;观察者模式可以立即调用订阅状态变化的对象方法,上下层有耦合。消息队列者可以解耦两者;上层者可以通过自己的实际的消息消费能力去处理消息,但是一般业务框架都需要立即响应请求,所以发布订阅模式RPC 框架中不业务请求中使用。

0x02 代码分层

通信层

通信层负责处理网络通信;责任为通过系统网络调用发起各种方式(TCP/HTTP/HTTPS)的网络包传输;如果用的TCP 则需要自己处理自定义的packet,需要校验包大小,是否收包完整;网络包一般分为head 和body, head会带一些token,方法路由信息,trace信息等公共使用的信息。如果使用的HTTP、HTTP2则有标准库来帮忙处理收包是否完整;TCP 自定义包协议格式推荐学习BaiduStd 协议格式。

路由层

路由层需要通过网络层解包后从头部信息中拿到方法信息。例如GRPC 其通信包头中含有消息需要路由到的服务名称和方法名称;这一层的路由非常简单,不会像网络路由那样复杂,直接通过map hash查找即可;一般情况下在RPC服务起来的时候,向server注入service对象,可以显示的方法绑定或者通过反射自动绑定。推荐反射,方式注入service方法,这样简单舒心,使用protobuffer,更是舒服,有协议编译工具直接生成桩代码;在网络层通知路由层消息到达后,路由层再回调service层对应的业务方法;路由层一般会负责网络body包的decoding 和 encoding,压缩和解压缩,还会有一些方法拦截器在路由层调用service层方法前调用。

业务层

业务层负责实现业务代码,响应客户端的请求。操作数据库数据等等;

0x03 GRPC 代码分层

3.1 transport 层

GRPC transport通信层代码分为client端和server端;下面说说server端的代码:
//${GRPC_PROJECT}/internal/transport/handler_server.go
   //这就是transport对象
   type serverHandlerTransport struct {
   //....
   }
   //下面这个这个方法就是传输层回调上层-路由层的方法。func(*Stream) 就是回调的函数指针
   func (ht *serverHandlerTransport) HandleStreams(startStream 	func(*Stream), traceCtx func(context.Context, string) context.Context) {
   // With this transport type there will be exactly 1 stream: this HTTP request.
   }
   

3.2 router层

向server 注入方法路由
//${GRPC_PROJECT}/server.go
//服务描述有由protobuf通过service描述生成方法桩代码
//ServiceDesc信息就是来自桩代码 
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.printf("RegisterService(%q)", sd.ServiceName)
	if s.serve {
		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
	}
	//
	if _, ok := s.services[sd.ServiceName]; ok {
		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
	}
	//服务信息
	info := &serviceInfo{
		serviceImpl: ss,
		//一元方法描述map
		methods:     make(map[string]*MethodDesc),
		//流式方法描述map
		streams:     make(map[string]*StreamDesc),
		mdata:       sd.Metadata,
	}
	//填入一元方法描述
	for i := range sd.Methods {
		d := &sd.Methods[i]
		info.methods[d.MethodName] = d
	}
	//填入流式方法描述
	for i := range sd.Streams {
		d := &sd.Streams[i]
		info.streams[d.StreamName] = d
	}
	s.services[sd.ServiceName] = info
}

路由注入transport的回调方法: ${GRPC_PROJECT}/server.go

func (s *Server) serveStreams(st transport.ServerTransport) {
	defer st.Close()
	var wg sync.WaitGroup

	var roundRobinCounter uint32
	//st 为servertransport对象,stream 则为transport层给上层回调函数的传参代表一次请求
	st.HandleStreams(func(stream *transport.Stream) {
		wg.Add(1)
		//如果设置了工作池,则消息通过chan传递出去;
		if s.opts.numServerWorkers > 0 {
			data := &serverWorkerData{st: st, wg: &wg, stream: stream}
			select {
			case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
			default:
				// If all stream workers are busy, fallback to the default code path.  
				go func() {
					s.handleStream(st, stream, s.traceInfo(st, stream))
					wg.Done()
				}()
			}
		} else {
		//默认情况下直接go出协程处理
			go func() {
				defer wg.Done()
				s.handleStream(st, stream, s.traceInfo(st, stream))
			}()
		}
	}, func(ctx context.Context, method string) context.Context {
		if !EnableTracing {
			return ctx
		}
		tr := trace.New("grpc.Recv."+methodFamily(method), method)
		return trace.NewContext(ctx, tr)
	})
	wg.Wait()
}

路由查找对应的service方法: //${GRPC_PROJECT}/server.go

func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
	//.... 省略
	service := sm[:pos]
	method := sm[pos+1:]

	srv, knownService := s.services[service]
	if knownService {
		if md, ok := srv.methods[method]; ok {
			//一元请求消息
			//1. 解压缩
			//2. decode
			//3. 通过下面的语句调用service层注册的service方法。
			//reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
			s.processUnaryRPC(t, stream, srv, md, trInfo)
			return
		}
		if sd, ok := srv.streams[method]; ok {
		//流式消息
			s.processStreamingRPC(t, stream, srv, sd, trInfo)
			return
		}
	}
	//....省略
}

0x04 总结

1.通过概念描述介绍和对GRPC 代码的拆解,我们可以看到涉及到服务通信的代码如何通过分层思路去阅读。

2.每个层负责的职责不一样,通过回调解耦了上下层;所以可以做多通信层,单路由或者多路由和单service层的代码。例如:nsqd 提供HTTP和TCP 两种通信方式,但是其service层是公用的。