• kube-apiserver源码分析(一)之 NewAPIServerCommand
  • 1. Main
  • 2. NewAPIServerCommand
  • 3. NewServerRunOptions
    • 3.1. ServerRunOptions
    • 3.2. NewServerRunOptions
    • 3.3. Complete
  • 3. AddFlagSet
    • 3.1. Flags
  • 4. Run
    • 4.1. CreateServerChain
    • 4.2. PrepareRun
    • 4.3. preparedGenericAPIServer.Run
  • 5. 总结

    kube-apiserver源码分析(一)之 NewAPIServerCommand

    以下代码分析基于 kubernetes v1.12.0 版本。

    本文主要分析kube-apiservercmd部分的代码,即NewAPIServerCommand相关的代码。更多具体的逻辑待后续文章分析。

    kube-apiservercmd部分目录代码结构如下:

    1. kube-apiserver
    2. ├── apiserver.go # kube-apiserver的main入口
    3. └── app
    4. ├── aggregator.go
    5. ├── apiextensions.go
    6. ├── options # 初始化kube-apiserver使用到的option
    7. ├── options.go # 包括:NewServerRunOptions、Flags等
    8. ├── options_test.go
    9. └── validation.go
    10. ├── server.go # 包括:NewAPIServerCommand、Run、CreateServerChain、Complete等

    1. Main

    此部分代码位于cmd/kube-apiserver/apiserver.go

    1. func main() {
    2. rand.Seed(time.Now().UTC().UnixNano())
    3. command := app.NewAPIServerCommand(server.SetupSignalHandler())
    4. // TODO: once we switch everything over to Cobra commands, we can go back to calling
    5. // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
    6. // normalize func and add the go flag set by hand.
    7. pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
    8. pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
    9. // utilflag.InitFlags()
    10. logs.InitLogs()
    11. defer logs.FlushLogs()
    12. if err := command.Execute(); err != nil {
    13. fmt.Fprintf(os.Stderr, "error: %v\n", err)
    14. os.Exit(1)
    15. }
    16. }

    核心代码:

    1. // 初始化APIServerCommand
    2. command := app.NewAPIServerCommand(server.SetupSignalHandler())
    3. // 执行Execute
    4. err := command.Execute()

    2. NewAPIServerCommand

    此部分的代码位于/cmd/kube-apiserver/app/server.go

    NewAPIServerCommand即Cobra命令行框架的构造函数,主要包括三部分:

    • 构造option
    • 添加Flags
    • 执行Run函数

    完整代码如下:

    此部分代码位于cmd/kube-apiserver/app/server.go

    1. // NewAPIServerCommand creates a *cobra.Command object with default parameters
    2. func NewAPIServerCommand(stopCh <-chan struct{}) *cobra.Command {
    3. s := options.NewServerRunOptions()
    4. cmd := &cobra.Command{
    5. Use: "kube-apiserver",
    6. Long: `The Kubernetes API server validates and configures data
    7. for the api objects which include pods, services, replicationcontrollers, and
    8. others. The API Server services REST operations and provides the frontend to the
    9. cluster's shared state through which all other components interact.`,
    10. RunE: func(cmd *cobra.Command, args []string) error {
    11. verflag.PrintAndExitIfRequested()
    12. utilflag.PrintFlags(cmd.Flags())
    13. // set default options
    14. completedOptions, err := Complete(s)
    15. if err != nil {
    16. return err
    17. }
    18. // validate options
    19. if errs := completedOptions.Validate(); len(errs) != 0 {
    20. return utilerrors.NewAggregate(errs)
    21. }
    22. return Run(completedOptions, stopCh)
    23. },
    24. }
    25. fs := cmd.Flags()
    26. namedFlagSets := s.Flags()
    27. for _, f := range namedFlagSets.FlagSets {
    28. fs.AddFlagSet(f)
    29. }
    30. usageFmt := "Usage:\n %s\n"
    31. cols, _, _ := apiserverflag.TerminalSize(cmd.OutOrStdout())
    32. cmd.SetUsageFunc(func(cmd *cobra.Command) error {
    33. fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
    34. apiserverflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
    35. return nil
    36. })
    37. cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
    38. fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
    39. apiserverflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
    40. })
    41. return cmd
    42. }

    核心代码:

    1. // 构造option
    2. s := options.NewServerRunOptions()
    3. // 添加flags
    4. fs := cmd.Flags()
    5. namedFlagSets := s.Flags()
    6. for _, f := range namedFlagSets.FlagSets {
    7. fs.AddFlagSet(f)
    8. }
    9. // set default options
    10. completedOptions, err := Complete(s)
    11. // Run
    12. Run(completedOptions, stopCh)

    3. NewServerRunOptions

    NewServerRunOptions基于默认的参数构造ServerRunOptions结构体。ServerRunOptions是apiserver运行的配置信息。具体结构体定义如下。

    3.1. ServerRunOptions

    其中主要的配置如下:

    • GenericServerRunOptions
    • Etcd
    • SecureServing
    • KubeletConfig
    1. // ServerRunOptions runs a kubernetes api server.
    2. type ServerRunOptions struct {
    3. GenericServerRunOptions *genericoptions.ServerRunOptions
    4. Etcd *genericoptions.EtcdOptions
    5. SecureServing *genericoptions.SecureServingOptionsWithLoopback
    6. InsecureServing *genericoptions.DeprecatedInsecureServingOptionsWithLoopback
    7. Audit *genericoptions.AuditOptions
    8. Features *genericoptions.FeatureOptions
    9. Admission *kubeoptions.AdmissionOptions
    10. Authentication *kubeoptions.BuiltInAuthenticationOptions
    11. Authorization *kubeoptions.BuiltInAuthorizationOptions
    12. CloudProvider *kubeoptions.CloudProviderOptions
    13. StorageSerialization *kubeoptions.StorageSerializationOptions
    14. APIEnablement *genericoptions.APIEnablementOptions
    15. AllowPrivileged bool
    16. EnableLogsHandler bool
    17. EventTTL time.Duration
    18. KubeletConfig kubeletclient.KubeletClientConfig
    19. KubernetesServiceNodePort int
    20. MaxConnectionBytesPerSec int64
    21. ServiceClusterIPRange net.IPNet // TODO: make this a list
    22. ServiceNodePortRange utilnet.PortRange
    23. SSHKeyfile string
    24. SSHUser string
    25. ProxyClientCertFile string
    26. ProxyClientKeyFile string
    27. EnableAggregatorRouting bool
    28. MasterCount int
    29. EndpointReconcilerType string
    30. ServiceAccountSigningKeyFile string
    31. }

    3.2. NewServerRunOptions

    NewServerRunOptions初始化配置结构体。

    1. // NewServerRunOptions creates a new ServerRunOptions object with default parameters
    2. func NewServerRunOptions() *ServerRunOptions {
    3. s := ServerRunOptions{
    4. GenericServerRunOptions: genericoptions.NewServerRunOptions(),
    5. Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, nil)),
    6. SecureServing: kubeoptions.NewSecureServingOptions(),
    7. InsecureServing: kubeoptions.NewInsecureServingOptions(),
    8. Audit: genericoptions.NewAuditOptions(),
    9. Features: genericoptions.NewFeatureOptions(),
    10. Admission: kubeoptions.NewAdmissionOptions(),
    11. Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),
    12. Authorization: kubeoptions.NewBuiltInAuthorizationOptions(),
    13. CloudProvider: kubeoptions.NewCloudProviderOptions(),
    14. StorageSerialization: kubeoptions.NewStorageSerializationOptions(),
    15. APIEnablement: genericoptions.NewAPIEnablementOptions(),
    16. EnableLogsHandler: true,
    17. EventTTL: 1 * time.Hour,
    18. MasterCount: 1,
    19. EndpointReconcilerType: string(reconcilers.LeaseEndpointReconcilerType),
    20. KubeletConfig: kubeletclient.KubeletClientConfig{
    21. Port: ports.KubeletPort,
    22. ReadOnlyPort: ports.KubeletReadOnlyPort,
    23. PreferredAddressTypes: []string{
    24. // --override-hostname
    25. string(api.NodeHostName),
    26. // internal, preferring DNS if reported
    27. string(api.NodeInternalDNS),
    28. string(api.NodeInternalIP),
    29. // external, preferring DNS if reported
    30. string(api.NodeExternalDNS),
    31. string(api.NodeExternalIP),
    32. },
    33. EnableHttps: true,
    34. HTTPTimeout: time.Duration(5) * time.Second,
    35. },
    36. ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange,
    37. }
    38. s.ServiceClusterIPRange = kubeoptions.DefaultServiceIPCIDR
    39. // Overwrite the default for storage data format.
    40. s.Etcd.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
    41. return &s
    42. }

    3.3. Complete

    当kube-apiserver的flags被解析后,调用Complete完成默认配置。

    此部分代码位于cmd/kube-apiserver/app/server.go

    1. // Should be called after kube-apiserver flags parsed.
    2. func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) {
    3. var options completedServerRunOptions
    4. // set defaults
    5. if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing.SecureServingOptions); err != nil {
    6. return options, err
    7. }
    8. if err := kubeoptions.DefaultAdvertiseAddress(s.GenericServerRunOptions, s.InsecureServing.DeprecatedInsecureServingOptions); err != nil {
    9. return options, err
    10. }
    11. serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
    12. if err != nil {
    13. return options, fmt.Errorf("error determining service IP ranges: %v", err)
    14. }
    15. s.ServiceClusterIPRange = serviceIPRange
    16. if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), []string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}, []net.IP{apiServerServiceIP}); err != nil {
    17. return options, fmt.Errorf("error creating self-signed certificates: %v", err)
    18. }
    19. if len(s.GenericServerRunOptions.ExternalHost) == 0 {
    20. if len(s.GenericServerRunOptions.AdvertiseAddress) > 0 {
    21. s.GenericServerRunOptions.ExternalHost = s.GenericServerRunOptions.AdvertiseAddress.String()
    22. } else {
    23. if hostname, err := os.Hostname(); err == nil {
    24. s.GenericServerRunOptions.ExternalHost = hostname
    25. } else {
    26. return options, fmt.Errorf("error finding host name: %v", err)
    27. }
    28. }
    29. glog.Infof("external host was not specified, using %v", s.GenericServerRunOptions.ExternalHost)
    30. }
    31. s.Authentication.ApplyAuthorization(s.Authorization)
    32. // Use (ServiceAccountSigningKeyFile != "") as a proxy to the user enabling
    33. // TokenRequest functionality. This defaulting was convenient, but messed up
    34. // a lot of people when they rotated their serving cert with no idea it was
    35. // connected to their service account keys. We are taking this oppurtunity to
    36. // remove this problematic defaulting.
    37. if s.ServiceAccountSigningKeyFile == "" {
    38. // Default to the private server key for service account token signing
    39. if len(s.Authentication.ServiceAccounts.KeyFiles) == 0 && s.SecureServing.ServerCert.CertKey.KeyFile != "" {
    40. if kubeauthenticator.IsValidServiceAccountKeyFile(s.SecureServing.ServerCert.CertKey.KeyFile) {
    41. s.Authentication.ServiceAccounts.KeyFiles = []string{s.SecureServing.ServerCert.CertKey.KeyFile}
    42. } else {
    43. glog.Warning("No TLS key provided, service account token authentication disabled")
    44. }
    45. }
    46. }
    47. if s.Etcd.StorageConfig.DeserializationCacheSize == 0 {
    48. // When size of cache is not explicitly set, estimate its size based on
    49. // target memory usage.
    50. glog.V(2).Infof("Initializing deserialization cache size based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
    51. // This is the heuristics that from memory capacity is trying to infer
    52. // the maximum number of nodes in the cluster and set cache sizes based
    53. // on that value.
    54. // From our documentation, we officially recommend 120GB machines for
    55. // 2000 nodes, and we scale from that point. Thus we assume ~60MB of
    56. // capacity per node.
    57. // TODO: We may consider deciding that some percentage of memory will
    58. // be used for the deserialization cache and divide it by the max object
    59. // size to compute its size. We may even go further and measure
    60. // collective sizes of the objects in the cache.
    61. clusterSize := s.GenericServerRunOptions.TargetRAMMB / 60
    62. s.Etcd.StorageConfig.DeserializationCacheSize = 25 * clusterSize
    63. if s.Etcd.StorageConfig.DeserializationCacheSize < 1000 {
    64. s.Etcd.StorageConfig.DeserializationCacheSize = 1000
    65. }
    66. }
    67. if s.Etcd.EnableWatchCache {
    68. glog.V(2).Infof("Initializing cache sizes based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
    69. sizes := cachesize.NewHeuristicWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
    70. if userSpecified, err := serveroptions.ParseWatchCacheSizes(s.Etcd.WatchCacheSizes); err == nil {
    71. for resource, size := range userSpecified {
    72. sizes[resource] = size
    73. }
    74. }
    75. s.Etcd.WatchCacheSizes, err = serveroptions.WriteWatchCacheSizes(sizes)
    76. if err != nil {
    77. return options, err
    78. }
    79. }
    80. // TODO: remove when we stop supporting the legacy group version.
    81. if s.APIEnablement.RuntimeConfig != nil {
    82. for key, value := range s.APIEnablement.RuntimeConfig {
    83. if key == "v1" || strings.HasPrefix(key, "v1/") ||
    84. key == "api/v1" || strings.HasPrefix(key, "api/v1/") {
    85. delete(s.APIEnablement.RuntimeConfig, key)
    86. s.APIEnablement.RuntimeConfig["/v1"] = value
    87. }
    88. if key == "api/legacy" {
    89. delete(s.APIEnablement.RuntimeConfig, key)
    90. }
    91. }
    92. }
    93. options.ServerRunOptions = s
    94. return options, nil
    95. }

    3. AddFlagSet

    AddFlagSet主要的作用是通过外部传入的flag的具体值,解析的时候传递给option的结构体,最终给apiserver使用。

    其中NewAPIServerCommand关于AddFlagSet的相关代码如下:

    1. fs := cmd.Flags()
    2. namedFlagSets := s.Flags()
    3. for _, f := range namedFlagSets.FlagSets {
    4. fs.AddFlagSet(f)
    5. }

    3.1. Flags

    Flags完整代码如下:

    此部分代码位于cmd/kube-apiserver/app/options/options.go

    1. // Flags returns flags for a specific APIServer by section name
    2. func (s *ServerRunOptions) Flags() (fss apiserverflag.NamedFlagSets) {
    3. // Add the generic flags.
    4. s.GenericServerRunOptions.AddUniversalFlags(fss.FlagSet("generic"))
    5. s.Etcd.AddFlags(fss.FlagSet("etcd"))
    6. s.SecureServing.AddFlags(fss.FlagSet("secure serving"))
    7. s.InsecureServing.AddFlags(fss.FlagSet("insecure serving"))
    8. s.InsecureServing.AddUnqualifiedFlags(fss.FlagSet("insecure serving")) // TODO: remove it until kops stops using `--address`
    9. s.Audit.AddFlags(fss.FlagSet("auditing"))
    10. s.Features.AddFlags(fss.FlagSet("features"))
    11. s.Authentication.AddFlags(fss.FlagSet("authentication"))
    12. s.Authorization.AddFlags(fss.FlagSet("authorization"))
    13. s.CloudProvider.AddFlags(fss.FlagSet("cloud provider"))
    14. s.StorageSerialization.AddFlags(fss.FlagSet("storage"))
    15. s.APIEnablement.AddFlags(fss.FlagSet("api enablement"))
    16. s.Admission.AddFlags(fss.FlagSet("admission"))
    17. // Note: the weird ""+ in below lines seems to be the only way to get gofmt to
    18. // arrange these text blocks sensibly. Grrr.
    19. fs := fss.FlagSet("misc")
    20. fs.DurationVar(&s.EventTTL, "event-ttl", s.EventTTL,
    21. "Amount of time to retain events.")
    22. fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged,
    23. "If true, allow privileged containers. [default=false]")
    24. fs.BoolVar(&s.EnableLogsHandler, "enable-logs-handler", s.EnableLogsHandler,
    25. "If true, install a /logs handler for the apiserver logs.")
    26. // Deprecated in release 1.9
    27. fs.StringVar(&s.SSHUser, "ssh-user", s.SSHUser,
    28. "If non-empty, use secure SSH proxy to the nodes, using this user name")
    29. fs.MarkDeprecated("ssh-user", "This flag will be removed in a future version.")
    30. // Deprecated in release 1.9
    31. fs.StringVar(&s.SSHKeyfile, "ssh-keyfile", s.SSHKeyfile,
    32. "If non-empty, use secure SSH proxy to the nodes, using this user keyfile")
    33. fs.MarkDeprecated("ssh-keyfile", "This flag will be removed in a future version.")
    34. fs.Int64Var(&s.MaxConnectionBytesPerSec, "max-connection-bytes-per-sec", s.MaxConnectionBytesPerSec, ""+
    35. "If non-zero, throttle each user connection to this number of bytes/sec. "+
    36. "Currently only applies to long-running requests.")
    37. fs.IntVar(&s.MasterCount, "apiserver-count", s.MasterCount,
    38. "The number of apiservers running in the cluster, must be a positive number. (In use when --endpoint-reconciler-type=master-count is enabled.)")
    39. fs.StringVar(&s.EndpointReconcilerType, "endpoint-reconciler-type", string(s.EndpointReconcilerType),
    40. "Use an endpoint reconciler ("+strings.Join(reconcilers.AllTypes.Names(), ", ")+")")
    41. // See #14282 for details on how to test/try this option out.
    42. // TODO: remove this comment once this option is tested in CI.
    43. fs.IntVar(&s.KubernetesServiceNodePort, "kubernetes-service-node-port", s.KubernetesServiceNodePort, ""+
    44. "If non-zero, the Kubernetes master service (which apiserver creates/maintains) will be "+
    45. "of type NodePort, using this as the value of the port. If zero, the Kubernetes master "+
    46. "service will be of type ClusterIP.")
    47. fs.IPNetVar(&s.ServiceClusterIPRange, "service-cluster-ip-range", s.ServiceClusterIPRange, ""+
    48. "A CIDR notation IP range from which to assign service cluster IPs. This must not "+
    49. "overlap with any IP ranges assigned to nodes for pods.")
    50. fs.Var(&s.ServiceNodePortRange, "service-node-port-range", ""+
    51. "A port range to reserve for services with NodePort visibility. "+
    52. "Example: '30000-32767'. Inclusive at both ends of the range.")
    53. // Kubelet related flags:
    54. fs.BoolVar(&s.KubeletConfig.EnableHttps, "kubelet-https", s.KubeletConfig.EnableHttps,
    55. "Use https for kubelet connections.")
    56. fs.StringSliceVar(&s.KubeletConfig.PreferredAddressTypes, "kubelet-preferred-address-types", s.KubeletConfig.PreferredAddressTypes,
    57. "List of the preferred NodeAddressTypes to use for kubelet connections.")
    58. fs.UintVar(&s.KubeletConfig.Port, "kubelet-port", s.KubeletConfig.Port,
    59. "DEPRECATED: kubelet port.")
    60. fs.MarkDeprecated("kubelet-port", "kubelet-port is deprecated and will be removed.")
    61. fs.UintVar(&s.KubeletConfig.ReadOnlyPort, "kubelet-read-only-port", s.KubeletConfig.ReadOnlyPort,
    62. "DEPRECATED: kubelet port.")
    63. fs.DurationVar(&s.KubeletConfig.HTTPTimeout, "kubelet-timeout", s.KubeletConfig.HTTPTimeout,
    64. "Timeout for kubelet operations.")
    65. fs.StringVar(&s.KubeletConfig.CertFile, "kubelet-client-certificate", s.KubeletConfig.CertFile,
    66. "Path to a client cert file for TLS.")
    67. fs.StringVar(&s.KubeletConfig.KeyFile, "kubelet-client-key", s.KubeletConfig.KeyFile,
    68. "Path to a client key file for TLS.")
    69. fs.StringVar(&s.KubeletConfig.CAFile, "kubelet-certificate-authority", s.KubeletConfig.CAFile,
    70. "Path to a cert file for the certificate authority.")
    71. // TODO: delete this flag in 1.13
    72. repair := false
    73. fs.BoolVar(&repair, "repair-malformed-updates", false, "deprecated")
    74. fs.MarkDeprecated("repair-malformed-updates", "This flag will be removed in a future version")
    75. fs.StringVar(&s.ProxyClientCertFile, "proxy-client-cert-file", s.ProxyClientCertFile, ""+
    76. "Client certificate used to prove the identity of the aggregator or kube-apiserver "+
    77. "when it must call out during a request. This includes proxying requests to a user "+
    78. "api-server and calling out to webhook admission plugins. It is expected that this "+
    79. "cert includes a signature from the CA in the --requestheader-client-ca-file flag. "+
    80. "That CA is published in the 'extension-apiserver-authentication' configmap in "+
    81. "the kube-system namespace. Components receiving calls from kube-aggregator should "+
    82. "use that CA to perform their half of the mutual TLS verification.")
    83. fs.StringVar(&s.ProxyClientKeyFile, "proxy-client-key-file", s.ProxyClientKeyFile, ""+
    84. "Private key for the client certificate used to prove the identity of the aggregator or kube-apiserver "+
    85. "when it must call out during a request. This includes proxying requests to a user "+
    86. "api-server and calling out to webhook admission plugins.")
    87. fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting,
    88. "Turns on aggregator routing requests to endpoints IP rather than cluster IP.")
    89. fs.StringVar(&s.ServiceAccountSigningKeyFile, "service-account-signing-key-file", s.ServiceAccountSigningKeyFile, ""+
    90. "Path to the file that contains the current private key of the service account token issuer. The issuer will sign issued ID tokens with this private key. (Requires the 'TokenRequest' feature gate.)")
    91. return fss
    92. }

    4. Run

    Run以常驻的方式运行apiserver。

    主要内容如下:

    1. 构造一个聚合的server结构体。
    2. 执行PrepareRun。
    3. 最终执行Run。

    此部分代码位于cmd/kube-apiserver/app/server.go

    1. // Run runs the specified APIServer. This should never exit.
    2. func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
    3. // To help debugging, immediately log version
    4. glog.Infof("Version: %+v", version.Get())
    5. server, err := CreateServerChain(completeOptions, stopCh)
    6. if err != nil {
    7. return err
    8. }
    9. return server.PrepareRun().Run(stopCh)
    10. }

    4.1. CreateServerChain

    构造聚合的Server。

    基本流程如下:

    1. 首先生成config对象,包括kubeAPIServerConfigapiExtensionsConfig
    2. 再通过config生成server对象,包括apiExtensionsServerkubeAPIServer
    3. 执行apiExtensionsServerkubeAPIServerPrepareRun部分。
    4. 生成聚合的config对象aggregatorConfig
    5. 基于aggregatorConfigkubeAPIServerapiExtensionsServer生成聚合的serveraggregatorServer

    此部分代码位于cmd/kube-apiserver/app/server.go

    1. // CreateServerChain creates the apiservers connected via delegation.
    2. func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
    3. nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
    4. if err != nil {
    5. return nil, err
    6. }
    7. kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
    8. if err != nil {
    9. return nil, err
    10. }
    11. // If additional API servers are added, they should be gated.
    12. apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount)
    13. if err != nil {
    14. return nil, err
    15. }
    16. apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
    17. if err != nil {
    18. return nil, err
    19. }
    20. kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
    21. if err != nil {
    22. return nil, err
    23. }
    24. // otherwise go down the normal path of standing the aggregator up in front of the API server
    25. // this wires up openapi
    26. kubeAPIServer.GenericAPIServer.PrepareRun()
    27. // This will wire up openapi for extension api server
    28. apiExtensionsServer.GenericAPIServer.PrepareRun()
    29. // aggregator comes last in the chain
    30. aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
    31. if err != nil {
    32. return nil, err
    33. }
    34. aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
    35. if err != nil {
    36. // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
    37. return nil, err
    38. }
    39. if insecureServingInfo != nil {
    40. insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
    41. if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
    42. return nil, err
    43. }
    44. }
    45. return aggregatorServer.GenericAPIServer, nil
    46. }

    4.2. PrepareRun

    PrepareRun主要执行一些API安装操作。

    此部分的代码位于vendor/k8s.io/apiserver/pkg/server/genericapiserver.go

    1. // PrepareRun does post API installation setup steps.
    2. func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
    3. if s.swaggerConfig != nil {
    4. routes.Swagger{Config: s.swaggerConfig}.Install(s.Handler.GoRestfulContainer)
    5. }
    6. if s.openAPIConfig != nil {
    7. routes.OpenAPI{
    8. Config: s.openAPIConfig,
    9. }.Install(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
    10. }
    11. s.installHealthz()
    12. // Register audit backend preShutdownHook.
    13. if s.AuditBackend != nil {
    14. s.AddPreShutdownHook("audit-backend", func() error {
    15. s.AuditBackend.Shutdown()
    16. return nil
    17. })
    18. }
    19. return preparedGenericAPIServer{s}
    20. }

    4.3. preparedGenericAPIServer.Run

    preparedGenericAPIServer.Run运行一个安全的http server。具体的实现逻辑待后续文章分析。

    此部分代码位于vendor/k8s.io/apiserver/pkg/server/genericapiserver.go

    1. // Run spawns the secure http server. It only returns if stopCh is closed
    2. // or the secure port cannot be listened on initially.
    3. func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
    4. err := s.NonBlockingRun(stopCh)
    5. if err != nil {
    6. return err
    7. }
    8. <-stopCh
    9. err = s.RunPreShutdownHooks()
    10. if err != nil {
    11. return err
    12. }
    13. // Wait for all requests to finish, which are bounded by the RequestTimeout variable.
    14. s.HandlerChainWaitGroup.Wait()
    15. return nil
    16. }

    核心函数:

    1. err := s.NonBlockingRun(stopCh)

    preparedGenericAPIServer.Run主要是调用NonBlockingRun函数,最终运行一个http server。该部分逻辑待后续文章分析。

    5. 总结

    NewAPIServerCommand采用了Cobra命令行框架,该框架使用主要包含以下部分:

    • 构造option参数,提供给执行主体(例如 本文的server)作为配置参数使用。
    • 添加Flags,主要用来通过传入的flags参数最终解析成option中使用的结构体属性。
    • 执行Run函数,执行主体的运行逻辑部分(核心部分)。

    其中Run函数的主要内容如下:

    1. 构造一个聚合的server结构体。
    2. 执行PrepareRun。
    3. 最终执行preparedGenericAPIServer.Run。

    preparedGenericAPIServer.Run主要是调用NonBlockingRun函数,最终运行一个http server。NonBlockingRun的具体逻辑待后续文章再单独分析。

    参考:

    • https://github.com/kubernetes/kubernetes/tree/v1.12.0/cmd/kube-apiserver
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kube-apiserver/app/server.go
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kube-apiserver/app/aggregator.go
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kube-apiserver/app/options/options.go