containerd 源码导读-基础篇

Containerd 源码基础结构分享关键的服务基础类

关键类

server

file: containerd/services/server/server.go

// Server is the containerd main daemon
type Server struct {
	grpcServer  *grpc.Server   
	ttrpcServer *ttrpc.Server
	tcpServer   *grpc.Server
	events      *exchange.Exchange //事件总线,有抽离成单独库,可以应用自己的代码中
	config      *srvconfig.Config //containerd 服务配置
	plugins     []*plugin.Plugin //service 插件,服务插件话,方便服务扩展,值得参考借鉴
}

//server 对象构造函数关键流程
func New(ctx context.Context, config *srvconfig.Config) (*Server, error){
//apply sets config settings on the server process
if err := apply(ctx, config); err != nil {
return nil, err
}

//装载服务插件
//在该子函数中注册了content 和 bolt 服务插件
plugins, err := LoadPlugins(ctx, config)
if err != nil {
return nil, err
}
//加载diff工具
for id, p := range config.StreamProcessors {
diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args))
}
//配置grpc的handler 拦截器
serverOpts := []grpc.ServerOption{
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
}

//创建server 用于IO通信
ttrpcServer, err := newTTRPCServer()
if err != nil {
return nil, err
}
grpcServer = grpc.NewServer(serverOpts...)
tcpServer  = grpc.NewServer(tcpServerOpts...)
//创建三个分别需要注册到对应server的service数组
grpcServices  []plugin.Service
tcpServices   []plugin.TCPService
ttrpcServices []plugin.TTRPCService
//创建server对象
s = &Server{
grpcServer:  grpcServer,
tcpServer:   tcpServer,
ttrpcServer: ttrpcServer,
events:      exchange.NewExchange(),
config:      config,
}
//初始化插件
initContext := plugin.NewContext(
ctx,
p,
initialized,
config.Root,
config.State,
)
initContext.Events = s.events //事件总线,用于插件间事件通信
initContext.Address = config.GRPC.Address //grpc接口通信
initContext.TTRPCAddress = config.TTRPC.Address//ttrpc 接口通信
result := p.Init(initContext)
if err := initialized.Add(result); err != nil {
return nil, errors.Wrapf(err, "could not add plugin result to plugin set")
}

instance, err := result.Instance()
//根据service插件实现的接口,将service对象实例放到不同数组中
// check for grpc services that should be registered with the server
if src, ok := instance.(plugin.Service); ok {
grpcServices = append(grpcServices, src)
}
if src, ok := instance.(plugin.TTRPCService); ok {
ttrpcServices = append(ttrpcServices, src)
}
if service, ok := instance.(plugin.TCPService); ok {
tcpServices = append(tcpServices, service)
}

//向server 注册服务(service)
// register services after all plugins have been initialized
for _, service := range grpcServices {
if err := service.Register(grpcServer); err != nil {
return nil, err
}
}
for _, service := range ttrpcServices {
if err := service.RegisterTTRPC(ttrpcServer); err != nil {
return nil, err
}
}
for _, service := range tcpServices {
if err := service.RegisterTCP(tcpServer); err != nil {
return nil, err
}
}
//对象构造完毕,返回containerd daemond server
return server
}

插件相关类

file:containerd/plugin/plugin.go

//全局默认的创建信息登记对象,通过func Register(r *Registration){}函数收录到登记对象中
var register = struct {
sync.RWMutex
r []*Registration
}{}

//插件登记信息
// Registration contains information for registering a plugin
type Registration struct {
// Type of the plugin
Type Type //插件类型
// ID of the plugin
ID string //插件ID
// Config specific to the plugin
Config interface{}
// Requires is a list of plugins that the registered plugin requires to be available
Requires []Type //插件依赖链

// InitFn is called when initializing a plugin. The registration and
// context are passed in. The init function may modify the registration to
// add exports, capabilities and platform support declarations.
InitFn func(*InitContext) (interface{}, error) //插件初始函数,返回关联到该插件对象的实例对象,需要使用插件化对象的外部构造函数
// Disable the plugin from loading
Disable bool 
}

插件初始上下文类以及插件类 file:containerd/plugin/context.go

// InitContext is used for plugin inititalization
type InitContext struct {
Context      context.Context
Root         string
State        string
Config       interface{}
Address      string
TTRPCAddress string
Events       *exchange.Exchange
Meta *Meta // plugins can fill in metadata at init.
plugins *Set
}
//
// Plugin represents an initialized plugin, used with an init context.
type Plugin struct {
Registration *Registration // registration, as initialized
Config       interface{}   // config, as initialized
Meta         *Meta
instance interface{} //插件化服务的实例对象
err      error // will be set if there was an error initializing the plugin
}

