Kubernetes Informer 源码解析与深度使用 [2/4]: informers 包源码解析与 Informer 在实际使用中的最佳实践
06 May 21 16:28 +0000

本文分 4 部分,分别是:

(1). cache 包源码解析与 Informer 的使用;

(2). informers 包源码解析与 SharedInformerFactory 的使用,以及 Informer 在实际使用中的最佳实践;

(3). 实现自定义资源 (CRD) Informer;

(4). dynamic 包源码解析与 DynamicSharedInformerFactory 的使用;

这里是第 2 部分。

本文使用的所有代码都可以在 articles/archive/dive-into-kubernetes-informer at main · wbsnail/articles · GitHub 找到。

前言

前一部分中,我给大家分析了 kubernetes/client-go 仓库中的 cache 包的主要代码,并梳理了 Informer 的工作原理和实现方式,这一部分我将着重分析 informers 包,并介绍 SharedInformerFactory 的正确打开方式。

👮‍♂️ 如果你没有看过前一部分,建议先看完它。

informers 包源码解析

工厂模式

在前一部分中我们知道,cache 包暴露的 Informer 创建方法有以下 5 个:

  • New
  • NewInformer
  • NewIndexerInformer
  • NewSharedInformer
  • NewSharedIndexInformer

它们有着不同程度的抽象和封装,NewSharedIndexInformer 是其中抽象程度最低,封装程度最高的一个,但即使是 NewSharedIndexInformer,也没有封装具体的资源类型,需要接收 ListerWatcher 和 Indexers 作为参数:

func NewSharedIndexInformer(
	lw ListerWatcher,
	exampleObject runtime.Object,
	defaultEventHandlerResyncPeriod time.Duration,
	indexers Indexers,
) SharedIndexInformer {}

就是说要构建 Informer,我们还得先构建 ListWatcher 和 Indexers。对于多种多样的资源类型来说,应该如何设计以更好地封装,提高代码的复用性呢?informers 包采用了工厂模式:为每种内置的资源类型创建对应的 Informer 工厂类,要使用某种资源 Informer 的时候直接使用工厂类构建。

package main

import (
	"fmt"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/tools/cache"
)

func main() {
	fmt.Println("----- 9-shared-informer-factory -----")

	// mustClientset 用于创建 kubernetes.Interface 实例,
	// 代码在前一部分中;
	// 第 2 个参数是 defaultResync,是构建新 Informer 时默认的 resyncPeriod,
	// resyncPeriod 在前一部分中介绍过了;
	informerFactory := informers.NewSharedInformerFactoryWithOptions(
		mustClientset(), 0, informers.WithNamespace("tmp"))
	configMapsInformer := informerFactory.Core().V1().ConfigMaps().Informer()
	configMapsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			configMap, ok := obj.(*corev1.ConfigMap)
			if !ok {
				return
			}
			fmt.Printf("created: %s\n", configMap.Name)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			configMap, ok := newObj.(*corev1.ConfigMap)
			if !ok {
				return
			}
			fmt.Printf("updated: %s\n", configMap.Name)
		},
		DeleteFunc: func(obj interface{}) {
			configMap, ok := obj.(*corev1.ConfigMap)
			if !ok {
				return
			}
			fmt.Printf("deleted: %s\n", configMap.Name)
		},
	})

	stopCh := make(chan struct{})
	defer close(stopCh)

	fmt.Println("Start syncing....")

	go informerFactory.Start(stopCh)

	<-stopCh
}

输出类似于:

----- 9-shared-informer-factory -----
Start syncing....
created: demo1
created: demo
deleted: demo
created: demo

本文使用的所有代码都可以在 articles/archive/dive-into-kubernetes-informer at main · wbsnail/articles · GitHub 找到。

上面的代码结构应该非常清晰,不用过多解释。值得一提的是 NewSharedInformerFactoryWithOptions 使用了函数式选项 (functional options) 模式:

// SharedInformerOption defines the functional option type for SharedInformerFactory.
type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory

func NewSharedInformerFactoryWithOptions(
	client kubernetes.Interface,
	defaultResync time.Duration,
	options ...SharedInformerOption,
) SharedInformerFactory {}

