Containerd 源码基础结构分享关键的服务基础类
关键类
server
file: containerd/services/server/server.go
// Server is the containerd main daemon
type Server struct {
grpcServer *grpc.Server
ttrpcServer *ttrpc.Server
tcpServer *grpc.Server
events *exchange.Exchange //事件总线,有抽离成单独库,可以应用自己的代码中
config *srvconfig.Config //containerd 服务配置
plugins []*plugin.Plugin //service 插件,服务插件话,方便服务扩展,值得参考借鉴
}
//server 对象构造函数关键流程
func New(ctx context.Context, config *srvconfig.Config) (*Server, error){
//apply sets config settings on the server process
if err := apply(ctx, config); err != nil {
return nil, err
}
//装载服务插件
//在该子函数中注册了content 和 bolt 服务插件
plugins, err := LoadPlugins(ctx, config)
if err != nil {
return nil, err
}
//加载diff工具
for id, p := range config.StreamProcessors {
diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args))
}
//配置grpc的handler 拦截器
serverOpts := []grpc.ServerOption{
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
}
//创建server 用于IO通信
ttrpcServer, err := newTTRPCServer()
if err != nil {
return nil, err
}
grpcServer = grpc.NewServer(serverOpts...)
tcpServer = grpc.NewServer(tcpServerOpts...)
//创建三个分别需要注册到对应server的service数组
grpcServices []plugin.Service
tcpServices []plugin.TCPService
ttrpcServices []plugin.TTRPCService
//创建server对象
s = &Server{
grpcServer: grpcServer,
tcpServer: tcpServer,
ttrpcServer: ttrpcServer,
events: exchange.NewExchange(),
config: config,
}
//初始化插件
initContext := plugin.NewContext(
ctx,
p,
initialized,
config.Root,
config.State,
)
initContext.Events = s.events //事件总线,用于插件间事件通信
initContext.Address = config.GRPC.Address //grpc接口通信
initContext.TTRPCAddress = config.TTRPC.Address//ttrpc 接口通信
result := p.Init(initContext)
if err := initialized.Add(result); err != nil {
return nil, errors.Wrapf(err, "could not add plugin result to plugin set")
}
instance, err := result.Instance()
//根据service插件实现的接口,将service对象实例放到不同数组中
// check for grpc services that should be registered with the server
if src, ok := instance.(plugin.Service); ok {
grpcServices = append(grpcServices, src)
}
if src, ok := instance.(plugin.TTRPCService); ok {
ttrpcServices = append(ttrpcServices, src)
}
if service, ok := instance.(plugin.TCPService); ok {
tcpServices = append(tcpServices, service)
}
//向server 注册服务(service)
// register services after all plugins have been initialized
for _, service := range grpcServices {
if err := service.Register(grpcServer); err != nil {
return nil, err
}
}
for _, service := range ttrpcServices {
if err := service.RegisterTTRPC(ttrpcServer); err != nil {
return nil, err
}
}
for _, service := range tcpServices {
if err := service.RegisterTCP(tcpServer); err != nil {
return nil, err
}
}
//对象构造完毕,返回containerd daemond server
return server
}
插件相关类
file:containerd/plugin/plugin.go
//全局默认的创建信息登记对象,通过func Register(r *Registration){}函数收录到登记对象中
var register = struct {
sync.RWMutex
r []*Registration
}{}
//插件登记信息
// Registration contains information for registering a plugin
type Registration struct {
// Type of the plugin
Type Type //插件类型
// ID of the plugin
ID string //插件ID
// Config specific to the plugin
Config interface{}
// Requires is a list of plugins that the registered plugin requires to be available
Requires []Type //插件依赖链
// InitFn is called when initializing a plugin. The registration and
// context are passed in. The init function may modify the registration to
// add exports, capabilities and platform support declarations.
InitFn func(*InitContext) (interface{}, error) //插件初始函数,返回关联到该插件对象的实例对象,需要使用插件化对象的外部构造函数
// Disable the plugin from loading
Disable bool
}
插件初始上下文类以及插件类 file:containerd/plugin/context.go
// InitContext is used for plugin inititalization
type InitContext struct {
Context context.Context
Root string
State string
Config interface{}
Address string
TTRPCAddress string
Events *exchange.Exchange
Meta *Meta // plugins can fill in metadata at init.
plugins *Set
}
//
// Plugin represents an initialized plugin, used with an init context.
type Plugin struct {
Registration *Registration // registration, as initialized
Config interface{} // config, as initialized
Meta *Meta
instance interface{} //插件化服务的实例对象
err error // will be set if there was an error initializing the plugin
}
插件的注入通过目录containerd/cmd/containerd/各个文件利用go 包加载调用init函数的特性,向插件系统注册插件例如:
//导入包
import (
_ "github.com/containerd/containerd/diff/walking/plugin"
)
//通过init 函数注册插件
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.DiffPlugin,
ID: "walking",
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
md, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
ic.Meta.Platforms = append(ic.Meta.Platforms, platforms.DefaultSpec())
cs := md.(*metadata.DB).ContentStore()
return diffPlugin{
Comparer: walking.NewWalkingDiff(cs),
Applier: apply.NewFileSystemApplier(cs),
}, nil
},
})
}
在目录containerd/containerd/services下可见,各个服务通过插件化组织。服务通过通信层插件,和服务层插件区分开,下面两个分类,区分开了插件的层次,通信层插件绑定本地服务
// ServicePlugin implements a internal service
ServicePlugin Type = "io.containerd.service.v1" //服务插件
// GRPCPlugin implements a grpc service
GRPCPlugin Type = "io.containerd.grpc.v1" //通信插件
//例如image服务
//通信层插件 file:containerd/services/images/service.go
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin, //grpc 通信插件
ID: "images",
Requires: []plugin.Type{
plugin.ServicePlugin, //依赖service插件
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, err
}
p, ok := plugins[services.ImagesService]
if !ok {
return nil, errors.New("images service not found")
}
i, err := p.Instance()
if err != nil {
return nil, err
}
return &service{local: i.(imagesapi.ImagesClient)}, nil
},
})
}
//通信层插件,和服务插件通信
type service struct {
local imagesapi.ImagesClient
}
//服务插件 file:containerd/services/images/local.go
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.ImagesService,
Requires: []plugin.Type{
plugin.MetadataPlugin,
plugin.GCPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
g, err := ic.Get(plugin.GCPlugin)
if err != nil {
return nil, err
}
return &local{
store: metadata.NewImageStore(m.(*metadata.DB)),
publisher: ic.Events,
gc: g.(gcScheduler),
}, nil
},
})
}
type gcScheduler interface {
ScheduleAndWait(context.Context) (gc.Stats, error)
}
//service 类,service类在应用层依赖 store 存储类,gc类,和事件总线类
type local struct {
store images.Store
gc gcScheduler
publisher events.Publisher
}
事件总线 Exchange
file: containerd/events/exchange/exchange.go
// Exchange broadcasts events
type Exchange struct {
broadcaster *goevents.Broadcaster //广播器
}
//Exchange 类实现了下面三个接口,用于事件的转发,订阅,和发布
// Publisher posts the event.
type Publisher interface {
Publish(ctx context.Context, topic string, event Event) error
}
// Forwarder forwards an event to the underlying event bus
type Forwarder interface {
Forward(ctx context.Context, envelope *Envelope) error
}
// Subscriber allows callers to subscribe to events
type Subscriber interface {
Subscribe(ctx context.Context, filters ...string) (ch <-chan *Envelope, errs <-chan error)
}
file:containerd/events/events.go
//信封对象,用于对事件消息的封装
// Envelope provides the packaging for an event.
type Envelope struct {
Timestamp time.Time //事件投递时间戳
Namespace string //命名空间
Topic string //事件topic
Event *types.Any //事件内容
}
以上是containerd 代码结构组织方式,以及几个关键类;结合实际代码阅读,理解containerd的代码组织结构,为后面继续深入阅读containerd的应用层代码打下基础,同时可以学习到代码结构插件化带来的扩展性,具有实际的参考意义。