apiserver 中的路由注册

在哪里进行的路由注册?

我们以 core API 为例 (也称为 legacy API),kube-apiserver 从启动开始,到开始注册 go-restful 之前的代码路径是:

-> cmd/kube-apiserver/app/server.go: func CreateServerChain()
	-> cmd/kube-apiserver/app/server.go: func CreateKubeAPIServer()
		-> pkg/controlplane/instance.go: func (c *completedConfig) New()
			-> k8s.io/apiserver/pkg/server/config.go func (c completedConfig) New()
				-> k8s.io/apiserver/pkg/server/handler.go NewAPIServerHandler()
					# 这里会初始化 restful.Container
				-> k8s.io/apiserver/pkg/server/config.go installAPI()
					# 这里会添加 profile, metric 等固定的 API
			-> pkg/controlplane/instance.go: func (m *Instance) InstallLegacyAPI()
				-> k8s.io/apiserver/pkg/server/genericapiserver.go: func (s *GenericAPIServer) InstallLegacyAPIGroup()
					-> k8s.io/apiserver/pkg/server/genericapiserver.go: func (s *GenericAPIServer) installAPIResources()
						-> k8s.io/apiserver/pkg/endpoints/groupversion.go: func (g *APIGroupVersion) InstallREST()
							-> k8s.io/apiserver/pkg/endpoints/installer.go: func (a *APIInstaller) Install()
								# 这里会创建 restful.WebService 对象
								-> k8s.io/apiserver/pkg/endpoints/installer.go: func (a *APIInstaller) registerResourceHandlers()
									# 这个函数很长,大概有 800 行,就是根据 API 对象的信息,向 restful.WebService 中添加路由。
							# 将得到的 restful.WebService 添加到 restful.Container 中。

上面是大概的流程结束之后,就会开始运行 apiserver,大概流程是如下:

-> cmd/kube-apiserver/app/server.go: Run()
	-> k8s.io/kube-aggregator/pkg/apiserver/apiserver.go: func (s *APIAggregator) PrepareRun()
		-> k8s.io/apiserver/pkg/server/genericapiserver.go: func (s *GenericAPIServer) PrepareRun()
	-> k8s.io/kube-aggregator/pkg/apiserver/apiserver.go: func (s preparedAPIAggregator) Run()
		-> k8s.io/apiserver/pkg/server/genericapiserver.go: func (s preparedGenericAPIServer) Run()
			# 这里最终根据 GenericAPIServer.APIServerHandler 来创建 http server
			# GenericAPIServer.APIServerHandler 则会将请求路由到它内部的 restful.Container 中,
			# 这个 container 包含了我们注册的 API

注册了哪些路由?

上一小节提到了,每个资源的 restful.WebService 中注册的路由都在如下方法中实现:

k8s.io/apiserver/pkg/endpoints/installer.go: func (a *APIInstaller) registerResourceHandlers()

这个函数的主要工具就是根据 APIGroupVersion 的信息生成需要添加到 restful.WebService 中的 route 内容,最主要部分就是指定 pathhandler,如下代码所示:

			route := ws.GET(action.Path).To(handler).	

因为 kubernetes 的所有资源的 API 都是统一的,所以你可以在这个函数里看到所有 API 的路由实现。

路由的 handler 在哪里?

找到一个路由后,我们就知道了 path,解析来还需要知道它是如何被 handle 的,也就是要找到 handler 的实现。

只要你继续跟进 func (a *APIInstaller) registerResourceHandlers() 的代码,就会发现,所有的 handler 都是在

k8s.io/apiserver/pkg/endpoints/handlers 这个模块中实现的。比如资源的 List 接口,就是在如下位置实现:

k8s.io/apiserver/pkg/endpoints/handlers/get.go: func ListResource()

在这个函数里,你可以看到 List 的实现,以及 Watch 的实现。

Handler 和资源的实现是如何关联起来的?

上面提到的功能,都是 apiserver 统一实现的,也就是说,每个资源都不需要自己实现这些部分。每个资源需要实现的部分,主要是数据操作部分。

Registry

这就要提到 registry 这个概念了,这个 registry 是 kubernetes 项目内部的代码上的概念,不是容器镜像那个概念。

在代码中可以找到这个概念的官方说明:

// Package registry contains the generic implementation of the storage and system logic.
package registry // import "k8s.io/apiserver/pkg/registry"

再简化一点的说,就是 kubernetes 项目中的 model 层。因为 k8s 使用 etcd 作为存储,所以就是一个使用 etcd 作为存储的 model 层。

Storage Interface

k8s.io/apiserver/pkg/registry/rest/rest.go 这个文件定义了存储的接口,代码中的一段注释说明了这个接口的定义:

// Storage interfaces need to be separated into two groups; those that operate
// on collections and those that operate on individually named items.
// Collection interfaces:
// (Method: Current -> Proposed)
//    GET: Lister -> CollectionGetter
//    WATCH: Watcher -> CollectionWatcher
//    CREATE: Creater -> CollectionCreater
//    DELETE: (n/a) -> CollectionDeleter
//    UPDATE: (n/a) -> CollectionUpdater
//
// Single item interfaces:
// (Method: Current -> Proposed)
//    GET: Getter -> NamedGetter
//    WATCH: (n/a) -> NamedWatcher
//    CREATE: (n/a) -> NamedCreater
//    DELETE: Deleter -> NamedDeleter
//    UPDATE: Update -> NamedUpdater

还有一个 Storage 的 interface:

// Storage is a generic interface for RESTful storage services.
// Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
// that objects may implement any of the below interfaces.
type Storage interface {
	// New returns an empty object that can be used with Create and Update after request data has been put into it.
	// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
	New() runtime.Object

	// Destroy cleans up its resources on shutdown.
	// Destroy has to be implemented in thread-safe way and be prepared
	// for being called more than once.
	Destroy()
}

这个类型,就是传递给上面那个添加路由函数的第二个参数:

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
}

因此,每个通过这个方法添加到 apiserver 的资源,都实现了 rest.Storage 这个接口。并且,还有可能实现其他的接口,如果有实现,就会添加对应的 API。因此,我们要看一个资源如何实现自己的 API 时,应该是去找它的 storage 实现。

Legacy API Storage

上面提到的这个方法 pkg/controlplane/instance.go: func (m *Instance) InstallLegacyAPI() 会在创建 apiserver 的时候被调用到,其中会添加我们最熟悉的那些资源的 storage,比如 pod。我们来具体看一下 pod 的 storage 添加的过程:

-> pkg/controlplane/instance.go: func (m *Instance) InstallLegacyAPI()
	-> pkg/registry/core/rest/storage_ore.go: func (c LegacyRESTStorageProvider) NewLegacyRESTStorage()
		-> pkg/registry/core/pod/storage/storage.go: func NewStorage()

Pod 的 NewStorage() 函数会返回一个 PodStorage 对象:

// PodStorage includes storage for pods and all sub resources
type PodStorage struct {
	Pod                 *REST
	Binding             *BindingREST
	LegacyBinding       *LegacyBindingREST
	Eviction            *EvictionREST
	Status              *StatusREST
	EphemeralContainers *EphemeralContainersREST
	Log                 *podrest.LogREST
	Proxy               *podrest.ProxyREST
	Exec                *podrest.ExecREST
	Attach              *podrest.AttachREST
	PortForward         *podrest.PortForwardREST
}

这个对象里的 Pod 成员,就是实现了 pod 常规 API 所需的 storage 接口:

// REST implements a RESTStorage for pods
type REST struct {
	*genericregistry.Store
	proxyTransport http.RoundTripper
}

它的大部分方法都是内嵌的 genericregistry.Store 实现的,所以你可以在 Store 对象中找到 List 方法的实现:

// List returns a list of items matching labels and field according to the
// store's PredicateFunc.
func (e *Store) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {

这个方法就是实现了 k8s.io/apiserver/pkg/registry/rest/rest.go: ListerInterface中的 List 方法:

// Lister is an object that can retrieve resources that match the provided field and label criteria.
type Lister interface {
	// NewList returns an empty object that can be used with the List call.
	// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
	NewList() runtime.Object
	// List selects resources in the storage which match to the selector. 'options' can be nil.
	List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error)
	// TableConvertor ensures all list implementers also implement table conversion
	TableConvertor
}

Pod 是一个很复杂的资源,还有很多其他的 storage 实现,以支持更多的 API:

# pkg/registry/core/rest/storage_ore.go: func (c LegacyRESTStorageProvider) NewLegacyRESTStorage()

	storage := map[string]rest.Storage{}
	if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
		storage[resource] = podStorage.Pod
		storage[resource+"/attach"] = podStorage.Attach
		storage[resource+"/status"] = podStorage.Status
		storage[resource+"/log"] = podStorage.Log
		storage[resource+"/exec"] = podStorage.Exec
		storage[resource+"/portforward"] = podStorage.PortForward
		storage[resource+"/proxy"] = podStorage.Proxy
		storage[resource+"/binding"] = podStorage.Binding
		if podStorage.Eviction != nil {
			storage[resource+"/eviction"] = podStorage.Eviction
		}
		storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers

	}

总结

Kubernetes 的 API 实现代码很复杂,看代码是很容易被绕晕的。

Storage 本身还有 cache 的实现,以及 etcd 访问的实现,本文并没有涉及。


知识共享许可协议本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。