接收的选项 (options) 会被依次执行,通过这种方式对 SharedInformerFactory 实例的参数进行调整。

如果没有调整的需要,可以直接使用默认方法:

func NewSharedInformerFactory(
	client kubernetes.Interface,
	defaultResync time.Duration,
) SharedInformerFactory {}

👨‍🔧 默认的 namespace 是所有 namespace。

通过 SharedInformerFactory,我们可以很方便地构建各种资源的 Informer 实例,比如:

configMapsInformer := informerFactory.Core().V1().ConfigMaps().Informer()

注意很重要的一点:对每种资源类型构建的 Informer 会被缓存,对相同资源重复调用 Informer() 返回的是同一个 Informer 实例。

除了构建 Informer 外,SharedInformerFactory 还支持构建 Lister,比如:

configMapLister := informerFactory.Core().V1().ConfigMaps().Lister()

返回的 Lister 是针对具体资源类型的,使用时不需要转换类型,用起来会比 Indexer 方便不少,比如说 ConfigMapLister:

// ConfigMapLister helps list ConfigMaps.
// All objects returned here must be treated as read-only.
type ConfigMapLister interface {
	// List lists all ConfigMaps in the indexer.
	// Objects returned here must be treated as read-only.
	List(selector labels.Selector) (ret []*v1.ConfigMap, err error)
	// ConfigMaps returns an object that can list and get ConfigMaps.
	ConfigMaps(namespace string) ConfigMapNamespaceLister
}

// ConfigMapNamespaceLister helps list and get ConfigMaps.
// All objects returned here must be treated as read-only.
type ConfigMapNamespaceLister interface {
	// List lists all ConfigMaps in the indexer for a given namespace.
	// Objects returned here must be treated as read-only.
	List(selector labels.Selector) (ret []*v1.ConfigMap, err error)
	// Get retrieves the ConfigMap from the indexer for a given namespace and name.
	// Objects returned here must be treated as read-only.
	Get(name string) (*v1.ConfigMap, error)
}

在所有针对特定资源类型的方法外,SharedInformerFactory 还暴露以下 4 个方法:

	InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
	ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
	Start(stopCh <-chan struct{})
	WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

InformerFor 用于获取指定资源类型的 Informer。注意!通过此方法构建的 Informer 也会被缓存,下次对相同资源类型的调用就不会调用 newFunc,因此会影响使用默认方法获取的 Informer,大多数情况下只在 informers 包内部调用,还是以 ConfigMap 举例:

func (f *configMapInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&corev1.ConfigMap{}, f.defaultInformer)
}

ForResource 用于根据 schema.GroupVersionResource 动态构建 Informer,但它返回的 Informer 和 Lister 是通用类型,使用起来没有具体类型那么方便;并且从实现上来看,就是一个 switch 语句,实际还是根据类型生成了具体资源类型的 Informer 和 Lister,并没有对第三方 CRD 的支持,估计是本来想实现 dynamic 包的功能,但是完成度不高,存在意义暂时不明 🕵️‍♂️

剩下就是 Start 和 WaitForCacheSync,这两个方法的作用想必你也能猜到,就是统一启动所有 Informers 和等待所有 Informers 首次同步完成。

Informer 最佳实践

上面关于 SharedInformerFactory 的演示代码能够正常运行,但是还存在一些问题:

(1). 处理事件是同步操作,有可能造成阻塞(在上一部分关于 cache 包的源码分析中可知,Informer 收到事件后调用处理程序是同步操作);

(2). 处理事件没有失败重试机制;

(3). 退出不够优雅,有可能造成状态异常;

(4). resyncPeriod 设置为 0,运行过程中有可能累积与服务器的不一致;

对于 (1), (2) 和 (3),都可以通过引入工作队列 (workqueue) 解决。对于 (4),kube-controller-manager (--min-resync-period) 采用的重新同步间隔是 12h - 24h(在此范围内随机,避免所有资源类型同时同步造成激增的压力),可以认为这是一个比较合理的值。

在 client-go 中内置了 workqueue 的实现,它被广泛使用于 kube-controller-manager 中,我们直接使用它来实现工作队列:

File

考虑上述这些情况,并确定了实现方案后,我设计了一个比较完善的控制器:树懒控制器 (SlothController) 🙉 :

