Kubernetes 代码笔记 -- 1
apiserver 中的路由注册
在哪里进行的路由注册?
我们以 core API 为例 (也称为 legacy API),kube-apiserver 从启动开始,到开始注册 go-restful 之前的代码路径是:
-> cmd/kube-apiserver/app/server.go: func CreateServerChain()
-> cmd/kube-apiserver/app/server.go: func CreateKubeAPIServer()
-> pkg/controlplane/instance.go: func (c *completedConfig) New()
-> k8s.io/apiserver/pkg/server/config.go func (c completedConfig) New()
-> k8s.io/apiserver/pkg/server/handler.go NewAPIServerHandler()
# 这里会初始化 restful.Container
-> k8s.io/apiserver/pkg/server/config.go installAPI()
# 这里会添加 profile, metric 等固定的 API
-> pkg/controlplane/instance.go: func (m *Instance) InstallLegacyAPI()
-> k8s.io/apiserver/pkg/server/genericapiserver.go: func (s *GenericAPIServer) InstallLegacyAPIGroup()
-> k8s.io/apiserver/pkg/server/genericapiserver.go: func (s *GenericAPIServer) installAPIResources()
-> k8s.io/apiserver/pkg/endpoints/groupversion.go: func (g *APIGroupVersion) InstallREST()
-> k8s.io/apiserver/pkg/endpoints/installer.go: func (a *APIInstaller) Install()
# 这里会创建 restful.WebService 对象
-> k8s.io/apiserver/pkg/endpoints/installer.go: func (a *APIInstaller) registerResourceHandlers()
# 这个函数很长,大概有 800 行,就是根据 API 对象的信息,向 restful.WebService 中添加路由。
# 将得到的 restful.WebService 添加到 restful.Container 中。
上面是大概的流程结束之后,就会开始运行 apiserver,大概流程是如下:
-> cmd/kube-apiserver/app/server.go: Run()
-> k8s.io/kube-aggregator/pkg/apiserver/apiserver.go: func (s *APIAggregator) PrepareRun()
-> k8s.io/apiserver/pkg/server/genericapiserver.go: func (s *GenericAPIServer) PrepareRun()
-> k8s.io/kube-aggregator/pkg/apiserver/apiserver.go: func (s preparedAPIAggregator) Run()
-> k8s.io/apiserver/pkg/server/genericapiserver.go: func (s preparedGenericAPIServer) Run()
# 这里最终根据 GenericAPIServer.APIServerHandler 来创建 http server
# GenericAPIServer.APIServerHandler 则会将请求路由到它内部的 restful.Container 中,
# 这个 container 包含了我们注册的 API
注册了哪些路由?
上一小节提到了,每个资源的 restful.WebService
中注册的路由都在如下方法中实现:
k8s.io/apiserver/pkg/endpoints/installer.go: func (a *APIInstaller) registerResourceHandlers()
这个函数的主要工具就是根据 APIGroupVersion
的信息生成需要添加到 restful.WebService
中的 route 内容,最主要部分就是指定 path
和 handler
,如下代码所示:
route := ws.GET(action.Path).To(handler).
因为 kubernetes 的所有资源的 API 都是统一的,所以你可以在这个函数里看到所有 API 的路由实现。
路由的 handler 在哪里?
找到一个路由后,我们就知道了 path,解析来还需要知道它是如何被 handle 的,也就是要找到 handler 的实现。
只要你继续跟进 func (a *APIInstaller) registerResourceHandlers()
的代码,就会发现,所有的 handler 都是在
k8s.io/apiserver/pkg/endpoints/handlers
这个模块中实现的。比如资源的 List 接口,就是在如下位置实现:
k8s.io/apiserver/pkg/endpoints/handlers/get.go: func ListResource()
在这个函数里,你可以看到 List 的实现,以及 Watch 的实现。
Handler 和资源的实现是如何关联起来的?
上面提到的功能,都是 apiserver 统一实现的,也就是说,每个资源都不需要自己实现这些部分。每个资源需要实现的部分,主要是数据操作部分。
Registry
这就要提到 registry 这个概念了,这个 registry 是 kubernetes 项目内部的代码上的概念,不是容器镜像那个概念。
在代码中可以找到这个概念的官方说明:
// Package registry contains the generic implementation of the storage and system logic.
package registry // import "k8s.io/apiserver/pkg/registry"
再简化一点的说,就是 kubernetes 项目中的 model 层。因为 k8s 使用 etcd 作为存储,所以就是一个使用 etcd 作为存储的 model 层。
Storage Interface
k8s.io/apiserver/pkg/registry/rest/rest.go 这个文件定义了存储的接口,代码中的一段注释说明了这个接口的定义:
// Storage interfaces need to be separated into two groups; those that operate
// on collections and those that operate on individually named items.
// Collection interfaces:
// (Method: Current -> Proposed)
// GET: Lister -> CollectionGetter
// WATCH: Watcher -> CollectionWatcher
// CREATE: Creater -> CollectionCreater
// DELETE: (n/a) -> CollectionDeleter
// UPDATE: (n/a) -> CollectionUpdater
//
// Single item interfaces:
// (Method: Current -> Proposed)
// GET: Getter -> NamedGetter
// WATCH: (n/a) -> NamedWatcher
// CREATE: (n/a) -> NamedCreater
// DELETE: Deleter -> NamedDeleter
// UPDATE: Update -> NamedUpdater
还有一个 Storage
的 interface:
// Storage is a generic interface for RESTful storage services.
// Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
// that objects may implement any of the below interfaces.
type Storage interface {
// New returns an empty object that can be used with Create and Update after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object
// Destroy cleans up its resources on shutdown.
// Destroy has to be implemented in thread-safe way and be prepared
// for being called more than once.
Destroy()
}
这个类型,就是传递给上面那个添加路由函数的第二个参数:
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
}
因此,每个通过这个方法添加到 apiserver 的资源,都实现了 rest.Storage
这个接口。并且,还有可能实现其他的接口,如果有实现,就会添加对应的 API。因此,我们要看一个资源如何实现自己的 API 时,应该是去找它的 storage 实现。
Legacy API Storage
上面提到的这个方法 pkg/controlplane/instance.go: func (m *Instance) InstallLegacyAPI()
会在创建 apiserver 的时候被调用到,其中会添加我们最熟悉的那些资源的 storage,比如 pod。我们来具体看一下 pod 的 storage 添加的过程:
-> pkg/controlplane/instance.go: func (m *Instance) InstallLegacyAPI()
-> pkg/registry/core/rest/storage_ore.go: func (c LegacyRESTStorageProvider) NewLegacyRESTStorage()
-> pkg/registry/core/pod/storage/storage.go: func NewStorage()
Pod 的 NewStorage()
函数会返回一个 PodStorage
对象:
// PodStorage includes storage for pods and all sub resources
type PodStorage struct {
Pod *REST
Binding *BindingREST
LegacyBinding *LegacyBindingREST
Eviction *EvictionREST
Status *StatusREST
EphemeralContainers *EphemeralContainersREST
Log *podrest.LogREST
Proxy *podrest.ProxyREST
Exec *podrest.ExecREST
Attach *podrest.AttachREST
PortForward *podrest.PortForwardREST
}
这个对象里的 Pod
成员,就是实现了 pod 常规 API 所需的 storage 接口:
// REST implements a RESTStorage for pods
type REST struct {
*genericregistry.Store
proxyTransport http.RoundTripper
}
它的大部分方法都是内嵌的 genericregistry.Store
实现的,所以你可以在 Store
对象中找到 List
方法的实现:
// List returns a list of items matching labels and field according to the
// store's PredicateFunc.
func (e *Store) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
这个方法就是实现了 k8s.io/apiserver/pkg/registry/rest/rest.go: ListerInterface
中的 List
方法:
// Lister is an object that can retrieve resources that match the provided field and label criteria.
type Lister interface {
// NewList returns an empty object that can be used with the List call.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
NewList() runtime.Object
// List selects resources in the storage which match to the selector. 'options' can be nil.
List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error)
// TableConvertor ensures all list implementers also implement table conversion
TableConvertor
}
Pod 是一个很复杂的资源,还有很多其他的 storage 实现,以支持更多的 API:
# pkg/registry/core/rest/storage_ore.go: func (c LegacyRESTStorageProvider) NewLegacyRESTStorage()
storage := map[string]rest.Storage{}
if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podStorage.Pod
storage[resource+"/attach"] = podStorage.Attach
storage[resource+"/status"] = podStorage.Status
storage[resource+"/log"] = podStorage.Log
storage[resource+"/exec"] = podStorage.Exec
storage[resource+"/portforward"] = podStorage.PortForward
storage[resource+"/proxy"] = podStorage.Proxy
storage[resource+"/binding"] = podStorage.Binding
if podStorage.Eviction != nil {
storage[resource+"/eviction"] = podStorage.Eviction
}
storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers
}
总结
Kubernetes 的 API 实现代码很复杂,看代码是很容易被绕晕的。
Storage 本身还有 cache 的实现,以及 etcd 访问的实现,本文并没有涉及。