client-go提供了Informer机制,client可以指定监听的资源对象类型,然后Informer:
- 首先,全量list所有的该类型的对象;
- 然后,监听该类型对象的变化(Add/Update/Delete),并调用用户注册的Handler;
一. 实现机制
由sharedIndexInformer类型实现,整体实现机制如下图所示:
- Reflector负责向apiserver list&watch资源对象;
- Reflector list&watch到的对象及其变化,被放入DeltaFIFO队列;
-
Informer负责pop DeltaFIFO队列的变化事件,然后:
- 更新对象到Indexer;
- 调用用户注册的ResourceEventHandler;
二. 源码框架
整体对外接口:sharedInformerFactory,通过map让各种resourceType的informer可以共享;
客户端调用创建出来的informer对象:sharedIndexInformer,其中包括:
- indexer Indexer: 索引&存储;
-
controller Controller:
- 调用Reflector&FIFO&indexer完成事件的分发、LocalStore的维护;
- 将Reflector&FIFO的逻辑串联起来;
- processor *sharedProcessor: 利用分发的事件,调用client的ResourceEventHandler;
- listwatcher ListerWatcher: 被reflector.Run()调用,进行资源对象的List&Watch;
- objectType runtime.Object: 监听的对象类型;
三. 源码:sharedInformerFactory如何share informer
client创建Informer的过程:
- 先创建sharedInformerFactory;
- 再用factory创建informer;
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 10*time.Second, informers.WithNamespace(""))
factory.Core().V1().Pods().Informer()
sharedInformerFactory的结构:
- informers:保存了所有已创建的sharedIndexInformer;
- startedInformers: 保存了所有已启动的sharedIndexInformer:
// staging/src/k8s.io/client-go/informers/factory.go
type sharedInformerFactory struct {
...
informers map[reflect.Type]cache.SharedIndexInformer
startedInformers map[reflect.Type]bool
...
}
底层client-go创建podInformer:
// staging/src/k8s.io/client-go/informers/core/v1/interface.go
// Pods returns a PodInformer.
func (v *version) Pods() PodInformer {
return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
// staging/src/k8s.io/client-go/informers/core/v1/pod.go
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
InformerFor()执行创建,并将podInformer添加到factory:
- 若objectType的informer已经存在,则直接返回;
- 否则,创建informer,并添加到factory.informers;
// staging/src/k8s.io/client-go/informer/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
.......
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
//obj的informer已经存在
if exists {
return informer
}
.....
//创建informer并添加到informers
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
三. 源码:sharedIndexInformer.Run()流程
Run()做初始化并调用其它组件run:
- 创建fifo,它对应delta存储;
- 创建controller;
- 调用process.run(): 执行client的ResourceEventHandler;
- 调用controller.Run(): 执行list&watch–>更新fifo–>[更新indexer,分发notification];
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh
四. 源码:controller.Run()流程
controller整合了reflector/fifo/indexer等各大组件:
- 使用reflector.Run()进行list&watcher,将变更存入fifo;
-
使用processLoop()进行fifo.Pop(),处理变更:
- 使用indexer进行对象的更新;
- 使用processor进行变更通知的分发;
// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh
1. reflector.Run()
调用ListAndWatch()监听apiserver然后进行fifo的更新;
- 首先,进行全量的List(),然后将结果全量更新至fifo(fifo.Replace());
-
然后,启动resync的定时器,定期进行resync,即将LocalStore的数据再放入DeltaFIFO进行处理;
- resync的原因:deltaFIFO中的事件回调,可能存在处理失败的情况,定时的resync机制让失败的事件有了重新onUpdate处理的机会;
- 最后,进行watch监听,然后通过watchHandler()处理watch到的事件(即进行fifo.Add/Update/Delete);
// staging/src/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh
func (r *Reflector) ListAndWatch(stopCh fifo.Replace()全量更新
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
r.setLastSyncResourceVersion(resourceVersion)
return nil
}(); err != nil {
return err
}
// 2)定期进行resync
resyncerrc := make(chan error, 1)
go func() {
resyncCh, cleanup := r.resyncChan()
for {
select {
case
重点看一下watcherHandler(): 监听到事件后,进行store的增删改;
// staging/src/k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh
2. controller.processLoop()
processLoop()的工作:
- Pop fifo中的事件;
- 事件被c.config.Process函数处理;
- 处理结果出现RetryOnError时,将事件重新入队;
// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) // pop出来的事件被Process函数处理
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
事件处理函数:c.config.Process = informer.HandleDeleta()
-
处理从fifo pop出来的变更:
- 一是更新indexer;
- 二是向processor分发notification;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
// 事件类型=Sync/Replaced/Add/Updated
case Sync, Replaced, Added, Updated:
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 若LocalStore中已存在,则更新;
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
}
// 向processor分发UPDATE notification
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 若LocalStore中不存在,则添加;
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 向processor分发ADD notification
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
// 事件类型=Deleted
case Deleted:
// 删除LocalStore中的对象
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 向processor分发delete notification
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
五. 源码:processor.run()流程
processor的类型是sharedProcessor;
processor.run()主要工作:
- 接收从fifo.Pop()–>informer.HandleDelta()分发出来的notification;
- 调用ResourceEventHandler进行notification的处理;
sharedProcessor中包含N个processorListner,而processorListener封装了ResourceEventHandler;
也就是说,sharedProcessor中包括N个ResourceEventHandler的处理;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type sharedProcessor struct {
...
listeners []*processorListener
syncingListeners []*processorListener
}
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
handler ResourceEventHandler
...
}
process.run()调用每个processorListener.run()/pop():
- 由processorListener.pop()进行notification的接收;
- 由processorListener.run()调用ResourceEventHandler进行notification的处理;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh
processorListener.run()
- 消费nextCh中的notification,调用ResourceEventHandler完成事件的处理;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh { // 消费notification
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net