树懒们会在收到更新事件时会报告到日志输出,但是每次执行任务前它们都要先睡一觉,而且有一定概率睡过头导致任务失败。如果多次睡过头导致某个任务失败,树懒们还会选择放弃这次任务。你不可以打断树懒的睡眠,在收到中断 (INTERRUPT) 信号后,需要等树懒们都处理完手头的工作才会退出(即使最终结果是失败)。

代码如下,建议从 main 函数开始阅读:

package main

import (
	"flag"
	"fmt"
	"github.com/spongeprojects/magicconch"
	"k8s.io/apimachinery/pkg/util/rand"
	"k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	listerscorev1 "k8s.io/client-go/listers/core/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/klog/v2"
	"os"
	"os/signal"
	"sync"
	"time"
)

// SlothController 树懒控制器!
type SlothController struct {
	factory           informers.SharedInformerFactory
	configMapLister   listerscorev1.ConfigMapLister
	configMapInformer cache.SharedIndexInformer
	queue             workqueue.RateLimitingInterface
	processingItems   *sync.WaitGroup

	// maxRetries 树懒们需要重试多少次才会放弃
	maxRetries int
	// chanceOfFailure 树懒们处理任务有多少概率失败(百分比)
	chanceOfFailure int
	// nap 树懒睡一觉要多久
	nap time.Duration
}

func NewController(
	factory informers.SharedInformerFactory,
	queue workqueue.RateLimitingInterface,
	chanceOfFailure int,
	nap time.Duration,
) *SlothController {
	configMapLister := factory.Core().V1().ConfigMaps().Lister()
	configMapInformer := factory.Core().V1().ConfigMaps().Informer()
	configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		UpdateFunc: func(old interface{}, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				// 只简单地把 key 放到队列中
				klog.Infof("[%s] received", key)
				queue.Add(key)
			}
		},
	})

	return &SlothController{
		factory:           factory,
		configMapLister:   configMapLister,
		configMapInformer: configMapInformer,
		queue:             queue,
		maxRetries:        3,
		chanceOfFailure:   chanceOfFailure,
		nap:               nap,
		processingItems:   &sync.WaitGroup{},
	}
}

// Run 开始运行控制器直到出错或 stopCh 关闭
func (c *SlothController) Run(sloths int, stopCh chan struct{}) error {
	// runtime.HandleCrash 是 Kubernetes 官方提供的 panic recover 方法,
	// 提供一个 panic recover 的统一入口,
	// 默认只是记录日志,该 panic 还是 panic。
	defer runtime.HandleCrash()
	// 退出前关闭队列让树懒们不要再接手新任务
	defer c.queue.ShutDown()

	klog.Info("SlothController starting...")

	go c.factory.Start(stopCh)

	// 等待首次同步完成
	for _, ok := range c.factory.WaitForCacheSync(stopCh) {
		if !ok {
			return fmt.Errorf("timed out waiting for caches to sync")
		}
	}

	for i := 0; i < sloths; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	// 等待 stopCh 关闭
	<-stopCh

	// 等待正在执行中的任务完成
	klog.Info("waiting for processing items to finish...")
	c.processingItems.Wait()

	return nil
}

func (c *SlothController) runWorker() {
	for c.processNextItem() {
	}
}

// processNextItem 用于等待和处理队列中的新任务
func (c *SlothController) processNextItem() bool {
	// 阻塞住,等待新任务中...
	key, shutdown := c.queue.Get()
	if shutdown {
		return false // 队列已进入退出状态,不要继续处理
	}

	c.processingItems.Add(1)

	// 任务完成后,无论成功与否,都记得标记完成,因为尽管有多只树懒,
	// 但对相同 key 的多个任务是不会并行处理的,
	// 如果相同 key 有多个事件,不要阻塞处理。
	defer c.queue.Done(key)

	result := c.processItem(key)
	c.handleErr(key, result)

	c.processingItems.Done()

	return true
}

// processItem 用于同步处理一个任务
func (c *SlothController) processItem(key interface{}) error {
	// 处理任务很慢,因为树懒很懒
	time.Sleep(c.nap)

	if rand.Intn(100) < c.chanceOfFailure {
		// 睡过头啦!
		return fmt.Errorf("zzz... ")
	}

	klog.Infof("[%s] processed.", key)
	return nil
}