插件的注入通过目录containerd/cmd/containerd/各个文件利用go 包加载调用init函数的特性,向插件系统注册插件例如:

//导入包
import (
_ "github.com/containerd/containerd/diff/walking/plugin"
)
//通过init 函数注册插件
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.DiffPlugin,
ID:   "walking",
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
md, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}

ic.Meta.Platforms = append(ic.Meta.Platforms, platforms.DefaultSpec())
cs := md.(*metadata.DB).ContentStore()

return diffPlugin{
Comparer: walking.NewWalkingDiff(cs),
Applier:  apply.NewFileSystemApplier(cs),
}, nil
},
})
}

在目录containerd/containerd/services下可见,各个服务通过插件化组织。服务通过通信层插件,和服务层插件区分开,下面两个分类,区分开了插件的层次,通信层插件绑定本地服务

// ServicePlugin implements a internal service
	ServicePlugin Type = "io.containerd.service.v1" //服务插件
// GRPCPlugin implements a grpc service
	GRPCPlugin Type = "io.containerd.grpc.v1" //通信插件
//例如image服务	
//通信层插件 file:containerd/services/images/service.go
func init() {
	plugin.Register(&plugin.Registration{
		Type: plugin.GRPCPlugin, //grpc 通信插件
		ID:   "images",
		Requires: []plugin.Type{
			plugin.ServicePlugin, //依赖service插件
		},
		InitFn: func(ic *plugin.InitContext) (interface{}, error) {
			plugins, err := ic.GetByType(plugin.ServicePlugin)
			if err != nil {
				return nil, err
			}
			p, ok := plugins[services.ImagesService]
			if !ok {
				return nil, errors.New("images service not found")
			}
			i, err := p.Instance()
			if err != nil {
				return nil, err
			}
			return &service{local: i.(imagesapi.ImagesClient)}, nil
		},
	})
}

//通信层插件,和服务插件通信
type service struct {
	local imagesapi.ImagesClient
}
//服务插件 file:containerd/services/images/local.go
func init() {
	plugin.Register(&plugin.Registration{
		Type: plugin.ServicePlugin,
		ID:   services.ImagesService,
		Requires: []plugin.Type{
			plugin.MetadataPlugin,
			plugin.GCPlugin,
		},
		InitFn: func(ic *plugin.InitContext) (interface{}, error) {
			m, err := ic.Get(plugin.MetadataPlugin)
			if err != nil {
				return nil, err
			}
			g, err := ic.Get(plugin.GCPlugin)
			if err != nil {
				return nil, err
			}

			return &local{
				store:     metadata.NewImageStore(m.(*metadata.DB)),
				publisher: ic.Events,
				gc:        g.(gcScheduler),
			}, nil
		},
	})
}

type gcScheduler interface {
	ScheduleAndWait(context.Context) (gc.Stats, error)
}
//service 类,service类在应用层依赖 store 存储类,gc类,和事件总线类
type local struct {
	store     images.Store
	gc        gcScheduler
	publisher events.Publisher
}

事件总线 Exchange

file: containerd/events/exchange/exchange.go

// Exchange broadcasts events
type Exchange struct {
broadcaster *goevents.Broadcaster //广播器
}
//Exchange 类实现了下面三个接口,用于事件的转发,订阅,和发布
// Publisher posts the event.
type Publisher interface {
Publish(ctx context.Context, topic string, event Event) error
}

// Forwarder forwards an event to the underlying event bus
type Forwarder interface {
Forward(ctx context.Context, envelope *Envelope) error
}

// Subscriber allows callers to subscribe to events
type Subscriber interface {
Subscribe(ctx context.Context, filters ...string) (ch <-chan *Envelope, errs <-chan error)
}

file:containerd/events/events.go

//信封对象,用于对事件消息的封装
// Envelope provides the packaging for an event.
type Envelope struct {
Timestamp time.Time //事件投递时间戳
Namespace string //命名空间
Topic     string //事件topic
Event     *types.Any //事件内容
}

以上是containerd 代码结构组织方式,以及几个关键类;结合实际代码阅读,理解containerd的代码组织结构,为后面继续深入阅读containerd的应用层代码打下基础,同时可以学习到代码结构插件化带来的扩展性,具有实际的参考意义。

参考

containerd