// handleErr 用于检查任务处理结果,在必要的时候重试
func (c *SlothController) handleErr(key interface{}, result error) {
	if result == nil {
		// 每次执行成功后清空重试记录。
		c.queue.Forget(key)
		return
	}

	if c.queue.NumRequeues(key) < c.maxRetries {
		klog.Warningf("error processing %s: %v", key, result)
		// 执行失败,重试
		c.queue.AddRateLimited(key)
		return
	}

	// 重试次数过多,日志记录错误,同时也别忘了清空重试记录
	c.queue.Forget(key)
	// runtime.HandleError 是 Kubernetes 官方提供的错误响应方法,
	// 提供一个错误响应的统一入口。
	runtime.HandleError(fmt.Errorf("max retries exceeded, "+
		"dropping item %s out of the queue: %v", key, result))
}

func main() {
	fmt.Println("----- 10-sloth-controller -----")

	var sloths int
	var chanceOfFailure int
	var napInSecond int
	flag.IntVar(&sloths, "sloths", 1,
		"number of sloths")
	flag.IntVar(&chanceOfFailure, "chance-of-failure", 0,
		"chance of failure in percentage")
	flag.IntVar(&napInSecond, "nap", 0,
		"how long should the sloth nap (in seconds)")
	flag.Parse()

	kubeconfig := os.Getenv("KUBECONFIG")
	config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
	magicconch.Must(err)

	clientset, err := kubernetes.NewForConfig(config)
	magicconch.Must(err)

	// 创建 SharedInformerFactory
	defaultResyncPeriod := time.Hour * 12
	informerFactory := informers.NewSharedInformerFactoryWithOptions(
		clientset, defaultResyncPeriod, informers.WithNamespace("tmp"))

	// 使用默认配置创建 RateLimitingQueue,这种队列支持重试,同时会记录重试次数
	rateLimiter := workqueue.DefaultControllerRateLimiter()
	queue := workqueue.NewRateLimitingQueue(rateLimiter)

	controller := NewController(informerFactory, queue, chanceOfFailure,
		time.Duration(napInSecond)*time.Second)

	stopCh := make(chan struct{})

	// 响应中断信号 (Ctrl+C)
	interrupted := make(chan os.Signal)
	signal.Notify(interrupted, os.Interrupt)

	go func() {
		<-interrupted // 第 1 次收到中断信号时关闭 stopCh
		close(stopCh)
		<-interrupted // 第 2 次收到中断信号时直接退出
		os.Exit(1)
	}()

	if err := controller.Run(sloths, stopCh); err != nil {
		klog.Errorf("SlothController exit with error: %s", err)
	} else {
		klog.Info("SlothController exit")
	}
}

下面演示一下树懒控制器工作的样子,先确认 1 只树懒,不睡觉的情况下能否正常工作:

❯ sloth-controller --sloths=1 --chance-of-failure=0 --nap=0
I0508 00:45:41.142394   49248 main.go:79] SlothController starting...
I0508 00:45:46.813814   49248 main.go:52] [tmp/demo] received
I0508 00:45:46.813852   49248 main.go:141] [tmp/demo] processed.
I0508 00:45:47.333631   49248 main.go:52] [tmp/demo1] received
I0508 00:45:47.333674   49248 main.go:141] [tmp/demo1] processed.

可见在不睡觉的情况下处理任务还是很快的,只用了微秒级的时间。

接下来给树懒们加入一些嗜睡基因 (--nap=5):

❯ sloth-controller --sloths=1 --chance-of-failure=0 --nap=5
I0508 00:49:49.672573   49494 main.go:79] SlothController starting...
I0508 00:50:02.637114   49494 main.go:52] [tmp/demo] received
I0508 00:50:03.123127   49494 main.go:52] [tmp/demo1] received
I0508 00:50:07.637208   49494 main.go:141] [tmp/demo] processed.
I0508 00:50:12.641724   49494 main.go:141] [tmp/demo1] processed.

注意到处理任务的时间变长了,而且任务是排队处理的,总共花费了 10 秒多时间,因为只有 1 只树懒。

加入更多的树懒 (--sloths=3):

❯ sloth-controller --sloths=3 --chance-of-failure=0 --nap=5
I0508 00:51:18.519972   49662 main.go:79] SlothController starting...
I0508 00:51:22.299195   49662 main.go:52] [tmp/demo] received
I0508 00:51:22.827831   49662 main.go:52] [tmp/demo1] received
I0508 00:51:27.302323   49662 main.go:141] [tmp/demo] processed.
I0508 00:51:27.827984   49662 main.go:141] [tmp/demo1] processed.

现在任务处理效率变高了,实现了并行处理任务的功能。

我们尝试打断一下树懒控制器的运行:

❯ sloth-controller --sloths=3 --chance-of-failure=0 --nap=5
I0508 00:52:39.233788   49771 main.go:79] SlothController starting...
I0508 00:52:47.551730   49771 main.go:52] [tmp/demo] received
I0508 00:52:48.184259   49771 main.go:52] [tmp/demo1] received
^C
I0508 00:52:48.817727   49771 main.go:98] waiting for processing items to finish...
I0508 00:52:52.556681   49771 main.go:141] [tmp/demo] processed.
I0508 00:52:53.188094   49771 main.go:141] [tmp/demo1] processed.
I0508 00:52:53.188194   49771 main.go:217] SlothController exit

注意到,在发出中断指令后,程序等到两个进行中的任务都完成后才退出,实现了优雅退出的功能。

接下来给树懒们加入一些更加强效的嗜睡基因,让他们有可能无法处理任务 (--chance-of-failure=50):

❯ sloth-controller --sloths=3 --chance-of-failure=50 --nap=5
I0508 00:58:25.991831   50040 main.go:79] SlothController starting...
I0508 00:58:29.627866   50040 main.go:52] [tmp/demo] received
W0508 00:58:34.630694   50040 main.go:154] error processing tmp/demo: zzz...
I0508 00:58:39.637953   50040 main.go:141] [tmp/demo] processed.

表现和预期的一样,出现了失败的任务,并且失败的任务进行了重试,最终执行成功,只不过花费了双倍的时间…

最后我们给树懒们加入一些极度强效的嗜睡基因 (--chance-of-failure=99) 😰

❯ sloth-controller --sloths=3 --chance-of-failure=99 --nap=5
I0508 01:00:58.172928   50221 main.go:79] SlothController starting...
I0508 01:01:12.565151   50221 main.go:52] [tmp/demo] received
W0508 01:01:17.565633   50221 main.go:154] error processing tmp/demo: zzz...
W0508 01:01:22.574243   50221 main.go:154] error processing tmp/demo: zzz...
W0508 01:01:27.588450   50221 main.go:154] error processing tmp/demo: zzz...
E0508 01:01:32.613327   50221 main.go:164] max retries exceeded, dropping item tmp/demo out of the queue: zzz...

没有办法,在经过 3 次尝试并失败后,任务被放弃不再重试,但没有影响到控制器整体运行。

通过以上的演示,可以看到树懒控制器完全实现了前面提到的功能,是一个非常可靠的控制器(当然前提是树懒不睡觉)。

👩‍⚕️ 实验过程中没有任何动物受到伤害。

总结

在以上的内容中,我们分析了 informers 包的源代码,并演示了 Informer 在实际使用中的最佳实践。

接下来,在后续章节中,我还是带大家继续解析 Informer 相关源代码,同时介绍自定义资源 (CRD) Informer 的实现和动态 Informer 相关的内容。

下一部分:实现自定义资源 (CRD) Informer

参考资料

Bitnami Engineering: A deep dive into Kubernetes controllers

Bitnami Engineering: Kubewatch, an example of Kubernetes custom controller

Dynamic Kubernetes Informers | FireHydrant

client-go/main.go at master · kubernetes/client-go · GitHub

GitHub - kubernetes/sample-controller: Repository for sample controller. Complements sample-apiserver

Kubernetes Deep Dive: Code Generation for CustomResources

How to generate client codes for Kubernetes Custom Resource Definitions (CRD) | by Roger Liang | ITNEXT


Loading comments...